سؤال

لدي عدد قليل من المهام غير المتزامنة قيد التشغيل وأحتاج إلى الانتظار حتى تنتهي واحدة منها على الأقل (ربما سأحتاج في المستقبل إلى الانتظار حتى تنتهي مهام M من N).يتم تقديمها حاليًا على أنها مستقبل، لذا أحتاج إلى شيء من هذا القبيل

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

هل هناك أي شيء من هذا القبيل؟أو أي شيء مماثل، ليس ضروريا للمستقبل.أقوم حاليًا باستعراض مجموعة من العقود الآجلة، وأتحقق مما إذا كان قد تم الانتهاء منها، ثم أنام لبعض الوقت وأتحقق مرة أخرى.يبدو أن هذا ليس الحل الأفضل، لأنه إذا نمت لفترة طويلة فسيتم إضافة تأخير غير مرغوب فيه، وإذا نمت لفترة قصيرة فقد يؤثر ذلك على الأداء.

يمكنني أن أحاول استخدام

new CountDownLatch(1)

وتقليل العد التنازلي عند اكتمال المهمة والقيام بها

countdown.await()

, لكنني وجدت أن ذلك ممكن فقط إذا كنت أتحكم في خلق المستقبل.هذا ممكن، ولكنه يتطلب إعادة تصميم النظام، لأن منطق إنشاء المهام الحالي (إرسال Callable إلى ExecutorService) منفصل عن قرار انتظار المستقبل.يمكنني أيضًا التجاوز

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

وإنشاء تطبيق مخصص لـ RunnableFuture مع القدرة على إرفاق المستمع ليتم إعلامه عند انتهاء المهمة، ثم إرفاق هذا المستمع بالمهام المطلوبة واستخدام CountDownLatch، ولكن هذا يعني أنه يتعين علي تجاوز newTaskFor لكل خدمة ExecutorService التي أستخدمها - ومن المحتمل أن يكون هناك تنفيذ والتي لا تمتد AbstractExecutorService.يمكنني أيضًا محاولة تغليف ExecutorService المعطاة لنفس الغرض، ولكن بعد ذلك يجب أن أقوم بتزيين جميع طرق إنتاج العقود الآجلة.

كل هذه الحلول قد تنجح لكنها تبدو غير طبيعية على الإطلاق.يبدو أنني أفتقد شيئًا بسيطًا، مثل

WaitHandle.WaitAny(WaitHandle[] waitHandles)

شركة#.هل هناك أي حلول معروفة لمثل هذا النوع من المشاكل؟

تحديث:

في الأصل لم يكن لدي إمكانية الوصول إلى إنشاء المستقبل على الإطلاق، لذلك لم يكن هناك حل أنيق.بعد إعادة تصميم النظام، تمكنت من الوصول إلى إنشاء المستقبل وتمكنت من إضافة countDownLatch.countdown() إلى عملية التنفيذ، ثم يمكنني countDownLatch.await() وكل شيء يعمل بشكل جيد.شكرًا على الإجابات الأخرى، لم أكن أعرف شيئًا عن ExecutorCompletionService ويمكن أن يكون مفيدًا بالفعل في مهام مماثلة، ولكن في هذه الحالة بالذات لا يمكن استخدامه لأنه يتم إنشاء بعض العقود الآجلة دون أي منفذ - يتم إرسال المهمة الفعلية إلى خادم آخر عبر الشبكة، يكتمل عن بعد ويتم تلقي إشعار الإكمال.

هل كانت مفيدة؟

المحلول

بقدر ما أعرف، جافا ليس لديها بنية مماثلة ل WaitHandle.WaitAny طريقة.

يبدو لي أنه يمكن تحقيق ذلك من خلال مصمم الديكور "WaitableFuture":

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

على الرغم من أن هذا لن يعمل إلا إذا كان من الممكن إدراجه قبل رمز التنفيذ، وإلا فلن يحتوي رمز التنفيذ على الجديد doTask() طريقة.لكنني لا أرى حقًا أي طريقة للقيام بذلك دون الاقتراع إذا لم تتمكن بطريقة ما من التحكم في الكائن المستقبلي قبل التنفيذ.

أو إذا كان المستقبل دائمًا يسير في خيط خاص به، ويمكنك بطريقة ما الحصول على هذا الخيط.بعد ذلك، يمكنك إنشاء خيط جديد لربط كل خيط آخر، ثم التعامل مع آلية الانتظار بعد عودة الانضمام...سيكون هذا قبيحًا حقًا وسيؤدي إلى الكثير من النفقات العامة.وإذا لم تكتمل بعض الكائنات المستقبلية، فقد يكون لديك الكثير من سلاسل الرسائل المحظورة اعتمادًا على سلاسل الرسائل الميتة.إذا لم تكن حذرًا، فقد يؤدي ذلك إلى تسرب الذاكرة وموارد النظام.

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

نصائح أخرى

بسيطة، تحقق من ExecutorCompletionService.

لماذا لا تقوم فقط بإنشاء قائمة انتظار النتائج والانتظار في قائمة الانتظار؟أو ببساطة، استخدم خدمة CompletionService لأن هذا هو ما هي عليه:ExecutorService + قائمة انتظار النتائج.

هذا في الواقع سهل جدًا باستخدام الانتظار () وإخطار الكل ().

أولاً، قم بتعريف كائن القفل.(يمكنك استخدام أي فئة لهذا، ولكن أود أن أكون واضحا):

package com.javadude.sample;

public class Lock {}

بعد ذلك، حدد مؤشر ترابط العامل الخاص بك.يجب عليه إخطار كائن القفل هذا عندما ينتهي من معالجته.لاحظ أن الإخطار يجب أن يكون في كتلة متزامنة مقفلة على كائن القفل.

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

وأخيرًا، يمكنك كتابة عميلك:

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}

نظرًا لأنك لا تهتم بأي واحد سينتهي، فلماذا لا يكون لديك WaitHandle واحد لجميع سلاسل الرسائل وتنتظر ذلك؟أيهما ينتهي أولاً يمكنه ضبط المقبض.

انظر هذا الخيار:

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top