Несколько производителей нескольких потребительских блокировков (или даже без ожидания) очередь
-
08-09-2020 - |
Вопрос
Я ищу документацию о том, как записать очередь MC / MC, чтобы быть безблокированным или даже без необходимости.Я использую .NET 4.0.Нашел много кода C ++, но я не очень знаком с моделями памяти, поэтому есть большой шанс, что я буду введен некоторые ошибки во время портирования к C #.
Решение
Почему вы думаете, что вам нужна бесплатная очередь?Вы пытались использовать использование ConcurrentQueue<T>
, возможно, заключенный в BlockingCollection<T>
?
Написание многопоточного кода сложно.Запись кода без блокировки еще более сложнее, и вы не должны делать это сами, если вы действительно должны быть.
Другие советы
Как вариант рассмотреть, есть алгоритм Ограниченный множественный производитель множественной потребительской очереди Дмитрия Вюков .Я портировал алгоритм к .NET, вы можете найти Источники на Github .Это очень быстро.
Алгоритм enqueue:
public bool TryEnqueue(object item)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var pos = _enqueuePos; // fetch the current position where to enqueue the item
var index = pos & _bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence wasn't touched by other producers
// and we can increment the enqueue position
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
// write the item we want to enqueue
Volatile.Write(ref buffer[index].Element, item);
// bump the sequence
buffer[index].Sequence = pos + 1;
return true;
}
// If the queue is full we cannot enqueue and just return false
if (cell.Sequence < pos)
{
return false;
}
// repeat the process if other producer managed to enqueue before us
} while (true);
}
.
Удаленный алгоритм:
public bool TryDequeue(out object result)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var bufferMask = _bufferMask; // prefetch the buffer mask
var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
var index = pos & bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence was changed by a producer and wasn't changed by other consumers
// and we can increment the dequeue position
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
// read the item
result = Volatile.Read(ref cell.Element);
// update for the next round of the buffer
buffer[index] = new Cell(pos + bufferMask + 1, null);
return true;
}
// If the queue is empty return false
if (cell.Sequence < pos + 1)
{
result = default(object);
return false;
}
// repeat the process if other consumer managed to dequeue before us
} while (true);
}
. Мое первое место будет с ConcurrentQueue<T>
, но вы можете абстрактные данные о вашем хранилище данных за интерфейсом, чтобы вы могли легко изменить реализации.Затем прозовел типичные сценарии и посмотрите, где вы столкнулись с проблемами.Помните: преждевременная оптизация - корень всего зла.Разработайте свою систему, чтобы она не связана с реализацией, но к договору, а затем вы можете оптимизировать ваши реализации, все, что вы хотите.
Я посмотрел на ConcurrentQueue<T>
с ILSPY и кажется бесплатной реализацией блокировки на первый взгляд - настолько удачный шанс, что это именно то, что вы ищете.