Pergunta

Estou procurando documentação sobre como escrever uma fila MP/MC sem bloqueio ou até mesmo sem espera.Estou usando o .Net 4.0.Encontrei muito código C++, mas não estou muito familiarizado com modelos de memória, então há uma grande chance de apresentar alguns bugs ao migrar para C#.

Foi útil?

Solução

Por que você acha que precisa de uma fila sem bloqueios?Você já tentou usar ConcurrentQueue<T>, possivelmente encerrado dentro de um BlockingCollection<T>?

Escrever código multithread é difícil.Escrever código sem bloqueio é ainda mais difícil e você não deve fazer isso sozinho, a menos que seja realmente necessário.

Outras dicas

Como opção a considerar, existe um algoritmo de a fila limitada de múltiplos produtores e múltiplos consumidores por Dmitry Vyukov.Eu portei o algoritmo para .NET, você pode encontrar as fontes no github.É muito rápido.

O algoritmo de enfileiramento:

public bool TryEnqueue(object item)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var pos = _enqueuePos; // fetch the current position where to enqueue the item
        var index = pos & _bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence wasn't touched by other producers
        // and we can increment the enqueue position
        if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
        {
            // write the item we want to enqueue
            Volatile.Write(ref buffer[index].Element, item);
            // bump the sequence
            buffer[index].Sequence = pos + 1;
            return true;
        }

        // If the queue is full we cannot enqueue and just return false
        if (cell.Sequence < pos)
        {
            return false;
        }

        // repeat the process if other producer managed to enqueue before us
    } while (true);
}

O algoritmo de desenfileiramento:

public bool TryDequeue(out object result)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var bufferMask = _bufferMask; // prefetch the buffer mask
        var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
        var index = pos & bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence was changed by a producer and wasn't changed by other consumers
        // and we can increment the dequeue position
        if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
        {
            // read the item
            result = Volatile.Read(ref cell.Element);
            // update for the next round of the buffer
            buffer[index] = new Cell(pos + bufferMask + 1, null);
            return true;
        }

        // If the queue is empty return false
        if (cell.Sequence < pos + 1)
        {
            result = default(object);
            return false;
        }

        // repeat the process if other consumer managed to dequeue before us
    } while (true);
}

Minha primeira tentativa seria com ConcurrentQueue<T> mas você pode abstrair seu armazenamento de dados atrás de uma interface para poder alterar facilmente as implementações.Em seguida, compare cenários típicos e veja onde você encontra problemas.Lembrar:A otimização prematura é a raiz de todos os males.Projete seu sistema de forma que ele não esteja vinculado a uma implementação, mas a um contrato e então você poderá otimizar suas implementações o quanto quiser.

eu dei uma olhada ConcurrentQueue<T> com ILSpy e parece ser uma implementação sem bloqueio à primeira vista - há boas chances de que seja exatamente o que você está procurando.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top