Question

I'm working on a project that has a thread pool to which it submits tasks. Each task is a chain, so to speak. When the task executes, it does what it needs to do, then checks the result. Each of these tasks contains a map of results (which is just an enum) and additional tasks. These are called within the same thread and the cycle repeats until there are no more tasks, at which point it goes back up the chain, adding each result to a collection and returning that to the main thread. Q&D example:

public abstract class MyCallable implements Callable<MyResponse> {

private Map<ResponseEnum, List<MyCallable>> callbacks;

public List<MyResponse> call() {
    List<MyResponse> resp = new ArrayList<MyResponse>();
    try{
        //Run the process method and collect the result
        MyResponse response = process();
        List<MyCallable> next = callbacks.get(response.getResult());

        if (next != null && !next.isEmpty()){
            //Run within same thread, return results
            for (MyCallable m : next){
                resp.addAll(m.call();
            }
            return resp;
        } else {
            //No more responses, pass them back up the chain
            resp.add(response);
            return list;
        }
    //Anything goes wrong, we catch it here and wrap it in a response
    } catch (Exception e){
         resp.add(new MyExceptionResponse(e));
         return resp;
    }
}

//Implemented by all child classes, does the actual work  
public abstract MyResponse process() throws Exception;

Bear in mind that this is also a prototype that I have not yet really tested out, so I'm aware that this may not be perfect or necessarily completely feasible.

The concern I have is this: A task is added to the thread pool and begins execution. In the main thread, a Future is created and a .get(N, TimeUnit) is called on it to retrieve the result. What if that task times out? We get a TimeoutException. Now, within a try/catch block I could cancel the Future, but is there any way for me to cancel the Future and extract the results, at least as far as they go? Three tasks may have executed and returned results before the the fourth stalled out. The try/catch in MyCallable should return a result and push it back up the chain if there's an exception (Ie, InterruptedException when .cancel(true) is called), but is it possible for me to get that result?

Of course, if I'm going about this completely wrong in the first place, that would also be good to know. This is my first big foray into multithreading.

EDIT: Okay, with that in mind, a wrapper has been placed around the MyCallable class. The wrapper implements Callable and returns the collection. The collection is passed down the chain of MyCallable objects and the results added, so if the Future.get times out, we can retrieve the collection and get the partial results.

However, this brings up a potential race condition. If the current MyCallable being invoked is waiting for an external service, then the Future.cancel(true) operation will cause an InterruptedException within MyCallable. This is caught and the exception is wrapped in a response object and added to the collection. The thing is, if the main thread cancels the Future, synchronizes on the wrapper or the collection in the wrapper, and then gets the collection, will that create a race condition between the getting of the collection and the try/catch block in MyCallable adding the wrapped exception to the collection? Or will the main thread wait for the catching of the exception and then execute the next line?

Was it helpful?

Solution

At the point when you get your TimeoutException, the task submitted to the Executor Service is merrily going forth on its way: it is only your waiting which has received the exception. That presumably means that the result map is still being populated.

What you could do is is use a concurrent map and safely extract whatever results are present after the timeout has occurred.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top