Java Executors : 작업이 완료되면 차단하지 않고 알림을받는 방법은 무엇입니까?

StackOverflow https://stackoverflow.com/questions/826212

문제

집행자 서비스에 제출 해야하는 작업으로 가득 찬 대기열이 있다고 가정 해 봅시다. 나는 그들이 한 번에 하나씩 처리하기를 원합니다. 내가 생각할 수있는 가장 간단한 방법은 다음과 같습니다.

  1. 대기열에서 작업을 수행하십시오
  2. 집행자에게 제출하십시오
  3. 반환 된 미래에 .get에 전화를 걸고 결과를 사용할 수있을 때까지 차단하십시오.
  4. 대기열에서 다른 작업을 수행하십시오 ...

그러나 나는 완전히 차단을 피하려고 노력하고 있습니다. 한 번에 하나씩 처리 된 작업이 필요한 10,000 개의 대기열이 있다면 스택 공간이 차단 된 스레드를 유지하기 때문에 스택 공간이 부족합니다.

내가 원하는 것은 작업을 제출하고 작업이 완료 될 때 호출되는 콜백을 제공하는 것입니다. 다음 작업을 보내기 위해 해당 콜백 알림을 플래그로 사용하겠습니다. (Functionaljava와 Jetlang은 분명히 비 블로킹 알고리즘을 사용하지만 코드를 이해할 수 없습니다)

JDK의 java.util.concurrent를 사용하여 어떻게 자신의 집행자 서비스를 작성하지 않습니까?

(이 작업을 자체적으로 제공하는 대기열 자체가 차단 될 수 있지만 나중에 해결해야 할 문제입니다).

도움이 되었습니까?

해결책

콜백 인터페이스를 정의하여 완료 알림에서 전달하려는 매개 변수를 수신하십시오. 그런 다음 작업이 끝날 때 호출하십시오.

실행 가능한 작업을 위해 일반 래퍼를 작성하고이를 다음으로 제출할 수도 있습니다. ExecutorService. 또는 Java 8에 내장 된 메커니즘은 아래를 참조하십시오.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

와 함께 CompletableFuture, Java 8에는 프로세스를 비동기 적으로 그리고 조건부로 완료 할 수있는 파이프 라인을 작성하는보다 정교한 수단이 포함되어 있습니다. 다음은 고안되었지만 완전한 알림의 예입니다.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

다른 팁

Java 8에서는 사용할 수 있습니다 완성성 문제. 다음은 코드에 사용하여 사용자 서비스에서 사용자를 가져오고 내보기 개체에 매핑 한 다음 내보기를 업데이트하거나 오류 대화 상자를 표시하는 예입니다 (GUI 응용 프로그램).

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

비동기 적으로 실행됩니다. 두 가지 개인 방법을 사용하고 있습니다. mapUsersToUserViews 그리고 updateView.

사용 구아바의 청취 가능한 미래 API 콜백을 추가하십시오. cf. 웹 사이트에서 :

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

당신은 확장 할 수 있습니다 FutureTask 클래스, 그리고 done() 방법을 추가하십시오 FutureTask 반대 ExecutorService, 그래서 done() 메소드가 호출됩니다 FutureTask 즉시 완료되었습니다.

ThreadPoolExecutor 또한 beforeExecute 그리고 afterExecute 재정의하고 사용할 수있는 후크 방법. 다음은 설명입니다 ThreadPoolExecutor'에스 Javadocs.

후크 방법

이 클래스는 보호 된 재정의 가능성을 제공합니다 beforeExecute(java.lang.Thread, java.lang.Runnable) 그리고 afterExecute(java.lang.Runnable, java.lang.Throwable) 각 작업의 실행 전후에 호출되는 메소드. 이들은 실행 환경을 조작하는 데 사용될 수 있습니다. 예를 들어, 재건 ThreadLocals, 통계 수집 또는 로그 항목 추가. 또한 방법 terminated() 한 번 수행 해야하는 특별 처리를 수행하기 위해 재정의 할 수 있습니다. Executor 완전히 종료되었습니다. 후크 또는 콜백 메소드가 예외를 제외하면 내부 작업자 스레드가 실패하고 갑자기 종료 될 수 있습니다.

a CountDownLatch.

그것은 ~로부터 java.util.concurrent 그리고 계속되기 전에 여러 스레드가 실행을 완료 할 때까지 기다리는 방법입니다.

돌보는 콜백 효과를 달성하려면 약간의 추가 작업이 필요합니다. 즉, 이것을 CountDownLatch 그리고 그것을 기다린 다음, 당신이 알리는 것이 무엇이든 알리는 것에 대해 계속합니다. 콜백에 대한 기본 지원이나 그 효과와 유사한 것은 없습니다.


편집하다: 이제 당신의 질문을 더 이해 했으므로, 당신이 불필요하게 너무 멀리 도달하고 있다고 생각합니다. 정기적으로 복용하는 경우 SingleThreadExecutor, 모든 작업을 제공하면 기본적으로 대기열이 될 것입니다.

작업이 동시에 실행되지 않도록하려면 단일 레드 레드 레드 executor. 작업은 제출 된 순서대로 처리됩니다. 작업을 보유 할 필요조차 없으며 임원에게 제출하십시오.

Matt의 답변에 추가하기 위해 도움이되었던 Matt의 대답에 콜백의 사용을 보여주는 더 많은 예가 있습니다.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

출력은 다음과 같습니다.

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

구현할 간단한 코드 Callback 사용 메커니즘 ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

산출:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

키 노트 :

  1. FIFO 순서로 순서대로 프로세스 작업을 원하는 경우 교체하십시오. newFixedThreadPool(5) ~와 함께 newFixedThreadPool(1)
  2. 결과를 분석 한 후 다음 작업을 처리하려면 callback 이전 과제의 경우 아래 줄 아래로 코팅을하지 않습니다

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. 당신은 교체 할 수 있습니다 newFixedThreadPool() 하나와 함께

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    사용 사례에 따라.

  4. 콜백 메소드를 비동기로 처리하려면

    ㅏ. 공유를 통과하십시오 ExecutorService or ThreadPoolExecutor 전화 가능한 작업에

    비. 당신의 변환 Callable 방법 Callable/Runnable 직무

    씨. 콜백 작업을 푸시하십시오 ExecutorService or ThreadPoolExecutor

이것은 Guava 's를 사용하여 Pache의 답변으로 확장됩니다. ListenableFuture.

특히, Futures.transform() 보고 ListenableFuture 따라서 비동기 호출을 체인하는 데 사용할 수 있습니다. Futures.addCallback() 보고 void, 체인에 사용될 수는 없지만 비동기 완료시 성공/실패를 처리하는 데 좋습니다.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

노트: 비동기 작업을 체인하는 것 외에도 Futures.transform() 또한 각 작업을 별도의 집행자로 예약 할 수 있습니다 (이 예제에는 표시되지 않음).

Callable의 구현을 사용할 수 있습니다

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

Callbackinterface는 매우 기본적인 곳입니다

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

그리고 이제 메인 클래스는 다음과 같이 보일 것입니다

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top