如何(以及如果)编写一个消费者的队列中使用的TPL?
-
21-09-2019 - |
题
我听说了一大堆的播客,最近关于TPL。净4.0.他们中的大多数描述背景的活动,如下载图像或做的计算中,采用的任务,使该工作不会干扰GUI线。
大多数代码我工作上有更多制作人/单的消费者口味,那里的工作的项目,从多个来源,必须排队,然后按顺序进行处理。一个例子就是日志记录,其中登录线从多个线程sequentialized入一个单一的队列为最终写入文件或数据库。所有记录任何单一来源必须保持秩序,并记录从同一时刻应该是"接近"每个其他最终的输出。
这么多线程或任务或什么都调用queuer:
lock( _queue ) // or use a lock-free queue!
{
_queue.enqueue( some_work );
_queueSemaphore.Release();
}
和一个专用的工作人员的线进程的队列:
while( _queueSemaphore.WaitOne() )
{
lock( _queue )
{
some_work = _queue.dequeue();
}
deal_with( some_work );
}
它似乎总是合理的专门一线工作人员对消费者方面的这些任务。我应该写入未来的方案采用的一些构造从TPL,而不是?哪一个?为什么?
解决方案
可以使用一个长期运行的任务到过程项从BlockingCollection通过Wilka的建议。下面是其中非常满足您的应用需求为例。你会看到输出是这样的:
Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C
不是从A,B,C&d输出随机出现,因为它们依赖于线程的开始时间,但乙总是B1之前出现。
public class LogItem
{
public string Message { get; private set; }
public LogItem (string message)
{
Message = message;
}
}
public void Example()
{
BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();
// Start queue listener...
CancellationTokenSource canceller = new CancellationTokenSource();
Task listener = Task.Factory.StartNew(() =>
{
while (!canceller.Token.IsCancellationRequested)
{
LogItem item;
if (_queue.TryTake(out item))
Console.WriteLine(item.Message);
}
},
canceller.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
// Add some log messages in parallel...
Parallel.Invoke(
() => { _queue.Add(new LogItem("Log from task A")); },
() => {
_queue.Add(new LogItem("Log from task B"));
_queue.Add(new LogItem("Log from task B1"));
},
() => { _queue.Add(new LogItem("Log from task C")); },
() => { _queue.Add(new LogItem("Log from task D")); });
// Pretend to do other things...
Thread.Sleep(1000);
// Shut down the listener...
canceller.Cancel();
listener.Wait();
}
其他提示
我知道这个答案是有关一年的晚期,但是看一看 MSDN.
这显示如何创建一个LimitedConcurrencyLevelTaskScheduler从TaskScheduler类。通过限制并发给一个单一的任务,即应当处理您的任务,以便为他们排队等待通过:
LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);
factory.StartNew(()=>
{
// your code
});
我不知道这是太平人寿在您的使用案例充分。从我的理解为TPL主要用例是一个艰巨的任务分割成可以并排运行侧几个更小的任务。例如,如果你有一个大名单,你要应用的每个元素在同一个转化。在这种情况下,你可以应用转换的列表中的一个子集多项任务。
您所描述的情况似乎并不适合在这张照片给我。在你的情况你没有的几个任务做同样的事情在并行。你有几个不同的任务,每做的是自己的工作(生产商)和一个任务消耗。也许可以用于消费的部分TPL如果你想有多个消费者,因为在这种情况下,每个消费者做相同的工作(假设你发现强制执行您要查找的时间一致性逻辑)。
那么,这当然是关于这个问题
只是我的personnal视图现场长和繁荣
这听起来像 BlockingCollection 将正适合你。因此,对于上面的代码,您可以使用类似(假设_queue
是BlockingCollection
实例):
// for your producers
_queue.Add(some_work);
一个专用辅助线程处理所述队列:
foreach (var some_work in _queue.GetConsumingEnumerable())
{
deal_with(some_work);
}
请注意:当你所有的生产商已经生产完毕的东西,你就需要调用CompleteAdding()
_queue
否则你的消费者将定在等待更多的工作。