BlockingCollection(T) 性能
-
27-09-2019 - |
题
在我的公司有一段时间,我们使用了国产的 ObjectPool<T>
提供对其内容的阻塞访问的实现。这非常简单:A Queue<T>
, , 一个 object
锁定,以及 AutoResetEvent
添加项目时向“借用”线程发出信号。
该类的核心实际上是这两个方法:
public T Borrow() {
lock (_queueLock) {
if (_queue.Count > 0)
return _queue.Dequeue();
}
_objectAvailableEvent.WaitOne();
return Borrow();
}
public void Return(T obj) {
lock (_queueLock) {
_queue.Enqueue(obj);
}
_objectAvailableEvent.Set();
}
我们一直在使用这个和其他一些集合类,而不是由 System.Collections.Concurrent
因为我们使用的是 .NET 3.5,而不是 4.0。但最近我们发现,由于我们正在使用 反应式扩展, ,我们实际上 做 有 Concurrent
我们可以使用的命名空间(在 System.Threading.dll 中)。
自然地,我认为自从 BlockingCollection<T>
是核心课程之一 Concurrent
命名空间,它可能会提供比我或我的队友编写的任何内容更好的性能。
所以我尝试编写一个工作非常简单的新实现:
public T Borrow() {
return _blockingCollection.Take();
}
public void Return(T obj) {
_blockingCollection.Add(obj);
}
令我惊讶的是,根据一些简单的测试(从多个线程借入/返回池几千次),我们最初的实现 明显节拍 BlockingCollection<T>
在性能方面. 。他们似乎都工作 正确地;只是我们原来的实现似乎要快得多。
我的问题:
- 为什么会这样呢?或许是因为
BlockingCollection<T>
提供了更大的灵活性(我知道它的工作原理是包装一个IProducerConsumerCollection<T>
),这必然会带来性能开销吗? - 这是否只是一种彻头彻尾的误导性使用?
BlockingCollection<T>
班级? - 如果这是适当的使用
BlockingCollection<T>
, ,我只是没有正确使用吗?例如,是Take
/Add
方法过于简单化,并且有一种性能更好的方法来获得相同的功能吗?
除非有人对第三个问题有一些见解,否则我们现在似乎将坚持原来的实现。
解决方案
这里有一些潜在的可能性。
第一的, BlockingCollection<T>
Reactive Extensions 中的 .NET 4 是向后移植的,与 .NET 4 最终版本并不完全相同。如果此向后移植的性能与 .NET 4 RTM 不同,我不会感到惊讶(尽管我没有具体介绍此集合)。大多数 TPL 在 .NET 4 中的性能比在 .NET 3.5 向后移植中的性能更好。
话虽这么说,我怀疑你的实现会表现出色 BlockingCollection<T>
如果您有一个生产者线程和一个消费者线程。对于一个生产者和一个消费者,你的锁对总体性能的影响会更小,并且重置事件是消费者端等待的一种非常有效的手段。
然而, BlockingCollection<T>
旨在允许许多生产者线程很好地“排队”数据。这对于您的实现来说效果不佳,因为锁定争用很快就会开始出现问题。
话虽这么说,我还想在这里指出一个误解:
...它可能会提供比我或我的队友写的任何东西更好的性能。
这通常是不正确的。框架集合类通常执行 很好, ,但对于给定场景来说通常不是性能最佳的选项。话虽如此,它们往往表现良好,同时非常灵活和强大。它们往往能够很好地扩展。“自制”集合类在特定场景中通常优于框架集合,但在专门设计的场景之外使用时往往会出现问题。我怀疑这是其中一种情况。
其他提示
我试过 BlockingCollection
反对 ConurrentQueue/AutoResetEvent
.Net 4 中的组合(类似于 OP 的解决方案,但无锁),后一个组合是 所以 对于我的用例来说,速度要快得多,所以我放弃了 BlockingCollection。不幸的是,这已经是一年前的事了,我找不到基准测试结果。
使用单独的 AutoResetEvent 不会使事情变得更加复杂。事实上,人们甚至可以将其一劳永逸地抽象为一种 BlockingCollectionSlim
....
BlockingCollection 内部也依赖于 ConcurrentQueue,但是 做 一些额外的杂耍 细长信号量 和 取消令牌, ,这会产生额外的功能,但要付出一定的代价,即使不使用也是如此。还应该注意的是,BlockingCollection 并未与 ConcurrentQueue 联姻,但可以与 ConcurrentQueue 的其他实现者一起使用 IProducerConsumerCollection
相反也是如此。
一个无限制的、非常简单的 BlockingCollectionSlim 实现:
class BlockingCollectionSlim<T>
{
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
public void Add(T item)
{
_queue.Enqueue(item);
_autoResetEvent.Set();
}
public bool TryPeek(out T result)
{
return _queue.TryPeek(out result);
}
public T Take()
{
T item;
while (!_queue.TryDequeue(out item))
_autoResetEvent.WaitOne();
return item;
}
public bool TryTake(out T item, TimeSpan patience)
{
if (_queue.TryDequeue(out item))
return true;
var stopwatch = Stopwatch.StartNew();
while (stopwatch.Elapsed < patience)
{
if (_queue.TryDequeue(out item))
return true;
var patienceLeft = (patience - stopwatch.Elapsed);
if (patienceLeft <= TimeSpan.Zero)
break;
else if (patienceLeft < MinWait)
// otherwise the while loop will degenerate into a busy loop,
// for the last millisecond before patience runs out
patienceLeft = MinWait;
_autoResetEvent.WaitOne(patienceLeft);
}
return false;
}
private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);
我在 .Net 4.7.2 中遇到了与 BlockingCollection 相同的性能问题,并找到了这篇文章。我的案例是多个生产者-多个消费者,特别是从多个来源读取小数据块,并且应该由多个过滤器处理。使用了几个(Env.ProcessorCount)BlockingCollections,最后我得到了一个性能分析器告诉我 BlockingCollection.GetConsumingEnumerable.MoveNext()
比实际过滤消耗更多的 CPU 时间!
谢谢@Eugene Beresovsky 的代码。供参考:在我的环境中,它几乎比 BlockingCollection 慢两倍。所以,这是我的 SpinLocked BlockingCollection:
public class BlockingCollectionSpin<T>
{
private SpinLock _lock = new SpinLock(false);
private Queue<T> _queue = new Queue<T>();
public void Add(T item)
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
_queue.Enqueue(item);
}
finally
{
if (gotLock) _lock.Exit(false);
}
}
public bool TryPeek(out T result)
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
if (_queue.Count > 0)
{
result = _queue.Peek();
return true;
}
else
{
result = default(T);
return false;
}
}
finally
{
if (gotLock) _lock.Exit(false);
}
}
public T Take()
{
var spin = new SpinWait();
do
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
if (_queue.Count > 0)
return _queue.Dequeue();
}
finally
{
if (gotLock) _lock.Exit(false);
}
spin.SpinOnce();
} while (true);
}
}
对于性能关键的代码,我建议避免 readonly
字段修饰符。它添加了对 IL 中每个字段访问的检查。用下面的测试代码
private static void TestBlockingCollections()
{
const int workAmount = 10000000;
var workerCount = Environment.ProcessorCount * 2;
var sw = new Stopwatch();
var source = new long[workAmount];
var rnd = new Random();
for (int i = 0; i < workAmount; i++)
source[i] = rnd.Next(1000000);
var swOverhead = 0.0;
for (int i = 0; i < workAmount; i++)
{
sw.Restart();
swOverhead += sw.Elapsed.TotalMilliseconds;
}
swOverhead /= workAmount;
var sum1 = new long[workerCount];
var queue1 = new BlockingCollection<long>(10000);
var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
foreach (var l in queue1.GetConsumingEnumerable())
sum1[n] += l;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue1.Add(l);
queue1.CompleteAdding();
Task.WaitAll(workers);
var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);
var sum2 = new long[workerCount];
var queue2 = new BlockingCollectionSlim<long?>();
workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
long? l;
while ((l = queue2.Take()).HasValue)
sum2[n] += l.Value;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue2.Add(l);
for (int i = 0; i < workerCount; i++)
queue2.Add(null);
Task.WaitAll(workers);
elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);
var sum3 = new long[workerCount];
var queue3 = new BlockingCollectionSpin<long?>();
workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
long? l;
while ((l = queue3.Take()).HasValue)
sum3[n] += l.Value;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue3.Add(l);
for (int i = 0; i < workerCount; i++)
queue3.Add(null);
Task.WaitAll(workers);
elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);
if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
Console.WriteLine("Wrong sum in the end!");
Console.ReadLine();
}
在具有 2 个内核且启用 HT 的 Core i5-3210M 上,我得到以下输出:
BlockingCollection 0.0006ms BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation) BlockingCollectionSpin 0.0003ms
因此,SpinLocked 版本比 .Net 快两倍 BlockingCollection
. 。但是,我建议只使用它!如果您真的更喜欢性能而不是代码简单性(和可维护性)。