Java Executors : 작업이 완료되면 차단하지 않고 알림을받는 방법은 무엇입니까?
문제
집행자 서비스에 제출 해야하는 작업으로 가득 찬 대기열이 있다고 가정 해 봅시다. 나는 그들이 한 번에 하나씩 처리하기를 원합니다. 내가 생각할 수있는 가장 간단한 방법은 다음과 같습니다.
- 대기열에서 작업을 수행하십시오
- 집행자에게 제출하십시오
- 반환 된 미래에 .get에 전화를 걸고 결과를 사용할 수있을 때까지 차단하십시오.
- 대기열에서 다른 작업을 수행하십시오 ...
그러나 나는 완전히 차단을 피하려고 노력하고 있습니다. 한 번에 하나씩 처리 된 작업이 필요한 10,000 개의 대기열이 있다면 스택 공간이 차단 된 스레드를 유지하기 때문에 스택 공간이 부족합니다.
내가 원하는 것은 작업을 제출하고 작업이 완료 될 때 호출되는 콜백을 제공하는 것입니다. 다음 작업을 보내기 위해 해당 콜백 알림을 플래그로 사용하겠습니다. (Functionaljava와 Jetlang은 분명히 비 블로킹 알고리즘을 사용하지만 코드를 이해할 수 없습니다)
JDK의 java.util.concurrent를 사용하여 어떻게 자신의 집행자 서비스를 작성하지 않습니까?
(이 작업을 자체적으로 제공하는 대기열 자체가 차단 될 수 있지만 나중에 해결해야 할 문제입니다).
해결책
콜백 인터페이스를 정의하여 완료 알림에서 전달하려는 매개 변수를 수신하십시오. 그런 다음 작업이 끝날 때 호출하십시오.
실행 가능한 작업을 위해 일반 래퍼를 작성하고이를 다음으로 제출할 수도 있습니다. ExecutorService
. 또는 Java 8에 내장 된 메커니즘은 아래를 참조하십시오.
class CallbackTask implements Runnable {
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}
public void run() {
task.run();
callback.complete();
}
}
와 함께 CompletableFuture
, Java 8에는 프로세스를 비동기 적으로 그리고 조건부로 완료 할 수있는 파이프 라인을 작성하는보다 정교한 수단이 포함되어 있습니다. 다음은 고안되었지만 완전한 알림의 예입니다.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking {
public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}
void notify(String msg) {
System.out.println("Received message: " + msg);
}
}
class ExampleService {
String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}
public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}
public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}
}
다른 팁
Java 8에서는 사용할 수 있습니다 완성성 문제. 다음은 코드에 사용하여 사용자 서비스에서 사용자를 가져오고 내보기 개체에 매핑 한 다음 내보기를 업데이트하거나 오류 대화 상자를 표시하는 예입니다 (GUI 응용 프로그램).
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);
비동기 적으로 실행됩니다. 두 가지 개인 방법을 사용하고 있습니다. mapUsersToUserViews
그리고 updateView
.
사용 구아바의 청취 가능한 미래 API 콜백을 추가하십시오. cf. 웹 사이트에서 :
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
당신은 확장 할 수 있습니다 FutureTask
클래스, 그리고 done()
방법을 추가하십시오 FutureTask
반대 ExecutorService
, 그래서 done()
메소드가 호출됩니다 FutureTask
즉시 완료되었습니다.
ThreadPoolExecutor
또한 beforeExecute
그리고 afterExecute
재정의하고 사용할 수있는 후크 방법. 다음은 설명입니다 ThreadPoolExecutor
'에스 Javadocs.
후크 방법
이 클래스는 보호 된 재정의 가능성을 제공합니다
beforeExecute(java.lang.Thread, java.lang.Runnable)
그리고afterExecute(java.lang.Runnable, java.lang.Throwable)
각 작업의 실행 전후에 호출되는 메소드. 이들은 실행 환경을 조작하는 데 사용될 수 있습니다. 예를 들어, 재건ThreadLocals
, 통계 수집 또는 로그 항목 추가. 또한 방법terminated()
한 번 수행 해야하는 특별 처리를 수행하기 위해 재정의 할 수 있습니다.Executor
완전히 종료되었습니다. 후크 또는 콜백 메소드가 예외를 제외하면 내부 작업자 스레드가 실패하고 갑자기 종료 될 수 있습니다.
그것은 ~로부터 java.util.concurrent
그리고 계속되기 전에 여러 스레드가 실행을 완료 할 때까지 기다리는 방법입니다.
돌보는 콜백 효과를 달성하려면 약간의 추가 작업이 필요합니다. 즉, 이것을 CountDownLatch
그리고 그것을 기다린 다음, 당신이 알리는 것이 무엇이든 알리는 것에 대해 계속합니다. 콜백에 대한 기본 지원이나 그 효과와 유사한 것은 없습니다.
편집하다: 이제 당신의 질문을 더 이해 했으므로, 당신이 불필요하게 너무 멀리 도달하고 있다고 생각합니다. 정기적으로 복용하는 경우 SingleThreadExecutor
, 모든 작업을 제공하면 기본적으로 대기열이 될 것입니다.
작업이 동시에 실행되지 않도록하려면 단일 레드 레드 레드 executor. 작업은 제출 된 순서대로 처리됩니다. 작업을 보유 할 필요조차 없으며 임원에게 제출하십시오.
Matt의 답변에 추가하기 위해 도움이되었던 Matt의 대답에 콜백의 사용을 보여주는 더 많은 예가 있습니다.
private static Primes primes = new Primes();
public static void main(String[] args) throws InterruptedException {
getPrimeAsync((p) ->
System.out.println("onPrimeListener; p=" + p));
System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
CompletableFuture.supplyAsync(primes::getNextPrime)
.thenApply((prime) -> {
System.out.println("getPrimeAsync(); prime=" + prime);
if (listener != null) {
listener.onPrime(prime);
}
return prime;
});
}
출력은 다음과 같습니다.
getPrimeAsync(); prime=241
onPrimeListener; p=241
Adios mi amigito
구현할 간단한 코드 Callback
사용 메커니즘 ExecutorService
import java.util.concurrent.*;
import java.util.*;
public class CallBackDemo{
public CallBackDemo(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(5);
try{
for ( int i=0; i<5; i++){
Callback callback = new Callback(i+1);
MyCallable myCallable = new MyCallable((long)i+1,callback);
Future<Long> future = service.submit(myCallable);
//System.out.println("future status:"+future.get()+":"+future.isDone());
}
}catch(Exception err){
err.printStackTrace();
}
service.shutdown();
}
public static void main(String args[]){
CallBackDemo demo = new CallBackDemo();
}
}
class MyCallable implements Callable<Long>{
Long id = 0L;
Callback callback;
public MyCallable(Long val,Callback obj){
this.id = val;
this.callback = obj;
}
public Long call(){
//Add your business logic
System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
callback.callbackMethod();
return id;
}
}
class Callback {
private int i;
public Callback(int i){
this.i = i;
}
public void callbackMethod(){
System.out.println("Call back:"+i);
// Add your business logic
}
}
산출:
creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4
키 노트 :
- FIFO 순서로 순서대로 프로세스 작업을 원하는 경우 교체하십시오.
newFixedThreadPool(5)
~와 함께newFixedThreadPool(1)
결과를 분석 한 후 다음 작업을 처리하려면
callback
이전 과제의 경우 아래 줄 아래로 코팅을하지 않습니다//System.out.println("future status:"+future.get()+":"+future.isDone());
당신은 교체 할 수 있습니다
newFixedThreadPool()
하나와 함께Executors.newCachedThreadPool() Executors.newWorkStealingPool() ThreadPoolExecutor
사용 사례에 따라.
콜백 메소드를 비동기로 처리하려면
ㅏ. 공유를 통과하십시오
ExecutorService or ThreadPoolExecutor
전화 가능한 작업에비. 당신의 변환
Callable
방법Callable/Runnable
직무씨. 콜백 작업을 푸시하십시오
ExecutorService or ThreadPoolExecutor
이것은 Guava 's를 사용하여 Pache의 답변으로 확장됩니다. ListenableFuture
.
특히, Futures.transform()
보고 ListenableFuture
따라서 비동기 호출을 체인하는 데 사용할 수 있습니다. Futures.addCallback()
보고 void
, 체인에 사용될 수는 없지만 비동기 완료시 성공/실패를 처리하는 데 좋습니다.
// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());
// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
Futures.transform(database, database -> database.query(table, ...));
// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
Futures.transform(cursor, cursor -> cursorToFooList(cursor));
// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
public void onSuccess(List<Foo> foos) {
doSomethingWith(foos);
}
public void onFailure(Throwable thrown) {
log.error(thrown);
}
});
노트: 비동기 작업을 체인하는 것 외에도 Futures.transform()
또한 각 작업을 별도의 집행자로 예약 할 수 있습니다 (이 예제에는 표시되지 않음).
Callable의 구현을 사용할 수 있습니다
public class MyAsyncCallable<V> implements Callable<V> {
CallbackInterface ci;
public MyAsyncCallable(CallbackInterface ci) {
this.ci = ci;
}
public V call() throws Exception {
System.out.println("Call of MyCallable invoked");
System.out.println("Result = " + this.ci.doSomething(10, 20));
return (V) "Good job";
}
}
Callbackinterface는 매우 기본적인 곳입니다
public interface CallbackInterface {
public int doSomething(int a, int b);
}
그리고 이제 메인 클래스는 다음과 같이 보일 것입니다
ExecutorService ex = Executors.newFixedThreadPool(2);
MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);