Frage

Ich habe einige asynchrone Aufgaben ausgeführt wird, und ich muss warten, bis zumindest einer von ihnen beendet ist (in der Zukunft wahrscheinlich werde ich util M aus N Aufgaben warten müssen, sind fertig). Aktuell sind sie als Zukunft vorgestellt, so brauche ich so etwas wie

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

Gibt es etwas wie das? Oder ähnliches, nicht notwendig für Zukunft. Derzeit ich Schleife durch Sammlung von Futures, prüfen Sie, ob ein fertig ist, dann für einige Zeit schlafen und wieder überprüfen. Das sieht aus wie nicht die beste Lösung, denn wenn ich für längere Zeit schlafen dann unerwünschte Verzögerung hinzugefügt wird, wenn ich für kurze Zeit schlafen, dann kann es die Leistung beeinträchtigen.

Ich könnte versuchen mit

new CountDownLatch(1)

und verringert Countdown, wenn Aufgabe abgeschlossen ist und tut

countdown.await()

, aber ich fand es möglich, nur wenn ich zukünftige Gestaltung steuern. Es ist möglich, erfordert aber System-Redesign, weil zur Zeit Logik der Aufgaben-Erstellung (Callable zu ExecutorService Senden) von Entscheidung getrennt ist, für die Zukunft zu warten. Ich könnte auch außer Kraft setzen

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

und benutzerdefinierte Implementierung von RunnableFuture mit Fähigkeit schaffen Hörer zu befestigen benachrichtigt werden, wenn Aufgabe beendet ist, dann bringt solche Hörer benötigten Aufgaben und verwenden CountDownLatch, aber das bedeutet, dass ich newTaskFor für jeden ExecutorService außer Kraft setzen muß ich verwenden - und möglicherweise dort wird die Umsetzung, die AbstractExecutorService nicht verlängern. Ich könnte versuchen, auch gegeben ExecutorService für denselben Zweck Einwickeln, aber dann muss ich alle Methoden produzieren Futures dekorieren.

Alle diese Lösungen können funktionieren, aber scheinen sehr unnatürlich. Es sieht aus, als ob ich etwas einfach, fehlt wie

WaitHandle.WaitAny(WaitHandle[] waitHandles)

in c #. Gibt es irgendwelche bekannten Lösungen für diese Art von Problem?

UPDATE:

Ursprünglich habe ich habe keinen Zugriff auf zukünftige Gestaltung überhaupt, so gab es keine elegante Lösung. Nachdem das System neu zu gestalten habe ich Zugriff auf zukünftige Gestaltung und konnte countDownLatch.countdown (), um Ausführungsprozess hinzuzufügen, dann kann ich countDownLatch.await () und alles funktioniert. Vielen Dank für die anderen Antworten, ich wusste nicht, über ExecutorCompletionService und es kann in der Tat hilfreich sein, in ähnlichen Aufgaben, aber in diesem speziellen Fall nicht verwendet werden könnte, weil einige Futures ohne Testamentsvollstrecker erstellt werden - eigentliche Aufgabe an einem anderen Server über das Netzwerk gesendet wird, abgeschlossen remote und Vollendungsmeldung empfangen wird.

War es hilfreich?

Lösung

Soweit ich weiß, Java hat keine analoge Struktur zu der WaitHandle.WaitAny Methode.

Es scheint mir, dass dies durch eine „WaitableFuture“ Dekorateur erreicht werden kann:

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

Obwohl dies würde nur funktionieren, wenn es vor der Ausführung Code eingesetzt werden kann, da sonst die Ausführung von Code nicht mit dem neuen doTask() Verfahren haben würde. Aber ich sehe wirklich keine Möglichkeit, dies ohne Abfrage zu tun, wenn Sie nicht irgendwie die Kontrolle über die Zukunft Objekt vor der Ausführung gewinnen.

Oder wenn die Zukunft immer in einem eigenen Thread läuft, und man kann irgendwie diesen Thread bekommen. Dann könnten Sie einen neuen Thread erzeugen sich Faden zu verbinden, dann den Wartemechanismus Griff nach der Rückkehr kommen ... Das wäre wirklich hässlich und würde eine Menge Aufwand obwohl induzieren. Und wenn einige zukünftige Objekte nicht fertig sind, können Sie eine Menge blockierter Threads auf totem Fäden abhängig. Wenn du nicht aufpasst, könnte diese Speicher- und Systemressourcen auslaufen.

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

Andere Tipps

einfache Besuche ExecutorCompletionService .

Warum nicht nur eine Ergebniswarteschlange erstellen und in der Warteschlange warten? Oder einfacher, verwenden Sie einen CompletionService denn das ist, was es ist:. Eine ExecutorService + Ergebniswarteschlange

Das ist eigentlich ziemlich einfach, mit Wartezeit ist () und notifyAll ().

Zunächst definiert ein Sperrobjekt. (Sie können für diese jede Klasse verwenden, aber Ich mag explizit sein):

package com.javadude.sample;

public class Lock {}

Als nächstes definieren Sie Ihren Worker-Thread. Er muss dieses Sperrobjekt benachrichtigt werden, wenn er mit seiner Verarbeitung fertig ist. Beachten Sie, dass die benachrichtigen muss in einer synchronisierten Block Verriegelung auf dem Sperrobjekt werden.

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

Schließlich können Sie Ihre Kunden schreiben

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}

Da Sie kümmern sich nicht, welche Oberflächen, warum nicht einfach nur einen einzigen Waithandle für alle Threads und auf das warten? Unabhängig davon, welche ein Ende kann zuerst den Griff eingestellt.

Sehen Sie diese Option:

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top