Ждать, пока любым из будущего<T> производится
-
02-07-2019 - |
Вопрос
У меня запущено несколько асинхронных задач, и мне нужно дождаться завершения хотя бы одной из них (в будущем, вероятно, мне нужно будет дождаться завершения использования M из N задач).В настоящее время они представлены как Будущие, поэтому мне нужно что-то вроде
/**
* Blocks current thread until one of specified futures is done and returns it.
*/
public static <T> Future<T> waitForAny(Collection<Future<T>> futures)
throws AllFuturesFailedException
Есть ли что-нибудь подобное?Или что-нибудь подобное, не нужное на Будущее.В настоящее время я перебираю коллекцию фьючерсов, проверяю, завершен ли один из них, затем некоторое время сплю и проверяю снова.Это выглядит не лучшим решением, потому что если я сплю в течение длительного периода, то добавляется нежелательная задержка, если я сплю в течение короткого периода, то это может повлиять на производительность.
Я мог бы попробовать использовать
new CountDownLatch(1)
и уменьшите обратный отсчет, когда задача будет выполнена, и выполните
countdown.await()
, но я обнаружил, что это возможно, только если я буду контролировать Будущее творение.Это возможно, но требует редизайна системы, поскольку в настоящее время логика создания задач (отправка вызываемого в ExecutorService) отделена от решения, какого будущего ждать.Я также мог бы переопределить
<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)
и создайте пользовательскую реализацию RunnableFuture с возможностью прикрепления прослушивателя для получения уведомлений о завершении задачи, затем прикрепите такого прослушивателя к необходимым задачам и используйте CountDownLatch, но это означает, что я должен переопределить newTaskFor для каждого используемого мной ExecutorService - и потенциально будет реализация, которая не расширяет AbstractExecutorService.Я мог бы также попробовать обернуть данный ExecutorService для той же цели, но тогда я должен украсить все методы, производящие фьючерсы.
Все эти решения могут работать, но кажутся очень неестественными.Похоже, я упускаю что-то простое, например
WaitHandle.WaitAny(WaitHandle[] waitHandles)
в c#.Существуют ли какие-либо хорошо известные решения для такого рода проблем?
Обновить:
Изначально у меня вообще не было доступа к Future creation, поэтому элегантного решения не было.После редизайна системы я получил доступ к будущему созданию и смог добавить CountDownLatch.countdown() в процесс выполнения, затем я могу CountDownLatch.await() и все работает нормально.Спасибо за другие ответы, я не знал об ExecutorCompletionService, и это действительно может быть полезно в подобных задачах, но в данном конкретном случае это не могло быть использовано, потому что некоторые фьючерсы создаются без какого-либо исполнителя - фактическая задача отправляется на другой сервер по сети, завершается удаленно и получено уведомление о завершении.
Решение
Насколько я знаю, Java не имеет аналогичной структуры для WaitHandle.WaitAny
способ.
Мне кажется, что этого можно было бы достичь с помощью декоратора "WaitableFuture":
public WaitableFuture<T>
extends Future<T>
{
private CountDownLatch countDownLatch;
WaitableFuture(CountDownLatch countDownLatch)
{
super();
this.countDownLatch = countDownLatch;
}
void doTask()
{
super.doTask();
this.countDownLatch.countDown();
}
}
Хотя это сработало бы только в том случае, если оно может быть вставлено перед кодом выполнения, поскольку в противном случае код выполнения не имел бы нового doTask()
способ.Но я действительно не вижу способа сделать это без опроса, если вы не можете каким-то образом получить контроль над Будущим объектом перед выполнением.
Или, если будущее всегда выполняется в своем собственном потоке, и вы можете каким-то образом получить этот поток.Затем вы могли бы создать новый поток для соединения друг с другом потоков, а затем обработать механизм ожидания после возврата соединения...Однако это было бы действительно некрасиво и привело бы к большим накладным расходам.И если некоторые будущие объекты не завершатся, у вас может быть много заблокированных потоков в зависимости от мертвых потоков.Если вы не будете осторожны, это может привести к утечке памяти и системных ресурсов.
/**
* Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
*/
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);
foreach(Thread thread in threads)
{
(new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
}
countDownLatch.await();
}
class JoinThreadHelper
implements Runnable
{
Thread thread;
CountDownLatch countDownLatch;
JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
{
this.thread = thread;
this.countDownLatch = countDownLatch;
}
void run()
{
this.thread.join();
this.countDownLatch.countDown();
}
}
Другие советы
все просто, зацените Исполнитель CompletionService.
Почему бы просто не создать очередь результатов и не подождать в этой очереди?Или, проще говоря, используйте CompletionService, поскольку это то, что это такое:ExecutorService + очередь результатов.
На самом деле это довольно просто с помощью wait() и notifyAll().
Сначала определите объект блокировки.(Вы можете использовать для этого любой класс, но я предпочитаю быть явным):
package com.javadude.sample;
public class Lock {}
Затем определите свой рабочий поток.Он должен уведомить этот объект блокировки, когда завершит свою обработку.Обратите внимание, что уведомление должно быть в синхронизированном блоке, блокирующем объект блокировки.
package com.javadude.sample;
public class Worker extends Thread {
private Lock lock_;
private long timeToSleep_;
private String name_;
public Worker(Lock lock, String name, long timeToSleep) {
lock_ = lock;
timeToSleep_ = timeToSleep;
name_ = name;
}
@Override
public void run() {
// do real work -- using a sleep here to simulate work
try {
sleep(timeToSleep_);
} catch (InterruptedException e) {
interrupt();
}
System.out.println(name_ + " is done... notifying");
// notify whoever is waiting, in this case, the client
synchronized (lock_) {
lock_.notify();
}
}
}
Наконец, вы можете написать своему клиенту:
package com.javadude.sample;
public class Client {
public static void main(String[] args) {
Lock lock = new Lock();
Worker worker1 = new Worker(lock, "worker1", 15000);
Worker worker2 = new Worker(lock, "worker2", 10000);
Worker worker3 = new Worker(lock, "worker3", 5000);
Worker worker4 = new Worker(lock, "worker4", 20000);
boolean started = false;
int numNotifies = 0;
while (true) {
synchronized (lock) {
try {
if (!started) {
// need to do the start here so we grab the lock, just
// in case one of the threads is fast -- if we had done the
// starts outside the synchronized block, a fast thread could
// get to its notification *before* the client is waiting for it
worker1.start();
worker2.start();
worker3.start();
worker4.start();
started = true;
}
lock.wait();
} catch (InterruptedException e) {
break;
}
numNotifies++;
if (numNotifies == 4) {
break;
}
System.out.println("Notified!");
}
}
System.out.println("Everyone has notified me... I'm done");
}
}
Поскольку вам все равно, какой из них завершается, почему бы просто не использовать один WaitHandle для всех потоков и не подождать этого?Тот, кто закончит первым, может установить ручку.
Смотрите этот параметр:
public class WaitForAnyRedux {
private static final int POOL_SIZE = 10;
public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {
List<Callable<T>> callables = new ArrayList<Callable<T>>();
for (final T t : collection) {
Callable<T> callable = Executors.callable(new Thread() {
@Override
public void run() {
synchronized (t) {
try {
t.wait();
} catch (InterruptedException e) {
}
}
}
}, t);
callables.add(callable);
}
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
return executorService.invokeAny(callables);
}
static public void main(String[] args) throws InterruptedException, ExecutionException {
final List<Integer> integers = new ArrayList<Integer>();
for (int i = 0; i < POOL_SIZE; i++) {
integers.add(i);
}
(new Thread() {
public void run() {
Integer notified = null;
try {
notified = waitForAny(integers);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("notified=" + notified);
}
}).start();
synchronized (integers) {
integers.wait(3000);
}
Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
System.out.println("Waking up " + randomInt);
synchronized (randomInt) {
randomInt.notify();
}
}
}