Question

I am using a third party library that iterates over some very large flat files which can take many minutes. The library provides an Enumerator so you can yield each result and process it while the enumerator then extracts the next item in the flat file.

eg:

IEnumerable<object> GetItems()
{
    var cursor = new Cursor;

    try
    {
        cursor.Open();

        while (!cursor.EOF)
        {
            yield return new //object;

            cursor.MoveNext();
        }

    }
    finally
    {
        if (cursor.IsOpen)
        {
            cursor.Close();
        }
    }
}

What I am trying to achieve is to have two consumers of the same Enumerable so I don't have to extract the information twice and so each consumer can still process each item as it arrives with out having to wait for all the times to arrive at once.

IEnumerable<object> items = GetItems();

new Thread(SaveToDateBase(items)).Start();
new Thread(SaveSomewhereElse(items)).Start();

I guess what I am trying to achieve is something like

"if the item the consumer is asking for is already extracted then yield it, otherwise move next and wait" but I am conscious of possible MoveNext() clashes between the two threads.

Does something like this already exits if not any thoughts on how it would be achieved?

Thanks

Was it helpful?

Solution

Essentially what you want is to cache an IEnumerable<T>'s data, but without waiting for it to finish before storing it. You can do something like this:

public static IEnumerable<T> Cache<T>(this IEnumerable<T> source)
{
    return new CacheEnumerator<T>(source);
}

private class CacheEnumerator<T> : IEnumerable<T>
{
    private CacheEntry<T> cacheEntry;
    public CacheEnumerator(IEnumerable<T> sequence)
    {
        cacheEntry = new CacheEntry<T>();
        cacheEntry.Sequence = sequence.GetEnumerator();
        cacheEntry.CachedValues = new List<T>();
    }

    public IEnumerator<T> GetEnumerator()
    {
        if (cacheEntry.FullyPopulated)
        {
            return cacheEntry.CachedValues.GetEnumerator();
        }
        else
        {
            return iterateSequence<T>(cacheEntry).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}

private static IEnumerable<T> iterateSequence<T>(CacheEntry<T> entry)
{
    for (int i = 0; entry.ensureItemAt(i); i++)
    {
        yield return entry.CachedValues[i];
    }
}

private class CacheEntry<T>
{
    public bool FullyPopulated { get; private set; }
    public IEnumerator<T> Sequence { get; set; }

    //storing it as object, but the underlying objects will be lists of various generic types.
    public List<T> CachedValues { get; set; }

    private static object key = new object();
    /// <summary>
    /// Ensure that the cache has an item a the provided index.  If not, take an item from the 
    /// input sequence and move to the cache.
    /// 
    /// The method is thread safe.
    /// </summary>
    /// <returns>True if the cache already had enough items or 
    /// an item was moved to the cache, 
    /// false if there were no more items in the sequence.</returns>
    public bool ensureItemAt(int index)
    {
        //if the cache already has the items we don't need to lock to know we 
        //can get it
        if (index < CachedValues.Count)
            return true;
        //if we're done there's no race conditions hwere either
        if (FullyPopulated)
            return false;

        lock (key)
        {
            //re-check the early-exit conditions in case they changed while we were
            //waiting on the lock.

            //we already have the cached item
            if (index < CachedValues.Count)
                return true;
            //we don't have the cached item and there are no uncached items
            if (FullyPopulated)
                return false;

            //we actually need to get the next item from the sequence.
            if (Sequence.MoveNext())
            {
                CachedValues.Add(Sequence.Current);
                return true;
            }
            else
            {
                Sequence.Dispose();
                FullyPopulated = true;
                return false;
            }
        }
    }
}

Example usage:

private static IEnumerable<int> interestingIntGenertionMethod(int maxValue)
{
    for (int i = 0; i < maxValue; i++)
    {
        Thread.Sleep(1000);
        Console.WriteLine("actually generating value: {0}", i);
        yield return i;
    }
}

public static void Main(string[] args)
{
    IEnumerable<int> sequence = interestingIntGenertionMethod(10)
        .Cache();

    int numThreads = 3;
    for (int i = 0; i < numThreads; i++)
    {
        int taskID = i;
        Task.Factory.StartNew(() =>
        {
            foreach (int value in sequence)
            {
                Console.WriteLine("Task: {0} Value:{1}",
                    taskID, value);
            }
        });
    }

    Console.WriteLine("Press any key to exit...");
    Console.ReadKey(true);
}

OTHER TIPS

Pipelines pattern implementation using .NET 4 BlockingCollection<T> and TPL Tasks is what you are looking for. See my answer with complete example in this StackOverflow post.

Example: 3 simultenious consumers

BlockingCollection<string> queue = new BlockingCollection<string>();    
public void Start()
{
    var producerWorker = Task.Factory.StartNew(() => ProducerImpl());
    var consumer1 = Task.Factory.StartNew(() => ConsumerImpl());
    var consumer2 = Task.Factory.StartNew(() => ConsumerImpl());
    var consumer3 = Task.Factory.StartNew(() => ConsumerImpl());

    Task.WaitAll(producerWorker, consumer1, consumer2, consumer3);
}

private void ProducerImpl()
{
   // 1. Read a raw data from a file
   // 2. Preprocess it
   // 3. Add item to a queue
   queue.Add(item);
}

// ConsumerImpl must be thrad safe 
// to allow launching multiple consumers simulteniously
private void ConsumerImpl()
{
    foreach (var item in queue.GetConsumingEnumerable())
    {
        // TODO
    }
}

If something is still not clear, please let me know.

High level diagram of pipelines flow:

enter image description here

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top