Pergunta

dizer que tenho uma fila cheia de tarefas que eu preciso submeter-se a um serviço executor. Eu quero que eles processados ??um de cada vez. A maneira mais simples que posso pensar é:

  1. Tome uma tarefa da fila
  2. Envie-o para o executor
  3. Chamada .GET sobre o Futuro voltou e bloco até que um resultado é disponível
  4. Tome outra tarefa da fila ...

No entanto, eu estou tentando evitar o bloqueio completo. Se eu tiver 10.000 essas filas, que precisam de suas tarefas processados ??um de cada vez, eu vou correr para fora do espaço de pilha porque a maioria deles será segurando a tópicos bloqueados.

O que eu gostaria é submeter uma tarefa e fornecer um call-back que é chamado quando a tarefa estiver concluída. Vou usar essa notificação call-back como um sinalizador para enviar a próxima tarefa. (Functionaljava e jetlang aparentemente utilizar tais algoritmos sem bloqueio, mas não posso compreender o seu código)

Como posso fazer isso usando java.util.concurrent do JDK, curta de escrever meu próprio serviço executor?

(a fila que alimenta me essas tarefas pode-se bloquear, mas isso é uma questão a ser abordada mais adiante)

Foi útil?

Solução

Definir uma interface de retorno de chamada para receber tudo o que parâmetros que você quer passar ao longo da notificação de conclusão. Em seguida, invocá-lo no final da tarefa.

Você pode até escrever um invólucro geral para tarefas Execut�eis, e submetê-los para ExecutorService. Ou, veja abaixo para um mecanismo incorporado Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

Com CompletableFuture , Java 8 incluídos uns meios mais elaborados para condutas de composição em que os processos podem ser completados de maneira assíncrona e condicionalmente. Aqui está um exemplo inventado, mas completo da notificação.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

Outras dicas

Em Java 8 você pode usar CompletableFuture . Aqui está um exemplo que eu tinha no meu código onde eu estou usando-o para buscar os usuários de meu serviço do usuário, mapeá-los aos meus ver objetos e, em seguida, atualizar meu ponto de vista ou mostrar um diálogo de erro (esta é uma aplicação GUI):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

É executado de forma assíncrona. Eu estou usando dois métodos privados:. mapUsersToUserViews e updateView

Use listenable futuro Guava API e adicionar uma chamada de retorno. Cf. a partir do site:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

Você poderia estender a classe FutureTask, e substituir o método done(), em seguida, adicione o objeto FutureTask ao ExecutorService, então o método done() irá invocar quando o FutureTask concluída imediatamente.

ThreadPoolExecutor também tem beforeExecute e métodos afterExecute gancho que você pode substituir e fazer uso. Aqui está a descrição de ThreadPoolExecutor é Javadocs .

métodos de gancho

Essa classe fornece protegida substituível beforeExecute(java.lang.Thread, java.lang.Runnable) e métodos afterExecute(java.lang.Runnable, java.lang.Throwable) que são chamados antes e após a execução de cada tarefa. Estes podem ser usados ??para manipular o ambiente de execução; por exemplo, reinitializing ThreadLocals, reunindo estatísticas, ou adicionar entradas de registo. Além disso, o método terminated() pode ser substituído para executar qualquer processamento especial que precisa ser feito uma vez que o Executor foi totalmente encerrado. Se gancho ou de retorno de chamada métodos lançam exceções, segmentos de trabalho internos podem, por sua vez falhar e abruptamente terminar.

Use a CountDownLatch .

É de java.util.concurrent e é exatamente do jeito que esperar por vários tópicos a execução completa antes de continuar.

A fim de alcançar o efeito de retorno de chamada que você está cuidando, que requer um pouco de trabalho extra adicional. Ou seja, a manipulação isso por si mesmo em um segmento separado que utiliza a CountDownLatch e faz espera nele, em seguida, passa sobre notificando tudo o que você precisa notificar. Não há suporte nativo para retorno de chamada, ou qualquer coisa semelhante para o efeito.


EDIT: agora que eu entendo ainda mais a sua pergunta, eu acho que você está chegando longe demais, desnecessariamente. Se você tomar um SingleThreadExecutor , dar-lhe todas as tarefas, e ele vai fazer o enfileiramento nativamente.

Se você quiser ter certeza de que há tarefas serão executadas ao mesmo tempo, em seguida, usar um SingleThreadedExecutor . As tarefas serão processados ??na ordem do são submetidos. Você não precisa mesmo de realizar as tarefas, basta enviá-los para o exec.

Apenas para adicionar à resposta de Matt, o que ajudou, aqui está um exemplo mais aprimorada para mostrar o uso de uma chamada de retorno.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

A saída é:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito

código simples de implementar mecanismo Callback usando ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

saída:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Notas-chave:

  1. Se você quiser tarefas do processo em sequência na ordem FIFO, substitua newFixedThreadPool(5) com newFixedThreadPool(1)
  2. Se você quiser processar próxima tarefa depois de analisar o resultado da callback da tarefa anterior, apenas un-comentário abaixo da linha

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. Você pode substituir newFixedThreadPool() com um dos

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    dependendo do seu caso de uso.

  4. Se você quiser método de retorno alça de forma assíncrona

    a. Passar um ExecutorService or ThreadPoolExecutor compartilhado para tarefas Callable

    b. Converter o seu método Callable a tarefa Callable/Runnable

    c. tarefa impulso de retorno de chamada para ExecutorService or ThreadPoolExecutor

Esta é uma extensão para a resposta de Pache usando ListenableFuture de goiaba.

Em particular, Futures.transform() retornos ListenableFuture assim que pode ser usado para chamadas de cadeia assíncronos. Futures.addCallback() retornos void, por isso não pode ser usado para encadear, mas é bom para lidar com sucesso / fracasso em uma conclusão assíncrona.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

NOTA: Além de encadeamento tarefas assíncronas, Futures.transform() também permite agendar cada tarefa em um executor separado (Não mostrado neste exemplo)

.

Você pode usar uma implementação de Callable tal que

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

onde CallbackInterface é algo muito básico como

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

e agora a classe principal será parecido com este

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top