Pregunta

Tengo algunas tareas asincrónicas en ejecución y necesito esperar hasta que finalice al menos una de ellas (en el futuro, probablemente tendré que esperar hasta que finalicen M de N tareas).Actualmente se presentan como Futuro, por lo que necesito algo como

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

¿Hay algo como esto?O algo similar, no necesario para Future.Actualmente reviso la colección de futuros, compruebo si uno está terminado, luego duermo un rato y vuelvo a comprobar.Esta no parece ser la mejor solución, porque si duermo durante un período prolongado se agrega un retraso no deseado; si duermo durante un período corto, puede afectar el rendimiento.

Podría intentar usar

new CountDownLatch(1)

y disminuir la cuenta regresiva cuando la tarea esté completa y hacer

countdown.await()

, pero lo encontré posible sólo si controlo la creación futura.Es posible, pero requiere un rediseño del sistema, porque actualmente la lógica de creación de tareas (enviar Callable a ExecutorService) está separada de la decisión de esperar qué futuro.También podría anular

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

y crear una implementación personalizada de RunnableFuture con la capacidad de adjuntar un oyente para recibir una notificación cuando finalice la tarea, luego adjuntar dicho oyente a las tareas necesarias y usar CountDownLatch, pero eso significa que tengo que anular newTaskFor para cada ExecutorService que uso, y potencialmente habrá implementación que no extienden AbstractExecutorService.También podría intentar envolver un ExecutorService dado para el mismo propósito, pero luego tengo que decorar todos los métodos que producen Futures.

Todas estas soluciones pueden funcionar, pero parecen muy antinaturales.Parece que me falta algo simple, como

WaitHandle.WaitAny(WaitHandle[] waitHandles)

Cª#.¿Existen soluciones conocidas para este tipo de problema?

ACTUALIZAR:

Originalmente no tenía acceso a la creación futura, por lo que no había una solución elegante.Después de rediseñar el sistema, obtuve acceso a la creación futura y pude agregar countDownLatch.countdown() al proceso de ejecución, luego puedo contarDownLatch.await() y todo funciona bien.Gracias por otras respuestas, no conocía ExecutorCompletionService y, de hecho, puede ser útil en tareas similares, pero en este caso particular no se pudo usar porque algunos Futures se crean sin ningún ejecutor: la tarea real se envía a otro servidor a través de la red. se completa de forma remota y se recibe una notificación de finalización.

¿Fue útil?

Solución

Hasta donde yo sé, Java no tiene una estructura análoga a la WaitHandle.WaitAny método.

Me parece que esto podría lograrse mediante un decorador "WaitableFuture":

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

Aunque esto sólo funcionaría si se puede insertar antes del código de ejecución, ya que de lo contrario el código de ejecución no tendría el nuevo doTask() método.Pero realmente no veo forma de hacer esto sin realizar una encuesta si de alguna manera no puedes obtener el control del objeto Future antes de la ejecución.

O si el futuro siempre corre en su propio hilo, y de alguna manera puedes conseguir ese hilo.Luego, podría generar un nuevo hilo para unirse entre sí y luego manejar el mecanismo de espera después de que regrese la unión...Sin embargo, esto sería realmente feo y generaría muchos gastos generales.Y si algunos objetos futuros no terminan, es posible que tenga muchos subprocesos bloqueados dependiendo de los subprocesos muertos.Si no tiene cuidado, esto podría perder memoria y recursos del sistema.

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

Otros consejos

simple, echa un vistazo Servicio de finalización del ejecutor.

¿Por qué no simplemente crear una cola de resultados y esperar en la cola?O más simplemente, use CompletionService ya que eso es lo que es:una cola de resultados ExecutorService +.

En realidad, esto es bastante fácil con wait() y notifyAll().

Primero, defina un objeto de bloqueo.(Puedes usar cualquier clase para esto, pero me gusta ser explícito):

package com.javadude.sample;

public class Lock {}

A continuación, defina su hilo de trabajo.Debe notificar a ese objeto bloqueado cuando haya terminado con su procesamiento.Tenga en cuenta que la notificación debe estar en un bloqueo de bloque sincronizado en el objeto de bloqueo.

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

Finalmente, puedes escribirle a tu cliente:

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}

Dado que no le importa cuál termina, ¿por qué no tener un único WaitHandle para todos los subprocesos y esperar?El que termine primero podrá configurar el mango.

Vea esta opción:

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top