Domanda

I want to create a singleton-ExecutorService with a fixed threadpool size. Another thread will feed that ExecutorService with Callables and I want to parse the result of the Callables (optimally) immediately after the execution is done.

I am really uncertain how to implement this properly. My initial thought was a method in the singleton-ES, which adds a Callable to the ExecutorService via "submit(callable)" and stores the resulting Future inside a HashMap or ArrayList inside the singleton. Another thread would check the Futures for results within a given interval.

But somehow this solution does not "feel right" and I didn't find a solution for this usecase elsewhere, so I am asking you guys before I code something I regret later. How would you approach this problem?

I am looking forward to your responses!

È stato utile?

Soluzione

import java.util.concurrent.*;

public class PostProcExecutor extends ThreadPoolExecutor {

  // adjust the constructor to your desired threading policy
  public PostProcExecutor(int corePoolSize, int maximumPoolSize,
      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  }

  @Override
  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable) {
      @Override
      protected void done()
      {
        if(!isCancelled()) try {
          processResult(get());
        } catch(InterruptedException ex) {
          throw new AssertionError("on complete task", ex);
        } catch(ExecutionException ex) {
          // no result available
        }
      }
    };
  }

  protected void processResult(Object o)
  {
    System.out.println("Result "+o);// do your post-processing here
  }
}

Altri suggerimenti

Use a ExecutorCompletionService. This way you can get the result of the Callable(s) as soon as they are ready. The take method of the completion service blocks waiting for each tasks to be done.

Here is an example from the java doc:

void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     for (Callable<Result> s : solvers)
         ecs.submit(s);
     int n = solvers.size();
     for (int i = 0; i < n; ++i) {
         Result r = ecs.take().get();
         if (r != null)
             use(r);
     }
 }

You can use MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_NUMBER)); to create service and use guava ListenableFuture for parsing result immidiatly also you can wtite your bike for Listen future result.

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

You can use ExecutorCompletionService to implement it.

The following steps can help you some.

  1. Populate the number of available processors using Runtime.getRuntime().availableProcessors(). Let's keep the value in variable availableProcessors.

  2. Initilize ExecutorService, like service = Executors.newFixedThreadPool(availableProcessors)

  3. Initialize ExecutorCompletionService, assume the result from Callable is an Integer Array Integer[], ExecutorCompletionService completionService = new ExecutorCompletionService(service)

  4. Use completionService.submit to submit the task.

  5. Use completionService.take().get() to get each result of a task (callable).

Based on the above steps you can get the results of all callable, and do some business you would like to.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top