Esecutori Java: come ricevere una notifica, senza bloccare, al completamento di un'attività?
Domanda
Supponi di avere una coda piena di attività che devo sottoporre a un servizio di esecuzione. Li voglio elaborati uno alla volta. Il modo più semplice a cui riesco a pensare è:
- Prendi un'attività dalla coda
- Invialo all'esecutore
- Chiama .get sul Future restituito e blocca fino a quando non è disponibile un risultato
- Prendi un'altra attività dalla coda ...
Tuttavia, sto cercando di evitare il blocco completo. Se ho 10.000 di queste code, che necessitano che i loro compiti vengano elaborati uno alla volta, finirò lo spazio dello stack perché la maggior parte di loro si aggrapperà ai thread bloccati.
Quello che vorrei è inviare un'attività e fornire una richiamata che viene chiamata al termine dell'attività. Userò quella notifica di richiamata come flag per inviare l'attività successiva. (funzionalejava e jetlang apparentemente usano tali algoritmi non bloccanti, ma non riesco a capire il loro codice)
Come posso farlo usando java.util.concurrent di JDK, a corto di scrivere il mio servizio di esecutore?
(la coda che mi fornisce questi compiti potrebbe bloccarsi, ma questo è un problema da affrontare in seguito)
Soluzione
Definire un'interfaccia di callback per ricevere tutti i parametri che si desidera trasmettere nella notifica di completamento. Quindi invocalo alla fine dell'attività.
Puoi persino scrivere un wrapper generale per le attività eseguibili e inviarle a ExecutorService
. Oppure, vedi sotto per un meccanismo integrato in Java 8.
class CallbackTask implements Runnable {
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}
public void run() {
task.run();
callback.complete();
}
}
Con CompletableFuture
, Java 8 includeva un mezzo più elaborato per comporre pipeline in cui i processi possono essere completati in modo asincrono e condizionale. Ecco un esempio inventato ma completo di notifica.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking {
public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}
void notify(String msg) {
System.out.println("Received message: " + msg);
}
}
class ExampleService {
String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}
public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}
public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}
}
Altri suggerimenti
In Java 8 è possibile utilizzare CompletableFuture . Ecco un esempio che ho avuto nel mio codice in cui lo sto usando per recuperare gli utenti dal mio servizio utente, mapparli agli oggetti della mia vista e quindi aggiornare la mia vista o mostrare una finestra di dialogo di errore (questa è un'applicazione GUI):
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);
Si esegue in modo asincrono. Sto usando due metodi privati: mapUsersToUserViews
e updateView
.
Utilizza l'API futura ascoltabile di Guava e aggiungi una richiamata. Cf. dal sito web:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
È possibile estendere la classe FutureTask
e sovrascrivere il metodo done ()
, quindi aggiungere l'oggetto FutureTask
all'oggetto ExecutorService
, quindi il metodo done ()
verrà richiamato quando FutureTask
sarà completato immediatamente.
ThreadPoolExecutor
ha anche i metodi hook beforeExecute
e afterExecute
che puoi sovrascrivere e usare. Ecco la descrizione da Javadocs .
Metodi hook
Questa classe fornisce l'overridable protetto
beforeExecute (java.lang.Thread, java.lang.Runnable)
eafterExecute (java .lang.Runnable, java.lang.Throwable)
che vengono chiamati prima e dopo l'esecuzione di ogni attività. Questi possono essere usati per manipolare l'ambiente di esecuzione; ad esempio, reinizializzandoThreadLocals
, raccogliendo statistiche o aggiungendo voci di registro. Inoltre, il metodoterminated ()
può essere ignorato per eseguire qualsiasi elaborazione speciale che deve essere eseguita una volta cheExecutor
è stato completamente chiuso. Se i metodi hook o callback generano eccezioni, i thread di lavoro interni potrebbero a loro volta fallire e terminare bruscamente.
Utilizza un < code> CountDownLatch .
Viene da java.util.concurrent
ed è esattamente il modo di aspettare che diversi thread completino l'esecuzione prima di continuare.
Per ottenere l'effetto di richiamata che stai cercando, è necessario un piccolo lavoro aggiuntivo. Vale a dire, gestirlo da solo in un thread separato che utilizza il CountDownLatch
e lo attende, quindi continua a notificare tutto ciò che è necessario notificare. Non esiste un supporto nativo per il callback o qualcosa di simile a tale effetto.
MODIFICA: ora che comprendo ulteriormente la tua domanda, penso che tu stia raggiungendo troppo, inutilmente. Se prendi regolarmente SingleThreadExecutor
, assegnagli tutte le attività e eseguirà le code in modo nativo.
Se vuoi assicurarti che nessuna attività verrà eseguita contemporaneamente, usa un SingleThreadedExecutor . Le attività verranno elaborate nell'ordine in cui sono state inviate. Non è nemmeno necessario conservare le attività, basta inviarle all'esecut.
Solo per aggiungere alla risposta di Matt, che ha aiutato, ecco un esempio più corposo per mostrare l'uso di un callback.
private static Primes primes = new Primes();
public static void main(String[] args) throws InterruptedException {
getPrimeAsync((p) ->
System.out.println("onPrimeListener; p=" + p));
System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
CompletableFuture.supplyAsync(primes::getNextPrime)
.thenApply((prime) -> {
System.out.println("getPrimeAsync(); prime=" + prime);
if (listener != null) {
listener.onPrime(prime);
}
return prime;
});
}
L'output è:
getPrimeAsync(); prime=241
onPrimeListener; p=241
Adios mi amigito
Codice semplice per implementare il meccanismo Callback
utilizzando ExecutorService
import java.util.concurrent.*;
import java.util.*;
public class CallBackDemo{
public CallBackDemo(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(5);
try{
for ( int i=0; i<5; i++){
Callback callback = new Callback(i+1);
MyCallable myCallable = new MyCallable((long)i+1,callback);
Future<Long> future = service.submit(myCallable);
//System.out.println("future status:"+future.get()+":"+future.isDone());
}
}catch(Exception err){
err.printStackTrace();
}
service.shutdown();
}
public static void main(String args[]){
CallBackDemo demo = new CallBackDemo();
}
}
class MyCallable implements Callable<Long>{
Long id = 0L;
Callback callback;
public MyCallable(Long val,Callback obj){
this.id = val;
this.callback = obj;
}
public Long call(){
//Add your business logic
System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
callback.callbackMethod();
return id;
}
}
class Callback {
private int i;
public Callback(int i){
this.i = i;
}
public void callbackMethod(){
System.out.println("Call back:"+i);
// Add your business logic
}
}
uscita:
creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4
Note chiave:
- Se si desidera eseguire le attività di processo in sequenza nell'ordine FIFO, sostituire
newFixedThreadPool (5)
connewFixedThreadPool (1)
-
Se si desidera elaborare l'attività successiva dopo aver analizzato il risultato da
callback
dell'attività precedente, basta annullare il commento sotto la riga//System.out.println("future status:"+future.get()+":"+future.isDone());
-
Puoi sostituire
newFixedThreadPool ()
con uno diExecutors.newCachedThreadPool() Executors.newWorkStealingPool() ThreadPoolExecutor
a seconda del caso d'uso.
-
Se si desidera gestire il metodo di callback in modo asincrono
a. Passa un
ExecutorService o ThreadPoolExecutor
condiviso all'attività Callableb. Converti il ??tuo metodo
Callable
inCallable / Runnable
c. Invia attività di callback a
ExecutorService o ThreadPoolExecutor
Questa è un'estensione della risposta di Pache usando ListenableFuture
di Guava.
In particolare, Futures.transform ()
restituisce ListenableFuture
, quindi può essere utilizzato per concatenare le chiamate asincrone. Futures.addCallback ()
restituisce void
, quindi non può essere utilizzato per il concatenamento, ma è utile per gestire il successo / fallimento in un completamento asincrono.
// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());
// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
Futures.transform(database, database -> database.query(table, ...));
// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
Futures.transform(cursor, cursor -> cursorToFooList(cursor));
// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
public void onSuccess(List<Foo> foos) {
doSomethingWith(foos);
}
public void onFailure(Throwable thrown) {
log.error(thrown);
}
});
NOTA: oltre a concatenare attività asincrone, Futures.transform ()
consente anche di pianificare ciascuna attività su un esecutore separato (non mostrato in questo esempio).
È possibile utilizzare un'implementazione di Callable tale che
public class MyAsyncCallable<V> implements Callable<V> {
CallbackInterface ci;
public MyAsyncCallable(CallbackInterface ci) {
this.ci = ci;
}
public V call() throws Exception {
System.out.println("Call of MyCallable invoked");
System.out.println("Result = " + this.ci.doSomething(10, 20));
return (V) "Good job";
}
}
dove CallbackInterface è qualcosa di molto semplice come
public interface CallbackInterface {
public int doSomething(int a, int b);
}
e ora la classe principale sarà simile a questa
ExecutorService ex = Executors.newFixedThreadPool(2);
MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);