Question

Pendant un certain temps dans mon entreprise, nous avons utilisé une implémentation ObjectPool<T>-Home Grown qui permet de bloquer l'accès à son contenu. Il est assez simple:. Un Queue<T>, un de object pour verrouiller et un AutoResetEvent de signal à un fil « emprunt » lorsqu'un élément est ajouté

La viande de la classe est vraiment ces deux méthodes:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

Nous avons utilisé cela et quelques autres classes de collection au lieu de ceux fournis par System.Collections.Concurrent parce que nous utilisons .NET 3.5, pas 4.0. Mais récemment, nous avons découvert que, puisque nous utilisons réactive Extensions , nous avons en fait < em> ne ont l'espace de noms Concurrent disponible pour nous (en System.Threading.dll).

Naturellement, je compris que depuis BlockingCollection<T> est un noyau classes dans l'espace de noms Concurrent, il offrirait probablement de meilleures performances que tout ce que moi ou mes coéquipiers écrit.

J'ai donc essayé d'écrire une nouvelle implémentation qui fonctionne très simplement:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

À ma grande surprise, selon quelques tests simples (emprunt / retour à la piscine quelques milliers de fois de plusieurs threads), notre implémentation originale bat de manière significative BlockingCollection<T> en termes de performance . Ils semblent tous deux au travail correctement ; il est juste que notre mise en œuvre originale semble être beaucoup plus rapide.

Ma question:

  1. Pourquoi serait-ce? Est-ce peut-être parce BlockingCollection<T> offre une plus grande flexibilité (je comprends cela fonctionne en enroulant un IProducerConsumerCollection<T> ), qui introduit nécessairement les frais généraux de performance?
  2. Est-ce juste une utilisation peu judicieuse plat de la classe BlockingCollection<T>?
  3. Si cela est une utilisation appropriée de BlockingCollection<T>, suis-je tout simplement pas correctement à l'aide? Par exemple, est le Take / approche Add trop simpliste, et il y a un moyen beaucoup plus performants pour obtenir les mêmes fonctionnalités?

À moins que quelqu'un a une idée de l'offre en réponse à cette troisième question, il semble que nous allons collons avec notre mise en œuvre originale pour l'instant.

Était-ce utile?

La solution

Il y a deux possibilités potentielles, ici.

D'abord, BlockingCollection<T> dans les réactifs Extensions est un backport, et pas exactement le même que le .NET 4 version finale. Je ne serais pas surpris si la performance de celle-ci diffère de Backport de .NET 4 RTM (bien que je ne l'ai pas profilée cette collection, en particulier). Une grande partie des préformes de TPL mieux dans .NET 4 que dans le backport Framework 3.5.

Cela étant dit, je soupçonne votre mise en œuvre surclasser BlockingCollection<T> si vous avez un seul fil de producteur et un seul fil à la consommation. Avec un producteur et un consommateur, votre serrure va avoir un impact moindre sur la performance totale, et l'événement de remise à zéro est un moyen très efficace d'attendre du côté des consommateurs.

Cependant, BlockingCollection<T> est conçu pour permettre à de nombreux fils de producteurs de données « enqueue » très bien. Cela ne fonctionnera pas bien avec votre mise en œuvre, comme l'affirmation de verrouillage va commencer à devenir problématique assez rapidement.

Cela étant dit, je voudrais aussi signaler une idée fausse ici:

  

... il offrirait probablement de meilleures performances que tout ce que moi ou mes coéquipiers écrit.

Ceci est souvent vrai. Les classes de collection cadres effectuent généralement très bien , mais ne sont souvent pas la solution la plus performante pour un scénario donné. Cela étant dit, ils ont tendance à bien performer tout en étant très souple et très robuste. Ils ont souvent tendance à l'échelle très bien. « Home-écrit » classes de collection de collections cadres souvent surperformer dans des scénarios spécifiques, mais ont tendance à être problématique quand il est utilisé dans des scénarios en dehors de celui pour lequel ils ont été spécialement conçus. Je soupçonne que c'est l'une de ces situations.

Autres conseils

J'ai essayé BlockingCollection contre un combo ConurrentQueue/AutoResetEvent (similaire à la solution de l'OP, mais lockless) en .Net 4, et ce dernier combo était si beaucoup plus rapide pour mon cas d'utilisation, que j'abandonné BlockingCollection. Malheureusement, ce fut presque il y a un an et je ne pouvais pas trouver les résultats de référence.

L'utilisation d'un AutoResetEvent séparé ne rend pas les choses trop compliquées. En fait, on pourrait même abstraite loin, une fois pour toutes, en BlockingCollectionSlim ....

BlockingCollection repose en interne sur un ConcurrentQueue aussi, mais fait jongler supplémentaire sémaphores mince et annulation des jetons , ce qui donne des fonctionnalités supplémentaires, mais à un coût, même lorsqu'ils ne sont pas utilisés. Il convient également de noter que BlockingCollection est pas marié à ConcurrentQueue, mais peut être utilisé avec d'autres implementors de IProducerConsumerCollection au lieu ainsi.


Un non borné, les os assez nus mise en œuvre BlockingCollectionSlim:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);

Je suis tombé sur les mêmes problèmes de performance avec BlockingCollection en .Net 4.7.2 et a trouvé ce poste. Mon cas est MultipleProducers-MultipleConsumers, en petits morceaux de données particulières sont lues à partir de nombreuses sources et doivent être traitées par de nombreux filtres. Plusieurs (Env.ProcessorCount) BlockingCollections ont été utilisés et j'ai fini avec un profileur de performance me disant que plus de temps BlockingCollection.GetConsumingEnumerable.MoveNext() Eats CPU que le filtrage réel!

Merci, @Eugene Beresovsky, votre code. Pour votre information: Sur mon environnement, il a été presque deux fois plus lent que BlockingCollection. Donc, voici mon SpinLocked BlockingCollection:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

Et pour le code de performance critiques Je suggère d'éviter modificateur de champ readonly. Il ajoute un contrôle sur tous les accès aux champs dans l'IL. Avec le code de test suivant

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

On Core i5-3210M avec 2 noyaux et HT ont permis que j'ai la sortie suivante:

BlockingCollection     0.0006ms
BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation)
BlockingCollectionSpin 0.0003ms

Ainsi, la version SpinLocked est deux fois plus rapide que .Net BlockingCollection. Mais, je vous conseille de l'utiliser seulement! si vous préférez vraiment la performance contre la simplicité du code (et maintenabilité).

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top