Запрос на отзыв: резьбовая безопасная очередь (параллельный push pop)
-
30-09-2019 - |
Вопрос
Вот реализация потоковой безопасной очереди. Push и Pop не блокируют друг друга. Однако поп будет ждать, пока элемент не будет нажата, если очередь пуста. Несколько производителей и потребителей могут использовать очередь. Пожалуйста, скажите мне, если вы видите какие-либо проблемы.
Обновление: отредактировано согласно ответам 1) решил проблему «очереди полной» ситуации. 2) есть BlockingCollection<T>
а также ConcurrentQueue<T>
в .NET4. Так что нет необходимости изобретать колесо (для .NET4)
public class CustomQueue<T> where T: class
{
class Node
{
public Node()
{
Value = null;
NextNode = null;
}
public Node(T value)
{
Value = value;
NextNode = null;
}
public T Value;
public Node NextNode;
}
object PushLocker = new object();
object PopLocker = new object();
Semaphore QueueSemaphore;
volatile int PushIncrement;
volatile int PopIncrement;
int MaxItems;
Node FirstNode;
Node LastNode;
public CustomQueue(int maxItems)
{
QueueSemaphore = new Semaphore(0, maxItems);
MaxItems = maxItems;
FirstNode = LastNode = new Node();
PushIncrement = 0;
PopIncrement = 0;
}
public int Size()
{
return PushIncrement - PopIncrement;
}
public bool Push(T value)
{
lock(PushLocker)
{
if((Size()) >= MaxItems)
{
lock(PopLocker)
{
PushIncrement = PushIncrement - PopIncrement;
PopIncrement = 0;
return false;
}
}
Node newNode = new Node(value);
LastNode.NextNode = newNode;
LastNode = newNode;
PushIncrement++;
QueueSemaphore.Release();
return true;
}
}
public T Pop()
{
QueueSemaphore.WaitOne();
lock(PopLocker)
{
Node tempFirst = FirstNode;
Node tempNext = FirstNode.NextNode;
T value = tempNext.Value;
tempNext.Value = null;
FirstNode = tempNext;
PopIncrement++;
return value;
}
}
}
Решение
Это похоже на хорошую реализацию с первого взгляда. Использование различных блокировков - это всегда красный флаг для меня, поэтому я хорошо посмотрел на немного краевых случаев с участием одновременно вызывает Pop
а также Push
И это выглядит безопасно. Я подозреваю, что вы, вероятно, воспитывали себя на подключенном списке, реализация блокирующей очереди HUH? Причина, по которой это безопасно, это потому, что вы только везде LastNode
от Push
а также FirstNode
от Pop
В противном случае вся хитрость упадет.
Единственное, что придерживается у меня прямо сейчас, это то, что когда вы пытаетесь освободить счет от Semaphore
Он будет выбрасывать исключение, если оно уже полно, чтобы вы могли захотеть охранять против этого.1 В противном случае вы получите дополнительные узлы в связанном списке, а очередь будет 1) иметь больше, чем максимальное количество элементов и 2), он будет заблокирован в прямом эфире.
Обновлять:
Я дал это еще несколько мыслей. Это Release
звонить в Push
Метод будет очень проблематично. Я вижу только одно возможное решение. Удалить maxItems
параметр от конструктора и позвольте семафору рассчитывать до Int.MaxValue
. Отказ В противном случае вам придется радикально изменить ваш подход, приводящий к реализации, который нет, где рядом с тем, что у вас есть.
1Я думаю, что вы обнаружите, что это будет сложнее, чем то, что вы можете сразу реализовать.
Другие советы
1.
Рассмотрите возможность добавления второго конструктора узла:
public Node(T value)
{
Value = value;
}
Тогда ваш код клиента:
Node newNode = new Node();
newNode.Value = value;
может относиться к значению как инвариант:
Node newNode = new Node(value);
2.
Затем сделайте свои публичные поля:
public T Value;
public Node NextNode;
в автоматические свойства:
public T Value { get; private set; };
public Node NextNode { get; set; };
Таким образом, вы можете абстрактное использование от реализации и добавления проверки, другой обработки и т. Д. После факта с минимальным нарушением к коду клиента.
Если вы делаете это для самообразования, отлично - в противном случае BlockingCollection<T>
или ConcurrentQueue<T>
хорошие альтернативы.
Одна проблема, которую я вижу вот то, что нет способа прерываться Pop
Как только он запускается - предполагает, что объект ждет, когда он пробудит. Как бы вы очистили это о расторжении? А. TryPop
Это возвращает true
(с элементом) или false
(Если никаких данных не может быть лучше, тогда вы можете сигнализировать ждущие потоки для очистки, когда очередь будет сливаться.
Смотрите это, если у вас есть .NET 4
Как я фанат неизменных объектов, вот альтернатива моему предыдущему ответу, что я бы рассмотрел немного очистителя:
public sealed class CustomQueue<T> where T : class
{
private readonly object pushLocker = new object();
private readonly object popLocker = new object();
private readonly Semaphore queueSemaphore;
private readonly int maxItems;
private volatile int pushIncrement;
private volatile int popIncrement;
private Node firstNode = new Node();
private Node lastNode;
public CustomQueue(int maxItems)
{
this.maxItems = maxItems;
this.lastNode = this.firstNode;
this.queueSemaphore = new Semaphore(0, this.maxItems);
}
public int Size
{
get
{
return this.pushIncrement - this.popIncrement;
}
}
public bool Push(T value)
{
lock (this.pushLocker)
{
if (this.Size >= this.maxItems)
{
lock (this.popLocker)
{
this.pushIncrement = this.pushIncrement - this.popIncrement;
this.popIncrement = 0;
return false;
}
}
Node newNode = new Node(value, this.lastNode.NextNode);
this.lastNode = new Node(this.lastNode.Value, newNode);
this.firstNode = new Node(null, newNode);
this.pushIncrement++;
this.queueSemaphore.Release();
return true;
}
}
public T Pop()
{
this.queueSemaphore.WaitOne();
lock (this.popLocker)
{
Node tempNext = this.firstNode.NextNode;
T value = tempNext.Value;
this.firstNode = tempNext;
this.popIncrement++;
return value;
}
}
private sealed class Node
{
private readonly T value;
private readonly Node nextNode;
public Node()
{
}
public Node(T value, Node nextNode)
{
this.value = value;
this.nextNode = nextNode;
}
public T Value
{
get
{
return this.value;
}
}
public Node NextNode
{
get
{
return this.nextNode;
}
}
}
}