Domanda

Ho poche attività asincrone in esecuzione e devo aspettare fino a quando almeno una di esse è terminata (in futuro probabilmente dovrò aspettare che M su N attività siano terminate). Attualmente sono presentati come Future, quindi ho bisogno di qualcosa come

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

C'è qualcosa del genere? O qualcosa di simile, non necessario per il futuro. Attualmente cerco la collezione di futures, controllo se uno è finito, poi dormo per un po 'e ricontrollo. Questa non sembra la soluzione migliore, perché se dormo per un lungo periodo viene aggiunto un ritardo indesiderato, se dormo per un breve periodo, ciò può influire sulle prestazioni.

Potrei provare a usare

new CountDownLatch(1)

e diminuisci il conto alla rovescia quando l'attività è completa e fai

countdown.await()

, ma l'ho trovato possibile solo se controllo la creazione futura. È possibile, ma richiede una riprogettazione del sistema, poiché attualmente la logica di creazione delle attività (invio di Callable a ExecutorService) è separata dalla decisione di attendere il futuro. Potrei anche ignorare

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

e creare un'implementazione personalizzata di RunnableFuture con la possibilità di collegare il listener per ricevere una notifica al termine dell'attività, quindi collegare tale listener alle attività necessarie e utilizzare CountDownLatch, ma ciò significa che devo sovrascrivere newTaskFor per ogni ExecutorService che uso - e potenzialmente lì sarà un'implementazione che non estende AbstractExecutorService. Potrei anche provare a confezionare ExecutorService per lo stesso scopo, ma poi devo decorare tutti i metodi che producono Futures.

Tutte queste soluzioni possono funzionare ma sembrano molto innaturali. Sembra che mi manchi qualcosa di semplice, come

WaitHandle.WaitAny(WaitHandle[] waitHandles)

in c #. Esistono soluzioni ben note per questo tipo di problema?

UPDATE:

Inizialmente non avevo affatto accesso alla creazione futura, quindi non c'erano soluzioni eleganti. Dopo aver riprogettato il sistema, ho avuto accesso alla creazione futura e sono stato in grado di aggiungere countDownLatch.countdown () al processo di esecuzione, quindi posso contareDownLatch.await () e tutto funziona perfettamente. Grazie per altre risposte, non sapevo di ExecutorCompletionService e in effetti può essere utile in attività simili, ma in questo caso particolare non potrebbe essere utilizzato perché alcuni Futures vengono creati senza alcun esecutore: l'attività effettiva viene inviata a un altro server tramite rete, viene completato in remoto e viene ricevuta la notifica di completamento.

È stato utile?

Soluzione

Per quanto ne so, Java non ha una struttura analoga al metodo WaitHandle.WaitAny .

Mi sembra che ciò possa essere ottenuto attraverso un "WaitableFuture" decorator:

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

Sebbene ciò funzionerebbe solo se potesse essere inserito prima del codice di esecuzione, poiché altrimenti il ??codice di esecuzione non avrebbe il nuovo metodo doTask () . Ma non vedo davvero alcun modo di farlo senza polling se non puoi in qualche modo ottenere il controllo dell'oggetto Future prima dell'esecuzione.

O se il futuro gira sempre nel suo thread e puoi in qualche modo ottenere quel thread. Quindi potresti generare un nuovo thread per unire l'altro, quindi gestire il meccanismo di attesa dopo il ritorno del join ... Questo sarebbe davvero brutto e indurrebbe comunque un sacco di spese generali. E se alcuni oggetti Future non finiscono, potresti avere molti thread bloccati a seconda dei thread morti. Se non stai attento, questo potrebbe perdere la memoria e le risorse di sistema.

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

Altri suggerimenti

semplice, controlla ExecutorCompletionService .

Perché non creare semplicemente una coda dei risultati e attendere in coda? O più semplicemente, usa un CompletionService poiché è quello che è: una coda di risultati ExecutorService +.

Questo in realtà è abbastanza facile con wait () e notifyAll ().

Innanzitutto, definire un oggetto blocco. (Puoi usare qualsiasi classe per questo, ma mi piace essere esplicito):

package com.javadude.sample;

public class Lock {}

Successivamente, definisci il tuo thread di lavoro. Deve notificare l'oggetto lock quando ha terminato la sua elaborazione. Si noti che la notifica deve trovarsi in un blocco di blocco sincronizzato sull'oggetto di blocco.

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

Infine, puoi scrivere il tuo client:

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}

Dal momento che non ti interessa quale finisce, perché non avere un solo WaitHandle per tutti i thread e aspettare quello? Qualunque sia il primo arrivato, è possibile impostare la maniglia.

Vedi questa opzione:

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top