Запрос на отзыв: резьбовая безопасная очередь (параллельный push pop)

StackOverflow https://stackoverflow.com/questions/4145438

Вопрос

Вот реализация потоковой безопасной очереди. Push и Pop не блокируют друг друга. Однако поп будет ждать, пока элемент не будет нажата, если очередь пуста. Несколько производителей и потребителей могут использовать очередь. Пожалуйста, скажите мне, если вы видите какие-либо проблемы.

Обновление: отредактировано согласно ответам 1) решил проблему «очереди полной» ситуации. 2) есть BlockingCollection<T> а также ConcurrentQueue<T> в .NET4. Так что нет необходимости изобретать колесо (для .NET4)

public class CustomQueue<T> where T: class
{
    class Node
    {
        public Node()
        {
            Value = null;
            NextNode = null;
        }

        public Node(T value)
        {
            Value = value;
            NextNode = null;
        }

        public T Value;
        public Node NextNode;
    }

    object PushLocker = new object();
    object PopLocker = new object();
    Semaphore QueueSemaphore;
    volatile int PushIncrement;
    volatile int PopIncrement;
    int MaxItems;
    Node FirstNode;
    Node LastNode;

    public CustomQueue(int maxItems)
    {
        QueueSemaphore = new Semaphore(0, maxItems);
        MaxItems = maxItems;
        FirstNode = LastNode = new Node();
        PushIncrement = 0;
        PopIncrement = 0;
    }

    public int Size()
    {
        return PushIncrement - PopIncrement;
    }

    public bool Push(T value)
    {
        lock(PushLocker)
        {
            if((Size()) >= MaxItems)
            {
                lock(PopLocker)
                {
                    PushIncrement = PushIncrement - PopIncrement;
                    PopIncrement = 0;
                    return false;
                }
            }
            Node newNode = new Node(value);                
            LastNode.NextNode = newNode;
            LastNode = newNode;
            PushIncrement++;
            QueueSemaphore.Release();
            return true;
        }            
    }

    public T Pop()
    {
        QueueSemaphore.WaitOne();
        lock(PopLocker)
        {
            Node tempFirst = FirstNode;
            Node tempNext = FirstNode.NextNode;
            T value = tempNext.Value;
            tempNext.Value = null;
            FirstNode = tempNext;
            PopIncrement++;
            return value;
        }
    }
}
Это было полезно?

Решение

Это похоже на хорошую реализацию с первого взгляда. Использование различных блокировков - это всегда красный флаг для меня, поэтому я хорошо посмотрел на немного краевых случаев с участием одновременно вызывает Pop а также Push И это выглядит безопасно. Я подозреваю, что вы, вероятно, воспитывали себя на подключенном списке, реализация блокирующей очереди HUH? Причина, по которой это безопасно, это потому, что вы только везде LastNode от Push а также FirstNode от Pop В противном случае вся хитрость упадет.

Единственное, что придерживается у меня прямо сейчас, это то, что когда вы пытаетесь освободить счет от Semaphore Он будет выбрасывать исключение, если оно уже полно, чтобы вы могли захотеть охранять против этого.1 В противном случае вы получите дополнительные узлы в связанном списке, а очередь будет 1) иметь больше, чем максимальное количество элементов и 2), он будет заблокирован в прямом эфире.

Обновлять:

Я дал это еще несколько мыслей. Это Release звонить в Push Метод будет очень проблематично. Я вижу только одно возможное решение. Удалить maxItems параметр от конструктора и позвольте семафору рассчитывать до Int.MaxValue. Отказ В противном случае вам придется радикально изменить ваш подход, приводящий к реализации, который нет, где рядом с тем, что у вас есть.

1Я думаю, что вы обнаружите, что это будет сложнее, чем то, что вы можете сразу реализовать.

Другие советы

1. Рассмотрите возможность добавления второго конструктора узла:

        public Node(T value)
        {
            Value = value;
        }

Тогда ваш код клиента:

            Node newNode = new Node();
            newNode.Value = value;

может относиться к значению как инвариант:

            Node newNode = new Node(value);

2. Затем сделайте свои публичные поля:

        public T Value;
        public Node NextNode;

в автоматические свойства:

        public T Value { get; private set; };
        public Node NextNode { get; set; };

Таким образом, вы можете абстрактное использование от реализации и добавления проверки, другой обработки и т. Д. После факта с минимальным нарушением к коду клиента.

Если вы делаете это для самообразования, отлично - в противном случае BlockingCollection<T> или ConcurrentQueue<T> хорошие альтернативы.

Одна проблема, которую я вижу вот то, что нет способа прерываться Pop Как только он запускается - предполагает, что объект ждет, когда он пробудит. Как бы вы очистили это о расторжении? А. TryPop Это возвращает true (с элементом) или false (Если никаких данных не может быть лучше, тогда вы можете сигнализировать ждущие потоки для очистки, когда очередь будет сливаться.

Как я фанат неизменных объектов, вот альтернатива моему предыдущему ответу, что я бы рассмотрел немного очистителя:

public sealed class CustomQueue<T> where T : class
{
    private readonly object pushLocker = new object();
    private readonly object popLocker = new object();
    private readonly Semaphore queueSemaphore;
    private readonly int maxItems;
    private volatile int pushIncrement;
    private volatile int popIncrement;
    private Node firstNode = new Node();
    private Node lastNode;

    public CustomQueue(int maxItems)
    {
        this.maxItems = maxItems;
        this.lastNode = this.firstNode;
        this.queueSemaphore = new Semaphore(0, this.maxItems);
    }

    public int Size
    {
        get
        {
            return this.pushIncrement - this.popIncrement;
        }
    }

    public bool Push(T value)
    {
        lock (this.pushLocker)
        {
            if (this.Size >= this.maxItems)
            {
                lock (this.popLocker)
                {
                    this.pushIncrement = this.pushIncrement - this.popIncrement;
                    this.popIncrement = 0;
                    return false;
                }
            }

            Node newNode = new Node(value, this.lastNode.NextNode);

            this.lastNode = new Node(this.lastNode.Value, newNode);
            this.firstNode = new Node(null, newNode);
            this.pushIncrement++;
            this.queueSemaphore.Release();
            return true;
        }
    }

    public T Pop()
    {
        this.queueSemaphore.WaitOne();
        lock (this.popLocker)
        {
            Node tempNext = this.firstNode.NextNode;
            T value = tempNext.Value;

            this.firstNode = tempNext;
            this.popIncrement++;
            return value;
        }
    }

    private sealed class Node
    {
        private readonly T value;

        private readonly Node nextNode;

        public Node()
        {
        }

        public Node(T value, Node nextNode)
        {
            this.value = value;
            this.nextNode = nextNode;
        }

        public T Value
        {
            get
            {
                return this.value;
            }
        }

        public Node NextNode
        {
            get
            {
                return this.nextNode;
            }
        }
    }
}
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top