Pergunta

Eu tenho algumas tarefas assíncronas em execução e eu preciso esperar até que pelo menos um deles está acabado (no futuro, provavelmente, eu vou ter de esperar util M fora de tarefas N são acabado). Atualmente eles são apresentados como futuro, então eu preciso de algo como

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

Existe alguma coisa assim? Ou qualquer coisa semelhante, não é necessário para o futuro. laço Atualmente eu através da recolha de futuros, verifique se alguém está terminado, e depois dormir por algum tempo e verificar novamente. Isso parece não gostar da melhor solução, porque se eu dormir por um longo período, então atraso indesejado é adicionado, se eu dormir por curto período de tempo, então ele pode afetar o desempenho.

Eu poderia tentar usar

new CountDownLatch(1)

e diminuição da contagem regressiva quando a tarefa estiver concluída e fazer

countdown.await()

, mas eu achei que possível apenas se eu controlar a criação de Futuro. É possível, mas requer redesenho do sistema, porque atualmente a lógica de criação de tarefas (enviando mobilizável para ExecutorService) é separada da decisão de esperar para o qual Futuro. Eu também poderia substituir

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

e criar implementação personalizada do RunnableFuture com capacidade de anexar ouvinte para ser notificado quando a tarefa estiver concluída, em seguida, anexar tais ouvinte para tarefas necessárias e uso CountDownLatch, mas isso significa que eu tenho que substituir newTaskFor para cada uso ExecutorService I - e potencialmente lá será a implementação que não se estendem AbstractExecutorService. Eu também poderia tentar envolver dada ExecutorService para mesma finalidade, mas então eu tenho que decorar todos os métodos de produção Futures.

Todas estas soluções podem funcionar, mas parece muito natural. Parece que eu estou faltando algo simples, como

WaitHandle.WaitAny(WaitHandle[] waitHandles)

em c #. Há alguma solução bem conhecidos para esse tipo de problema?

UPDATE:

Inicialmente eu não tinha acesso à criação Futuro em tudo, por isso não havia nenhuma solução elegante. Depois que o sistema redesenhar eu tenho acesso a criação Futuro e foi capaz de adicionar countDownLatch.countdown () para processo de execução, então eu posso countDownLatch.await () e tudo funciona bem. Obrigado por outras respostas, eu não sabia sobre ExecutorCompletionService e que, efectivamente pode ser útil em tarefas semelhantes, mas, neste caso particular, não poderia ser usado porque alguns Futures são criados sem qualquer executor - tarefa real é enviado para outro servidor através da rede, concluída remotamente e notificação de conclusão é recebido.

Foi útil?

Solução

Tanto quanto eu sei, Java não tem estrutura semelhante ao método WaitHandle.WaitAny.

Parece-me que isso poderia ser conseguido através de um decorador "WaitableFuture":

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

Embora isso só funciona se ele pode ser inserido antes do código de execução, pois caso contrário o código de execução não teria o novo método doTask(). Mas eu realmente não vejo nenhuma maneira de fazer isso sem votação, se você não pode de alguma forma obter o controle do objeto futuro antes da execução.

Ou se o futuro sempre é executado em seu próprio segmento, e você pode de alguma forma obter esse segmento. Então você poderia gerar um novo segmento para se juntar a outro tópico, então lidar com o mecanismo de espera após a juntar-se retornos ... Isso seria realmente feio e induziria um monte de que em cima. E se alguns objetos futuros não terminar, você poderia ter um monte de tópicos bloqueados, dependendo tópicos mortos. Se você não tiver cuidado, isso pode vazar memória e recursos do sistema.

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

Outras dicas

Por que não criar um resultado fila e esperar na fila? Ou mais simplesmente, usar um CompletionService já que é o que é:. Uma fila ExecutorService + resultado

Isso é realmente muito fácil com wait () e notifyAll ().

Primeiro, definir um objeto de bloqueio. (Você pode usar qualquer classe para isso, mas eu gosto de ser explícito):

package com.javadude.sample;

public class Lock {}

Em seguida, definir o seu segmento de trabalho. Ele deve notificar esse objeto de bloqueio quando ele está acabado com o seu processamento. Note-se que a notificar deve ser um bloqueamento bloco sincronizado sobre o objeto de bloqueio.

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

Finalmente, você pode escrever o seu cliente:

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}

Uma vez que você não se importa qual deles acabamentos, porque não basta ter um único WaitHandle para os tópicos e esperar em que? Qualquer um que termina primeiro pode definir o identificador.

Veja esta opção:

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top