Исполнители Java:как получать уведомления без блокировки о завершении задачи?
Вопрос
Допустим, у меня есть очередь, полная задач, которые мне нужно отправить в службу-исполнитель.Я хочу, чтобы они обрабатывались по одному за раз.Самый простой способ, который я могу придумать, - это:
- Возьмите задачу из очереди
- Отправьте его исполнителю
- Вызовите.получите возвращенное будущее и заблокируйте до тех пор, пока не будет доступен результат
- Возьмите другую задачу из очереди...
Тем не менее, я пытаюсь полностью избежать блокировки.Если у меня будет 10 000 таких очередей, которым нужно обрабатывать свои задачи по одной за раз, у меня закончится место в стеке, потому что большинство из них будут привязаны к заблокированным потокам.
Чего я хотел бы, так это отправить задачу и обеспечить обратный вызов, который вызывается после завершения задачи.Я буду использовать это уведомление об обратном вызове в качестве флага для отправки следующей задачи.(functionaljava и jetlang, по-видимому, используют такие неблокирующие алгоритмы, но я не могу понять их код)
Как я могу это сделать, используя java.util.concurrent от JDK, за исключением написания моей собственной службы-исполнителя?
(очередь, которая передает мне эти задачи, сама может блокироваться, но это проблема, которую нужно решить позже)
Решение
Определите интерфейс обратного вызова для получения любых параметров, которые вы хотите передать в уведомлении о завершении. Затем вызовите его в конце задачи. Р>
Вы даже можете написать общую оболочку для задач Runnable и отправить их в ExecutorService
. Или см. Ниже механизм, встроенный в Java 8.
class CallbackTask implements Runnable {
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}
public void run() {
task.run();
callback.complete();
}
}
<Ч>
С CompletableFuture
, Java 8 включает более сложные средства для составления конвейеров, где процессы могут выполняться асинхронно и условно. Вот надуманный, но полный пример уведомления.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking {
public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}
void notify(String msg) {
System.out.println("Received message: " + msg);
}
}
class ExampleService {
String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}
public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}
public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}
}
Другие советы
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);
Он выполняется асинхронно. Я использую два закрытых метода: mapUsersToUserViews
и updateView
.
Используйте API прослушиваемого будущего Guava и добавьте обратный вызов. Ср с сайта:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
Вы можете расширить класс FutureTask
и переопределить метод done ()
, а затем добавить объект FutureTask
в ExecutorService
, поэтому метод done ()
будет вызван, когда FutureTask
завершится немедленно.
ThreadPoolExecutor
также имеет методы ловушки beforeExecute
и afterExecute
, которые вы можете переопределить и использовать. Вот описание из ThreadPoolExecutor
Javadocs .
Хук-методы
Этот класс предоставляет защищенную переопределяемую
beforeExecute (java.lang.Thread, java.lang.Runnable)
иafterExecute (java .lang.Runnable, java.lang.Throwable)
методы, которые вызываются до и после выполнения каждой задачи. Их можно использовать для манипулирования средой исполнения; например, повторная инициализацияThreadLocals
, сбор статистики или добавление записей журнала. Кроме того, методterminated ()
может быть переопределен для выполнения любой специальной обработки, которую необходимо выполнить после полного завершенияExecutor
. Если методы ловушки или обратного вызова генерируют исключения, внутренние рабочие потоки могут, в свою очередь, завершиться сбоем и внезапно завершиться.
Используйте < код> CountDownLatch код> .
Это из java.util.concurrent
, и именно так можно дождаться завершения нескольких потоков, прежде чем продолжить.
Чтобы добиться эффекта обратного вызова, за которым вы ухаживаете, это требует немного дополнительной дополнительной работы. А именно, обрабатывая это самостоятельно в отдельном потоке, который использует CountDownLatch
и ожидает его, а затем продолжает уведомлять обо всем, что вам нужно уведомить. Нет встроенной поддержки для обратного вызова или чего-либо подобного этому эффекту.
РЕДАКТИРОВАТЬ: теперь, когда я понимаю ваш вопрос, я думаю, вы слишком далеко зашли слишком далеко. Если вы берете обычный SingleThreadExecutor
, передайте ему все задачи, и он будет выполнять очередную очередь.
Если вы хотите убедиться, что никакие задачи не будут выполняться одновременно, используйте SingleThreadedExecutor . Задачи будут обработаны в порядке их отправки. Вам даже не нужно держать задачи, просто отправьте их в exec.
Просто добавьте к ответу Мэтта, который помог, вот более конкретный пример, демонстрирующий использование обратного вызова.
private static Primes primes = new Primes();
public static void main(String[] args) throws InterruptedException {
getPrimeAsync((p) ->
System.out.println("onPrimeListener; p=" + p));
System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
CompletableFuture.supplyAsync(primes::getNextPrime)
.thenApply((prime) -> {
System.out.println("getPrimeAsync(); prime=" + prime);
if (listener != null) {
listener.onPrime(prime);
}
return prime;
});
}
Вывод:
getPrimeAsync(); prime=241
onPrimeListener; p=241
Adios mi amigito
Простой в реализации код Callback
механизм, использующий ExecutorService
import java.util.concurrent.*;
import java.util.*;
public class CallBackDemo{
public CallBackDemo(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(5);
try{
for ( int i=0; i<5; i++){
Callback callback = new Callback(i+1);
MyCallable myCallable = new MyCallable((long)i+1,callback);
Future<Long> future = service.submit(myCallable);
//System.out.println("future status:"+future.get()+":"+future.isDone());
}
}catch(Exception err){
err.printStackTrace();
}
service.shutdown();
}
public static void main(String args[]){
CallBackDemo demo = new CallBackDemo();
}
}
class MyCallable implements Callable<Long>{
Long id = 0L;
Callback callback;
public MyCallable(Long val,Callback obj){
this.id = val;
this.callback = obj;
}
public Long call(){
//Add your business logic
System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
callback.callbackMethod();
return id;
}
}
class Callback {
private int i;
public Callback(int i){
this.i = i;
}
public void callbackMethod(){
System.out.println("Call back:"+i);
// Add your business logic
}
}
выходной сигнал:
creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4
Ключевые примечания:
- Если вы хотите обрабатывать задачи последовательно в порядке FIFO, замените
newFixedThreadPool(5)
сnewFixedThreadPool(1)
Если вы хотите обработать следующую задачу после анализа результата из
callback
из предыдущей задачи, просто отмените комментарий под строкой//System.out.println("future status:"+future.get()+":"+future.isDone());
Вы можете заменить
newFixedThreadPool()
с одним изExecutors.newCachedThreadPool() Executors.newWorkStealingPool() ThreadPoolExecutor
в зависимости от вашего варианта использования.
Если вы хотите обрабатывать метод обратного вызова асинхронно
a.Передать общий доступ
ExecutorService or ThreadPoolExecutor
к вызываемой задачеb.Преобразуйте свой
Callable
способ полученияCallable/Runnable
задачаc.Отправить задачу обратного вызова в
ExecutorService or ThreadPoolExecutor
Это расширение ответа Пача с использованием ListenableFuture
в Guava.
В частности, Futures.transform ()
возвращает ListenableFuture
, поэтому его можно использовать для объединения асинхронных вызовов. Futures.addCallback ()
возвращает void
, поэтому не может использоваться для цепочки, но подходит для обработки успеха / неудачи при асинхронном завершении.
// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());
// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
Futures.transform(database, database -> database.query(table, ...));
// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
Futures.transform(cursor, cursor -> cursorToFooList(cursor));
// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
public void onSuccess(List<Foo> foos) {
doSomethingWith(foos);
}
public void onFailure(Throwable thrown) {
log.error(thrown);
}
});
ПРИМЕЧАНИЕ. Помимо цепочки асинхронных задач, Futures.transform ()
также позволяет планировать каждую задачу для отдельного исполнителя (не показан в этом примере). р>
Вы можете использовать реализацию Callable так, чтобы
public class MyAsyncCallable<V> implements Callable<V> {
CallbackInterface ci;
public MyAsyncCallable(CallbackInterface ci) {
this.ci = ci;
}
public V call() throws Exception {
System.out.println("Call of MyCallable invoked");
System.out.println("Result = " + this.ci.doSomething(10, 20));
return (V) "Good job";
}
}
где CallbackInterface является чем-то очень простым, например
public interface CallbackInterface {
public int doSomething(int a, int b);
}
и теперь основной класс будет выглядеть так
ExecutorService ex = Executors.newFixedThreadPool(2);
MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);