Esecutori Java: come ricevere una notifica, senza bloccare, al completamento di un'attività?

StackOverflow https://stackoverflow.com/questions/826212

Domanda

Supponi di avere una coda piena di attività che devo sottoporre a un servizio di esecuzione. Li voglio elaborati uno alla volta. Il modo più semplice a cui riesco a pensare è:

  1. Prendi un'attività dalla coda
  2. Invialo all'esecutore
  3. Chiama .get sul Future restituito e blocca fino a quando non è disponibile un risultato
  4. Prendi un'altra attività dalla coda ...

Tuttavia, sto cercando di evitare il blocco completo. Se ho 10.000 di queste code, che necessitano che i loro compiti vengano elaborati uno alla volta, finirò lo spazio dello stack perché la maggior parte di loro si aggrapperà ai thread bloccati.

Quello che vorrei è inviare un'attività e fornire una richiamata che viene chiamata al termine dell'attività. Userò quella notifica di richiamata come flag per inviare l'attività successiva. (funzionalejava e jetlang apparentemente usano tali algoritmi non bloccanti, ma non riesco a capire il loro codice)

Come posso farlo usando java.util.concurrent di JDK, a corto di scrivere il mio servizio di esecutore?

(la coda che mi fornisce questi compiti potrebbe bloccarsi, ma questo è un problema da affrontare in seguito)

È stato utile?

Soluzione

Definire un'interfaccia di callback per ricevere tutti i parametri che si desidera trasmettere nella notifica di completamento. Quindi invocalo alla fine dell'attività.

Puoi persino scrivere un wrapper generale per le attività eseguibili e inviarle a ExecutorService . Oppure, vedi sotto per un meccanismo integrato in Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

Con CompletableFuture , Java 8 includeva un mezzo più elaborato per comporre pipeline in cui i processi possono essere completati in modo asincrono e condizionale. Ecco un esempio inventato ma completo di notifica.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

Altri suggerimenti

In Java 8 è possibile utilizzare CompletableFuture . Ecco un esempio che ho avuto nel mio codice in cui lo sto usando per recuperare gli utenti dal mio servizio utente, mapparli agli oggetti della mia vista e quindi aggiornare la mia vista o mostrare una finestra di dialogo di errore (questa è un'applicazione GUI):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

Si esegue in modo asincrono. Sto usando due metodi privati: mapUsersToUserViews e updateView .

Utilizza l'API futura ascoltabile di Guava e aggiungi una richiamata. Cf. dal sito web:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

È possibile estendere la classe FutureTask e sovrascrivere il metodo done () , quindi aggiungere l'oggetto FutureTask all'oggetto ExecutorService , quindi il metodo done () verrà richiamato quando FutureTask sarà completato immediatamente.

ThreadPoolExecutor ha anche i metodi hook beforeExecute e afterExecute che puoi sovrascrivere e usare. Ecco la descrizione da Javadocs .

  

Metodi hook

     

Questa classe fornisce l'overridable protetto beforeExecute (java.lang.Thread, java.lang.Runnable) e afterExecute (java .lang.Runnable, java.lang.Throwable) che vengono chiamati prima e dopo l'esecuzione di ogni attività. Questi possono essere usati per manipolare l'ambiente di esecuzione; ad esempio, reinizializzando ThreadLocals , raccogliendo statistiche o aggiungendo voci di registro. Inoltre, il metodo terminated () può essere ignorato per eseguire qualsiasi elaborazione speciale che deve essere eseguita una volta che Executor è stato completamente chiuso. Se i metodi hook o callback generano eccezioni, i thread di lavoro interni potrebbero a loro volta fallire e terminare bruscamente.

Utilizza un < code> CountDownLatch .

Viene da java.util.concurrent ed è esattamente il modo di aspettare che diversi thread completino l'esecuzione prima di continuare.

Per ottenere l'effetto di richiamata che stai cercando, è necessario un piccolo lavoro aggiuntivo. Vale a dire, gestirlo da solo in un thread separato che utilizza il CountDownLatch e lo attende, quindi continua a notificare tutto ciò che è necessario notificare. Non esiste un supporto nativo per il callback o qualcosa di simile a tale effetto.


MODIFICA: ora che comprendo ulteriormente la tua domanda, penso che tu stia raggiungendo troppo, inutilmente. Se prendi regolarmente SingleThreadExecutor , assegnagli tutte le attività e eseguirà le code in modo nativo.

Se vuoi assicurarti che nessuna attività verrà eseguita contemporaneamente, usa un SingleThreadedExecutor . Le attività verranno elaborate nell'ordine in cui sono state inviate. Non è nemmeno necessario conservare le attività, basta inviarle all'esecut.

Solo per aggiungere alla risposta di Matt, che ha aiutato, ecco un esempio più corposo per mostrare l'uso di un callback.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

L'output è:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

Codice semplice per implementare il meccanismo Callback utilizzando ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

uscita:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Note chiave:

  1. Se si desidera eseguire le attività di processo in sequenza nell'ordine FIFO, sostituire newFixedThreadPool (5) con newFixedThreadPool (1)
  2. Se si desidera elaborare l'attività successiva dopo aver analizzato il risultato da callback dell'attività precedente, basta annullare il commento sotto la riga

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. Puoi sostituire newFixedThreadPool () con uno di

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    a seconda del caso d'uso.

  4. Se si desidera gestire il metodo di callback in modo asincrono

    a. Passa un ExecutorService o ThreadPoolExecutor condiviso all'attività Callable

    b. Converti il ??tuo metodo Callable in Callable / Runnable

    c. Invia attività di callback a ExecutorService o ThreadPoolExecutor

Questa è un'estensione della risposta di Pache usando ListenableFuture di Guava.

In particolare, Futures.transform () restituisce ListenableFuture , quindi può essere utilizzato per concatenare le chiamate asincrone. Futures.addCallback () restituisce void , quindi non può essere utilizzato per il concatenamento, ma è utile per gestire il successo / fallimento in un completamento asincrono.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

NOTA: oltre a concatenare attività asincrone, Futures.transform () consente anche di pianificare ciascuna attività su un esecutore separato (non mostrato in questo esempio).

È possibile utilizzare un'implementazione di Callable tale che

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

dove CallbackInterface è qualcosa di molto semplice come

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

e ora la classe principale sarà simile a questa

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top