Question

(I am currently restricted to .NET 4.0)

I have a situation where I want to process items in parallel as much as possible, order must be maintained, and items can be added at any time until "stop" is pressed.

Items can come in "bursts", so it is possible that the queue will completely drain, there will be a pause, and then a large number of items will come in at once again.

I want the results to become available as soon as they are done.

Here is a simplified example:

class Program
{
    static void Main(string[] args)
    {
        BlockingCollection<int> itemsQueue = new BlockingCollection<int>();

        Random random = new Random();

        var results = itemsQueue
        .GetConsumingEnumerable()
        .AsParallel()
        .AsOrdered()
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .Select(i =>
        {
            int work = 0;

            Console.WriteLine("Working on " + i);

            //simulate work
            for (int busy = 0; busy <= 90000000; ++busy) { ++work; };

            Console.WriteLine("Finished " + i);


            return i;
        });

        TaskCompletionSource<bool> completion = new TaskCompletionSource<bool>();

        Task.Factory.StartNew(() =>
        {
            foreach (int i in results)
            {
                Console.WriteLine("Result Available: " + i);
            }
            completion.SetResult(true);
        });

        int iterations;
        iterations = random.Next(5, 50);
        Console.WriteLine("------- iterations: " + iterations + "-------");

        for (int i = 1; i <= iterations; ++i)
        {
            itemsQueue.Add(i);
        }

        while (true)
        {
            char c = Console.ReadKey().KeyChar;

            if (c == 's')
            {
                break;
            }
            else
            {
                ++iterations;

                Console.WriteLine("adding: " + iterations);
                itemsQueue.Add(iterations);
            }
        }


        itemsQueue.CompleteAdding();

        completion.Task.Wait();

        Console.WriteLine("Done!");
        Console.ReadKey();
        itemsQueue.Dispose();
    }
}

As the above example shows, what will typically happen, is that results will become available up until the last few results (I'm not 100% sure of this, but the number of results that it stops short may be roughly correlated with the number of cores on the box), until itemsQueue.CompleteAdding(); is called (in the example, the "s" key is pressed), at which point the rest of the results will finally become available.

Why do the results not become available immediately despite the fact that I specify .WithMergeOptions(ParallelMergeOptions.NotBuffered), and how can I make them become available immediately?

Was it helpful?

Solution

Note that the problem is not an issue if you can call BlockingQueue.CompleteAdding() instance method - that will cause all results to finish.

Short Answer

If on the other hand, you need to maintain order, and need to have the results available as soon as they can be, and you don't have an opportunity to call BlockingQueue.CompleteAdding(), then if at all possible, you are much better off having the consumption of items in the queue be non-parallel, but parallelize the processing of each individual task.

E.g.

  class Program
  {
    //Not parallel, but suitable for monitoring queue purposes, 
    //can then focus on parallelizing each individual task
    static void Main(string[] args)
    {
        BlockingCollection<int> itemsQueue = new BlockingCollection<int>();


        Random random = new Random();

        var results = itemsQueue.GetConsumingEnumerable()
        .Select(i =>
        {
            Console.WriteLine("Working on " + i);

            //Focus your parallelization efforts on the work of 
            //the individual task
            //E.g, simulated:
            double work = Enumerable.Range(0, 90000000 - (10 * (i % 3)))
            .AsParallel()
            .Select(w => w + 1)
            .Average();

            Console.WriteLine("Finished " + i);


            return i;
        });

        TaskCompletionSource<bool> completion = new TaskCompletionSource<bool>();

        Task.Factory.StartNew(() =>
        {
            foreach (int i in results)
            {
                Console.WriteLine("Result Available: " + i);
            }
            completion.SetResult(true);
        });

        int iterations;
        iterations = random.Next(5, 50);
        Console.WriteLine("------- iterations: " + iterations + "-------");

        for (int i = 1; i <= iterations; ++i)
        {
            itemsQueue.Add(i);
        }

        while (true)
        {
            char c = Console.ReadKey().KeyChar;

            if (c == 's')
            {
                break;
            }
            else
            {
                ++iterations;

                Console.WriteLine("adding: " + iterations);
                itemsQueue.Add(iterations);
            }
        }


        itemsQueue.CompleteAdding();

        completion.Task.Wait();

        Console.WriteLine("Done!");
        Console.ReadKey();
        itemsQueue.Dispose();
    }
}

Longer Answer

It appears that there is an interaction between the BlockingQueue in particular and AsOrderable()

It seems that AsOrderable will stop the processing of Tasks whenever one of the enumerators in the partition blocks.

The default partitioner will deal with chunks typically greater than one - and the blocking queue will block until the chunk can be filled (or CompleteAdding is filled).

However, even with a chunk size of 1, the problem does not completely go away.

To play around with this, you can sometimes see the behavior when implementing your own partitioner. (Note, that if you specify .WithDegreeOfParallelism(1) the problem with results waiting to appear goes away - but of course, having a degree of parallelism = 1 kind of defeats the purpose!)

e.g.

public class ImmediateOrderedPartitioner<T> : OrderablePartitioner<T>
{
    private readonly IEnumerable<T> _consumingEnumerable;
    private readonly Ordering _ordering = new Ordering();

    public ImmediateOrderedPartitioner(BlockingCollection<T> collection) : base(true, true, true)
    {
        _consumingEnumerable = collection.GetConsumingEnumerable();
    }

    private class Ordering
    {
        public int Order = -1;
    }

    private class MyEnumerator<S> : IEnumerator<KeyValuePair<long, S>>
    {
        private readonly object _orderLock = new object();

        private readonly IEnumerable<S> _enumerable;

        private KeyValuePair<long, S> _current;

        private bool _hasItem;

        private Ordering _ordering;

        public MyEnumerator(IEnumerable<S> consumingEnumerable, Ordering ordering)
        {
            _enumerable = consumingEnumerable;
            _ordering = ordering;
        }

        public KeyValuePair<long, S> Current
        {
            get
            {
                if (_hasItem)
                {
                    return _current;
                }
                else
                    throw new InvalidOperationException();
            }
        }

        public void Dispose()
        {

        }

        object System.Collections.IEnumerator.Current
        {
            get 
            {
                return Current;
            }
        }

        public bool MoveNext()
        {
            lock (_orderLock)
            {
                bool canMoveNext = false;

                var next = _enumerable.Take(1).FirstOrDefault(s => { canMoveNext = true; return true; });

                if (canMoveNext)
                {
                    _current = new KeyValuePair<long, S>(++_ordering.Order, next);
                    _hasItem = true;
                    ++_ordering.Order;
                }
                else
                {
                    _hasItem = false;
                }

                return canMoveNext;
            }
        }

        public void Reset()
        {
            throw new NotSupportedException();
        }
    }

    public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int partitionCount)
    {
        var result = new List<IEnumerator<KeyValuePair<long,T>>>();

        //for (int i = 0; i < partitionCount; ++i)
        //{
        //    result.Add(new MyEnumerator<T>(_consumingEnumerable, _ordering));
        //}

        //share the enumerator between partitions in this case to maintain
        //the proper locking on ordering.
        var enumerator = new MyEnumerator<T>(_consumingEnumerable, _ordering);

        for (int i = 0; i < partitionCount; ++i)
        {
            result.Add(enumerator);
        }

        return result;
    }

    public override bool SupportsDynamicPartitions
    {
        get
        {
            return false;
        }
    }

    public override IEnumerable<T> GetDynamicPartitions()
    {
        throw new NotImplementedException();
        return base.GetDynamicPartitions();
    }

    public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
    {
        throw new NotImplementedException();
        return base.GetOrderableDynamicPartitions();
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        throw new NotImplementedException();
        return base.GetPartitions(partitionCount);
    }
}

class Program
{
    static void Main(string[] args)
    {
        BlockingCollection<int> itemsQueue = new BlockingCollection<int>();

        var partitioner = new ImmediateOrderedPartitioner<int>(itemsQueue);

        Random random = new Random();

        var results = partitioner
        .AsParallel()
        .AsOrdered()
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        //.WithDegreeOfParallelism(1)
        .Select(i =>
        {
            int work = 0;

            Console.WriteLine("Working on " + i);

            for (int busy = 0; busy <= 90000000; ++busy) { ++work; };

            Console.WriteLine("Finished " + i);


            return i;
        });

        TaskCompletionSource<bool> completion = new TaskCompletionSource<bool>();

        Task.Factory.StartNew(() =>
        {
            foreach (int i in results)
            {
                Console.WriteLine("Result Available: " + i);
            }
            completion.SetResult(true);
        });

        int iterations;
        iterations = 1; // random.Next(5, 50);
        Console.WriteLine("------- iterations: " + iterations + "-------");

        for (int i = 1; i <= iterations; ++i)
        {
            itemsQueue.Add(i);
        }

        while (true)
        {
            char c = Console.ReadKey().KeyChar;

            if (c == 's')
            {
                break;
            }
            else
            {
                ++iterations;

                Console.WriteLine("adding: " + iterations);
                itemsQueue.Add(iterations);
            }
        }


        itemsQueue.CompleteAdding();

        completion.Task.Wait();

        Console.WriteLine("Done!");
        Console.ReadKey();
        itemsQueue.Dispose();
    }
}

Alternate Approach If parallelizing the individual task (as recommended in the "short answer") is not a possibility, and all the other problem constraints apply, then you can implement your own type of queue that spins up tasks for each item - thus letting the Task Parallel Library handle the scheduling of work, but synchronize the consumption of results on your own.

For example, something like the below (with the standard "no warranties" disclaimer!)

public class QueuedItem<TInput, TResult>
{
    private readonly object _lockObject = new object();

    private TResult _result;

    private readonly TInput _input;

    private readonly TResult _notfinished;

    internal readonly bool IsEndQueue = false;

    internal QueuedItem()
    {
        IsEndQueue = true;
    }

    public QueuedItem(TInput input, TResult notfinished)
    {
        _input = input;
        _notfinished = notfinished;
        _result = _notfinished;
    }

    public TResult ReadResult()
    {
        lock (_lockObject)
        {
            if (!IsResultReady)
                throw new InvalidOperationException("Check IsResultReady before calling ReadResult()");

            return _result;
        }
    }

    public void WriteResult(TResult value)
    {
        lock (_lockObject)
        {
            if (IsResultReady)
                throw new InvalidOperationException("Result has already been written");

            _result = value;
        }
    }

    public TInput Input { get { return _input; } }

    public bool IsResultReady
    {
        get
        {
            lock (_lockObject)
            {
                return !object.Equals(_result, _notfinished) || IsEndQueue;
            }
        }
    }
}


public class ParallelImmediateOrderedProcessingQueue<TInput, TResult>
{
    private readonly ReaderWriterLockSlim _addLock = new ReaderWriterLockSlim();

    private readonly object _readingResultsLock = new object();

    private readonly ConcurrentQueue<QueuedItem<TInput, TResult>> _concurrentQueue = new ConcurrentQueue<QueuedItem<TInput, TResult>>();

    bool _isFinishedAdding = false;

    private readonly TResult _notFinished;

    private readonly Action<QueuedItem<TInput, TResult>> _processor;

    /// <param name="notFinished">A value that indicates the result is not yet finished</param>
    /// <param name="processor">Must call SetResult() on argument when finished.</param>
    public ParallelImmediateOrderedProcessingQueue(TResult notFinished, Action<QueuedItem<TInput, TResult>> processor)
    {
        _notFinished = notFinished;
        _processor = processor;
    }

    public event Action ResultsReady = delegate { };

    private void SignalResult()
    {
            QueuedItem<TInput, TResult> item;
            if (_concurrentQueue.TryPeek(out item) && item.IsResultReady)
            {
                ResultsReady();
            }
    }

    public void Add(TInput input)
    {
        bool shouldThrow = false;

        _addLock.EnterReadLock();
        {
            shouldThrow = _isFinishedAdding;

            if (!shouldThrow)
            {
                var queuedItem = new QueuedItem<TInput, TResult>(input, _notFinished);

                _concurrentQueue.Enqueue(queuedItem);

                Task.Factory.StartNew(() => { _processor(queuedItem); SignalResult(); });
            }
        }
        _addLock.ExitReadLock();

        if (shouldThrow)
            throw new InvalidOperationException("An attempt was made to add an item, but adding items was marked as completed");
    }

    public IEnumerable<TResult> ConsumeReadyResults()
    {
        //lock necessary to preserve ordering
        lock (_readingResultsLock)
        {
            QueuedItem<TInput, TResult> queuedItem;

            while (_concurrentQueue.TryPeek(out queuedItem) && queuedItem.IsResultReady)
            {
                if (!_concurrentQueue.TryDequeue(out queuedItem))
                    throw new ApplicationException("this shouldn't happen");

                if (queuedItem.IsEndQueue)
                {
                    _completion.SetResult(true);
                }
                else
                {
                    yield return queuedItem.ReadResult();
                }
            }
        }
    }

    public void CompleteAddingItems()
    {
        _addLock.EnterWriteLock();
        {
            _isFinishedAdding = true;

            var queueCompletion = new QueuedItem<TInput, TResult>();

            _concurrentQueue.Enqueue(queueCompletion);
            Task.Factory.StartNew(() => { SignalResult(); });
        }
        _addLock.ExitWriteLock();
    }

    TaskCompletionSource<bool> _completion = new TaskCompletionSource<bool>();

    public void WaitForCompletion()
    {
        _completion.Task.Wait();
    }
}

class Program
{
    static void Main(string[] args)
    {
        const int notFinished = int.MinValue;

        var processingQueue = new ParallelImmediateOrderedProcessingQueue<int, int>(notFinished, qi =>
        {
            int work = 0;

            Console.WriteLine("Working on " + qi.Input);

            //simulate work
            int maxBusy = 90000000 - (10 * (qi.Input % 3));
            for (int busy = 0; busy <= maxBusy; ++busy) { ++work; };

            Console.WriteLine("Finished " + qi.Input);

            qi.WriteResult(qi.Input);
        });

        processingQueue.ResultsReady += new Action(() =>
        {
            Task.Factory.StartNew(() =>
                {
                    foreach (int result in processingQueue.ConsumeReadyResults())
                    {
                        Console.WriteLine("Results Available: " + result);
                    }
                });
        });


        int iterations = new Random().Next(5, 50);
        Console.WriteLine("------- iterations: " + iterations + "-------");

        for (int i = 1; i <= iterations; ++i)
        {
            processingQueue.Add(i);
        }

        while (true)
        {
            char c = Console.ReadKey().KeyChar;

            if (c == 's')
            {
                break;
            }
            else
            {
                ++iterations;

                Console.WriteLine("adding: " + iterations);
                processingQueue.Add(iterations);
            }
        }

        processingQueue.CompleteAddingItems();
        processingQueue.WaitForCompletion();

        Console.WriteLine("Done!");
        Console.ReadKey();
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top