Pergunta

Here is an implementation of a thread safe queue. The push and pop won't block each other. However, pop will wait until an item is pushed, if the queue is empty. Multiple producers and consumers can use the queue. Please tell me if you see any problems.

Update: Edited as per answers 1)Resolved the issue of "Queue Full" situation. 2) There is BlockingCollection<T> and ConcurrentQueue<T> in .NET4. So there is no need to reinvent the wheel(for .NET4)

public class CustomQueue<T> where T: class
{
    class Node
    {
        public Node()
        {
            Value = null;
            NextNode = null;
        }

        public Node(T value)
        {
            Value = value;
            NextNode = null;
        }

        public T Value;
        public Node NextNode;
    }

    object PushLocker = new object();
    object PopLocker = new object();
    Semaphore QueueSemaphore;
    volatile int PushIncrement;
    volatile int PopIncrement;
    int MaxItems;
    Node FirstNode;
    Node LastNode;

    public CustomQueue(int maxItems)
    {
        QueueSemaphore = new Semaphore(0, maxItems);
        MaxItems = maxItems;
        FirstNode = LastNode = new Node();
        PushIncrement = 0;
        PopIncrement = 0;
    }

    public int Size()
    {
        return PushIncrement - PopIncrement;
    }

    public bool Push(T value)
    {
        lock(PushLocker)
        {
            if((Size()) >= MaxItems)
            {
                lock(PopLocker)
                {
                    PushIncrement = PushIncrement - PopIncrement;
                    PopIncrement = 0;
                    return false;
                }
            }
            Node newNode = new Node(value);                
            LastNode.NextNode = newNode;
            LastNode = newNode;
            PushIncrement++;
            QueueSemaphore.Release();
            return true;
        }            
    }

    public T Pop()
    {
        QueueSemaphore.WaitOne();
        lock(PopLocker)
        {
            Node tempFirst = FirstNode;
            Node tempNext = FirstNode.NextNode;
            T value = tempNext.Value;
            tempNext.Value = null;
            FirstNode = tempNext;
            PopIncrement++;
            return value;
        }
    }
}
Foi útil?

Solução

It looks like a good implementation at a glance. Using different locks is always a red flag to me so I took a good hard look at some of the edge cases involving simultaneously calls to Pop and Push and it appears safe. I suspect you probably educated yourself on the linked list implemenation of a blocking queue huh? The reason why this is safe is because you only ever reference LastNode from Push and FirstNode from Pop otherwise the whole trick would fall apart.

The only thing that is sticking out at me right now is that when you try to release a count from a Semaphore it will throw an exception if it is already full so you might want to guard against that.1 Otherwise you will end up with extra nodes in the linked list and the queue will 1) have more than the maximum number of items and 2) it will get live-locked.

Update:

I gave this some more thought. That Release call in the Push method is going to be very problematic. I see only one possible solution. Remove the maxItems parameter from the constructor and allow the semaphore to count up to Int.MaxValue. Otherwise, you are going to have to radically change your approach resulting in an implementation that is no where close to what you currently have.

1I think you will find that this will be more difficult than what you might immediately realize.

Outras dicas

1. Consider adding a second Node constructor:

        public Node(T value)
        {
            Value = value;
        }

then your client code:

            Node newNode = new Node();
            newNode.Value = value;

can treat the value as an invariant:

            Node newNode = new Node(value);

2. Then make your public fields:

        public T Value;
        public Node NextNode;

into auto properties:

        public T Value { get; private set; };
        public Node NextNode { get; set; };

So you can abstract the usage from the implementation and add validation, other processing, etc. after the fact with minimal disruption to client code.

If you are doing this for self-education, great - otherwise BlockingCollection<T> or ConcurrentQueue<T> are good alternatives.

One problem I do see here is that there is no way to interrupt Pop once it starts - it assumes an object is waiting when it awakens. How would you clear this down on termination? A TryPop that returns true (with an element) or false (if no data) might be better, then you could signal waiting threads to shut down cleanly once the queue is drained.

As I'm a fan of immutable objects, here's an alternative to my earlier answer that I would consider a bit cleaner:

public sealed class CustomQueue<T> where T : class
{
    private readonly object pushLocker = new object();
    private readonly object popLocker = new object();
    private readonly Semaphore queueSemaphore;
    private readonly int maxItems;
    private volatile int pushIncrement;
    private volatile int popIncrement;
    private Node firstNode = new Node();
    private Node lastNode;

    public CustomQueue(int maxItems)
    {
        this.maxItems = maxItems;
        this.lastNode = this.firstNode;
        this.queueSemaphore = new Semaphore(0, this.maxItems);
    }

    public int Size
    {
        get
        {
            return this.pushIncrement - this.popIncrement;
        }
    }

    public bool Push(T value)
    {
        lock (this.pushLocker)
        {
            if (this.Size >= this.maxItems)
            {
                lock (this.popLocker)
                {
                    this.pushIncrement = this.pushIncrement - this.popIncrement;
                    this.popIncrement = 0;
                    return false;
                }
            }

            Node newNode = new Node(value, this.lastNode.NextNode);

            this.lastNode = new Node(this.lastNode.Value, newNode);
            this.firstNode = new Node(null, newNode);
            this.pushIncrement++;
            this.queueSemaphore.Release();
            return true;
        }
    }

    public T Pop()
    {
        this.queueSemaphore.WaitOne();
        lock (this.popLocker)
        {
            Node tempNext = this.firstNode.NextNode;
            T value = tempNext.Value;

            this.firstNode = tempNext;
            this.popIncrement++;
            return value;
        }
    }

    private sealed class Node
    {
        private readonly T value;

        private readonly Node nextNode;

        public Node()
        {
        }

        public Node(T value, Node nextNode)
        {
            this.value = value;
            this.nextNode = nextNode;
        }

        public T Value
        {
            get
            {
                return this.value;
            }
        }

        public Node NextNode
        {
            get
            {
                return this.nextNode;
            }
        }
    }
}
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top