문제

For a while at my company we've used a home-grown ObjectPool<T> implementation that provides blocking access to its contents. It's pretty straightforward: a Queue<T>, an object to lock on, and an AutoResetEvent to signal to a "borrowing" thread when an item is added.

The meat of the class is really these two methods:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

We have been using this and a few other collection classes instead of those provided by System.Collections.Concurrent because we are using .NET 3.5, not 4.0. But recently we discovered that since we are using Reactive Extensions, we actually do have the Concurrent namespace available to us (in System.Threading.dll).

Naturally, I figured that since BlockingCollection<T> is one of the core classes in the Concurrent namespace, it would probably offer better performance than anything I or my teammates wrote.

So I tried writing a new implementation that works very simply:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

To my surprise, according to some simple tests (borrowing/returning to the pool a few thousand times from multiple threads), our original implementation significantly beats BlockingCollection<T> in terms of performance. They both appear to work correctly; it's just that our original implementation seems to be much faster.

My question:

  1. Why would this be? Is it perhaps because BlockingCollection<T> offers greater flexibility (I understand it works by wrapping an IProducerConsumerCollection<T>), which necessarily introduces performance overhead?
  2. Is this just a flat-out misguided use of the BlockingCollection<T> class?
  3. If this is an appropriate use of BlockingCollection<T>, am I just not using properly? For example, is the Take/Add approach overly simplistic, and there's a far better-performing way to get the same functionality?

Unless anyone has some insight to offer in response to that third question, it looks like we'll be sticking with our original implementation for now.

도움이 되었습니까?

해결책

There are a couple of potential possibilities, here.

First, BlockingCollection<T> in the Reactive Extensions is a backport, and not exactly the same as the .NET 4 final version. I wouldn't be surprised if the performance of this backport differs from .NET 4 RTM (though I haven't profiled this collection, specifically). Much of the TPL performs better in .NET 4 than in the .NET 3.5 backport.

That being said, I'd suspect your implementation will out-perform BlockingCollection<T> if you have a single producer thread and a single consumer thread. With one producer and one consumer, your lock is going to have a smaller impact on the total performance, and the reset event is a very effective means of waiting on the consumer side.

However, BlockingCollection<T> is designed to allow many producer threads to "enqueue" data very well. This will not perform well with your implementation, as the locking contention will start to become problematic fairly quickly.

That being said, I'd also like to point out one misconception here:

...it would probably offer better performance than anything I or my teammates wrote.

This is often not true. The framework collection classes typically perform very well, but are often not the most performant option for a given scenario. That being said, they tend to perform well while being very flexible and very robust. They often tend to scale very well. "Home-written" collection classes often outperform framework collections in specific scenarios, but tend to be problematic when used in scenarios outside of the one for which they were specifically designed. I suspect this is one of those situations.

다른 팁

I tried BlockingCollection against a ConurrentQueue/AutoResetEvent combo (similar to OP's solution, but lockless) in .Net 4, and the latter combo was so much faster for my use case, that I ditched BlockingCollection. Unfortunately this was almost a year ago and I couldn't find the benchmark results.

Using a separate AutoResetEvent doesn't make things too much more complicated. In fact, one could even abstract it away, once and for all, into a BlockingCollectionSlim....

BlockingCollection internally relies on a ConcurrentQueue as well, but does some additional juggling with slim semaphores and cancellation tokens, which yields additional features, but at a cost, even when not used. It should also be noted that BlockingCollection is not married to ConcurrentQueue, but can be used with other implementors of IProducerConsumerCollection instead as well.


An unbounded, pretty bare bones BlockingCollectionSlim implementation:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);

I came across the same performance issues with BlockingCollection in .Net 4.7.2 and found this post. My case is MultipleProducers-MultipleConsumers, in particular small data chunks are read from many sources and should be processed by many filters. Several (Env.ProcessorCount) BlockingCollections were used and I ended up with a performance profiler telling me that BlockingCollection.GetConsumingEnumerable.MoveNext() eats more CPU time than actual filtering!

Thank you, @Eugene Beresovsky, for your code. FYI: On my environment it was almost twice slower than BlockingCollection. So, here is my SpinLocked BlockingCollection:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

And for performance-critical code I would suggest to avoid readonly field modifier. It adds a check on every field access in the IL. With the following test code

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

On Core i5-3210M with 2 cores and HT enabled I've got the following output:

BlockingCollection     0.0006ms
BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation)
BlockingCollectionSpin 0.0003ms

So, SpinLocked version is twice faster than .Net BlockingCollection. But, I would suggest to use it only! if you really prefer performance against code simplicity (and maintainability).

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top