Исполнители Java:как получать уведомления без блокировки о завершении задачи?

StackOverflow https://stackoverflow.com/questions/826212

Вопрос

Допустим, у меня есть очередь, полная задач, которые мне нужно отправить в службу-исполнитель.Я хочу, чтобы они обрабатывались по одному за раз.Самый простой способ, который я могу придумать, - это:

  1. Возьмите задачу из очереди
  2. Отправьте его исполнителю
  3. Вызовите.получите возвращенное будущее и заблокируйте до тех пор, пока не будет доступен результат
  4. Возьмите другую задачу из очереди...

Тем не менее, я пытаюсь полностью избежать блокировки.Если у меня будет 10 000 таких очередей, которым нужно обрабатывать свои задачи по одной за раз, у меня закончится место в стеке, потому что большинство из них будут привязаны к заблокированным потокам.

Чего я хотел бы, так это отправить задачу и обеспечить обратный вызов, который вызывается после завершения задачи.Я буду использовать это уведомление об обратном вызове в качестве флага для отправки следующей задачи.(functionaljava и jetlang, по-видимому, используют такие неблокирующие алгоритмы, но я не могу понять их код)

Как я могу это сделать, используя java.util.concurrent от JDK, за исключением написания моей собственной службы-исполнителя?

(очередь, которая передает мне эти задачи, сама может блокироваться, но это проблема, которую нужно решить позже)

Это было полезно?

Решение

Определите интерфейс обратного вызова для получения любых параметров, которые вы хотите передать в уведомлении о завершении. Затем вызовите его в конце задачи.

Вы даже можете написать общую оболочку для задач Runnable и отправить их в ExecutorService . Или см. Ниже механизм, встроенный в Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}
<Ч>

С CompletableFuture , Java 8 включает более сложные средства для составления конвейеров, где процессы могут выполняться асинхронно и условно. Вот надуманный, но полный пример уведомления.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

Другие советы

Используйте API прослушиваемого будущего Guava и добавьте обратный вызов. Ср с сайта:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

Вы можете расширить класс FutureTask и переопределить метод done () , а затем добавить объект FutureTask в ExecutorService , поэтому метод done () будет вызван, когда FutureTask завершится немедленно.

ThreadPoolExecutor также имеет методы ловушки beforeExecute и afterExecute , которые вы можете переопределить и использовать. Вот описание из ThreadPoolExecutor Javadocs .

  

Хук-методы

     

Этот класс предоставляет защищенную переопределяемую beforeExecute (java.lang.Thread, java.lang.Runnable) и afterExecute (java .lang.Runnable, java.lang.Throwable) методы, которые вызываются до и после выполнения каждой задачи. Их можно использовать для манипулирования средой исполнения; например, повторная инициализация ThreadLocals , сбор статистики или добавление записей журнала. Кроме того, метод terminated () может быть переопределен для выполнения любой специальной обработки, которую необходимо выполнить после полного завершения Executor . Если методы ловушки или обратного вызова генерируют исключения, внутренние рабочие потоки могут, в свою очередь, завершиться сбоем и внезапно завершиться.

Используйте < код> CountDownLatch .

Это из java.util.concurrent , и именно так можно дождаться завершения нескольких потоков, прежде чем продолжить.

Чтобы добиться эффекта обратного вызова, за которым вы ухаживаете, это требует немного дополнительной дополнительной работы. А именно, обрабатывая это самостоятельно в отдельном потоке, который использует CountDownLatch и ожидает его, а затем продолжает уведомлять обо всем, что вам нужно уведомить. Нет встроенной поддержки для обратного вызова или чего-либо подобного этому эффекту.

<Ч>

РЕДАКТИРОВАТЬ: теперь, когда я понимаю ваш вопрос, я думаю, вы слишком далеко зашли слишком далеко. Если вы берете обычный SingleThreadExecutor , передайте ему все задачи, и он будет выполнять очередную очередь.

Если вы хотите убедиться, что никакие задачи не будут выполняться одновременно, используйте SingleThreadedExecutor . Задачи будут обработаны в порядке их отправки. Вам даже не нужно держать задачи, просто отправьте их в exec.

Просто добавьте к ответу Мэтта, который помог, вот более конкретный пример, демонстрирующий использование обратного вызова.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

Вывод:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

Простой в реализации код Callback механизм, использующий ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

выходной сигнал:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Ключевые примечания:

  1. Если вы хотите обрабатывать задачи последовательно в порядке FIFO, замените newFixedThreadPool(5) с newFixedThreadPool(1)
  2. Если вы хотите обработать следующую задачу после анализа результата из callback из предыдущей задачи, просто отмените комментарий под строкой

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. Вы можете заменить newFixedThreadPool() с одним из

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    в зависимости от вашего варианта использования.

  4. Если вы хотите обрабатывать метод обратного вызова асинхронно

    a.Передать общий доступ ExecutorService or ThreadPoolExecutor к вызываемой задаче

    b.Преобразуйте свой Callable способ получения Callable/Runnable задача

    c.Отправить задачу обратного вызова в ExecutorService or ThreadPoolExecutor

Это расширение ответа Пача с использованием ListenableFuture в Guava.

В частности, Futures.transform () возвращает ListenableFuture , поэтому его можно использовать для объединения асинхронных вызовов. Futures.addCallback () возвращает void , поэтому не может использоваться для цепочки, но подходит для обработки успеха / неудачи при асинхронном завершении.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

ПРИМЕЧАНИЕ. Помимо цепочки асинхронных задач, Futures.transform () также позволяет планировать каждую задачу для отдельного исполнителя (не показан в этом примере).

Вы можете использовать реализацию Callable так, чтобы

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

где CallbackInterface является чем-то очень простым, например

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

и теперь основной класс будет выглядеть так

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top