This expands the answer by @TwoThe on how AtomicReference<Future<V>>
may be used.
Basically, if you don't mind having (a little bit more expensive) synchronized
sections in your code, the easiest (and the most readable) solution would be to use the Double-checked Locking idiom (with volatile
).
If you still want to utilize the CAS (this is what the whole family of Atomic*
types is about), you have to use AtomicReference<Future<V>>
, not AtomicReference<V>
(or you may end up having multiple threads computing the same expensive value).
But here's another catch: you may obtain a valid Future<V>
instance and share it between multiple threads, but the instance itself may be unusable because your costly computation may have failed. This leads us to the need to re-set the atomic reference we have (fCachedValue.set(null)
) in some or all exceptional situations.
The above implies that it's no longer sufficient to call fCachedValue.compareAndSet(null, new FutureTask(...))
once -- you'll have to atomically test whether the reference contains a non-null
value and re-initialize it if necessary (on each invocation). Luckily, the AtomicReference
class has the getAndUpdate(...)
method which merely invokes compareAndSet(...)
in a loop. So the resulting code might look like this:
class ConcurrentLazy<V> implements Callable<V> {
private final AtomicReference<Future<V>> fCachedValue = new AtomicReference<>();
private final Callable<V> callable;
public ConcurrentLazy(final Callable<V> callable) {
this.callable = callable;
}
/**
* {@inheritDoc}
*
* @throws Error if thrown by the underlying callable task.
* @throws RuntimeException if thrown by the underlying callable task,
* or the task throws a checked exception,
* or the task is interrupted (in this last case, it's the
* client's responsibility to process the cause of the
* exception).
* @see Callable#call()
*/
@Override
public V call() {
final RunnableFuture<V> newTask = new FutureTask<>(this.callable);
final Future<V> oldTask = this.fCachedValue.getAndUpdate(f -> {
/*
* If the atomic reference is un-initialised or reset,
* set it to the new task. Otherwise, return the
* previous (running or completed) task.
*/
return f == null ? newTask : f;
});
if (oldTask == null) {
/*
* Compute the new value on the current thread.
*/
newTask.run();
}
try {
return (oldTask == null ? newTask : oldTask).get();
} catch (final ExecutionException ee) {
/*
* Re-set the reference.
*/
this.fCachedValue.set(null);
final Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw (Error) cause;
}
throw toUnchecked(cause);
} catch (final InterruptedException ie) {
/*
* Re-set the reference.
*/
this.fCachedValue.set(null);
/*
* It's the client's responsibility to check the cause.
*/
throw new RuntimeException(ie);
}
}
private static RuntimeException toUnchecked(final Throwable t) {
return t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);
}
}
In Kotlin, the above can be expressed in a much simpler way (() -> V
denotes your lazy computation):
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.atomic.AtomicReference
fun <V> (() -> V).concurrent(): () -> V {
/*
* The cached result of the computation.
*/
val valueRef = AtomicReference<Future<V>?>()
return {
val newTask = FutureTask(this)
val oldTaskOrNull = valueRef.getAndUpdate { oldTaskOrNull ->
oldTaskOrNull ?: newTask
}
if (oldTaskOrNull == null) {
/*
* Compute the new value on the current thread.
*/
newTask.run()
}
try {
(oldTaskOrNull ?: newTask).get()
}
catch (ee: ExecutionException) {
/*
* Re-set the reference.
*/
valueRef.set(null)
/*
* Don't mask the compilation failure with an ExecutionException.
*/
throw ee.cause ?: ee
}
catch (e: Exception) {
/*
* Re-set the reference.
*/
valueRef.set(null)
/*
* Most probably, an InterruptedException.
*/
throw e
}
}
}
Usage example:
val lambda = {
println("Computing 2x2...")
val timeNanos = System.nanoTime()
if (timeNanos % 2L == 0L) {
throw IOException(timeNanos.toString())
}
2 * 2
}.concurrent()
val resultSeq = sequence {
while (true) {
val element = try {
lambda().toString()
}
catch (ioe: IOException) {
ioe.toString()
}
yield(element)
}
}
resultSeq.take(50).forEach(::println)
The above code, when run, produces the following output:
Computing 2x2...
java.io.IOException: 93224642168398
Computing 2x2...
4
4
4
4
4
4
...
The result of the computation can be re-set multiple times, but once a successful result is reached, it is always returned. At the same time, while the computation is being run, the lambda
can be shared between multiple threads (e. g.: cached in a ConcurrentHashMap
).
P. S. If you're limited to Java, you might also want to take a look at the CompletableFuture
class.