Вопрос

я создал очередь производителя-потребителя, заключающую в себе ConcurrentQueue .net 4.0 с сигнализацией SlimManualResetEvent между производящим (Enqueue) и потребляющим (while (true) потоком на основе.очередь выглядит следующим образом:

public class ProducerConsumerQueue<T> : IDisposable, IProducerConsumerQueue<T>
{
    private bool _IsActive=true;

    public int Count
    {
        get
        {
            return this._workerQueue.Count;
        }
    }

    public bool IsActive
    {
        get { return _IsActive; }
        set { _IsActive = value; }
    }

    public event Dequeued<T> OnDequeued = delegate { };
    public event LoggedHandler OnLogged = delegate { };

    private ConcurrentQueue<T> _workerQueue = new ConcurrentQueue<T>();

    private object _locker = new object();

    Thread[] _workers;

    #region IDisposable Members

    int _workerCount=0;

    ManualResetEventSlim _mres = new ManualResetEventSlim();

    public void Dispose()
    {
        _IsActive = false;

        _mres.Set();

        LogWriter.Write("55555555555");

          for (int i = 0; i < _workerCount; i++)
          // Wait for the consumer's thread to finish.
          {
             _workers[i].Join();        
          }
           LogWriter.Write("6666666666");
     // Release any OS resources.
    }
    public ProducerConsumerQueue(int workerCount)
    {
        try
        {
            _workerCount = workerCount;
            _workers = new Thread[workerCount];
            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Work)).Start();
        }
        catch (Exception ex)
        {
            OnLogged(ex.Message + ex.StackTrace);
        }

    }
    #endregion

    #region IProducerConsumerQueue<T> Members

    public void EnqueueTask(T task)
    {
        if (_IsActive)
        {
            _workerQueue.Enqueue(task);
            //Monitor.Pulse(_locker);
            _mres.Set();
        }
    }

    public void Work()
    {
      while (_IsActive)
      {
          try
          {
              T item = Dequeue();
              if (item != null)
                  OnDequeued(item);
          }
          catch (Exception ex)
          {
              OnLogged(ex.Message + ex.StackTrace);
          }              
      }
    }

    #endregion
    private T Dequeue()
    {
        try
        {
            T dequeueItem;
            //if (_workerQueue.Count > 0)
            //{
            _workerQueue.TryDequeue(out dequeueItem);
            if (dequeueItem != null)
                return dequeueItem;
            //}
            if (_IsActive)
            {
                _mres.Wait();
                _mres.Reset();
            }
            //_workerQueue.TryDequeue(out dequeueItem);
            return dequeueItem;
        }
        catch (Exception ex)
        {
            OnLogged(ex.Message + ex.StackTrace);
            T dequeueItem;
            //if (_workerQueue.Count > 0)
            //{
            _workerQueue.TryDequeue(out dequeueItem);
            return dequeueItem;
        }

    }


    public void Clear()
    {
        _workerQueue = new ConcurrentQueue<T>();
    }
}

}

при вызове Dispose он иногда блокируется при соединении (потребляет один поток), и метод dispose застревает.я предполагаю, что это зависает при ожидании resetEvents, но для этого я вызываю set в dispose.есть какие-нибудь предложения?

Это было полезно?

Решение

Обновлять: Я понимаю вашу точку зрения о том, чтобы нуждаться в очередь внутренне. Мое предложение использовать BlockingCollection<T> основан на том факте, что ваш код содержит много логики для обеспечения блокирующего поведения. Написание такой логики самостоятельно очень склонна к ошибкам (я знаю это из опыта); Таким образом, когда в рамках существующего класса в рамках, по крайней мере, некоторый Работа для вас, как правило, предпочтительнее идти с этим.

Полный пример того, как вы можете реализовать этот класс, используя BlockingCollection<T> немного слишком большой, чтобы включить в этот ответ, поэтому я разместил работу Пример на pastebin.com.; Не стесняйтесь взглянуть и посмотреть, что вы думаете.

Я также написал пример программы, демонстрируя вышеприведенный пример здесь.

Мой код правильно? Я бы не сказал да с слишком много уверенности; В конце концов, я не написал тесты на подразделение, запускающуюся на нее никакой диагностики и т. Д. Это просто базовая тяга, чтобы дать вам идею, как использовать BlockingCollection<T> вместо ConcurrentQueue<T> очищает много вашей логики (на мой взгляд) и облегчает сосредоточенность на главном цель вашего класса (потребляющие предметы из очереди и уведомления абонентов), а не несколько сложный аспект его реализация (блокирующее поведение внутренней очереди).


Вопрос представляется в комментарии:

Какая-либо причина, по которой вы не используете BlockingCollection<T>?

Ваш ответ:

...] Мне нужна очередь.

От MSDN Документация на конструктор по умолчанию для BlockingCollection<T> класс:

Основная коллекция по умолчанию является ConcurrentQueue<T>.

Если то Только Причина вы решили реализовать свой собственный класс вместо использования BlockingCollection<T> Это то, что вам нужна очередь ФИФО, ну тогда ... Вы можете переосмыслить ваше решение. А. BlockingCollection<T> Суммировано с использованием по умолчанию PARMEMERLELLE CONDROCTOR является очередь ФИФО.

Тем не менее, я не думаю, что я могу предложить комплексный анализ кода, который вы разместили, я могу хотя бы предложить пару указателей:

  1. Я был бы очень нерешительным использовать мероприятия таким образом, чтобы вы находитесь здесь для класса, который касается такого сложного многопотативного поведения. Код вызова может прикрепить любые обработчики событий, которые он хочет, и они могут в свою очередь, бросать исключения (которые вы не поймаете), блок в течение длительного времени или, возможно, даже тупик по причинам полностью за пределами вашего управления - что очень плохо случай блокировки очереди.
  2. В вашем состоянии гонки в вашем Dequeue и Dispose методы.

Посмотрите на эти строки вашего Dequeue Метод:

if (_IsActive) // point A
{
    _mres.Wait(); // point C
    _mres.Reset(); // point D
}

И теперь посмотрите на эти две линии от Dispose:

_IsActive = false;

_mres.Set(); // point B

Допустим, у вас есть три потока, т1, Т.2, и т3. Отказ Т1 и т2 оба в точке А., где каждая проверка _IsActive и нахожу true. Отказ Затем Dispose называется и т3 набор _IsActive к false (но т1 и т2 уже прошли смысл А.) а затем достигает точки Преступность, где он называет _mres.Set(). Отказ Тогда т1 попадает в точку зрения Слияние, переходит на точку Подразделение, и звонит _mres.Reset(). Отказ Теперь т2 достигает точки Слияние и будет застрять навсегда с тех пор _mres.Set не будет вызван снова (любая поток выполняет Enqueue найду _IsActive == false и вернуться немедленно, а тема выполняется Dispose уже прошел точку Преступность).

Я был бы счастлив попытаться предложить помощь по решению этой гонки, но я скептически это BlockingCollection<T> Не на самом деле не именно класс, который вам нужен для этого. Если вы можете предоставить еще несколько информации, чтобы убедить меня, что это не так, может быть, я сделаю другой взгляд.

Другие советы

С тех пор как _IsActive не помечен как volatile и там нет никакого lock при всем доступе каждое ядро может иметь отдельный кэш для этого значения, и этот кэш может никогда не обновляться.Таким образом , маркировка _IsActive к ложному в Dispose фактически это не повлияет на все запущенные потоки.

http://igoro.com/archive/volatile-keyword-in-c-memory-model-explained/

private volatile bool _IsActive=true;
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top