Question

I'm looking for the best scenario to implement one producer multiple consumer multithreaded application. Currently I'm using one queue for shared buffer but it's much slower than the case of one producer one consumer. I'm planning to do it like this:

Queue<item>[] buffs = new Queue<item>[N];
object[] _locks = new object[N];
static void Produce()
{
    int curIndex = 0;
    while(true)
    {
        // Produce item;
        lock(_locks[curIndex])
        {
            buffs[curIndex].Enqueue(curItem);
            Monitor.Pulse(_locks[curIndex]);
        }
        curIndex = (curIndex+1)%N;
    }
}

static void Consume(int myIndex)
{
    item curItem;
    while(true)
    {
        lock(_locks[myIndex])
        {
            while(buffs[myIndex].Count == 0)
                Monitor.Wait(_locks[myIndex]);
            curItem = buffs[myIndex].Dequeue();
        }
        // Consume item;
    }
}

static void main()
{
    int N = 100;
    Thread[] consumers = new Thread[N];
    for(int i = 0; i < N; i++)
    {
        consumers[i] = new Thread(Consume);
        consumers[i].Start(i);
    }
    Thread producer = new Thread(Produce);
    producer.Start();
}
Was it helpful?

Solution

Use a BlockingCollection

BlockingCollection<item> _buffer = new BlockingCollection<item>();

static void Produce()
{
    while(true)
    {
        // Produce item;
        _buffer.Add(curItem);
    }

    // eventually stop producing
    _buffer.CompleteAdding();
}

static void Consume(int myIndex)
{
    foreach (var curItem in _buffer.GetConsumingEnumerable())
    {
        // Consume item;
    }
}

static void main()
{
    int N = 100;
    Thread[] consumers = new Thread[N];
    for(int i = 0; i < N; i++)
    {
        consumers[i] = new Thread(Consume);
        consumers[i].Start(i);
    }
    Thread producer = new Thread(Produce);
    producer.Start();
}

If you don't want to specify number of threads from start you can use Parallel.ForEach instead.

static void Consume(item curItem)
{
    // consume item
}

void Main()
{
    Thread producer = new Thread(Produce);
    producer.Start();

    Parallel.ForEach(_buffer.GetConsumingPartitioner(), Consumer)
}

OTHER TIPS

Using more threads won't help. It may even reduce performance. I suggest you try to use ThreadPool where every work item is one item created by the producer. However, that doesn't guarantee the produced items to be consumed in the order they were produced.


Another way could be to reduce the number of consumers to 4, for example and modify the way they work as follows:

The producer adds the new work to the queue. There's only one global queue for all worker threads. It then sets a flag to indicate there is new work like this:

ManualResetEvent workPresent = new ManualResetEvent(false);
Queue<item> workQueue = new Queue<item>();

static void Produce()
{
    while(true)
    {
        // Produce item;
        lock(workQueue)
        {
            workQueue.Enqueue(newItem);
            workPresent.Set();
        }
    }
}

The consumers wait for work to be added to the queue. Only one consumer will get to do its job. It then takes all the work from the queue and resets the flag. The producer will not be able to add new work until that is done.

static void Consume()
{
    while(true)
    {
        if (WaitHandle.WaitOne(workPresent))
        {
            workPresent.Reset();

            Queue<item> localWorkQueue = new Queue<item>();
            lock(workQueue)
            {
                while (workQueue.Count > 0)
                    localWorkQueue.Enqueue(workQueue.Dequeue());
            }

            // Handle items in local work queue
            ...
        }
    }
}    

That outcome of this, however, is a bit unpredictable. It could be that one thread is doing all the work and the others do nothing.

I don't see why you have to use multiple queues. Just reduce the amount of locking. Here is an sample where you can have a large number of consumers and they all wait for new work.

public class MyWorkGenerator
{
    ConcurrentQueue<object> _queuedItems = new ConcurrentQueue<object>();
    private object _lock = new object();

    public void Produce()
    {
        while (true)
        {
            _queuedItems.Enqueue(new object());
            Monitor.Pulse(_lock);
        }
    }

    public object Consume(TimeSpan maxWaitTime)
    {
        if (!Monitor.Wait(_lock, maxWaitTime))
            return null;

        object workItem;
        if (_queuedItems.TryDequeue(out workItem))
        {
            return workItem;
        }

        return null;
    }

}

Do note that Pulse() will only trigger one consumer at a time.

Example usage:

    static void main()
    {
        var generator = new MyWorkGenerator();

        var consumers = new Thread[20];
        for (int i = 0; i < consumers.Length; i++)
        {
            consumers[i] = new Thread(DoWork);
            consumers[i].Start(generator);
        }

        generator.Produce();
    }

    public static void DoWork(object state)
    {
        var generator = (MyWorkGenerator) state;

        var workItem = generator.Consume(TimeSpan.FromHours(1));
        while (workItem != null)
        {
            // do work


            workItem = generator.Consume(TimeSpan.FromHours(1));
        }
    }

Note that the actual queue is hidden in the producer as it's imho an implementation detail. The consumers doesn't really have to know how the work items are generated.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top