Question

Dites que j'ai une file d'attente pleine de tâches que je dois soumettre à un service d'exécution. Je veux qu'ils soient traités un à la fois. Le moyen le plus simple auquel je puisse penser est de:

  1. Effectuer une tâche dans la file d'attente
  2. Soumettez-le à l'exécuteur
  3. Appelez .get sur le futur renvoyé et bloquez jusqu'à ce qu'un résultat soit disponible
  4. Prenez une autre tâche dans la file d'attente ...

Cependant, j'essaie d'éviter de bloquer complètement. Si j'ai 10 000 files d'attente de ce type, dont les tâches doivent être traitées une à la fois, je vais manquer d'espace de pile car la plupart d'entre elles conservent des threads bloqués.

Ce que je voudrais, c'est soumettre une tâche et fournir un rappel qui est appelé lorsque la tâche est terminée. J'utiliserai cette notification de rappel comme indicateur pour envoyer la tâche suivante. (functionaljava et jetlang utilisent apparemment de tels algorithmes, mais je ne comprends pas leur code)

Comment puis-je faire cela en utilisant java.util.concurrent du JDK, à moins d'écrire mon propre service d'exécuteur?

(la file d’alimentation qui me nourrit ces tâches peut elle-même bloquer, mais c’est un problème à traiter plus tard)

Était-ce utile?

La solution

Définissez une interface de rappel pour recevoir les paramètres que vous souhaitez transmettre dans la notification d'achèvement. Puis appelez-le à la fin de la tâche.

Vous pouvez même écrire un wrapper général pour les tâches exécutables et les soumettre à ExecutorService . Ou, voir ci-dessous un mécanisme intégré à Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

Avec CompletableFuture , Java 8 inclut un moyen plus élaboré de composer des pipelines dans lesquels les processus peuvent être terminés de manière asynchrone et conditionnelle. Voici un exemple de notification complexe mais complet.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

Autres conseils

En Java 8, vous pouvez utiliser CompletableFuture . Voici un exemple dans mon code où je l'utilise pour extraire des utilisateurs de mon service utilisateur, les mapper sur mes objets de vue, puis mettre à jour ma vue ou afficher une boîte de dialogue d'erreur (il s'agit d'une application graphique):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

Il s’exécute de manière asynchrone. J'utilise deux méthodes privées: mapUsersToUserViews et updateView .

Utilisez l'API future de Guava à l'écoute et ajoutez un rappel. Cf. sur le site:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

Vous pouvez étendre la classe FutureTask et remplacer la méthode done () , puis ajouter l'objet FutureTask à ExecutorService , donc la méthode done () sera appelée lorsque la FutureTask sera terminée immédiatement.

ThreadPoolExecutor contient également les méthodes de raccordement beforeExecute et afterExecute que vous pouvez remplacer et utiliser. Voici la description de ThreadPoolExecutor Javadocs .

  

Méthodes de hook

     

Cette classe fournit des éléments substituables protégés beforeExecute (java.lang.Thread, java.lang.Runnable) et afterExecute (java) .lang.Runnable, java.lang.Throwable) qui sont appelées avant et après l'exécution de chaque tâche. Ceux-ci peuvent être utilisés pour manipuler l'environnement d'exécution; Par exemple, réinitialiser ThreadLocals , collecter des statistiques ou ajouter des entrées de journal. De plus, la méthode terminated () peut être remplacé pour effectuer tout traitement spécial à effectuer une fois que exécuteur est complètement terminé. Si les méthodes de raccordement ou de rappel génèrent des exceptions, les threads de travail internes peuvent à leur tour échouer et se terminer brusquement.

Utilisez un < code> CountDownLatch .

Il provient de java.util.concurrent et constitue le moyen d'attendre l'exécution de plusieurs threads avant de poursuivre.

Pour obtenir l'effet de rappel que vous recherchez, cela nécessite un peu de travail supplémentaire. C'est-à-dire que vous gérez cela vous-même dans un thread séparé qui utilise CountDownLatch et l'attend, puis continue à notifier tout ce que vous devez notifier. Il n'y a pas de support natif pour le rappel, ou quelque chose de similaire à cet effet.

MODIFIER: maintenant que je comprends mieux votre question, je pense que vous allez trop loin, inutilement. Si vous prenez régulièrement un SingleThreadExecutor , confiez-lui toutes les tâches et effectuera la mise en file d'attente de manière native.

Si vous voulez vous assurer qu'aucune tâche ne s'exécutera en même temps, utilisez un SingleThreadedExecutor . Les tâches seront traitées dans l'ordre dans lequel elles sont soumises. Vous n'avez même pas besoin de conserver les tâches, il vous suffit de les soumettre à l'exécutif.

Juste pour ajouter à la réponse de Matt, ce qui a aidé, voici un exemple plus étoffé pour montrer l'utilisation d'un rappel.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

Le résultat est:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

Code simple pour implémenter le mécanisme Callback à l'aide de ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

sortie:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Remarques clés:

  1. Si vous souhaitez traiter les tâches en séquence dans l'ordre FIFO, remplacez newFixedThreadPool (5) par newFixedThreadPool (1)
  2. Si vous souhaitez traiter la tâche suivante après avoir analysé le résultat du rappel de la tâche précédente, supprimez simplement le commentaire sous la ligne

    .
    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. Vous pouvez remplacer newFixedThreadPool () par l'un des

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    en fonction de votre cas d'utilisation.

  4. Si vous souhaitez gérer la méthode de rappel de manière asynchrone

    a. Passez un ExecutorService ou un ThreadPoolExecutor partagé à la tâche appelable

    .

    b. Convertissez votre méthode Callable en tâche Callable / Runnable

    c. Envoyer la tâche de rappel à ExecutorService ou ThreadPoolExecutor

Il s'agit d'une extension de la réponse de Pache utilisant le ListenableFuture de Guava.

En particulier, Futures.transform () renvoie ListenableFuture et peut donc être utilisé pour chaîner des appels asynchrones. Futures.addCallback () renvoie void ; il ne peut donc pas être utilisé pour le chaînage, mais convient pour gérer la réussite / l'échec lors de l'achèvement asynchrone.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

REMARQUE: Outre le chaînage de tâches asynchrones, Futures.transform () vous permet également de planifier chaque tâche sur un exécuteur distinct (non illustré dans cet exemple).

Vous pouvez utiliser une implémentation de Callable telle que

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

où CallbackInterface est quelque chose de très basique comme

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

et maintenant la classe principale ressemblera à ceci

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top