Как (и если) написать очередь с одним потребителем, используя TPL?
-
21-09-2019 - |
Вопрос
Недавно я слышал кучу подкастов о TPL в .NET 4.0.Большинство из них описывают фоновые действия, такие как загрузка изображений или выполнение вычислений, использование задач, чтобы работа не мешала потоку GUI.
Большая часть кода, над которым я работаю, больше ориентирована на нескольких производителей / одного потребителя, где рабочие элементы из нескольких источников должны быть поставлены в очередь, а затем обработаны по порядку.Одним из примеров может быть ведение журнала, когда строки журнала из нескольких потоков упорядочиваются в единую очередь для последующей записи в файл или базу данных.Все записи из любого отдельного источника должны оставаться в порядке, и записи за один и тот же момент времени должны быть "близки" друг к другу в конечном результате.
Таким образом, несколько потоков, задач или что-то еще вызывают очередь:
lock( _queue ) // or use a lock-free queue!
{
_queue.enqueue( some_work );
_queueSemaphore.Release();
}
И выделенный рабочий поток обрабатывает очередь:
while( _queueSemaphore.WaitOne() )
{
lock( _queue )
{
some_work = _queue.dequeue();
}
deal_with( some_work );
}
Всегда казалось разумным выделить рабочий поток для потребительской стороны этих задач.Должен ли я писать будущие программы, используя вместо этого какую-нибудь конструкцию из TPL?Который из них?Почему?
Решение
Вы можете использовать длительно выполняющуюся задачу для обработки элементов из BlockingCollection, как предлагает Wilka.Вот пример, который в значительной степени соответствует требованиям ваших приложений.Вы увидите вывод чего-то вроде этого:
Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C
Не то чтобы выходные данные из A, B, C & D казались случайными, потому что они зависят от времени запуска потоков, но B всегда появляется перед B1.
public class LogItem
{
public string Message { get; private set; }
public LogItem (string message)
{
Message = message;
}
}
public void Example()
{
BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();
// Start queue listener...
CancellationTokenSource canceller = new CancellationTokenSource();
Task listener = Task.Factory.StartNew(() =>
{
while (!canceller.Token.IsCancellationRequested)
{
LogItem item;
if (_queue.TryTake(out item))
Console.WriteLine(item.Message);
}
},
canceller.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
// Add some log messages in parallel...
Parallel.Invoke(
() => { _queue.Add(new LogItem("Log from task A")); },
() => {
_queue.Add(new LogItem("Log from task B"));
_queue.Add(new LogItem("Log from task B1"));
},
() => { _queue.Add(new LogItem("Log from task C")); },
() => { _queue.Add(new LogItem("Log from task D")); });
// Pretend to do other things...
Thread.Sleep(1000);
// Shut down the listener...
canceller.Cancel();
listener.Wait();
}
Другие советы
Я знаю, что этот ответ запоздал примерно на год, но взгляните на MSDN.
в котором показано, как создать LimitedConcurrencyLevelTaskScheduler из класса TaskScheduler.Ограничивая параллелизм одной задачей, которая затем должна обрабатывать ваши задачи в порядке их постановки в очередь через:
LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);
factory.StartNew(()=>
{
// your code
});
Я не уверен, что TPL адекватен в вашем случае использования.Насколько я понимаю, основной вариант использования TPL заключается в разделении одной огромной задачи на несколько более мелких задач, которые можно запускать параллельно.Например, если у вас большой список и вы хотите применить одно и то же преобразование к каждому элементу.В этом случае у вас может быть несколько задач, применяющих преобразование к подмножеству списка.
Случай, который вы описываете, мне кажется, не вписывается в эту картину.В вашем случае у вас нет нескольких задач, которые выполняют одно и то же параллельно.У вас есть несколько разных задач, каждая из которых выполняет свою собственную работу (производители) и одну задачу, которая потребляет.Возможно, TPL можно было бы использовать для потребительской части, если вы хотите иметь несколько потребителей, потому что в этом случае каждый потребитель выполняет одну и ту же работу (при условии, что вы найдете логику для обеспечения искомой временной согласованности).
Ну, это, конечно, всего лишь мой личный взгляд на этот предмет
Живите долго и процветайте
Это звучит как Блокирующая коллекция было бы удобно для вас.Итак, для вашего приведенного выше кода вы могли бы использовать что-то вроде (предполагая _queue
является BlockingCollection
экземпляр):
// for your producers
_queue.Add(some_work);
Выделенный рабочий поток, обрабатывающий очередь:
foreach (var some_work in _queue.GetConsumingEnumerable())
{
deal_with(some_work);
}
Примечание:когда все ваши продюсеры закончат выпуск материала, вам нужно будет позвонить CompleteAdding()
на _queue
в противном случае ваш потребитель застрянет в ожидании дополнительной работы.