executores Java: como será notificado, sem bloquear, quando uma tarefa é concluída?
Pergunta
dizer que tenho uma fila cheia de tarefas que eu preciso submeter-se a um serviço executor. Eu quero que eles processados ??um de cada vez. A maneira mais simples que posso pensar é:
- Tome uma tarefa da fila
- Envie-o para o executor
- Chamada .GET sobre o Futuro voltou e bloco até que um resultado é disponível
- Tome outra tarefa da fila ...
No entanto, eu estou tentando evitar o bloqueio completo. Se eu tiver 10.000 essas filas, que precisam de suas tarefas processados ??um de cada vez, eu vou correr para fora do espaço de pilha porque a maioria deles será segurando a tópicos bloqueados.
O que eu gostaria é submeter uma tarefa e fornecer um call-back que é chamado quando a tarefa estiver concluída. Vou usar essa notificação call-back como um sinalizador para enviar a próxima tarefa. (Functionaljava e jetlang aparentemente utilizar tais algoritmos sem bloqueio, mas não posso compreender o seu código)
Como posso fazer isso usando java.util.concurrent do JDK, curta de escrever meu próprio serviço executor?
(a fila que alimenta me essas tarefas pode-se bloquear, mas isso é uma questão a ser abordada mais adiante)
Solução
Definir uma interface de retorno de chamada para receber tudo o que parâmetros que você quer passar ao longo da notificação de conclusão. Em seguida, invocá-lo no final da tarefa.
Você pode até escrever um invólucro geral para tarefas Execut�eis, e submetê-los para ExecutorService
. Ou, veja abaixo para um mecanismo incorporado Java 8.
class CallbackTask implements Runnable {
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}
public void run() {
task.run();
callback.complete();
}
}
Com CompletableFuture
, Java 8 incluídos uns meios mais elaborados para condutas de composição em que os processos podem ser completados de maneira assíncrona e condicionalmente. Aqui está um exemplo inventado, mas completo da notificação.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking {
public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}
void notify(String msg) {
System.out.println("Received message: " + msg);
}
}
class ExampleService {
String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}
public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}
public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}
}
Outras dicas
Em Java 8 você pode usar CompletableFuture . Aqui está um exemplo que eu tinha no meu código onde eu estou usando-o para buscar os usuários de meu serviço do usuário, mapeá-los aos meus ver objetos e, em seguida, atualizar meu ponto de vista ou mostrar um diálogo de erro (esta é uma aplicação GUI):
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);
É executado de forma assíncrona. Eu estou usando dois métodos privados:. mapUsersToUserViews
e updateView
Use listenable futuro Guava API e adicionar uma chamada de retorno. Cf. a partir do site:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
Você poderia estender a classe FutureTask
, e substituir o método done()
, em seguida, adicione o objeto FutureTask
ao ExecutorService
, então o método done()
irá invocar quando o FutureTask
concluída imediatamente.
ThreadPoolExecutor
também tem beforeExecute
e métodos afterExecute
gancho que você pode substituir e fazer uso. Aqui está a descrição de ThreadPoolExecutor
é Javadocs .
métodos de gancho
Essa classe fornece protegida substituível
beforeExecute(java.lang.Thread, java.lang.Runnable)
e métodosafterExecute(java.lang.Runnable, java.lang.Throwable)
que são chamados antes e após a execução de cada tarefa. Estes podem ser usados ??para manipular o ambiente de execução; por exemplo, reinitializingThreadLocals
, reunindo estatísticas, ou adicionar entradas de registo. Além disso, o métodoterminated()
pode ser substituído para executar qualquer processamento especial que precisa ser feito uma vez que oExecutor
foi totalmente encerrado. Se gancho ou de retorno de chamada métodos lançam exceções, segmentos de trabalho internos podem, por sua vez falhar e abruptamente terminar.
Use a CountDownLatch
.
É de java.util.concurrent
e é exatamente do jeito que esperar por vários tópicos a execução completa antes de continuar.
A fim de alcançar o efeito de retorno de chamada que você está cuidando, que requer um pouco de trabalho extra adicional. Ou seja, a manipulação isso por si mesmo em um segmento separado que utiliza a CountDownLatch
e faz espera nele, em seguida, passa sobre notificando tudo o que você precisa notificar. Não há suporte nativo para retorno de chamada, ou qualquer coisa semelhante para o efeito.
EDIT: agora que eu entendo ainda mais a sua pergunta, eu acho que você está chegando longe demais, desnecessariamente. Se você tomar um SingleThreadExecutor
, dar-lhe todas as tarefas, e ele vai fazer o enfileiramento nativamente.
Se você quiser ter certeza de que há tarefas serão executadas ao mesmo tempo, em seguida, usar um SingleThreadedExecutor . As tarefas serão processados ??na ordem do são submetidos. Você não precisa mesmo de realizar as tarefas, basta enviá-los para o exec.
Apenas para adicionar à resposta de Matt, o que ajudou, aqui está um exemplo mais aprimorada para mostrar o uso de uma chamada de retorno.
private static Primes primes = new Primes();
public static void main(String[] args) throws InterruptedException {
getPrimeAsync((p) ->
System.out.println("onPrimeListener; p=" + p));
System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
CompletableFuture.supplyAsync(primes::getNextPrime)
.thenApply((prime) -> {
System.out.println("getPrimeAsync(); prime=" + prime);
if (listener != null) {
listener.onPrime(prime);
}
return prime;
});
}
A saída é:
getPrimeAsync(); prime=241
onPrimeListener; p=241
Adios mi amigito
código simples de implementar mecanismo Callback
usando ExecutorService
import java.util.concurrent.*;
import java.util.*;
public class CallBackDemo{
public CallBackDemo(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(5);
try{
for ( int i=0; i<5; i++){
Callback callback = new Callback(i+1);
MyCallable myCallable = new MyCallable((long)i+1,callback);
Future<Long> future = service.submit(myCallable);
//System.out.println("future status:"+future.get()+":"+future.isDone());
}
}catch(Exception err){
err.printStackTrace();
}
service.shutdown();
}
public static void main(String args[]){
CallBackDemo demo = new CallBackDemo();
}
}
class MyCallable implements Callable<Long>{
Long id = 0L;
Callback callback;
public MyCallable(Long val,Callback obj){
this.id = val;
this.callback = obj;
}
public Long call(){
//Add your business logic
System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
callback.callbackMethod();
return id;
}
}
class Callback {
private int i;
public Callback(int i){
this.i = i;
}
public void callbackMethod(){
System.out.println("Call back:"+i);
// Add your business logic
}
}
saída:
creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4
Notas-chave:
- Se você quiser tarefas do processo em sequência na ordem FIFO, substitua
newFixedThreadPool(5)
comnewFixedThreadPool(1)
-
Se você quiser processar próxima tarefa depois de analisar o resultado da
callback
da tarefa anterior, apenas un-comentário abaixo da linha//System.out.println("future status:"+future.get()+":"+future.isDone());
-
Você pode substituir
newFixedThreadPool()
com um dosExecutors.newCachedThreadPool() Executors.newWorkStealingPool() ThreadPoolExecutor
dependendo do seu caso de uso.
-
Se você quiser método de retorno alça de forma assíncrona
a. Passar um
ExecutorService or ThreadPoolExecutor
compartilhado para tarefas Callableb. Converter o seu método
Callable
a tarefaCallable/Runnable
c. tarefa impulso de retorno de chamada para
ExecutorService or ThreadPoolExecutor
Esta é uma extensão para a resposta de Pache usando ListenableFuture
de goiaba.
Em particular, Futures.transform()
retornos ListenableFuture
assim que pode ser usado para chamadas de cadeia assíncronos. Futures.addCallback()
retornos void
, por isso não pode ser usado para encadear, mas é bom para lidar com sucesso / fracasso em uma conclusão assíncrona.
// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());
// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
Futures.transform(database, database -> database.query(table, ...));
// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
Futures.transform(cursor, cursor -> cursorToFooList(cursor));
// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
public void onSuccess(List<Foo> foos) {
doSomethingWith(foos);
}
public void onFailure(Throwable thrown) {
log.error(thrown);
}
});
NOTA: Além de encadeamento tarefas assíncronas, Futures.transform()
também permite agendar cada tarefa em um executor separado (Não mostrado neste exemplo)
Você pode usar uma implementação de Callable tal que
public class MyAsyncCallable<V> implements Callable<V> {
CallbackInterface ci;
public MyAsyncCallable(CallbackInterface ci) {
this.ci = ci;
}
public V call() throws Exception {
System.out.println("Call of MyCallable invoked");
System.out.println("Result = " + this.ci.doSomething(10, 20));
return (V) "Good job";
}
}
onde CallbackInterface é algo muito básico como
public interface CallbackInterface {
public int doSomething(int a, int b);
}
e agora a classe principal será parecido com este
ExecutorService ex = Executors.newFixedThreadPool(2);
MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);