Java Testamentsvollstrecker: Wie benachrichtigt werden, ohne zu blockieren, wenn eine Aufgabe abgeschlossen ist?

StackOverflow https://stackoverflow.com/questions/826212

Frage

Sagen wir, ich habe eine Warteschlange voll von Aufgaben, die ich zu einem Testamentsvollstrecker Dienst einreichen müssen. Ich mag, dass sie einen nach dem anderen verarbeitet. Der einfachste Weg, die ich denken kann ist:

  1. Nehmen Sie eine Aufgabe aus der Warteschlange
  2. Senden sie dem Vollstrecker
  3. Rufen Sie .get auf der zurück Zukunft und Block, bis ein Ergebnis vorliegt
  4. Nehmen Sie eine andere Aufgabe aus der Warteschlange ...

Allerdings versuche ich, ganz zu vermeiden blockieren. Wenn ich 10.000 solche Warteschlangen haben, die ihre Aufgaben benötigen einer nach dem anderen verarbeitet, werde ich aus Stapelspeicher ausgeführt werden, da die meisten von ihnen werden in den gesperrten Threads festhalten.

Was ich möchte, ist, eine Aufgabe zu stellen und eine Rückruf bereitzustellen, die aufgerufen wird, wenn die Aufgabe abgeschlossen ist. Ich werde diese Rückrufbenachrichtigung als Flag verwenden, um die nächste Aufgabe zu senden. (Functionaljava und jetlang offenbar verwenden solche nicht-blockierende Algorithmen, aber ich kann nicht verstehen, ihren Code)

Wie kann ich das JDK java.util.concurrent verwenden, kurz meine eigenen Testamentsvollstrecker Dienst zu schreiben?

(die Warteschlange, die mir diese Aufgaben kann sich Block-Feeds, aber das ist ein Thema, später in Angriff genommen werden)

War es hilfreich?

Lösung

Definieren Sie eine Callback-Schnittstelle zu empfangen, was Parameter, die Sie wollen in der Abschlussmeldung an passieren entlang. Dann rufen Sie es am Ende der Aufgabe.

Sie könnten sogar eine allgemeine Wrapper für Runnable Aufgaben schreiben und senden diese an ExecutorService. Oder siehe unten für einen Mechanismus in Java gebaut 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

Mit CompletableFuture , Java 8 enthielt ein aufwendigeren Mittel Pipelines zu komponieren, wo Prozesse asynchron und bedingt abgeschlossen werden können. Hier ist ein erfundenes, aber vollständiges Beispiel der Mitteilung.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

Andere Tipps

In Java 8 können Sie verwenden CompletableFuture . Hier ist ein Beispiel, das ich in meinem Code, wo ich bin mit ihm Benutzer von meinem Benutzerdienst zu holen, Karte, sie zu meiner Ansicht Objekten und dann meine Ansicht aktualisieren oder einen Fehlerdialog zeigen (dies ist eine GUI-Anwendung):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

Es wird asynchron ausgeführt. Ich bin mit zwei privaten Methoden. mapUsersToUserViews und updateView

Verwenden Sie Guava die hörbare Zukunft API und einen Rückruf hinzuzufügen. Vgl von der Website:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

Sie könnten FutureTask Klasse erweitern und die done() Methode überschreiben, dann das FutureTask Objekt der ExecutorService hinzufügen, so dass die done() Methode aufrufe, wenn der FutureTask sofort abgeschlossen.

ThreadPoolExecutor hat auch beforeExecute und afterExecute Hook-Methoden, die Sie und nutzen überschreiben können. Hier ist die Beschreibung von ThreadPoolExecutor Javadocs .

  

Haken Methoden

     

stellt diese Klasse geschützt overridable beforeExecute(java.lang.Thread, java.lang.Runnable) und afterExecute(java.lang.Runnable, java.lang.Throwable) Methoden, die vor und nach der Ausführung jeder Aufgabe aufgerufen werden. Diese können verwendet werden, um die Ausführungsumgebung zu manipulieren; beispielsweise Reinitialisieren ThreadLocals, das Sammeln von Statistiken oder Log-Einträge hinzufügen. Zusätzlich Methode terminated() kann eine spezielle Verarbeitung auszuführen, außer Kraft gesetzt werden, die getan werden muss, sobald die Executor vollständig beendet werden. Wenn Haken oder Callback-Methoden Ausnahmen auslösen, interne Arbeitsthreads wiederum fehlschlagen und abrupt beenden.

Verwenden Sie einen CountDownLatch .

Es ist von java.util.concurrent und es ist genau so, wie für mehrere Threads warten Ausführung abgeschlossen ist, bevor Sie fortfahren.

Um den Callback-Effekt Sie suchen nach zu erreichen, die eine wenig zusätzlichen zusätzlichen Arbeit erfordert. Das heißt, diese in einem separaten Thread selbst Handhabung, die die CountDownLatch verwendet und auf sie nicht warten, geht dann auf zu benachrichtigen, was es ist, Sie benachrichtigen müssen. Es gibt keine native Unterstützung für Rückruf oder etwas ähnliches in diesem Sinne.


EDIT: jetzt, dass ich Ihre Frage weiter verstehen, ich glaube, Sie zu weit erreichen, unnötig. Wenn Sie eine regelmäßige SingleThreadExecutor , geben sie alle Aufgaben, und es wird nativ die Warteschlangen tun.

Wenn Sie wollen sicherstellen, dass keine Aufgaben zur gleichen Zeit ausgeführt werden dann mit einem SingleThreadedExecutor . Die Aufgaben werden in der Reihenfolge verarbeitet werden, die vorgelegt werden. Sie brauchen nicht einmal die Aufgaben zu halten, legt sie nur auf die exec.

Nur um Matts Antwort hinzuzufügen, die geholfen haben, hier ist ein prall-out Beispiel die Verwendung eines Callback zu zeigen.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

Die Ausgabe lautet:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

Einfacher Code Callback Mechanismus ExecutorService zu implementieren

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

Ausgabe:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Key Hinweise:

  1. Wenn Sie Prozessaufgaben in Folge in FIFO-Reihenfolge möchten, ersetzen newFixedThreadPool(5) mit newFixedThreadPool(1)
  2. Wenn Sie nächste Aufgabe zu verarbeiten, nachdem das Ergebnis aus callback der vorherigen Aufgabe zu analysieren, nur un-Kommentar unterhalb der Linie

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. Sie können newFixedThreadPool() mit einem

    ersetzen
    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    Je nach Anwendungsfall.

  4. Wenn Sie möchten, asynchron Callback-Methode behandeln

    a. Führen Sie einen gemeinsamen ExecutorService or ThreadPoolExecutor zu aufrufbare Aufgabe

    b. Konvertieren Sie Ihre Callable Methode Aufgabe Callable/Runnable

    c. Drücken Sie Rückruf Aufgabe ExecutorService or ThreadPoolExecutor

Dies ist eine Erweiterung zu Pache Antwort Guava des ListenableFuture verwenden.

Insbesondere Futures.transform() kehrt ListenableFuture so kann Kette async Anrufe verwendet werden. Futures.addCallback() kehrt void, so kann nicht für die Verkettung verwendet werden, aber es ist gut für den Umgang mit Erfolg / Misserfolg auf einem Asynchron-Abschluss.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

. Hinweis: Neben async Aufgaben Chaining Futures.transform() auch ermöglicht es Ihnen, jede Aufgabe auf einem separaten Testamentsvollstrecker zu planen (nicht in diesem Beispiel gezeigt)

Sie können eine Implementierung von Callable verwenden, so dass

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

wo CallbackInterface etwas sehr einfach wie

ist
public interface CallbackInterface {
    public int doSomething(int a, int b);
}

und jetzt die Hauptklasse wird wie folgt aussehen

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top