Question

Let's say I have two sequences returning integers 1 to 5.

The first returns 1, 2 and 3 very fast, but 4 and 5 take 200ms each.

public static IEnumerable<int> FastFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i > 3) Thread.Sleep(200);
        yield return i;
    }
}

The second returns 1, 2 and 3 with a 200ms delay, but 4 and 5 are returned fast.

public static IEnumerable<int> SlowFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i < 4) Thread.Sleep(200);
        yield return i;
    }
}

Unioning both these sequences give me just numbers 1 to 5.

FastFirst().Union(SlowFirst());

I cannot guarantee which of the two methods has delays at what point, so the order of the execution cannot guarantee a solution for me. Therefore, I would like to parallelise the union, in order to minimise the (artifical) delay in my example.

A real-world scenario: I have a cache that returns some entities, and a datasource that returns all entities. I'd like to be able to return an iterator from a method that internally parallelises the request to both the cache and the datasource so that the cached results yield as fast as possible.

Note 1: I realise this is still wasting CPU cycles; I'm not asking how can I prevent the sequences from iterating over their slow elements, just how I can union them as fast as possible.

Update 1: I've tailored achitaka-san's great response to accept multiple producers, and to use ContinueWhenAll to set the BlockingCollection's CompleteAdding just the once. I just put it here since it would get lost in the lack of comments formatting. Any further feedback would be great!

public static IEnumerable<TResult> SelectAsync<TResult>(
    params IEnumerable<TResult>[] producer)
{
    var resultsQueue = new BlockingCollection<TResult>();

    var taskList = new HashSet<Task>();
    foreach (var result in producer)
    {
        taskList.Add(
            Task.Factory.StartNew(
                () =>
                    {
                        foreach (var product in result)
                        {
                            resultsQueue.Add(product);
                        }
                    }));
    }

    Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());

    return resultsQueue.GetConsumingEnumerable();
}
Was it helpful?

Solution

Take a look at this. The first method just returns everything in order results come. The second checks uniqueness. If you chain them you will get the result you want I think.

public static class Class1
{
    public static IEnumerable<TResult> SelectAsync<TResult>(
        IEnumerable<TResult> producer1,
        IEnumerable<TResult> producer2,
        int capacity)
    {
        var resultsQueue = new BlockingCollection<TResult>(capacity);
        var producer1Done = false;
        var producer2Done = false;

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer1)
            {
                resultsQueue.Add(product);
            }
            producer1Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer2)
            {
                resultsQueue.Add(product);
            }
            producer2Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        return resultsQueue.GetConsumingEnumerable();
    }


    public static IEnumerable<TResult> SelectAsyncUnique<TResult>(this IEnumerable<TResult> source)
    {
        HashSet<TResult> knownResults = new HashSet<TResult>();
        foreach (TResult result in source)
        {
            if (knownResults.Contains(result)) {continue;}
            knownResults.Add(result);
            yield return result;
        }
    }
}

OTHER TIPS

The cache would be nearly instant compared to fetching from the database, so you could read from the cache first and return those items, then read from the database and return the items except those that were found in the cache.

If you try to parallelise this, you will add a lot of complexity but get quite a small gain.

Edit:

If there is no predictable difference in the speed of the sources, you could run them in threads and use a synchronised hash set to keep track of which items you have already got, put the new items in a queue, and let the main thread read from the queue:

public static IEnumerable<TItem> GetParallel<TItem, TKey>(Func<TItem, TKey> getKey, params IEnumerable<TItem>[] sources) {
  HashSet<TKey> found = new HashSet<TKey>();
  List<TItem> queue = new List<TItem>();
  object sync = new object();
  int alive = 0;
  object aliveSync = new object();
  foreach (IEnumerable<TItem> source in sources) {
    lock (aliveSync) {
      alive++;
    }
    new Thread(s => {
      foreach (TItem item in s as IEnumerable<TItem>) {
        TKey key = getKey(item);
        lock (sync) {
          if (found.Add(key)) {
            queue.Add(item);
          }
        }
      }
      lock (aliveSync) {
        alive--;
      }
    }).Start(source);
  }
  while (true) {
    lock (sync) {
      if (queue.Count > 0) {
        foreach (TItem item in queue) {
          yield return item;
        }
        queue.Clear();
      }
    }
    lock (aliveSync) {
      if (alive == 0) break;
    }
    Thread.Sleep(100);
  }
}

Test stream:

public static IEnumerable<int> SlowRandomFeed(Random rnd) {
  int[] values = new int[100];
  for (int i = 0; i < 100; i++) {
    int pos = rnd.Next(i + 1);
    values[i] = i;
    int temp = values[pos];
    values[pos] = values[i];
    values[i] = temp;
  }
  foreach (int value in values) {
    yield return value;
    Thread.Sleep(rnd.Next(200));
  }
}

Test:

Random rnd = new Random();
foreach (int item in GetParallel(n => n, SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd), SlowRandomFeed(rnd))) {
  Console.Write("{0:0000 }", item);
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top