Как (и если) написать очередь с одним потребителем, используя TPL?

StackOverflow https://stackoverflow.com/questions/2293976

Вопрос

Недавно я слышал кучу подкастов о TPL в .NET 4.0.Большинство из них описывают фоновые действия, такие как загрузка изображений или выполнение вычислений, использование задач, чтобы работа не мешала потоку GUI.

Большая часть кода, над которым я работаю, больше ориентирована на нескольких производителей / одного потребителя, где рабочие элементы из нескольких источников должны быть поставлены в очередь, а затем обработаны по порядку.Одним из примеров может быть ведение журнала, когда строки журнала из нескольких потоков упорядочиваются в единую очередь для последующей записи в файл или базу данных.Все записи из любого отдельного источника должны оставаться в порядке, и записи за один и тот же момент времени должны быть "близки" друг к другу в конечном результате.

Таким образом, несколько потоков, задач или что-то еще вызывают очередь:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

И выделенный рабочий поток обрабатывает очередь:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

Всегда казалось разумным выделить рабочий поток для потребительской стороны этих задач.Должен ли я писать будущие программы, используя вместо этого какую-нибудь конструкцию из TPL?Который из них?Почему?

Это было полезно?

Решение

Вы можете использовать длительно выполняющуюся задачу для обработки элементов из BlockingCollection, как предлагает Wilka.Вот пример, который в значительной степени соответствует требованиям ваших приложений.Вы увидите вывод чего-то вроде этого:

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

Не то чтобы выходные данные из A, B, C & D казались случайными, потому что они зависят от времени запуска потоков, но B всегда появляется перед B1.

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}

Другие советы

Я знаю, что этот ответ запоздал примерно на год, но взгляните на MSDN.

в котором показано, как создать LimitedConcurrencyLevelTaskScheduler из класса TaskScheduler.Ограничивая параллелизм одной задачей, которая затем должна обрабатывать ваши задачи в порядке их постановки в очередь через:

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);

factory.StartNew(()=> 
{
   // your code
});

Я не уверен, что TPL адекватен в вашем случае использования.Насколько я понимаю, основной вариант использования TPL заключается в разделении одной огромной задачи на несколько более мелких задач, которые можно запускать параллельно.Например, если у вас большой список и вы хотите применить одно и то же преобразование к каждому элементу.В этом случае у вас может быть несколько задач, применяющих преобразование к подмножеству списка.

Случай, который вы описываете, мне кажется, не вписывается в эту картину.В вашем случае у вас нет нескольких задач, которые выполняют одно и то же параллельно.У вас есть несколько разных задач, каждая из которых выполняет свою собственную работу (производители) и одну задачу, которая потребляет.Возможно, TPL можно было бы использовать для потребительской части, если вы хотите иметь несколько потребителей, потому что в этом случае каждый потребитель выполняет одну и ту же работу (при условии, что вы найдете логику для обеспечения искомой временной согласованности).

Ну, это, конечно, всего лишь мой личный взгляд на этот предмет

Живите долго и процветайте

Это звучит как Блокирующая коллекция было бы удобно для вас.Итак, для вашего приведенного выше кода вы могли бы использовать что-то вроде (предполагая _queue является BlockingCollection экземпляр):

// for your producers 
_queue.Add(some_work);

Выделенный рабочий поток, обрабатывающий очередь:

foreach (var some_work in _queue.GetConsumingEnumerable())
{
    deal_with(some_work);
}

Примечание:когда все ваши продюсеры закончат выпуск материала, вам нужно будет позвонить CompleteAdding() на _queue в противном случае ваш потребитель застрянет в ожидании дополнительной работы.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top