Вопрос

Поток байта должен быть передан, и есть один продюсер и потребительский. Скорость производителя выше, чем потребитель большую часть времени, и мне нужны достаточно буферизованные данные для QoS моего приложения. Я читал о своей проблеме, и есть такие решения, такие как общий буфер, класс Pipestream .NET ... Этот класс собирается создать много раз на сервере, поэтому мне нужно и оптимизировать решение. Это хорошая идея использовать очередь ByTearray?

Если да, я буду использовать алгоритм оптимизации, чтобы угадать размер очереди и каждую мощность ByTearray и теоретически это соответствует моему делу.

Если нет, я какой лучший подход?

Пожалуйста, дайте мне знать, если есть хорошая блокировка свободной резьбы Безопасная реализация очереди ByTearray в C # или VB.

заранее спасибо

Это было полезно?

Решение 6

Самая важная часть - это дизайн общего объекта. В моем сценарионом читателе и писателю могут использовать отдельные буферы (большие куски данных) независимо, а затем, только доступ к общему объекту FIFO, как очередь, должен быть синхронизирован. Таким образом, время блокировки сводится к минимуму, а резьбы могут завершить работу параллельно. И с .Net Framewok 4.0 Реализация этой концепции сделана легко:

В System.Colluctions.Concurrent Tampacte есть класс ConsullentQueue (из T). В пространстве имен существуют другие потоковые коллекции.

http://msdn.microsoft.com/en-us/library/system.colection.concurrent.aspx.

Другие советы

Вы, вероятно, получите гораздо более ускоренные, если вместо того, чтобы произвести и потребление байта, вы работаете в кусках. В этом случае «Lock-FreeDess» кода, вероятно, не имеет значения вообще - на самом деле традиционное, блокирующее решение может быть предпочтительным. Я постараюсь продемонстрировать.

Без блокировки, Один режиссер, Один потребитель, ограниченный Очередь дана в C #. (Листинг а)
Нет никаких эзотерических блокируемых операций, даже без явных барьеров памяти. Допустим, на первый взгляд он так быстро и без блокировки, как он получает. Не так ли?
Теперь давайте сравним его с блокирующим решением что Марк Грейль дал, здесь.

Мы будем использовать двойную машину ЦП, которая не имеет общего кэша L3 между сердечниками. Мы ожидаем не более 2x Speedup. 2x Speedup действительно означало, что решение без блокировки выполняется в идеале, на теоретических границах.
Для того, чтобы сделать идеальную среду для блокировки кода, мы даже устанавливаем сходство CPU производителя и потребительскую нить, используя класс утилиты из здесь.
Полученный код теста находится в (листинге b).

Это производит ок. 10 мбайт на одном потоке, потребляя его на другой.
Размер очереди фиксируется на 32кбайтах. Если он полон, производитель ждет.
Типичный пробег теста на моей машине выглядит так:

LockFreebytequeue: 799ms.
Bytequeue: 1843 мс.

Очередь блокировки быстрее. Вау, это более 2x так быстро! Это что-то похвастаться. :)
Давайте посмотрим на то, что происходит. Очередь блокировки Марка делает только что это. Это замки. Это делает это для каждого байта.

Надо ли нам заблокировать каждый байт и нажать на байт данных byte? Это наиболее уверенно приходит в куски в сети (как некоторые пакеты CA. 1K). Даже если он действительно приходит байт байтом из внутреннего источника, производитель может легко упаковать его в хорошие куски.
Давайте просто сделаем это - вместо того, чтобы производить и потреблять байт-байт, давайте будем работать в кусках и добавить два других теста в микро-эталон (листинг C, просто вставите его в тело Benchmark).
Теперь типичный пробег выглядит так:

LockFreepageQueue: 33 мс.
Специальность: 25 мс.

Теперь обе из них на самом деле 20x быстрее, чем оригинальный код без блокировки - Решение MARC С добавленной кусочкой на самом деле Быстрее Теперь, чем без блокировки кода с Chunking!
Вместо того, чтобы идти с безблокированной структурой, которая приведет к скорости 2 раза, мы предприняли другое решение, которое работает совсем хорошо с блокировкой и привело к ускорению 20x (!).
Ключ ко многим проблемам не так много, избегая блокировки - это гораздо больше, чтобы избежать обмена и минимизации блокировки. В приведенном выше случае мы можем избежать совместного использования в течение длительности байтового копирования.
Мы можем работать над частной структурой в большую часть времени, а затем вставлять один указатель, тем самым сокращая общий пробел и время в одну вставку одного указателя в очередь.

Листинг A, безблокированная, однопроизводитель, одна потребительская очередь:

public class BoundedSingleProducerSingleConsumerQueue<T>
{
    T[] queue;
    volatile int tail;
    volatile int head;

    public BoundedSingleProducerSingleConsumerQueue(int capacity)
    {
        queue = new T[capacity + 1];
        tail = head = 0;
    }

    public bool TryEnqueue(T item)
    {
        int newtail = (tail + 1) % queue.Length;
        if (newtail == head) return false;
        queue[tail] = item;
        tail = newtail;
        return true;
    }

    public bool TryDequeue(out T item)
    {
        item = default(T);
        if (head == tail) return false;
        item = queue[head];
        queue[head] = default(T);
        head = (head + 1) % queue.Length;
        return true;
    }
}

Листинг B, микро-ориентир:

class Program
{
    static void Main(string[] args)
    {
        for (int numtrials = 3; numtrials > 0; --numtrials)
        {
            using (ProcessorAffinity.BeginAffinity(0))
            {
                int pagesize = 1024 * 10;
                int numpages = 1024;
                int totalbytes = pagesize * numpages;

                BoundedSingleProducerSingleConsumerQueue<byte> lockFreeByteQueue = new BoundedSingleProducerSingleConsumerQueue<byte>(1024 * 32);
                Stopwatch sw = new Stopwatch();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            while (!lockFreeByteQueue.TryEnqueue((byte)(i & 0xFF))) ;
                        }
                    }
                });
                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp;
                    while (!lockFreeByteQueue.TryDequeue(out tmp)) ;
                }
                sw.Stop();
                Console.WriteLine("LockFreeByteQueue: {0}ms", sw.ElapsedMilliseconds);


                SizeQueue<byte> byteQueue = new SizeQueue<byte>(1024 * 32);
                sw.Reset();
                sw.Start();
                ThreadPool.QueueUserWorkItem(delegate(object state)
                {
                    using (ProcessorAffinity.BeginAffinity(1))
                    {
                        for (int i = 0; i < totalbytes; i++)
                        {
                            byteQueue.Enqueue((byte)(i & 0xFF));
                        }
                    }
                });

                for (int i = 0; i < totalbytes; i++)
                {
                    byte tmp = byteQueue.Dequeue();
                }
                sw.Stop();
                Console.WriteLine("ByteQueue: {0}ms", sw.ElapsedMilliseconds);

                Console.ReadKey();
            }
        }
    }
}

Листинг C, Coanked Tests:

BoundedSingleProducerSingleConsumerQueue<byte[]> lockfreePageQueue = new BoundedSingleProducerSingleConsumerQueue<byte[]>(32);
sw.Reset();
sw.Start();
ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            while (!lockfreePageQueue.TryEnqueue(page)) ;
        }
    }
});
for (int i = 0; i < numpages; i++)
{
    byte[] page;
    while (!lockfreePageQueue.TryDequeue(out page)) ;
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("LockFreePageQueue: {0}ms", sw.ElapsedMilliseconds);

SizeQueue<byte[]> pageQueue = new SizeQueue<byte[]>(32);

ThreadPool.QueueUserWorkItem(delegate(object state)
{
    using (ProcessorAffinity.BeginAffinity(1))
    {
        for (int i = 0; i < numpages; i++)
        {
            byte[] page = new byte[pagesize];
            for (int j = 0; j < pagesize; j++)
            {
                page[j] = (byte)(i & 0xFF);
            }
            pageQueue.Enqueue(page);
        }
    }
});
sw.Reset();
sw.Start();
for (int i = 0; i < numpages; i++)
{
    byte[] page = pageQueue.Dequeue();
    for (int j = 0; j < pagesize; j++)
    {
        byte tmp = page[j];
    }
}
sw.Stop();
Console.WriteLine("PageQueue: {0}ms", sw.ElapsedMilliseconds);

В .NET 4 есть System.Collections.Concurrent.Queue<T> Что касается блокировки, поскольку эти вещи могут быть (пока все еще общие).

Доктор Добс внедрил очередь без блокировки в C ++, который вы могли бы относительно легко принять к C #. Он работает, когда имеется ровно один производитель (может быть любое количество потребителей).

Основная идея состоит в том, чтобы использовать вдвойне связанный список в качестве основной структуры вместе с подвижной головкой и хвостом. Когда элемент изготовлен, он добавляется к концу, и все между началом списка и текущей «головой» удаляется. Потреблять, попытаться переместить голову; Если он попадает в хвост, не удается, если он не преуспевает и верните новый элемент. Особый порядок операций делает его по своей сути, безопасным.

Тем не менее, есть две основные проблемы с использованием такого «безблокированного» дизайна здесь:

  1. Нет никакого способа принуждать верхнюю привязку к размеру очереди, что может быть серьезной проблемой, если ваш производитель будет быстрее, чем ваш потребитель;

  2. По дизайну, Consume Метод должен просто не получить элемент, если ничего не было. Это означает, что вам нужно реализовать ваш собственный Блокировка для потребителя, и такая блокировка неизменно является либо занятым ожидающим (что намного хуже Чем блокировка в спектре исполнения) или время ожидания (что замедляет ваш потребитель еще больше).

По этим причинам я бы порекомендовал вам серьезно рассмотреть вопрос о том, действительно ли вам нужна бесплатная структура. Многие люди приходят на этот сайт, думая, что это будет «быстрее», чем эквивалентная структура, используя блокировку, но практическая разница для большинства приложений настолько незначительно, что она обычно не стоит добавленной сложности, а в некоторых случаях она на самом деле может выполнять хуже, потому что состояние ожидания (или оповещения ждет) намного дешевле, чем занято.

Многократные машины и необходимость барьеров памяти делают эффективную резьбу без блокировки еще более сложнее; При нормальной работе вы все еще можете получить выполнение вне заказа, а в .NET Джиттер может еще раз решить переупорядочить инструкции, поэтому вам, вероятно, понадобится перец кода с volatile Переменные и Thread.MemoryBarrier Звонки, которые снова могут внести свой вклад в создание бесплатной версии, чем базовая синхронизированная версия.

Как насчет использования простого старого синхронизированного производителя-потребительской очереди сначала и профилирование Ваше заявление, чтобы определить, может ли он соответствовать вашим требованиям производительности? Есть отличная, эффективная реализация очереди ПК в течение Сайт Джозефа Альбахари. Отказ Или, как упоминание Ричарда, если вы используете Framework .NET 4.0, вы можете просто использовать ConcularentQueue. или более вероятно БлокировкаCollection..

Испытайте первое - тест нагрузки Синхронизированная очередь, которая легко реализовать - и просмотреть, сколько времени на самом деле заблокировано. Нет ожидающий, который вам придется сделать все равно, но на самом деле приобретение а также выпуск замки после того, как они сигнализируются. Если это более 1% времени выполнения вашей программы, я был бы очень удивлен; Но если так, тогда Начните смотреть на бесплатные реализации, - и убедитесь, что вас тоже профиль, чтобы убедиться, что они на самом деле выполняют лучше.

Throttling важен здесь, по звуку этого, класс границ в этом статья в журнале соответствует законопроекту. Аналогичный класс будет доступен в .NET 4.0 как Класс блокировкиCollection. Отказ Настройка размера буфера все еще зависит от вас.

Джулиан М Бакналл имеет написано один в C #.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top