複数のプロデューサー 複数のコンシューマーのロックフリー (または待機フリーの) キュー
-
08-09-2020 - |
質問
MP/MC キューをロックフリーまたはウェイトフリーで作成する方法に関するドキュメントを探しています。.Net 4.0を使用しています。たくさんの C++ コードが見つかりましたが、メモリ モデルについてはあまり詳しくないため、C# に移植するときにいくつかのバグが発生する可能性が大いにあります。
解決
なぜロックフリーキューが必要だと思いますか?使ってみましたか ConcurrentQueue<T>
, で囲まれている可能性があります。 BlockingCollection<T>
?
マルチスレッドのコードを書くのは難しいです。ロックフリーのコードを書くのはさらに難しいので、本当に必要な場合を除いて、自分で行うべきではありません。
他のヒント
考慮すべきオプションとして、
エンキューアルゴリズム:
public bool TryEnqueue(object item)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var pos = _enqueuePos; // fetch the current position where to enqueue the item
var index = pos & _bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence wasn't touched by other producers
// and we can increment the enqueue position
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
// write the item we want to enqueue
Volatile.Write(ref buffer[index].Element, item);
// bump the sequence
buffer[index].Sequence = pos + 1;
return true;
}
// If the queue is full we cannot enqueue and just return false
if (cell.Sequence < pos)
{
return false;
}
// repeat the process if other producer managed to enqueue before us
} while (true);
}
.
デキューアルゴリズム:
public bool TryDequeue(out object result)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var bufferMask = _bufferMask; // prefetch the buffer mask
var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
var index = pos & bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence was changed by a producer and wasn't changed by other consumers
// and we can increment the dequeue position
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
// read the item
result = Volatile.Read(ref cell.Element);
// update for the next round of the buffer
buffer[index] = new Cell(pos + bufferMask + 1, null);
return true;
}
// If the queue is empty return false
if (cell.Sequence < pos + 1)
{
result = default(object);
return false;
}
// repeat the process if other consumer managed to dequeue before us
} while (true);
}
. 私の最初の行動はConcurrentQueue<T>
と一緒になるでしょうが、あなたは簡単に実装を簡単に変更することができるように、あなたのデータストアをインターフェイスの後ろに抽象化することができます。その後、典型的なシナリオをベンチマークし、問題が発生した場所を確認してください。覚えておいてください:時期尚早の最適化はすべての悪の根源です。システムを設計して、実装に縛られていないが、契約に縛られてから、必要なすべての実装を最適化することができます。
私はIlSpyを使ってConcurrentQueue<T>
を見ていて、最初の一目でのロック無料の実装のようです - とても良いチャンスがまさにあなたが探しているものです。