多个生产者多个消费者无锁(甚至无等待)队列
-
08-09-2020 - |
题
我正在寻找有关如何编写MP/MC队列以无锁甚至无等待的文档。我正在使用.Net4.0。发现了很多C++代码,但我对内存模型不是很熟悉,所以很有可能我会在移植到C#时引入一些错误。
解决方案
为什么你认为你需要免费锁定队列?您是否尝试过使用 ConcurrentQueue<T>
,可能包含在 BlockingCollection<T>
?
编写多线程代码很难。写入锁定代码甚至更难,除非您真的不得不这样做,除非您真的不应该这样做。
其他提示
作为一个选择要考虑,有一个 DMitry Vyukov 的界多个生产商多个消费者队列。我已经将算法移植到.NET,可以找到 github 的来源。这很快。
eNqueue算法:
public bool TryEnqueue(object item)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var pos = _enqueuePos; // fetch the current position where to enqueue the item
var index = pos & _bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence wasn't touched by other producers
// and we can increment the enqueue position
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
// write the item we want to enqueue
Volatile.Write(ref buffer[index].Element, item);
// bump the sequence
buffer[index].Sequence = pos + 1;
return true;
}
// If the queue is full we cannot enqueue and just return false
if (cell.Sequence < pos)
{
return false;
}
// repeat the process if other producer managed to enqueue before us
} while (true);
}
.
Dequeue算法:
public bool TryDequeue(out object result)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var bufferMask = _bufferMask; // prefetch the buffer mask
var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
var index = pos & bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence was changed by a producer and wasn't changed by other consumers
// and we can increment the dequeue position
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
// read the item
result = Volatile.Read(ref cell.Element);
// update for the next round of the buffer
buffer[index] = new Cell(pos + bufferMask + 1, null);
return true;
}
// If the queue is empty return false
if (cell.Sequence < pos + 1)
{
result = default(object);
return false;
}
// repeat the process if other consumer managed to dequeue before us
} while (true);
}
. 我的第一个目标是 ConcurrentQueue<T>
但是,您可以将数据存储抽象在接口后面,以便轻松更改实现。然后对典型场景进行基准测试,并查看您遇到问题的位置。請記住!:过早优化是万恶之源。设计你的系统,这样它就不会与实现绑定,而是与合同绑定,然后你就可以随心所欲地优化你的实现。
我看了一下 ConcurrentQueue<T>
随着ILSpy,乍一看似乎是一个无锁的实现-所以很好的机会,这正是你正在寻找的。
不隶属于 StackOverflow