Future<T> のいずれかが完了するまで待ちます
-
02-07-2019 - |
質問
実行中の非同期タスクがほとんどないので、少なくとも 1 つが完了するまで待つ必要があります (将来的には、おそらく N 個のタスクのうち M 個が完了するまで待つ必要があるでしょう)。現在、それらはFutureとして提示されているので、次のようなものが必要です
/**
* Blocks current thread until one of specified futures is done and returns it.
*/
public static <T> Future<T> waitForAny(Collection<Future<T>> futures)
throws AllFuturesFailedException
このようなことはありますか?または同様のもので、Future には必要ありません。現在、先物のコレクションをループして、いずれかが完了したかどうかを確認し、しばらく寝てから再度確認します。長時間スリープすると不要な遅延が追加され、短時間スリープするとパフォーマンスに影響を与える可能性があるため、これは最良の解決策ではないようです。
使ってみることができました
new CountDownLatch(1)
タスクが完了したらカウントダウンを減らして実行します
countdown.await()
, 、しかし、私が未来の創造を制御する場合にのみ可能であることがわかりました。可能ではありますが、現在、タスク作成ロジック (Callable を ExecutorService に送信する) と、どの Future を待つかの決定が分離されているため、システムの再設計が必要です。オーバーライドすることもできます
<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)
そして、タスクの終了時に通知されるリスナーをアタッチする機能を備えた RunnableFuture のカスタム実装を作成し、そのようなリスナーを必要なタスクにアタッチして CountDownLatch を使用します。ただし、それは、使用する ExecutorService ごとに newTaskFor をオーバーライドする必要があることを意味します。実装される可能性があります。 AbstractExecutorService を拡張しません。同じ目的で指定された ExecutorService をラップしてみることもできますが、その場合は Future を生成するすべてのメソッドを修飾する必要があります。
これらの解決策はすべて機能するかもしれませんが、非常に不自然に見えます。次のような単純なものが欠けているようです
WaitHandle.WaitAny(WaitHandle[] waitHandles)
C#で。この種の問題に対するよく知られた解決策はありますか?
アップデート:
もともと私はFutureの作成にまったくアクセスできなかったので、エレガントな解決策はありませんでした。システムを再設計した後、Future 作成にアクセスできるようになり、実行プロセスに countDownLatch.countdown() を追加できるようになり、 countDownLatch.await() を実行できるようになり、すべてが正常に動作しました。他の回答をありがとうございます。ExecutorCompletionService について知りませんでした。同様のタスクでは確かに役立ちますが、この特定のケースでは、一部の Future がエグゼキューターなしで作成されるため、使用できませんでした。実際のタスクはネットワーク経由で別のサーバーに送信されます。リモートで完了し、完了通知を受け取ります。
解決
私の知る限り、Java にはこれに似た構造はありません。 WaitHandle.WaitAny
方法。
これは「WaitableFuture」デコレータを通じて実現できるように思えます。
public WaitableFuture<T>
extends Future<T>
{
private CountDownLatch countDownLatch;
WaitableFuture(CountDownLatch countDownLatch)
{
super();
this.countDownLatch = countDownLatch;
}
void doTask()
{
super.doTask();
this.countDownLatch.countDown();
}
}
ただし、これは実行コードの前に挿入できる場合にのみ機能します。そうでない場合、実行コードには新しい doTask()
方法。しかし、実行前に何らかの方法で Future オブジェクトを制御できない場合、ポーリングなしでこれを行う方法は実際にはありません。
あるいは、Future が常に独自のスレッドで実行され、何らかの方法でそのスレッドを取得できる場合。次に、新しいスレッドを生成して他のスレッドを結合し、結合が戻った後に待機メカニズムを処理できます。ただし、これは非常に見苦しく、多くのオーバーヘッドが発生します。また、一部の Future オブジェクトが終了しない場合、デッドスレッドによっては大量のスレッドがブロックされる可能性があります。注意しないと、メモリやシステム リソースがリークする可能性があります。
/**
* Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
*/
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);
foreach(Thread thread in threads)
{
(new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
}
countDownLatch.await();
}
class JoinThreadHelper
implements Runnable
{
Thread thread;
CountDownLatch countDownLatch;
JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
{
this.thread = thread;
this.countDownLatch = countDownLatch;
}
void run()
{
this.thread.join();
this.countDownLatch.countDown();
}
}
他のヒント
簡単です、チェックしてください ExecutorCompletionService.
結果キューを作成してキュー上で待機するだけではどうでしょうか?もっと簡単に言うと、CompletionService を使用します。ExecutorService + 結果キュー。
これは、wait() と NoticeAll() を使用すると、実際には非常に簡単です。
まず、ロック オブジェクトを定義します。(これには任意のクラスを使用できますが、私は明示的にしたいと思います):
package com.javadude.sample;
public class Lock {}
次に、ワーカー スレッドを定義します。処理が終了したら、そのロック オブジェクトに通知する必要があります。通知は、ロック オブジェクトをロックする同期ブロック内に存在する必要があることに注意してください。
package com.javadude.sample;
public class Worker extends Thread {
private Lock lock_;
private long timeToSleep_;
private String name_;
public Worker(Lock lock, String name, long timeToSleep) {
lock_ = lock;
timeToSleep_ = timeToSleep;
name_ = name;
}
@Override
public void run() {
// do real work -- using a sleep here to simulate work
try {
sleep(timeToSleep_);
} catch (InterruptedException e) {
interrupt();
}
System.out.println(name_ + " is done... notifying");
// notify whoever is waiting, in this case, the client
synchronized (lock_) {
lock_.notify();
}
}
}
最後に、クライアントを次のように記述できます。
package com.javadude.sample;
public class Client {
public static void main(String[] args) {
Lock lock = new Lock();
Worker worker1 = new Worker(lock, "worker1", 15000);
Worker worker2 = new Worker(lock, "worker2", 10000);
Worker worker3 = new Worker(lock, "worker3", 5000);
Worker worker4 = new Worker(lock, "worker4", 20000);
boolean started = false;
int numNotifies = 0;
while (true) {
synchronized (lock) {
try {
if (!started) {
// need to do the start here so we grab the lock, just
// in case one of the threads is fast -- if we had done the
// starts outside the synchronized block, a fast thread could
// get to its notification *before* the client is waiting for it
worker1.start();
worker2.start();
worker3.start();
worker4.start();
started = true;
}
lock.wait();
} catch (InterruptedException e) {
break;
}
numNotifies++;
if (numNotifies == 4) {
break;
}
System.out.println("Notified!");
}
}
System.out.println("Everyone has notified me... I'm done");
}
}
どちらが終了するかは気にしないので、すべてのスレッドに対して 1 つの WaitHandle を用意して、それを待機するだけではどうでしょうか。どちらが先に終了してもハンドルを設定できます。
このオプションを参照してください。
public class WaitForAnyRedux {
private static final int POOL_SIZE = 10;
public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {
List<Callable<T>> callables = new ArrayList<Callable<T>>();
for (final T t : collection) {
Callable<T> callable = Executors.callable(new Thread() {
@Override
public void run() {
synchronized (t) {
try {
t.wait();
} catch (InterruptedException e) {
}
}
}
}, t);
callables.add(callable);
}
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
return executorService.invokeAny(callables);
}
static public void main(String[] args) throws InterruptedException, ExecutionException {
final List<Integer> integers = new ArrayList<Integer>();
for (int i = 0; i < POOL_SIZE; i++) {
integers.add(i);
}
(new Thread() {
public void run() {
Integer notified = null;
try {
notified = waitForAny(integers);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("notified=" + notified);
}
}).start();
synchronized (integers) {
integers.wait(3000);
}
Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
System.out.println("Waking up " + randomInt);
synchronized (randomInt) {
randomInt.notify();
}
}
}