Question

I am doing some experiments about concurrency in Java 8

In the ScheduledThreadPoolExecutor API

I can see the two following signatures:

schedule(Callable<V> callable, long delay, TimeUnit unit)
schedule(Runnable command, long delay, TimeUnit unit)

One for Callable and one for Runnable

I can see in the API the two following too:

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

My question is, why do not exists the two equivalents for Callable

scheduleAtFixedRate(Callable<V> callable, long initialDelay, long period, TimeUnit unit)
scheduleWithFixedDelay(Callable<V> callable, long initialDelay, long delay, TimeUnit unit)

I would need retrieve a boolean result for an operation.

Thank You.

Was it helpful?

Solution

What would you expect the return type of scheduleAtFixedRate(Callable<V>) to be? The return type of schedule(Callable<V>) is Future<V>, indicating that at some point in the future, the value of type V returned by the callable will be available. You can wait for this value to be available by calling get() on the Future.

The return type of scheduleAtFixedRate(Callable<V>) cannot be something like Future<List<V>>, as this would mean that at some point in the future, all values returned by the repeated calls to the callable are available. However, there will always be more executions of the callable scheduled, so this list will never exist.

What you need for something like this is the concept of an asynchronous stream of results to which you can subscribe in such a way that you handle each result as it arrives. As far as I know, this doesn't exist in the standard library. One third-party library I know of that contains such a thing is Netflix's RxJava. Using for example a ReplaySubject from that library, you can create the stream of results and handle each result as it is returned:

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import rx.subjects.ReplaySubject;

public class Callables {

    public static void main(String[] args) {
        // Example Callable that returns a boolean result
        Random random = new Random();
        Callable<Boolean> callable = () -> random.nextBoolean();

        // Turn the Callable into a Runnable that adds the last result to the stream of results
        ReplaySubject<Boolean> results = ReplaySubject.create();
        Runnable runnable = () -> {
            try {
                boolean result = callable.call();
                results.onNext(result);
            } catch (Exception e) {
                // Needed since Callable can throw an exception, but Runnable cannot
            }
        };

        // Periodically run the Runnable
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        executor.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);

        // Handling the results as they arrive
        results.forEach(result -> System.out.println("Result: " + result));

        System.out.println("Waiting for results...");
    }

}

If you decide to look at using RxJava, it may be worth using more of their API instead of using an Executor directly. You can use Observable.interval to generate a stream that emits a number periodically, then map this to call your callable. This way you get the same stream of results in a more concise way:

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import rx.Observable;

public class MoreCallables {

    public static void main(String[] args) throws IOException {
        Observable<Long> periodic = Observable.interval(1, TimeUnit.SECONDS);

        Random random = new Random();
        Observable<Boolean> results = periodic.map(i -> random.nextBoolean());

        results.forEach(result -> System.out.println("Result: " + result));

        System.out.println("Waiting for results...");
        System.in.read();
    }

}

OTHER TIPS

following to the @andersschuller solution the more concise form of it may look like:

Observable.interval(1, TimeUnit.SECONDS)
    .map((Long i) -> "tick " + i)
    .forEach(result -> System.out.println("Result: " + result));
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top