Frage

ich habe einen Producer Consumer Warteschlange Einwickeln eine ConcurrentQueue von .net gebaut 4.0 mit SlimManualResetEvent Signalisierung zwischen der Herstellung (Enqueue) und dem Verbraucher (while (true) Gewinde basiert. die Warteschlange sieht aus wie:

public class ProducerConsumerQueue<T> : IDisposable, IProducerConsumerQueue<T>
{
    private bool _IsActive=true;

    public int Count
    {
        get
        {
            return this._workerQueue.Count;
        }
    }

    public bool IsActive
    {
        get { return _IsActive; }
        set { _IsActive = value; }
    }

    public event Dequeued<T> OnDequeued = delegate { };
    public event LoggedHandler OnLogged = delegate { };

    private ConcurrentQueue<T> _workerQueue = new ConcurrentQueue<T>();

    private object _locker = new object();

    Thread[] _workers;

    #region IDisposable Members

    int _workerCount=0;

    ManualResetEventSlim _mres = new ManualResetEventSlim();

    public void Dispose()
    {
        _IsActive = false;

        _mres.Set();

        LogWriter.Write("55555555555");

          for (int i = 0; i < _workerCount; i++)
          // Wait for the consumer's thread to finish.
          {
             _workers[i].Join();        
          }
           LogWriter.Write("6666666666");
     // Release any OS resources.
    }
    public ProducerConsumerQueue(int workerCount)
    {
        try
        {
            _workerCount = workerCount;
            _workers = new Thread[workerCount];
            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Work)).Start();
        }
        catch (Exception ex)
        {
            OnLogged(ex.Message + ex.StackTrace);
        }

    }
    #endregion

    #region IProducerConsumerQueue<T> Members

    public void EnqueueTask(T task)
    {
        if (_IsActive)
        {
            _workerQueue.Enqueue(task);
            //Monitor.Pulse(_locker);
            _mres.Set();
        }
    }

    public void Work()
    {
      while (_IsActive)
      {
          try
          {
              T item = Dequeue();
              if (item != null)
                  OnDequeued(item);
          }
          catch (Exception ex)
          {
              OnLogged(ex.Message + ex.StackTrace);
          }              
      }
    }

    #endregion
    private T Dequeue()
    {
        try
        {
            T dequeueItem;
            //if (_workerQueue.Count > 0)
            //{
            _workerQueue.TryDequeue(out dequeueItem);
            if (dequeueItem != null)
                return dequeueItem;
            //}
            if (_IsActive)
            {
                _mres.Wait();
                _mres.Reset();
            }
            //_workerQueue.TryDequeue(out dequeueItem);
            return dequeueItem;
        }
        catch (Exception ex)
        {
            OnLogged(ex.Message + ex.StackTrace);
            T dequeueItem;
            //if (_workerQueue.Count > 0)
            //{
            _workerQueue.TryDequeue(out dequeueItem);
            return dequeueItem;
        }

    }


    public void Clear()
    {
        _workerQueue = new ConcurrentQueue<T>();
    }
}

}

, wenn Entsorgen nannte es manchmal Blöcke auf dem Join (ein Thread raubend) und die dispose-Methode ist verklemmt. Ich denke, es zu bekommen ist auf dem Warten der resetEvents stecken aber, dass ich das Set auf der dispose nennen. irgendwelche Vorschläge?

War es hilfreich?

Lösung

Aktualisieren : Ich verstehe Ihren Standpunkt über eine Warteschlange intern benötigen. Mein Vorschlag eine BlockingCollection<T> verwenden beruht auf der Tatsache, dass Ihr Code eine Menge Logik enthält das Sperrverhalten bereitzustellen. Schreiben eine solche Logik selbst ist sehr anfällig für Fehler (ich weiß das aus Erfahrung); so, wenn es eine vorhandene Klasse im Rahmen, der zumindest tut einig der Arbeit für Sie, ist es im Allgemeinen bevorzugt, damit zu gehen.

Ein vollständiges Beispiel dafür, wie Sie diese Klasse implementieren können eine BlockingCollection<T> verwendet, ist ein bisschen zu groß, in dieser Antwort zu schließen, so dass ich eine Arbeits Beispiel auf pastebin.com ; fühlen sich frei, um einen Blick zu nehmen und sehen, was Sie denken.

Ich schrieb auch ein Beispielprogramm das obige Beispiel hier rel="nofollow">.

Ist mein Code korrekt? Ich würde nicht sagen, ja mit zu viel Vertrauen; Immerhin habe ich läuft nicht Unit-Tests geschrieben, keine Diagnose darauf, etc. Es ist nur ein grundlegender Entwurf Ihnen eine Vorstellung zu geben, wie BlockingCollection<T> statt ConcurrentQueue<T> aufräumt viel Ihrer Logik (meiner Meinung nach) und macht es leichter auf der Haupt Ziel konzentrieren Ihre Klasse (Elemente aus einer Warteschlange raubend und Abonnent Benachrichtigung), anstatt ein etwas schwieriger Aspekte seines Implementierung (Sperrverhalten der internen Warteschlange).


Frage in einem Kommentar gefragt:

  

Gibt es Gründe, die Sie nicht BlockingCollection<T> mit?

Ihre Antwort:

  

[...] brauchte ich eine Warteschlange.

Von der MSDN-Dokumentation auf dem Standard-Konstruktor für die Klasse BlockingCollection<T> :

  

Die Standardsammlung zugrunde liegt, ist ein ConcurrentQueue<T> .

Wenn die nur Grund, warum Sie entschieden sich Ihre eigene Klasse zu implementieren, anstatt BlockingCollection<T> zu verwenden, ist, dass Sie eine FIFO-Warteschlange müssen, auch dann ... Sie könnten Ihre Entscheidung zu überdenken wollen. Ein BlockingCollection<T> instanziiert die Standardparameterlos Konstruktor mit ist ein FIFO-Warteschlange.

Das heißt, während das glaube ich nicht, dass ich eine umfassende Analyse des Codes bieten Ihnen geschrieben habe, ich kann zumindest ein paar Hinweise bieten:

  1. würde ich sehr zögerlich Ereignisse in der Art und Weise zu verwenden, dass Sie hier für eine Klasse sind die sich mit so heikel multithreaded Verhalten. Telefonvorwahl können alle Event-Handler anhängen es will, und diese können wiederum werfen Ausnahmen (die Sie nicht fangen tun), Block für längere Zeit oder möglicherweise sogar aus Gründen Deadlock völlig außerhalb Ihrer Kontrolle - was sehr schlecht in ist der Fall einer Blockierung Warteschlange.
  2. Es gibt eine Race-Bedingung in Ihren Dequeue und Dispose Methoden.

Sehen sie sich diese Zeilen Ihrer Dequeue Methode:

if (_IsActive) // point A
{
    _mres.Wait(); // point C
    _mres.Reset(); // point D
}

Und nun einen Blick auf diesen beiden Linien von Dispose:

_IsActive = false;

_mres.Set(); // point B

Angenommen, Sie haben drei Threads, T 1 , T 2 und T 3 . T 1 und T 2 sind beide an dem Punkt A , wobei jeweils überprüft _IsActive und Funde true. Dann wird Dispose genannt, und T 3 Sätze _IsActive zu false (aber T 1 und T 2 sind bereits vergangen Punkt A ) und dann Punkt erreicht B , wo es nennt _mres.Set(). Dann T 1 zu Punkt bekommt C , auf Züge zu Punkt D und fordern _mres.Reset(). Jetzt T 2 erreicht Punkt C und wird für immer da _mres.Set stecken wird not wieder aufgerufen werden (jeder Thread ausgeführt wird Enqueue wird _IsActive == false finden und sofort zurück, und der Faden Ausführung Dispose bereits Punkt überschritten B ).

Ich würde gerne etwas Hilfe bei der Lösung dieser Racebedingung, um zu versuchen und bieten, aber ich bin skeptisch, dass BlockingCollection<T> ist nicht in der Tat genau die Klasse, die Sie dafür brauchen. Wenn Sie weitere Informationen zur Verfügung stellen können, mich zu überzeugen, dass dies nicht der Fall ist, werde ich vielleicht noch einen Blick.

Andere Tipps

Da _IsActive nicht als volatile markiert und es gibt keinen lock um allen Zugang kann jeder Kern für diesen Wert eine separate Cache hat und dass Cache nie aktualisiert bekommen. So Markierung _IsActive auf false in Dispose wird nicht wirklich alle laufenden Fäden beeinflussen.

http://igoro.com/archive/volatile -keyword-in-c-Speicher-Modell erläuterte /

private volatile bool _IsActive=true;
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top