Múltiples Productor de Varias de Consumo libre de bloqueo (o incluso esperar libre) de la cola
-
08-09-2020 - |
Pregunta
Estoy buscando documentación sobre cómo escribir MP/MC cola para ser libre de bloqueo o incluso esperar libre.Estoy usando .Net 4.0.Se encontró una gran cantidad de código de C++, pero no estoy muy familiarizado con los modelos de memoria, por lo que hay una gran posibilidad de que voy a presentar algunos errores, mientras que la migración a C#.
Solución
¿Por qué crees que necesitas cola de bloqueo?¿Ha intentado usar ConcurrentQueue<T>
, posiblemente encerrado dentro de un BlockingCollection<T>
?
La escritura de código multi-roscado es difícil.Escribir código de bloqueo sin bloqueo es aún más difícil y no debe hacerlo usted mismo a menos que realmente tenga que hacerlo.
Otros consejos
Como opción de considerar, hay un algoritmo de El productor múltiple limitado múltiple cola de consumo por Dmitry Vyukov .He portado el algoritmo a .Net, puedes encontrar las fuentes en github .Es muy rápido.
El algoritmo de enquego:
public bool TryEnqueue(object item)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var pos = _enqueuePos; // fetch the current position where to enqueue the item
var index = pos & _bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence wasn't touched by other producers
// and we can increment the enqueue position
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
// write the item we want to enqueue
Volatile.Write(ref buffer[index].Element, item);
// bump the sequence
buffer[index].Sequence = pos + 1;
return true;
}
// If the queue is full we cannot enqueue and just return false
if (cell.Sequence < pos)
{
return false;
}
// repeat the process if other producer managed to enqueue before us
} while (true);
}
El algoritmo de DEQUEUE:
public bool TryDequeue(out object result)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var bufferMask = _bufferMask; // prefetch the buffer mask
var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
var index = pos & bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence was changed by a producer and wasn't changed by other consumers
// and we can increment the dequeue position
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
// read the item
result = Volatile.Read(ref cell.Element);
// update for the next round of the buffer
buffer[index] = new Cell(pos + bufferMask + 1, null);
return true;
}
// If the queue is empty return false
if (cell.Sequence < pos + 1)
{
result = default(object);
return false;
}
// repeat the process if other consumer managed to dequeue before us
} while (true);
}
Mi primera será con ConcurrentQueue<T>
pero usted puede abstracto de su almacén de datos de distancia detrás de una interfaz de modo que usted puede cambiar fácilmente las implementaciones.Luego de referencia escenarios típicos y ver donde te encuentras con problemas.Recuerde:Prematuro optimzation es la raíz de todos los males.El diseño de su sistema por lo que no está atado a una aplicación, sino de un contrato y, a continuación, puede optimizar las implementaciones de todo lo que quieras.
Eché un vistazo a ConcurrentQueue<T>
con ILSpy y parece ser un bloqueo implementación libre a primera vista tan buena oportunidad es exactamente lo que está buscando.