In Java how to shutdown executorservice when it may submit additional tasks to itself

StackOverflow https://stackoverflow.com/questions/10948024

  •  13-06-2021
  •  | 
  •  

Вопрос

I have a pipeline of tasks (each task in the pipeline has different parallelism requirements), each task works in a different ExecutorService. Tasks work on packets of data, so if we have 10 datapackets then 10 tasks will be submitted to service1, one task per data packet. Once a task submitted to service1 has actually invoked it may submit a new task to work further on the datapacket to service2, service3 or not.

The following code works fine, i.e.:

  • shutdown() is invoked on service1 after everything has been submitted to service1
  • Then awaitTermination() does not return until all the tasks that were submitted before the shutdown() have actually completed running. -- shutdown() is then invoked on service2 but because all tasks submitted to service1 have completed, and all tasks are submitted to service2 from tasks on service1 all tasks have been submitted to service2 before shutdown() is called on service2. -- and so on for service3

    ExecutorService[] services = {
            service1,
            service2,
            service3};
    
    int count = 0;
    for(ExecutorService service: services)
    {   
        service.shutdown();
        service.awaitTermination(1, TimeUnit.HOURS);
    }
    

However I have now added a case whereby service2 can break a datapacket into a smaller packet and submit additional tasks on service2 and the code is now failing. The problem is that shutdown() is called on service2 once all the tasks on service1 have completed, but now we want to submit additional service2 tasks from a task running in service2

My questions:

  1. Does shutdown() rerun after all submitted tasks have finished running, or does it return immediately but just doesn't stop already submitted tasks from running ? Update:answered below
  2. How do I solve my new problem ?
Это было полезно?

Решение 5

Matts question looks like it may well work but Im concerned it may cause new issues.

Ive come up with a solution which works without many code changes for my scenario, although it seems a bit clunky

Ive introduced a new service (service2a) that runs the same task as service2. When a task in service2 wants to submit a small data packet it submits it to service2a rather than service2 so all sub packets are submitted to service2a before service 2 shutdowns. This works for me as the smaller data packets dont need to be broken down into further subpackets and the subpackets idea only applies to service2(a) not any of the other services.

Другие советы

"shutdown" simply tells the pool not to accept any more work. It does nothing more. All existing submitted work will be executed as normal. When the queue is drained, the pool will actually destroy all it's threads and terminate.

The problem here is that you're saying that tasks in service2 will submit additional tasks to service2 for processing. There seems to be no way to know when you should actually call a shutdown. But alas, there is an alternative, assuming these smaller packets don't break down further into service.

List<Future<Void>> service2Futures = new ArrayList<Future<Void>>();

service2Futures.add(service2.submit(new Callable<Void>() {
  public Void call() throws Exception {
    // do your work, submit more stuff to service2
    // if you submit Callables, you could use Future.get() to wait on those
    // results.
    return null;
  }
}));

for (Future<Void> future : service2Futures) {
  future.get();
}

service2.shutdown();
...

What's going on here is that you're storing Future objects for the top level submitted tasks (you'll have to use Callable and not Runnable). Instead of immediately shutting the pool down after submission, you simply collect up the Future objects. You then wait until they are all done running by cycling through them, and calling get() on each one. The "get()" method blocks until the thread running that task has completed.

At that point, all of the top level tasks are complete, and they will have submitted second level tasks. You can now issue a shutdown. This assumes the second level tasks don't submit more stuff to service2.

This all being said, if you're using java 7, you should consider taking a look at ForkJoinPool and RecursiveTask instead. It probably makes more sense for what you're doing.

ForkJoinPool forkJoinPool = new ForkJoinPool();
RecursiveAction action = new RecursiveAction() {
    protected void compute() {
         // break down here and build actions
         RecursiveAction smallerActions[] = ...; 
         invokeAll(smallerActions);
    }
};

Future<Void> future = forkJoinPool.submit(action);

ExecutorService#shutdown lets already submitted tasks finish whatever they are doing - javadoc extract:

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.
This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that.

In practice, you can consider that a call to shutdown does several things:

  1. the ExecutorService can't accept new jobs any more
  2. existing threads are terminated once they have finished running

So to answer your questions:

  • if you have submitted all your tasks to service1 before you call service1.shutdown (if you submit anything after that call you will get an exception anyway), you are fine (i.e. if those tasks submit something to service2 and service2 is not shutdown, they will be executed).
  • shutdown returns immediately and does not guarantee that already submitted tasks will stop (they could run forever).
  • The problem you are having is probably linked to how you submit your tasks from one service to another and it seems difficult to solve it with only the information you have given.

The best way would be to include a SSCCE in your question that replicates the behaviour you are seeing.

Instead of shutting down the ExecutorService, you should track the tasks themselves. they can pass around a "job state" object in which they use to keep track of outstanding work, e.g.:

public class JobState {
  private int _runningJobs;

  public synchronized void start() {
    ++_runningJobs;
  }

  public synchronized void finish() {
    --_runningJobs;
    if(_runningJobs == 0) { notifyAll(); }
  }

  public synchronized void awaitTermination() {
    while(_runningJobs > 0) { wait() }
  }
}

public class SomeJob implements Runnable {
  private final JobState _jobState;

  public void run() {
    try {
      // ... do work here, possibly submitting new jobs, and pass along _jobState
    } finally {
      _jobState.finish();
    }
  }
}

// utility method to start a new job
public static void submitJob(ExecutorService executor, Runnable runnable, JobState jobState) {
  // call start _before_ submitting
  jobState.start();
  executor.submit(runnable);
}

// start main work
JobState state = new JobState();
Runnable firstJob = new SomeJob(state);
submitJob(executor, firstJob, state);
state.awaitTermination();

When you call shutdown it does not wait all task will be finished. Do it as you do with awaitTermination.

But once shutdown was called - new task are blocked. Your executor service reject all new task. For ThreadPoolExecutor rejected task handle in RejectedExecutionHandler. If you specify you custom handler you can process rejected after shutdown task. This is one of workarounds.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top