Pregunta

Durante un tiempo en mi empresa hemos utilizado una implementación ObjectPool<T>-cosecha propia que proporciona el bloqueo del acceso a su contenido. Es bastante sencillo: a. Queue<T>, un object para bloquear en, y un AutoResetEvent a la señal de un hilo "tomar prestado" cuando se añade un elemento

La carne de la clase es en realidad estos dos métodos:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

Hemos estado utilizando esto y algunas otras clases de colección en lugar de los proporcionados por System.Collections.Concurrent porque estamos utilizando .NET 3.5, no 4.0. Sin embargo, recientemente hemos descubierto que ya que estamos utilizando reactiva Extensiones , que en realidad < em> no tener el espacio de nombres Concurrent disponible para nosotros (en System.Threading.dll).

Naturalmente, pensé que desde BlockingCollection<T> es uno del núcleo las clases del espacio de nombres Concurrent, es probable que ofrecen un mejor rendimiento que cualquier cosa que yo o mis compañeros de equipo escribí.

Así que traté de escribir una nueva aplicación que funciona de manera muy sencilla:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

Para mi sorpresa, según algunas pruebas simples (préstamos / volver a la piscina unos pocos miles de veces a partir de múltiples hilos), nuestra implementación original supera significativamente BlockingCollection<T> en términos de rendimiento . Ambos parecen funcionar correctamente ; es sólo que nuestra implementación original parece ser mucho más rápido.

Mi pregunta:

  1. ¿Por qué sería esto? ¿Es tal vez porque ofrece una mayor flexibilidad BlockingCollection<T> (entiendo funciona envolviendo un IProducerConsumerCollection<T> ), que necesariamente se introduce sobrecarga de rendimiento?
  2. Se trata sólo de un uso equivocado de plano de la clase BlockingCollection<T>?
  3. Si se trata de un uso adecuado de BlockingCollection<T>, estoy simplemente no usando correctamente? Por ejemplo, es el enfoque Take / Add demasiado simplista, y hay una manera mucho mejor rendimiento para obtener la misma funcionalidad?

A menos que alguien tiene una idea de una oferta en respuesta a la tercera pregunta, parece que vamos a la pervivencia de nuestra implementación original por ahora.

¿Fue útil?

Solución

Hay un par de posibilidades potenciales, aquí.

En primer lugar, BlockingCollection<T> en las extensiones reactivas es un backport, y no es exactamente la misma que la versión final .NET 4. No me sorprendería si el cumplimiento de este backport difiere de .NET 4 RTM (aunque no he perfilado esta colección, en concreto). Gran parte de los TPL realiza mejor en .NET 4 que en el backport .NET 3.5.

Una vez dicho esto, yo sospecha que su aplicación va a superar el rendimiento de BlockingCollection<T> si tiene un solo hilo productor y un único hilo consumidor. Con un productor y un consumidor, el bloqueo va a tener un menor impacto en el rendimiento total y el evento de reinicio es un medio muy eficaz de espera en el lado del consumidor.

Sin embargo, BlockingCollection<T> está diseñado para permitir muchos hilos de productores a los datos "enqueue" muy bien. Esto no va a funcionar bien con su aplicación, ya que la contención de bloqueo empieza a convertirse en un problema bastante rapidez.

Una vez dicho esto, también me gustaría señalar una idea equivocada aquí:

  

... es probablemente ofrecería un mejor rendimiento que cualquier cosa que yo o mis compañeros de equipo escribí.

Esto es a menudo no es cierto. Las clases de colección marco suelen realizar muy bien , pero a menudo no son la opción mas potente para un escenario dado. Una vez dicho esto, tienden a funcionar bien mientras que es muy flexible y muy robusto. A menudo tienden a escala muy bien. "Inicio-escrito" clases de colección a menudo colecciones marco superan en escenarios específicos, pero tienden a ser problemático cuando se utiliza en escenarios fuera de aquel para el cual fueron diseñados específicamente. Sospecho que esto es una de esas situaciones.

Otros consejos

Me trató BlockingCollection contra un combinado ConurrentQueue/AutoResetEvent (similar a la solución de OP, pero lockless) en .Net 4, y el segundo combo era de modo mucho más rápido para mi caso de uso, que se deshizo BlockingCollection. Por desgracia, esto fue hace casi un año y no pude encontrar los resultados de referencia.

El uso de un AutoResetEvent separada no hace las cosas mucho más complicadas. De hecho, uno podría incluso abstracta a la basura, una vez por todas, en un BlockingCollectionSlim ....

BlockingCollection basa internamente en un ConcurrentQueue también, pero hace algunos malabares adicional con semáforos delgados y cancelación tokens , que da características adicionales, pero a un costo, incluso cuando no se utiliza. También hay que señalar que BlockingCollection no está casada con ConcurrentQueue, pero se puede utilizar con otros implementadores de IProducerConsumerCollection lugar así.


Una, bastante básico aplicación sin límites BlockingCollectionSlim:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);

Me encontré con los mismos problemas de rendimiento con BlockingCollection en .Net 4.7.2 y encontró este post. Mi caso es MultipleProducers-MultipleConsumers, en particular los pequeños fragmentos de datos se leen de muchas fuentes y deben ser procesados ??por muchos filtros. Varios (Env.ProcessorCount) BlockingCollections fueron utilizados y que terminó con un perfilador de rendimiento diciéndome que come BlockingCollection.GetConsumingEnumerable.MoveNext() más tiempo de CPU que el filtrado real!

Gracias, @Eugene Beresovsky, por su código. Para su información: En mi entorno era casi dos veces más lento que BlockingCollection. Por lo tanto, aquí está mi SpinLocked BlockingCollection:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

Y para el código de rendimiento crítico que sugeriría para evitar modificador del campo readonly. Se añade un control de todos los accesos en el campo de la IL. Con el siguiente código de prueba

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

El Core i5-3210M con 2 núcleos y HT habilitadas Tengo el siguiente resultado:

BlockingCollection     0.0006ms
BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation)
BlockingCollectionSpin 0.0003ms

Así, SpinLocked versión es dos veces más rápido que .Net BlockingCollection. Sin embargo, sugeriría usarlo sólo! si realmente prefiere rendimiento frente a la simplicidad del código (y facilidad de mantenimiento).

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top