Frage

Für eine Weile in meinem Unternehmen haben wir eine home-grown ObjectPool<T> Implementierung verwendet, die seinen Inhalt blockiert den Zugriff bietet. Es ist ziemlich einfach: a. Queue<T>, ein object zu erfassen, und ein AutoResetEvent zu Signal an einen "borgen" Thread, wenn ein Element hinzugefügt

Das Fleisch der Klasse ist wirklich diese beiden Methoden:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

Wir haben diese und einige andere Sammlung Klassen wurden anstelle der von System.Collections.Concurrent bereitgestellt, da wir .NET 3.5, 4.0 nicht verwenden. Aber vor kurzem entdeckten wir, dass da wir verwenden Reactive Extensions wir eigentlich < em> Sie haben den Concurrent Namespace uns zur Verfügung (in System.Threading.dll).

Natürlich, dachte ich, dass da BlockingCollection<T> eine des Kerns Klassen im Concurrent Namespace, wäre es wahrscheinlich eine bessere Leistung bieten als alles, was ich schrieb oder meine Teamkollegen.

So habe ich versucht, eine neue Implementierung zu schreiben, die sehr einfach funktioniert:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

Zu meiner Überraschung nach einigen einfachen Tests (borgen / Rückkehr in den Pool ein paar tausend mal von mehreren Threads), unsere ursprüngliche Implementierung deutlich schlägt BlockingCollection<T> in Bezug auf Leistung . Beide scheinen Arbeit richtig ; es ist nur, dass unsere ursprüngliche Implementierung scheint viel schneller zu sein.

Meine Frage:

  1. Warum wäre das? Ist es vielleicht, weil BlockingCollection<T> bietet eine größere Flexibilität (Ich verstehe, es funktioniert durch Umwickeln ein IProducerConsumerCollection<T> ), die notwendigerweise Performance-Overhead führt?
  2. Ist das nur eine flat-out verfehlte Verwendung der BlockingCollection<T> Klasse?
  3. Wenn dies eine angemessene Verwendung von BlockingCollection<T> ist, bin ich mit nur nicht richtig? Zum Beispiel ist der Take / Add Ansatz zu einfach, und es gibt einen weit bessere Leistung Weg, um die gleiche Funktionalität zu erhalten?

Es sei denn, jemand einen kleinen Einblick zu bieten als Antwort auf diese dritte Frage hat, sieht es aus wie wir jetzt mit unserer ursprünglichen Implementierung kleben werden.

War es hilfreich?

Lösung

Es gibt ein paar potentiellen Möglichkeiten, hier.

Als erstes BlockingCollection<T> in dem Reactive Extensions ist ein Backport, und nicht genau das gleiche wie die .NET 4 endgültige Version. Ich würde nicht, wenn die Leistung dieses Backport unterscheidet sich von .NET 4 RTM überrascht sein (obwohl ich diese Sammlung nicht profiliert haben, speziell). Ein großer Teil der TPL eine bessere Leistung in .NET 4 als in dem .NET 3.5 Rückportierung.

Das wird gesagt, würde ich Ihre Implementierung wird vermuten BlockingCollection<T> aus-führen, wenn Sie einen einzigen Hersteller Faden und einen einzelnen Verbraucher Gewinde haben. Mit einem Hersteller und einem Verbraucher, Ihr Schloss wird einen geringeren Einfluss auf die Gesamtleistung haben, und das Reset-Ereignis ist ein sehr wirksames Mittel, auf der Verbraucherseite warten.

Allerdings BlockingCollection<T> ist so konzipiert, sehr gut viele Produzenten-Threads auf „enqueue“ Daten zu ermöglichen. Das wird nicht gut mit Ihrer Implementierung durchführen, da der Sperr Streit beginnt problematisch zu werden ziemlich schnell.

That being said, ich möchte auch darauf hinweisen, ein Missverständnis hier:

  

... es würde bieten wahrscheinlich eine bessere Leistung als alles, was ich oder meine Teamkollegen geschrieben haben.

Dies ist oft nicht wahr. Die Kollektionsrahmenklassen führen typischerweise sehr gut , ist aber oft nicht die performant Option für ein bestimmtes Szenario. That being said, neigen sie gut auszuführen, während sehr flexibel und sehr robust zu sein. Sie neigen dazu, oft sehr gut auf Skala. „Home-geschrieben“ Collection-Klassen oft outperform Rahmen Sammlungen in bestimmten Szenarien, sondern neigen dazu, problematisch zu sein, wenn sie in Szenarien außerhalb des einen verwendet, für die sie speziell entwickelt wurden. Ich vermute, das eine dieser Situationen ist.

Andere Tipps

Ich habe versucht BlockingCollection gegen eine ConurrentQueue/AutoResetEvent Combo (ähnlich OP-Lösung, aber lockless) in .NET 4 und diese Combo war so viel schneller für meinen Anwendungsfall, dass ich Blocking ditched. Leider war dies vor fast einem Jahr, und ich konnte die Benchmark-Ergebnisse nicht gefunden.

eine separate Autoreset Verwendung nicht die Dinge zu sehr kompliziert mehr. In der Tat, man könnte sogar abstrakt es weg, ein für allemal, in eine BlockingCollectionSlim ....

Blocking setzt intern auf einem ConcurrentQueue als gut, aber tut einige zusätzliche Jonglieren mit schlank Semaphore und Stornierung Token , die zusätzliche Funktionen, aber auf Kosten ergibt, auch wenn sie nicht verwendet wird. Es sollte auch beachtet werden, dass Blocking nicht ConcurrentQueue verheiratet ist, kann aber mit anderen Implementierer IProducerConsumerCollection stattdessen ebenfalls verwendet werden.


Eine unbegrenzte, ziemlich nackte Knochen BlockingCollectionSlim Umsetzung:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);

Ich kam über die gleichen Performance-Probleme mit Blocking in .Net 4.7.2 und diesen Beitrag vorhanden. Mein Fall ist MultipleProducers-MultipleConsumers, insbesondere kleine Datenblöcke aus vielen Quellen gelesen werden und sollten durch viele Filter verarbeitet werden. Mehrere (Env.ProcessorCount) BlockingCollections verwendet wurde, und ich mit einem Performance-Profiler am Ende mir, dass BlockingCollection.GetConsumingEnumerable.MoveNext() eats mehr CPU-Zeit als die tatsächliche Filterung zu sagen!

Danke, @Eugene Beresovsky, für Ihren Code. Zur Info: Bei meiner Umgebung war es fast zweimal langsamer als Blocking. So, hier ist mein SpinLocked Blocking:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

Und für leistungskritische Code würde ich vorschlagen, readonly Feld Modifikator zu vermeiden. Es fügt eine Überprüfung auf jedem Feld Zugang in der IL. Mit dem folgenden Testcode

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

Ein Kern i5-3210M mit zwei Kernen und HT aktiviert Ich habe die folgende Ausgabe bekommen:

BlockingCollection     0.0006ms
BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation)
BlockingCollectionSpin 0.0003ms

So, SpinLocked Version ist zweimal schneller als .Net BlockingCollection. Aber ich würde es nur vorschlagen, zu verwenden! wenn Sie wirklich Leistung gegen Code Einfachheit bevorzugen (und Wartbarkeit).

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top