Domanda

Per un po 'alla mia azienda abbiamo usato un'implementazione ObjectPool<T> home-grown che fornisce bloccando l'accesso ai suoi contenuti. E 'piuttosto semplice: a. Queue<T>, un object per bloccare, e un AutoResetEvent per segnale ad un "prestito" thread quando viene aggiunto un elemento

La carne della classe è in realtà questi due metodi:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

Abbiamo usato questa e poche altre classi di raccolta al posto di quelli forniti da System.Collections.Concurrent perché stiamo usando .NET 3.5, non 4.0. Ma di recente abbiamo scoperto che, dal momento che stiamo utilizzando reattiva estensioni , abbiamo effettivamente < em> non avere lo spazio dei nomi Concurrent a nostra disposizione (in System.Threading.dll).

Naturalmente, ho pensato che, dal momento BlockingCollection<T> è uno dei core classi del namespace Concurrent, sarebbe probabilmente offrire prestazioni migliori di qualsiasi cosa io o miei compagni di squadra scritto.

Così ho provato a scrivere una nuova implementazione che funziona in modo molto semplice:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

Con mia grande sorpresa, secondo alcuni semplici test (prendendo in prestito / ritorno alla piscina di qualche migliaio di volte da più thread), la nostra implementazione originale batte significativamente BlockingCollection<T> in termini di prestazioni . Entrambi sembrano funzionare correttamente ; è solo che la nostra implementazione originale sembra essere molto più veloce.

La mia domanda:

  1. Perché questo sarebbe? E 'forse perché offerte BlockingCollection<T> una maggiore flessibilità (ho capito che funziona avvolgendo un IProducerConsumerCollection<T> ), che introduce necessariamente sovraccarico delle prestazioni?
  2. E 'solo un uso incauto flat-out della classe BlockingCollection<T>?
  3. Se questo è un uso appropriato di BlockingCollection<T>, sto solo che non utilizzano correttamente? Ad esempio, è l'approccio Take / Add eccessivamente semplicistico, e c'è un modo di gran lunga migliore prestazioni per ottenere la stessa funzionalità?

A meno che qualcuno ha una certa comprensione da offrire in risposta a tale terza questione, sembra che saremo attaccare con la nostra implementazione originale per ora.

È stato utile?

Soluzione

Ci sono un paio di potenziali possibilità, qui.

In primo luogo, BlockingCollection<T> nelle estensioni reattiva è un backport, e non esattamente la stessa della versione finale di .NET 4. Non sarei sorpreso se le prestazioni di questo backport differisce da .NET 4 RTM (anche se non ho profilato questa collezione, in particolare). Gran parte dei esegue TPL migliori in .NET 4 che in .NET 3.5 backport.

Detto questo, avrei il sospetto che l'implementazione verrà fuori eseguire BlockingCollection<T> se si dispone di un singolo thread produttore e un singolo thread consumatore. Con un produttore e un consumatore, il blocco sta per avere un minore impatto sulle prestazioni totali, e l'evento di reset è un mezzo molto efficace di attesa sul lato del consumatore.

Tuttavia, BlockingCollection<T> è stato progettato per consentire a molti produttori discussioni ai dati "accodamento" molto bene. Questo non funzionare bene con l'implementazione, come il conflitto di blocco inizierà a diventare problematico abbastanza rapidamente.

Detto questo, vorrei anche far notare un equivoco qui:

  

... sarebbe probabilmente offrire prestazioni migliori di qualsiasi cosa io o miei compagni di squadra scritto.

Questo spesso non è vero. Le classi di raccolta quadro eseguono solitamente molto bene , ma spesso non sono l'opzione più performante per un determinato scenario. Detto, tendono a funzionare bene pur essendo molto flessibile e molto robusto. Essi tendono spesso a scala molto bene. "Casa-scritto" classi di raccolta collezioni quadro spesso sovraperformare in scenari specifici, ma tendono ad essere problematico quando viene utilizzato in scenari di fuori di quello per cui sono stati progettati specificamente. Ho il sospetto che questa è una di quelle situazioni.

Altri suggerimenti

ho provato BlockingCollection contro una combo ConurrentQueue/AutoResetEvent (simile alla soluzione di OP, ma lockless) in .Net 4, e il secondo combo era così molto più veloce per il mio caso d'uso, che mi piantarono BlockingCollection. Purtroppo questo è stato quasi un anno fa e non sono riuscito a trovare i risultati dei benchmark.

Utilizzando un AutoResetEvent separata non rendere le cose troppo molto più complicato. In effetti, si potrebbe anche astratto via, una volta per tutte, in una BlockingCollectionSlim ....

BlockingCollection si basa internamente su un ConcurrentQueue pure, ma fa alcuni giocoleria supplementare con semafori sottili e cancellazione token , che produce funzionalità aggiuntive, ma ad un costo, anche quando non è utilizzato. Va inoltre notato che BlockingCollection non è sposato a ConcurrentQueue, ma può essere utilizzato con altri implementatori di IProducerConsumerCollection invece pure.


Una, piuttosto ossa nude applicazione illimitata BlockingCollectionSlim:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);

mi sono imbattuto gli stessi problemi di prestazioni con BlockingCollection in .Net 4.7.2 e ho trovato questo post. Il mio caso è MultipleProducers-MultipleConsumers, in particolare le piccole blocchi di dati vengono letti da molte fonti e devono essere trattati da molti filtri. Diversi (Env.ProcessorCount) BlockingCollections sono stati utilizzati e ho finito con un profiler prestazioni che mi diceva che BlockingCollection.GetConsumingEnumerable.MoveNext() mangia più tempo di CPU rispetto filtraggio reale!

Grazie, @Eugene Beresovsky, per il codice. FYI: Sul mio ambiente era quasi due volte più lento di BlockingCollection. Quindi, ecco la mia SpinLocked BlockingCollection:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

E per il codice delle prestazioni critiche Vorrei suggerire di evitare readonly campo modificatore. Si aggiunge un controllo su ogni accesso campo nella IL. Con il seguente codice di prova

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

Il core i5-3210M con 2 core e HT abilitati ho il seguente output:

BlockingCollection     0.0006ms
BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation)
BlockingCollectionSpin 0.0003ms

Quindi, SpinLocked versione è due volte più veloce di .Net BlockingCollection. Ma, vorrei suggerire di usarlo solo! se davvero preferisce prestazione contro la semplicità del codice (e manutenibilità).

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top