Вопрос

Consider a long running computation inside Callable instance.

And consider that the result of this computation can have some precision depending on computation time, i.e.: if task will be cancled than it should return what is computed so far before canceling (for example, we have a conveyor of irrational numbers calculating).

It is desirable to implement this paradigm using standard java concurency utils, e.g.

Callable<ValuableResult> task = new Callable<>() { ... };
Future<ValuableResult> future = Executors.newSingleThreadExecutor().submit(task);
try {
    return future.get(timeout, TimeUnit.SECONDS);
} catch (TimeoutException te) {
    future.cancel(true);
    // HERE! Get what was computed so far
}

It seems, that without full reimplementing of Future and ThreadPoolExecutor interfaces this issue can not be solved. Are any convient existing tools for that in Java 1.7?

Это было полезно?

Решение 2

Well, it seems to me, that the most simple way in this case is to prepare some final ResultWrapper object, which will be passed inside this Callable instance:

final ValuableResultWrapper wrapper = new ValuableResultWrapper();
final CountDownLatch latch = new CountDownLatch(1);

Callable<ValuableResultWrapper> task = new Callable<>() { 
   ... 
   wrapper.setValue(...); // here we set what we have computed so far
   latch.countDown();
   return wrapper;
   ...  
};
Future<ValuableResultWrapper> future = Executors.newSingleThreadExecutor().submit(task);
try {
    return future.get(timeout, TimeUnit.SECONDS);
} catch (TimeoutException te) {
    future.cancel(true);
    // HERE! Get what was computed so far
    latch.await();
    return wrapper;
}

UPD: In such implemetation (which becomes to complicated) we have to introduce some kind of latch (CountDownLatch in my example) to be sure, that task will be completed before we done return wrapper;

Другие советы

Instead of canceling it through the Future's API, tell it to finish through a mechanism of your own (such as a long that you pass into the constructor, which tells it how long to run before returning normally; or an AtomicBoolean you set to true).

Keep in mind that once the task actually starts, cancel (true) doesn't magically stop it. All it does then is to interrupt the thread. There are a few methods that check this flag and throw InterruptedException, but otherwise you'll have to manually check the isInterrupted flag. So, given that you need to code that cooperative mechanism anyway, why not just make it one that better suits your requirements?

CompletionSerivce is a more powerful than only FutureTask and in many case it's more suitable. I get some idea from it to solve the problem. Besides, its subclass public ExecutorCompletionService is simple than FutureTask, just including a few lines code. It's easy to read. So I modify the class to get partly computed result. A satisfying solution for me, after all, it looks simple and clear.

Demo code:

CompletionService<List<DeviceInfo>> completionService =
                new MyCompletionService<>(Executors.newCachedThreadPool());   
        Future task = completionService.submit(detector);
    try {
        LogHelper.i(TAG, "result 111: " );
        Future<List<DeviceInfo>> result = completionService.take();
        LogHelper.i(TAG, "result: " + result.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

This is the class code:

import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

/**
*  This is a CompletionService like java.util.ExecutorCompletionService, but we can get partly computed result
 *  from our FutureTask which returned from submit, even we cancel or interrupt it.
 *  Besides, CompletionService can ensure that the FutureTask is done when we get from take or poll method.
 */
public class MyCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * FutureTask extension to enqueue upon completion.
     */
    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }

    private static class DoneFutureTask<V> extends FutureTask<V> {
        private Object outcome;

        DoneFutureTask(Callable<V> task) {
            super(task);
        }

        DoneFutureTask(Runnable task, V result) {
            super(task, result);
        }

        @Override
        protected void set(V v) {
            super.set(v);
            outcome = v;
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            try {
                return super.get();
            } catch (CancellationException e) {
                return (V)outcome;
            }
        }

    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
            return new DoneFutureTask<V>(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
            return new DoneFutureTask<V>(task, result);
    }

    /**
     * Creates an MyCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public MyCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * Creates an MyCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        {@code Queue.add} operations for completed tasks cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are {@code null}
     */
    public MyCompletionService(Executor executor,
                               BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top