Question

I'm looking to lazily create something and cache the results as an optimization. Is the code below safe and efficient, or is there a better way to do this? Is the compare and set loop needed here?

...
AtomicReference<V> fCachedValue = new AtomicReference<>();

public V getLazy() {
    V result = fCachedValue.get();
    if (result == null) {
        result = costlyIdempotentOperation();
        fCachedValue.set(result);
    }
    return result; 
} 

edit: The value being set in my example here from costlyIdempotentOperation() would always be the same no matter what thread called it.

Was it helpful?

Solution

That is not a great system. The problem is that two threads may find that the result == null, and both will set the fCachedValue to their new result value.

You want to use the compareAndSet(...) method:

AtomicReference<V> fCachedValue = new AtomicReference<>();

public V getLazy() {
    V result = fCachedValue.get();
    if (result == null) {
        result = costlyIdempotentOperation();
        if (!fCachedValue.compareAndSet(null, result)) {
            return fCachedValue.get();
        }
    }
    return result; 
} 

If multiple threads get in to the method before it has been initialized, they may all try to create the large result instance. They will all create their own version of it, but the first one to complete the process will be the one who gets to store their result in the AtomicReference. The other threads will complete their work, then dispose of their result and instead use the result instance created by the 'winner'.

OTHER TIPS

For a similar purpose I implemented OnceEnteredCallable which returns a ListenableFuture for a result. The advantage is that the other threads are not being blocked and this costly operation is being called once.

Usage (requires Guava):

Callable<V> costlyIdempotentOperation = new Callable<>() {...};

// this would block only the thread to execute the callable
ListenableFuture<V> future = new OnceEnteredCallable<>().runOnce(costlyIdempotentOperation);

// this would block all the threads and set the reference
fCachedValue.set(future.get());

// this would set the reference upon computation, Java 8 syntax
future.addListener(() -> {fCachedValue.set(future.get())}, executorService);

Try AtomicInitializer or AtomicSafeInitializer:

class CachedValue extends AtomicInitializer<V> {
  @Override
  public V initialize() {
    return costlyIdempotentOperation();
  }
}

This expands the answer by @TwoThe on how AtomicReference<Future<V>> may be used.

Basically, if you don't mind having (a little bit more expensive) synchronized sections in your code, the easiest (and the most readable) solution would be to use the Double-checked Locking idiom (with volatile).

If you still want to utilize the CAS (this is what the whole family of Atomic* types is about), you have to use AtomicReference<Future<V>>, not AtomicReference<V> (or you may end up having multiple threads computing the same expensive value).

But here's another catch: you may obtain a valid Future<V> instance and share it between multiple threads, but the instance itself may be unusable because your costly computation may have failed. This leads us to the need to re-set the atomic reference we have (fCachedValue.set(null)) in some or all exceptional situations.

The above implies that it's no longer sufficient to call fCachedValue.compareAndSet(null, new FutureTask(...)) once -- you'll have to atomically test whether the reference contains a non-null value and re-initialize it if necessary (on each invocation). Luckily, the AtomicReference class has the getAndUpdate(...) method which merely invokes compareAndSet(...) in a loop. So the resulting code might look like this:

class ConcurrentLazy<V> implements Callable<V> {
    private final AtomicReference<Future<V>> fCachedValue = new AtomicReference<>();

    private final Callable<V> callable;

    public ConcurrentLazy(final Callable<V> callable) {
        this.callable = callable;
    }

    /**
     * {@inheritDoc}
     *
     * @throws Error if thrown by the underlying callable task.
     * @throws RuntimeException if thrown by the underlying callable task,
     *         or the task throws a checked exception,
     *         or the task is interrupted (in this last case, it's the
     *         client's responsibility to process the cause of the
     *         exception).
     * @see Callable#call()
     */
    @Override
    public V call() {
        final RunnableFuture<V> newTask = new FutureTask<>(this.callable);
        final Future<V> oldTask = this.fCachedValue.getAndUpdate(f -> {
            /*
             * If the atomic reference is un-initialised or reset,
             * set it to the new task. Otherwise, return the
             * previous (running or completed) task.
             */
            return f == null ? newTask : f;
        });

        if (oldTask == null) {
            /*
             * Compute the new value on the current thread. 
             */
            newTask.run();
        }

        try {
            return (oldTask == null ? newTask : oldTask).get();
        } catch (final ExecutionException ee) {
            /*
             * Re-set the reference.
             */
            this.fCachedValue.set(null);

            final Throwable cause = ee.getCause();
            if (cause instanceof Error) {
                throw (Error) cause;
            }
            throw toUnchecked(cause);
        } catch (final InterruptedException ie) {
            /*
             * Re-set the reference.
             */
            this.fCachedValue.set(null);

            /*
             * It's the client's responsibility to check the cause.
             */
            throw new RuntimeException(ie);
        }
    }

    private static RuntimeException toUnchecked(final Throwable t) {
        return t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t);
    }
}

In Kotlin, the above can be expressed in a much simpler way (() -> V denotes your lazy computation):

import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.FutureTask
import java.util.concurrent.atomic.AtomicReference

fun <V> (() -> V).concurrent(): () -> V {
  /*
   * The cached result of the computation.
   */
  val valueRef = AtomicReference<Future<V>?>()

  return {
    val newTask = FutureTask(this)

    val oldTaskOrNull = valueRef.getAndUpdate { oldTaskOrNull ->
      oldTaskOrNull ?: newTask
    }

    if (oldTaskOrNull == null) {
      /*
       * Compute the new value on the current thread.
       */
      newTask.run()
    }

    try {
      (oldTaskOrNull ?: newTask).get()
    }
    catch (ee: ExecutionException) {
      /*
       * Re-set the reference.
       */
      valueRef.set(null)

      /*
       * Don't mask the compilation failure with an ExecutionException.
       */
      throw ee.cause ?: ee
    }
    catch (e: Exception) {
      /*
       * Re-set the reference.
       */
      valueRef.set(null)

      /*
       * Most probably, an InterruptedException.
       */
      throw e
    }
  }
}

Usage example:

  val lambda = {
    println("Computing 2x2...")

    val timeNanos = System.nanoTime()
    if (timeNanos % 2L == 0L) {
      throw IOException(timeNanos.toString())
    }

    2 * 2
  }.concurrent()

  val resultSeq = sequence {
    while (true) {
      val element = try {
        lambda().toString()
      }
      catch (ioe: IOException) {
        ioe.toString()
      }

      yield(element)
    }
  }

  resultSeq.take(50).forEach(::println)

The above code, when run, produces the following output:

Computing 2x2...
java.io.IOException: 93224642168398
Computing 2x2...
4
4
4
4
4
4
...

The result of the computation can be re-set multiple times, but once a successful result is reached, it is always returned. At the same time, while the computation is being run, the lambda can be shared between multiple threads (e. g.: cached in a ConcurrentHashMap).

P. S. If you're limited to Java, you might also want to take a look at the CompletableFuture class.

You can properly double-check before you do the costly operation (tm) by using a secondary atomic boolean, like this:

AtomicReference<V> fCachedValue = new AtomicReference<>();
AtomicBoolean inProgress = new AtomicBoolean(false);

public V getLazy() {
  V result = fCachedValue.get();
  if (result == null) {
    if (inProgress.compareAndSet(false, true)) {          
      result = costlyIdempotentOperation();
      fCachedValue.set(result);
      notifyAllSleepers();
    } else {
      while ((result = fCachedValue.get()) == null) {
        awaitResultOfSet(); // block and sleep until above is done
      }
    }
  }
  return result; 
}

Even though this won't stop threads from blocking if the value is not set yet, it will at least guarantee that the calculation is only done once. And blocking as well means that the CPU is available for other tasks. But note that if you use standard wait/notify, this might cause a thread-lock, if the first notifies and after that the other one waits. You can either do wait(T_MS) or use a more sophisticated tool like AtomicReference<Future<V>>.

As @rolfl points out himself, under a CAS-based approach multiple threads might create their own instances of result, which is supposedly costly.

A well known solution is to use the lock-based lazy initialization pattern. It uses a single lock and it can handle well exceptions thrown while holding the lock, so if correctly applied, this approach is free from most complexities associated to locking.

You just need a synchronized block and a second null check inside it.

AtomicReference<V> fCachedValue = new AtomicReference<>();
private final Object forSettingCachedVal = new Object();

public V getLazy() {
    V result = fCachedValue.get();
    if (result == null) {

        // synchronizing inside the null check avoids thread blockage
        // where unnecessary, and only before initialization.
        synchronized(forSettingCachedVal) {
            // because the thread may have waited for another thread
            // when attempting to enter the synchronized block:
            result = fCachedValue.get();
            // check that this was the first thread to enter the
            // synchronized block. if not, the op is done, so we're done.
            if (result != null) return result;

            // the first thread can now generate that V
            result = costlyIdempotentOperation();
            // compareAndSet isn't strictly necessary, but it allows a
            // subsequent assertion that the code executed as expected,
            // for documentation purposes.
            boolean successfulSet = fCachedValue.compareAndSet(null, result);
            // assertions are good for documenting things you're pretty damn sure about
            assert successfulSet : "something fishy is afoot";
        }
    }
    return result; 
}

This solution, though slightly more complicated than rolfl's, will avoid executing the costly operation more than once. Hence:

  1. that costly operation doesn't have to be idempotent,
  2. thread contention during lazy initialization is out of the picture, and
  3. despite introduction of synchronization, your code may actually execute faster.
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top