Waithandle的解决方法。WAITALL64手柄限制?
-
01-10-2019 - |
题
我的应用程序通过 ThreadPool.QueueUserWorkItem
我跟踪通过多个 ManualResetEvent
实例。我用 WaitHandle.WaitAll
阻止我的应用程序关闭直到完成这些线程的方法。
但是,我从未有任何问题,因为我的应用程序正在受到更多负载,即创建更多的线程,我现在开始获得此例外:
WaitHandles must be less than or equal to 64 - missing documentation
最好的替代解决方案是什么?
代码段
List<AutoResetEvent> events = new List<AutoResetEvent>();
// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
// do work
evt.Set();
});
...
WaitHandle.WaitAll(events.ToArray());
解决方法
int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
try
{
// do work
}
finally
{
if (Interlocked.Decrement(ref threadCount) == 0)
{
finished.Set();
}
}
});
...
finished.WaitOne();
解决方案
创建一个可以跟踪运行任务数量的变量:
int numberOfTasks = 100;
创建一个信号:
ManualResetEvent signal = new ManualResetEvent(false);
每当完成任务时减少任务的数量:
if (Interlocked.Decrement(ref numberOftasks) == 0)
{
如果没有任务,请设置信号:
signal.Set();
}
同时,其他地方,等待设置信号:
signal.WaitOne();
其他提示
从.NET 4.0开始,您还有两个(IMO,更清洁)的选项。
首先是使用 CountdownEvent
班级. 。它可以防止需要自己处理增量和减少:
int tasks = <however many tasks you're performing>;
// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
// Queue work.
ThreadPool.QueueUserWorkItem(() => {
// Do work
...
// Signal when done.
e.Signal();
});
// Wait till the countdown reaches zero.
e.Wait();
}
但是,还有一个更强大的解决方案,那就是使用 Task
班级, ,像这样:
// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
// Create task here.
Task.Factory.StartNew(() => {
// Do work.
}
// No signalling, no anything.
).ToArray();
// Wait on all the tasks.
Task.WaitAll(tasks);
使用 Task
上课和电话 WaitAll
IMO更加干净,因为您在整个代码中编织较少的螺纹原始图(请注意,无等待手柄);您不必设置计数器,处理递增/减少,而只需设置任务,然后等待它们即可。这使代码在 什么 您想做的事,而不是 如何 (至少在管理它的并行化方面)。
.NET 4.5提供更多选项,您可以简化序列的生成 Task
打电话给 静止的 Run
方法 Task
班级:
// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
// Create task here.
Task.Run(() => {
// Do work.
})
// No signalling, no anything.
).ToArray();
// Wait on all the tasks.
Tasks.WaitAll(tasks);
或者,您可以利用 TPL数据流库 (在 System
名称空间,因此是正式的,即使是从Nuget下载,例如实体框架)并使用 ActionBlock<TInput>
, ,像这样:
// Create the action block. Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
// Do work.
});
// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);
// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();
// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();
请注意 ActionBlock<TInput>
默认情况下,一次处理一个项目,因此,如果您想一次处理多个操作,则必须通过传递A来设置要在构造函数中处理的并发项目的数量 ExecutionDataflowBlockOptions
实例并设置 MaxDegreeOfParallelism
财产:
var actionBlock = new ActionBlock<object>(o => {
// Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
如果您的操作确实是安全的,那么您可以设置 MaxDegreeOfParallelsim
财产为 DataFlowBlockOptions.Unbounded
:
var actionBlock = new ActionBlock<object>(o => {
// Do work.
}, new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});
关键是,您对 如何 并行您希望您的选择。
当然,如果您有一系列想要传递到您的项目 ActionBlock<TInput>
实例,您可以链接 ISourceBlock<TOutput>
实施以喂养 ActionBlock<TInput>
, ,像这样:
// The buffer block.
var buffer = new BufferBlock<int>();
// Create the action block. Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
// Do work.
});
// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock,
// Want to propagate completion state to the action block.
new DataflowLinkOptions {
PropagateCompletion = true,
},
// Can filter on items flowing through if you want.
i => true)
{
// Post 100 times to the *buffer*
foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);
// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();
// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();
}
根据您需要做的事情,TPL DataFlow库成为一个 很多 更具吸引力的选择,因为它可以处理跨越的并发 全部 任务链接在一起,它使您可以非常具体 只是 您希望每件作品的平行程度如何,同时保持每个块的关注点的适当分离。
您的解决方法是不正确的。原因是 Set
和 WaitOne
如果最后一个工作项目导致 threadCount
零 前 排队线必须有机会排队 全部 工作项目。修复很简单。将您的排队线程视为工作项目本身。初始化 threadCount
到1,并在排队完成时进行降低和信号。
int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
try
{
// do work
}
finally
{
if (Interlocked.Decrement(ref threadCount) == 0)
{
finished.Set();
}
}
});
...
if (Interlocked.Decrement(ref threadCount) == 0)
{
finished.Set();
}
finished.WaitOne();
作为个人喜好,我喜欢使用 CountdownEvent
上课为我计数。
var finished = new CountdownEvent(1);
...
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate
{
try
{
// do work
}
finally
{
finished.Signal();
}
});
...
finished.Signal();
finished.Wait();
添加到DTB的答案中,您可以将其包装成一个不错的简单类。
public class Countdown : IDisposable
{
private readonly ManualResetEvent done;
private readonly int total;
private long current;
public Countdown(int total)
{
this.total = total;
current = total;
done = new ManualResetEvent(false);
}
public void Signal()
{
if (Interlocked.Decrement(ref current) == 0)
{
done.Set();
}
}
public void Wait()
{
done.WaitOne();
}
public void Dispose()
{
((IDisposable)done).Dispose();
}
}
当我们想要回电时,添加到DTB的答案。
using System;
using System.Runtime.Remoting.Messaging;
using System.Threading;
class Program
{
static void Main(string[] args)
{
Main m = new Main();
m.TestMRE();
Console.ReadKey();
}
}
class Main
{
CalHandler handler = new CalHandler();
int numberofTasks =0;
public void TestMRE()
{
for (int j = 0; j <= 3; j++)
{
Console.WriteLine("Outer Loop is :" + j.ToString());
ManualResetEvent signal = new ManualResetEvent(false);
numberofTasks = 4;
for (int i = 0; i <= 3; i++)
{
CalHandler.count caller = new CalHandler.count(handler.messageHandler);
caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal);
}
signal.WaitOne();
}
}
private void NumberCallback(IAsyncResult result)
{
AsyncResult asyncResult = (AsyncResult)result;
CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate;
int num = caller.EndInvoke(asyncResult);
Console.WriteLine("Number is :"+ num.ToString());
ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState;
if (Interlocked.Decrement(ref numberofTasks) == 0)
{
mre.Set();
}
}
}
public class CalHandler
{
public delegate int count(int number);
public int messageHandler ( int number )
{
return number;
}
}
protected void WaitAllExt(WaitHandle[] waitHandles)
{
//workaround for limitation of WaitHandle.WaitAll by <=64 wait handles
const int waitAllArrayLimit = 64;
var prevEndInd = -1;
while (prevEndInd < waitHandles.Length - 1)
{
var stInd = prevEndInd + 1;
var eInd = stInd + waitAllArrayLimit - 1;
if (eInd > waitHandles.Length - 1)
{
eInd = waitHandles.Length - 1;
}
prevEndInd = eInd;
//do wait
var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray();
WaitHandle.WaitAll(whSubarray);
}
}
我确实通过简单地拨打了不再丢失表演的事件的数量来解决它,并且它在生产环境中正常工作。遵循代码:
var events = new List<ManualResetEvent>();
// code omited
var newEvent = new ManualResetEvent(false);
events.Add(newEvent);
ThreadPool.QueueUserWorkItem(c => {
//thread code
newEvent.Set();
});
// code omited
var wait = true;
while (wait)
{
WaitHandle.WaitAll(events.Take(60).ToArray());
events.RemoveRange(0, events.Count > 59 ? 60 : events.Count);
wait = events.Any();
}
这是另一个解决方案。这是“事件”是ManualResetevent的列表。列表的大小可能大于64(max_events_no)。
int len = events.Count;
if (len <= MAX_EVENTS_NO)
{
WaitHandle.WaitAll(events.ToArray());
} else {
int start = 0;
int num = MAX_EVENTS_NO;
while (true)
{
if(start + num > len)
{
num = len - start;
}
List<ManualResetEvent> sublist = events.GetRange(start, num);
WaitHandle.WaitAll(sublist.ToArray());
start += num;
if (start >= len)
break;
}
}
Windows XP SP3最多支持两个Waithandles。对于超过2个Waithandles申请的情况,请过早终止。