Question

I'm looking to build a parallel cache. The requirement is that there will be n number of datacollectors that need to be fired at once. Each of these data collectors will hit a boundary layer (call this the service layer) and retrieve data. However, since this is within the same request (WCF), if 2 data collectors need to invoke the same method on the service layer, I don't want the second request to wait for the first one to complete.

This needs to be build transparently to developers building the data collectors (Using Unity Interception to insert this caching aspect).

here is what the flow would look like. Would Reactive extensions be the correct fit for this kind of a design? I haven't worked with Rx in the past, and do not want to hit a brick wall 10 days into development. Otherwise, a combination of async, await and events might also serve well here.

EDIT: I implemented this using Rx - works well in a multi threaded context. The interesting bit was to try add instead of tryGet. (This is an Unity Interception CallHandler)

 /// <summary>
    /// Intercepts the calls and tries to retrieve from the cache
    /// </summary>
    class CacheCallHandler : ICallHandler
    {

        [Dependency]
        public ICache RequestCache { get; set; }

        public IMethodReturn Invoke(IMethodInvocation input, GetNextHandlerDelegate getNext)
        {
            IMethodReturn mesg = null;

            string cacheKey = CacheKeyGenerator.GetCacheKey(input);

            //create the task to retrieve the data
            var task = new Task<IMethodReturn>(() =>
            {
                return getNext()(input, getNext);
            });

            //make it observable
            var observableItem = task.ToObservable();

            //try to add it to the cache
            //we need to do this in the order of Add and then try to get, otherwise multiple thread might enter the same area
            if (RequestCache.TryAdd(cacheKey, observableItem))
            {
                //if the add succeeed, it means that we are responsible to starting this task
                task.Start();
            }
            else
            {
                if ( RequestCache.TryGetValue(cacheKey, out observableItem) )
                {
                    //do nothing, the observable item is already updated with the requried reference
                }
                else
                {
                    throw new CacheHandlerException("Could not add to cache AND could not retrieve from cache either. Something's wrong", input);
                }
            }

            //observe the return 
            if ( observableItem != null )
                mesg = observableItem.FirstOrDefault();

            if (mesg == null)
                throw new CacheHandlerException("Not return value found. this should not happen", input);

            return mesg;
        }


        /// <summary>
        /// Should always be the first to execute on the boundary
        /// </summary>
        public int Order
        {
            get { return 1; }
            set { ; }
        }
    }

enter image description here

Was it helpful?

Solution

Yes, Rx is an excellent fit for this.

I suggest you look at implementing the following dictionary to back your Key Cache:

Dictionary<K, AsyncSubject<V>>

Your fetch data asynchronously part just needs to subscribe to the subject to populate it with the results.

OTHER TIPS

https://github.com/reactiveui/ReactiveUI/blob/master/ReactiveUI/ObservableAsyncMRUCache.cs already does basically exactly what you want, specifically wrt the two requests for the same content being "debounced". From the comments:

/// ObservableAsyncMRUCache implements memoization for asynchronous or
/// expensive to compute methods. This memoization is an MRU-based cache
/// with a fixed limit for the number of items in the cache.     
///
/// This class guarantees that only one calculation for any given key is
/// in-flight at a time, subsequent requests will wait for the first one and
/// return its results (for example, an empty web image cache that receives
/// two concurrent requests for "Foo.jpg" will only issue one WebRequest -
/// this does not mean that a request for "Bar.jpg" will wait on "Foo.jpg").
///
/// Concurrency is also limited by the maxConcurrent parameter - when too
/// many in-flight operations are in progress, further operations will be
/// queued until a slot is available.

I would lean towards an async solution, specifically one using AsyncLazy<T> (from my blog):

public sealed class MyCache<TKey, TValue>
{
  private readonly ConcurrentDictionary<TKey, AsyncLazy<TValue>> dictionary =
      new ConcurrentDictionary<TKey, AsyncLazy<TValue>>();

  private readonly Func<TKey, Task<TValue>> LookupAsync;

  public MyCache(Func<TKey, Task<TValue>> lookupAsync)
  {
    LookupAsync = lookupAsync;
  }

  public AsyncLazy<TValue> Get(TKey key)
  {
    return dictionary.GetOrAdd(key,
        key => new AsyncLazy<TValue>(() => lookupAsync(key)));
  }
}

This is a very simplistic "cache", since it has no expiration. It can be used as such:

MyCache<string, MyResource> cache = new MyCache<string, MyResource>(async key =>
{
  MyResource ret = await DataLayer.GetResourceAsync(key);
  ...
  return ret;
});
MyResource resource = await cache.Get("key");

In multithreading situations, GetOrAdd may create an extra AsyncLazy<TValue>, but it will never be awaited, so lookupAsync will only be called once per TKey. Also note that lookupAsync is always called from the thread pool.

P.S. If you do go async, you may find my async WCF post helpful.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top