Question

J'ai peu de tâches asynchrones en cours d'exécution et j'ai besoin d'attendre qu'au moins une d'entre elles soit terminée (à l'avenir, j'aurai probablement besoin d'attendre que M tâches sur N soient terminées). Actuellement, ils sont présentés comme futurs, il me faut donc quelque chose comme

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

Y a-t-il quelque chose comme ça? Ou quelque chose de similaire, pas nécessaire pour l'avenir. Actuellement, je passe en revue une collection de contrats à terme, vérifie si l'un est terminé, puis passe un certain temps en veille et vérifie à nouveau. Cela ne semble pas être la meilleure solution, car si je dors pendant une longue période, un délai indésirable est ajouté. Si je dors pendant une courte période, cela peut affecter les performances.

Je pourrais essayer d'utiliser

new CountDownLatch(1)

et diminuez le compte à rebours une fois la tâche terminée et faites-le

countdown.await()

, mais je ne trouvais cela possible que si je contrôlais la création future. Cela est possible, mais nécessite une refonte du système, car actuellement, la logique de création des tâches (envoi de Callable à ExecutorService) est séparée de la décision d’attendre quel avenir. Je pourrais aussi remplacer

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

et créer une implémentation personnalisée de RunnableFuture avec possibilité d'attacher un écouteur pour être averti lorsque la tâche est terminée, puis attacher cet écouteur aux tâches nécessaires et utiliser CountDownLatch, mais cela signifie que je dois remplacer newTaskFor pour chaque service ExecutorService que j'utilise - et potentiellement là-bas seront des implémentations qui n’étendent pas AbstractExecutorService. Je pourrais aussi essayer d’emballer ExecutorService dans le même but, mais je dois ensuite décorer toutes les méthodes produisant des contrats à terme.

Toutes ces solutions peuvent fonctionner mais semblent très peu naturelles. Il semble que je manque quelque chose de simple, comme

WaitHandle.WaitAny(WaitHandle[] waitHandles)

dans c #. Existe-t-il des solutions bien connues à ce type de problème?

MISE À JOUR:

À l'origine, je n'avais pas du tout accès à la création de Future, il n'y avait donc pas de solution élégante. Après avoir redessiné le système, j'ai eu accès à la création de Future et j'ai pu ajouter countDownLatch.countdown () au processus d'exécution, puis je peux countDownLatch.await () et tout fonctionne correctement. Merci pour les autres réponses. Je ne connaissais pas ExecutorCompletionService et cela peut effectivement être utile pour des tâches similaires, mais dans ce cas particulier, il ne pourrait pas être utilisé car certains contrats à terme sont créés sans exécuteur - la tâche réelle est envoyée à un autre serveur via le réseau, se termine à distance et la notification d'achèvement est reçue.

Était-ce utile?

La solution

Pour autant que je sache, Java n'a pas de structure analogue à la méthode WaitHandle.WaitAny .

Il me semble que cela pourrait être réalisé grâce à un "WaitableFuture". décorateur:

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

Bien que cela ne fonctionne que s'il peut être inséré avant le code d'exécution, sinon le code d'exécution n'aurait pas la nouvelle méthode doTask () . Mais je ne vois vraiment aucun moyen de le faire sans interrogation si vous ne pouvez pas en quelque sorte prendre le contrôle de l'objet Future avant son exécution.

Ou si le futur tourne toujours dans son propre fil, et vous pouvez en quelque sorte obtenir ce fil. Ensuite, vous pourriez créer un nouveau thread pour se joindre à un autre thread, puis gérer le mécanisme d’attente après le retour de la jointure ... Ce serait vraiment moche et entraînerait toutefois beaucoup de surcharge. Et si certains objets Future ne finissent pas, vous pourriez avoir beaucoup de threads bloqués en fonction de threads morts. Si vous ne faites pas attention, vous risquez de perdre de la mémoire et des ressources système.

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

Autres conseils

simple, consultez ExecutorCompletionService .

Pourquoi ne pas simplement créer une file d'attente de résultats et attendre sur la file d'attente? Ou plus simplement, utilisez un CompletionService puisque c’est ce qu’il est: une file d’exécution ExecutorService +.

Ceci est en fait assez facile avec wait () et notifyAll ().

Définissez d’abord un objet verrou. (Vous pouvez utiliser n'importe quelle classe pour cela, mais j'aime être explicite):

package com.javadude.sample;

public class Lock {}

Ensuite, définissez votre thread de travail. Il doit notifier cet objet de verrouillage lorsqu'il a terminé son traitement. Notez que la notification doit être dans un verrouillage de bloc synchronisé sur l'objet verrou.

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

Enfin, vous pouvez écrire votre client:

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}

Puisque vous ne vous souciez pas de savoir lequel termine, pourquoi ne pas simplement avoir un seul WaitHandle pour tous les threads et attendre cela? Celui qui termine le premier peut définir la poignée.

Voir cette option:

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top