Вопрос

На некоторое время у моей компании мы использовали домашнюю ObjectPool<T> Реализация, которая обеспечивает блокировку доступа к его содержимому. Это довольно просто: Queue<T>, ан object заблокировать, и AutoResetEvent Чтобы сигнализировать на «заимствование» нити, когда добавляется элемент.

Мясо класса действительно эти два метода:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

Мы использовали это и несколько других классов сбора, а не тех, которые предоставляются System.Collections.Concurrent Потому что мы используем .NET 3.5, а не 4.0. Но недавно мы обнаружили, что, поскольку мы используем Реактивные расширения, мы на самом деле делать иметь Concurrent пространство имен доступно нам (в System.threading.dll).

Естественно, я подумал, что с BlockingCollection<T> является одним из основных классов в Concurrent Пространство имен, вероятно, предложило бы лучшую производительность, чем все, что я или мои товарищи по команде писали.

Поэтому я попытался написать новую реализацию, которая работает очень просто:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

К моему удивлению, по некоторым простым тестам (заимствование / возвращение в бассейн несколько тысяч раз из нескольких нитей), наша оригинальная реализация значительно побеждает BlockingCollection<T> С точки зрения производительности. Отказ Они оба кажутся работать правильно; Это просто то, что наша оригинальная реализация, кажется, намного быстрее.

Мой вопрос:

  1. Почему это будет? Это возможно потому, что BlockingCollection<T> предлагает большую гибкость (я понимаю, что работает, упаковывая IProducerConsumerCollection<T>), что обязательно вводит производительность наверху?
  2. Это просто безупречное использование BlockingCollection<T> сорт?
  3. Если это подходящее использование BlockingCollection<T>, я просто не использую правильно? Например, это Take/Add Подходить слишком упрощенным, и есть гораздо лучший способ получить ту же функциональность?

Если кто-то не имеет некоторого понимания, чтобы предложить в ответ на этот третий вопрос, похоже, что мы будем придерживаться нашей оригинальной реализации.

Это было полезно?

Решение

Здесь есть пара потенциальных возможностей, здесь.

Первый, BlockingCollection<T> В реактивных расширениях является задняя копия, а не совпадают, что и конечная версия .NET 4. Я бы не был удивлен, если производительность этого backport отличается от .NET 4 RTM (хотя я не профилировал эту коллекцию, конкретно). Большая часть TPL работает лучше в .NET 4, чем в Backport .NET 3.5.

Что говорится, я бы подозреваю, что ваша реализация выйдет BlockingCollection<T> Если у вас есть отдельная нить производителей и единая потребительская нить. С одним производителем и одним потребителем ваш замок будет иметь меньшее влияние на общую производительность, а событие сброса является очень эффективным средством ожидания на стороне потребителей.

Однако, BlockingCollection<T> Предназначен, чтобы позволить много продуктивных потоков в «Enqueue» данные очень хорошо. Это не будет хорошо работать с вашей реализацией, поскольку разблокировка начнет становиться проблематичным справедливо быстро.

Что говорят, я бы также хотел указать на одно заблуждение здесь:

... Вероятно, предложил бы лучшую производительность, чем все, что я или мои товарищи по команде писали.

Это часто не правда. Классы рымки, как правило, выполняют очень хорошо, но часто не являются самым исполнительным вариантом для данного сценария. Это, как говорят, они, как правило, работают хорошо, когда это очень гибкий и очень надежный. Они часто имеют тенденцию масштабироваться очень хорошо. «Домашние письменные» классы сбора часто превзойдуют рамки коллекций в конкретных сценариях, но имеют тенденцию быть проблематичными при использовании в сценариях за пределами того, что они были специально разработаны. Я подозреваю, что это одна из тех ситуаций.

Другие советы

Я старался BlockingCollection против А. ConurrentQueue/AutoResetEvent комбо (похожее на решение OP, но без белонов) в .NET 4, а последний комбинировал так Намного быстрее для моего использования в случае, чтобы я бросил блокировкуCollection. К сожалению, это было почти год назад, и я не смог найти результаты тестов.

Использование отдельного автореретета не делает вещи слишком сложнее. На самом деле, можно даже абстрагировать его, раз и для всех, в BlockingCollectionSlim....

Blockingcollection Внутренне опирается на ConcularentQueue, но делает некоторые дополнительные жонглирование с Тонкие семафоры а также токены отмены, что дает дополнительные функции, но по себестоимости, даже когда не используется. Следует также отметить, что блокировкаCollection не состоять в браке на ConcurrentQueue, но может использоваться с другими ремонтаторами IProducerConsumerCollection вместо этого тоже.


Неограниченные, довольно голые кости блокировки концепции10.

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);

Я наткнулся на те же проблемы с производительностью с блокировкойCollection в .NET 4.7.2 и нашел этот пост. Мой корпус - это несколькопроизводитель-мультипликаторы, в частности небольшие куски данных читаются от многих источников и должны обрабатываться многими фильтрами. Были использованы несколько (Env.ProcessorCount) блокировки, и я оказался профилировщиком производительности, рассказывая мне, что BlockingCollection.GetConsumingEnumerable.MoveNext() Ест больше процессорного времени, чем фактическая фильтрация!

Спасибо, @eugene Beresovsky, для вашего кода. FYI: На моей среде это было почти в два раза медленнее, чем блокировкаCollection. Итак, вот моя застежка-заблокированная блокировкаCollection:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

И для эффективности критический код, который я предложил избежать readonly модификатор поля. Он добавляет проверку на каждом поле поле в IL. Со следующим тестовым кодом

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

На Core I5-3210M с 2 ядрами и HT включили, у меня есть следующий вывод:

BlockingCollection 0,0006 мс БлокировкаCollctionsLectsLECLECTSLECTIONSTAL 0,0010 мс (Евгений Берсовский Реализация) БлокировкаCollctionspin 0,0003 мс

Итак, спинтевая версия в два раза быстрее, чем .NET BlockingCollection. Отказ Но я бы предложил использовать это только! Если вы действительно предпочитаете производительность против простоты (и ремонтопригодности).

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top