Javaエグゼキューター:タスクが完了したときに、ブロックせずに通知する方法

StackOverflow https://stackoverflow.com/questions/826212

質問

エクゼキュータサービスに送信する必要があるタスクがいっぱいのキューがあるとします。一度に1つずつ処理してほしい。私が考えることができる最も簡単な方法は次のとおりです:

  1. キューからタスクを取得
  2. エグゼキューターに送信
  3. 返されたFutureで.getを呼び出し、結果が利用可能になるまでブロックします
  4. キューから別のタスクを取得...

ただし、ブロッキングを完全に回避しようとしています。一度に1つのタスクを処理する必要があるこのようなキューが10,000個ある場合、それらのほとんどがブロックされたスレッドを保持するため、スタック領域が不足します。

私が望むのは、タスクを送信し、タスクが完了したときに呼び出されるコールバックを提供することです。そのコールバック通知をフラグとして使用して、次のタスクを送信します。 (functionaljavaとjetlangは明らかにそのようなノンブロッキングアルゴリズムを使用していますが、コードを理解できません)

JDKのjava.util.concurrentを使用して、独自のexecutorサービスを作成することなく、どうすればそれを実行できますか?

(これらのタスクをフィードするキュー自体がブロックする場合がありますが、それは後で取り組むべき問題です)

役に立ちましたか?

解決

コールバックインターフェイスを定義して、完了通知で渡すパラメータを受け取ります。次に、タスクの最後に呼び出します。

Runnableタスクの一般的なラッパーを作成して、 ExecutorService に送信することもできます。または、Java 8に組み込まれたメカニズムについては以下を参照してください。

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

CompletableFuture 、Java 8には、プロセスを非同期的かつ条件付きで完了できるパイプラインを構成するためのより精巧な手段が含まれていました。以下は、不自然ですが完全な通知の例です。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

他のヒント

Java 8では、 CompletableFuture 。これは、ユーザーサービスからユーザーを取得し、ビューオブジェクトにマップし、ビューを更新するか、エラーダイアログ(これはGUIアプリケーションです)を表示するために使用しているコードの例です:

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

非同期で実行されます。 mapUsersToUserViews updateView の2つのプライベートメソッドを使用しています。

Guavaのlistenable future API を使用して、コールバックを追加します。 Cf.ウェブサイトから:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

FutureTask クラスを拡張し、 done()メソッドをオーバーライドして、 FutureTask オブジェクトを ExecutorService 。したがって、 done()メソッドは、 FutureTask がすぐに完了したときに呼び出されます。

ThreadPoolExecutor には、オーバーライドして利用できる beforeExecute および afterExecute フックメソッドもあります。 ThreadPoolExecutor Javadocs

  

フックメソッド

     

このクラスは、保護されたオーバーライド可能な beforeExecute(java.lang.Thread、java.lang.Runnable) および afterExecute(java .lang.Runnable、java.lang.Throwable) メソッド。各タスクの実行前後に呼び出されます。これらを使用して、実行環境を操作できます。たとえば、 ThreadLocals の再初期化、統計の収集、ログエントリの追加などです。さらに、メソッド terminate() は、 Executor が完全に終了した後に実行する必要がある特別な処理を実行するためにオーバーライドできます。フックまたはコールバックメソッドが例外をスローすると、内部ワーカースレッドが失敗し、突然終了する可能性があります。

< code> CountDownLatch

java.util.concurrent から取得したものであり、続行する前に複数のスレッドの実行が完了するまで待機する方法です。

目的のコールバック効果を得るには、少し追加の作業が必要です。つまり、 CountDownLatch を使用して待機する別のスレッドでこれを自分で処理し、通知する必要があるものをすべて通知し続けます。コールバックのネイティブサポート、またはその効果に類似したものはありません。


編集:これであなたの質問がさらに理解できたので、不必要にあなたが行き過ぎていると思います。通常の SingleThreadExecutor 、すべてのタスクを与えると、ネイティブにキューイングを行います。

タスクが同時に実行されないようにする場合は、 SingleThreadedExecutor 。タスクは、送信された順に処理されます。タスクを保持する必要さえなく、単にそれらをexecに送信するだけです。

参考になったMattの回答に追加するだけで、コールバックの使用を示すより具体的な例を次に示します。

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

出力は次のとおりです。

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

ExecutorService

を使用して Callback メカニズムを実装するシンプルなコード
import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

出力:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

キーノート:

  1. FIFO順でタスクを順番に処理する場合は、 newFixedThreadPool(5) newFixedThreadPool(1)に置き換えます
  2. 前のタスクの callback の結果を分析した後、次のタスクを処理する場合は、行の下のコメントを外します

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. newFixedThreadPool()

    のいずれかに置き換えることができます
    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    ユースケースによって異なります。

  4. コールバックメソッドを非同期で処理する場合

    a。共有 ExecutorServiceまたはThreadPoolExecutor を呼び出し可能なタスクに渡します

    b。 Callable メソッドを Callable / Runnable タスクに変換します

    c。コールバックタスクを ExecutorServiceまたはThreadPoolExecutor

  5. にプッシュします

これは、Guavaの ListenableFuture を使用したPacheの答えの拡張です。

特に、 Futures.transform() ListenableFuture を返すため、非同期呼び出しのチェーンに使用できます。 Futures.addCallback() void を返すため、チェーンには使用できませんが、非同期完了の成功/失敗の処理には適しています。

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

注:非同期タスクのチェーンに加えて、 Futures.transform()では、各タスクを個別のエグゼキューターでスケジュールすることもできます(この例には示されていません)。

Callableの実装を使用して、

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

CallbackInterfaceは非常に基本的なものです

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

これでメインクラスは次のようになります

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top