Очередь производителя-потребителя не утилизирует
-
28-09-2019 - |
Вопрос
я создал очередь производителя-потребителя, заключающую в себе ConcurrentQueue .net 4.0 с сигнализацией SlimManualResetEvent между производящим (Enqueue) и потребляющим (while (true) потоком на основе.очередь выглядит следующим образом:
public class ProducerConsumerQueue<T> : IDisposable, IProducerConsumerQueue<T>
{
private bool _IsActive=true;
public int Count
{
get
{
return this._workerQueue.Count;
}
}
public bool IsActive
{
get { return _IsActive; }
set { _IsActive = value; }
}
public event Dequeued<T> OnDequeued = delegate { };
public event LoggedHandler OnLogged = delegate { };
private ConcurrentQueue<T> _workerQueue = new ConcurrentQueue<T>();
private object _locker = new object();
Thread[] _workers;
#region IDisposable Members
int _workerCount=0;
ManualResetEventSlim _mres = new ManualResetEventSlim();
public void Dispose()
{
_IsActive = false;
_mres.Set();
LogWriter.Write("55555555555");
for (int i = 0; i < _workerCount; i++)
// Wait for the consumer's thread to finish.
{
_workers[i].Join();
}
LogWriter.Write("6666666666");
// Release any OS resources.
}
public ProducerConsumerQueue(int workerCount)
{
try
{
_workerCount = workerCount;
_workers = new Thread[workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(_workers[i] = new Thread(Work)).Start();
}
catch (Exception ex)
{
OnLogged(ex.Message + ex.StackTrace);
}
}
#endregion
#region IProducerConsumerQueue<T> Members
public void EnqueueTask(T task)
{
if (_IsActive)
{
_workerQueue.Enqueue(task);
//Monitor.Pulse(_locker);
_mres.Set();
}
}
public void Work()
{
while (_IsActive)
{
try
{
T item = Dequeue();
if (item != null)
OnDequeued(item);
}
catch (Exception ex)
{
OnLogged(ex.Message + ex.StackTrace);
}
}
}
#endregion
private T Dequeue()
{
try
{
T dequeueItem;
//if (_workerQueue.Count > 0)
//{
_workerQueue.TryDequeue(out dequeueItem);
if (dequeueItem != null)
return dequeueItem;
//}
if (_IsActive)
{
_mres.Wait();
_mres.Reset();
}
//_workerQueue.TryDequeue(out dequeueItem);
return dequeueItem;
}
catch (Exception ex)
{
OnLogged(ex.Message + ex.StackTrace);
T dequeueItem;
//if (_workerQueue.Count > 0)
//{
_workerQueue.TryDequeue(out dequeueItem);
return dequeueItem;
}
}
public void Clear()
{
_workerQueue = new ConcurrentQueue<T>();
}
}
}
при вызове Dispose он иногда блокируется при соединении (потребляет один поток), и метод dispose застревает.я предполагаю, что это зависает при ожидании resetEvents, но для этого я вызываю set в dispose.есть какие-нибудь предложения?
Решение
Обновлять: Я понимаю вашу точку зрения о том, чтобы нуждаться в очередь внутренне. Мое предложение использовать BlockingCollection<T>
основан на том факте, что ваш код содержит много логики для обеспечения блокирующего поведения. Написание такой логики самостоятельно очень склонна к ошибкам (я знаю это из опыта); Таким образом, когда в рамках существующего класса в рамках, по крайней мере, некоторый Работа для вас, как правило, предпочтительнее идти с этим.
Полный пример того, как вы можете реализовать этот класс, используя BlockingCollection<T>
немного слишком большой, чтобы включить в этот ответ, поэтому я разместил работу Пример на pastebin.com.; Не стесняйтесь взглянуть и посмотреть, что вы думаете.
Я также написал пример программы, демонстрируя вышеприведенный пример здесь.
Мой код правильно? Я бы не сказал да с слишком много уверенности; В конце концов, я не написал тесты на подразделение, запускающуюся на нее никакой диагностики и т. Д. Это просто базовая тяга, чтобы дать вам идею, как использовать BlockingCollection<T>
вместо ConcurrentQueue<T>
очищает много вашей логики (на мой взгляд) и облегчает сосредоточенность на главном цель вашего класса (потребляющие предметы из очереди и уведомления абонентов), а не несколько сложный аспект его реализация (блокирующее поведение внутренней очереди).
Вопрос представляется в комментарии:
Какая-либо причина, по которой вы не используете
BlockingCollection<T>
?
Ваш ответ:
...] Мне нужна очередь.
От MSDN Документация на конструктор по умолчанию для BlockingCollection<T>
класс:
Основная коллекция по умолчанию является
ConcurrentQueue<T>
.
Если то Только Причина вы решили реализовать свой собственный класс вместо использования BlockingCollection<T>
Это то, что вам нужна очередь ФИФО, ну тогда ... Вы можете переосмыслить ваше решение. А. BlockingCollection<T>
Суммировано с использованием по умолчанию PARMEMERLELLE CONDROCTOR является очередь ФИФО.
Тем не менее, я не думаю, что я могу предложить комплексный анализ кода, который вы разместили, я могу хотя бы предложить пару указателей:
- Я был бы очень нерешительным использовать мероприятия таким образом, чтобы вы находитесь здесь для класса, который касается такого сложного многопотативного поведения. Код вызова может прикрепить любые обработчики событий, которые он хочет, и они могут в свою очередь, бросать исключения (которые вы не поймаете), блок в течение длительного времени или, возможно, даже тупик по причинам полностью за пределами вашего управления - что очень плохо случай блокировки очереди.
- В вашем состоянии гонки в вашем
Dequeue
иDispose
методы.
Посмотрите на эти строки вашего Dequeue
Метод:
if (_IsActive) // point A
{
_mres.Wait(); // point C
_mres.Reset(); // point D
}
И теперь посмотрите на эти две линии от Dispose
:
_IsActive = false;
_mres.Set(); // point B
Допустим, у вас есть три потока, т1, Т.2, и т3. Отказ Т1 и т2 оба в точке А., где каждая проверка _IsActive
и нахожу true
. Отказ Затем Dispose
называется и т3 набор _IsActive
к false
(но т1 и т2 уже прошли смысл А.) а затем достигает точки Преступность, где он называет _mres.Set()
. Отказ Тогда т1 попадает в точку зрения Слияние, переходит на точку Подразделение, и звонит _mres.Reset()
. Отказ Теперь т2 достигает точки Слияние и будет застрять навсегда с тех пор _mres.Set
не будет вызван снова (любая поток выполняет Enqueue
найду _IsActive == false
и вернуться немедленно, а тема выполняется Dispose
уже прошел точку Преступность).
Я был бы счастлив попытаться предложить помощь по решению этой гонки, но я скептически это BlockingCollection<T>
Не на самом деле не именно класс, который вам нужен для этого. Если вы можете предоставить еще несколько информации, чтобы убедить меня, что это не так, может быть, я сделаю другой взгляд.
Другие советы
С тех пор как _IsActive
не помечен как volatile
и там нет никакого lock
при всем доступе каждое ядро может иметь отдельный кэш для этого значения, и этот кэш может никогда не обновляться.Таким образом , маркировка _IsActive
к ложному в Dispose
фактически это не повлияет на все запущенные потоки.
http://igoro.com/archive/volatile-keyword-in-c-memory-model-explained/
private volatile bool _IsActive=true;