线程池/WaitHandle资源泄漏/崩溃
-
16-09-2020 - |
题
我想我可能需要重新考虑我的设计。我很难缩小导致计算机完全挂起的错误范围,有时会从 VS 2010 抛出 HRESULT 0x8007000E。
我有一个控制台应用程序(稍后我将其转换为服务),用于处理基于数据库队列的文件传输。
我正在限制允许传输的线程。这是因为我们连接的某些系统只能包含来自某些帐户的一定数量的连接。
例如,系统 A 只能接受 3 个同时连接(这意味着 3 个独立的线程)。这些线程中的每一个都有自己独特的连接对象,因此我们不应该遇到任何同步问题,因为它们不共享连接。
我们希望循环处理来自这些系统的文件。例如,我们将允许 3 个连接,每个连接最多可以传输 100 个文件。这意味着,要从系统 A 移动 1000 个文件,我们每个周期只能处理 300 个文件,因为允许 3 个线程,每个线程 100 个文件。因此,在此传输的生命周期中,我们将有 10 个线程。我们一次只能运行 3 个。因此,将会有 3 个周期,最后一个周期将仅使用 1 个线程来传输最后 100 个文件。(3 个线程 x 100 个文件 = 每个周期 300 个文件)
当前的架构示例是:
- System.Threading.Timer 每 5 秒检查一次队列,通过调用 GetScheduledTask() 来执行某些操作
- 如果没有什么可做的,GetScheduledTask() 就什么也不做
- 如果有工作,则创建ThreadPool线程来处理工作【工作线程A】
- 工作线程 A 看到有 1000 个文件要传输
- 工作线程 A 发现它只能有 3 个线程运行到从中获取文件的系统
- 工作线程A启动三个新的工作线程[B,C,D]并传输
- 工作线程A等待B、C、D
[WaitHandle.WaitAll(transfersArray)]
- 工作线程 A 看到队列中还有更多文件(现在应该是 700 个)
- 工作线程 A 创建一个新数组来等待
[transfersArray = new TransferArray[3]
这是系统 A 的最大值,但可能因系统而异 - 工作线程 A 启动三个新的工作线程 [B,C,D] 并等待它们
[WaitHandle.WaitAll(transfersArray)]
- 重复该过程,直到没有更多文件可以移动。
- 工作线程 A 发出信号表示已完成
我正在使用 ManualResetEvent 来处理信号。
我的问题是:
- 是否有任何明显的情况会导致资源泄漏或我遇到的问题?
- 我应该在每次之后循环遍历数组吗
WaitHandle.WaitAll(array)
并打电话array[index].Dispose()?
- 任务管理器下此进程的句柄计数慢慢增加
- 我从 System.Threading.Timer 调用工作线程 A 的初始创建。这样会不会有什么问题呢?该计时器的代码是:
(一些用于调度的类代码)
private ManualResetEvent _ResetEvent;
private void Start()
{
_IsAlive = true;
ManualResetEvent transferResetEvent = new ManualResetEvent(false);
//Set the scheduler timer to 5 second intervals
_ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}
private void ScheduledTasks_Tick(object state)
{
ManualResetEvent resetEvent = null;
try
{
resetEvent = (ManualResetEvent)state;
//Block timer until GetScheduledTasks() finishes
_ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
GetScheduledTasks();
}
finally
{
_ScheduledTasks.Change(5000, 5000);
Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
resetEvent.Set();
}
}
private void GetScheduledTask()
{
try
{
//Check to see if the database connection is still up
if (!_IsAlive)
{
//Handle
_ConnectionLostNotification = true;
return;
}
//Get scheduled records from the database
ISchedulerTask task = null;
using (DataTable dt = FastSql.ExecuteDataTable(
_ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
{
if (dt != null)
{
if (dt.Rows.Count == 1)
{ //Only 1 row is allowed
DataRow dr = dt.Rows[0];
//Get task information
TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
task = ScheduledTaskFactory.CreateScheduledTask(taskType);
task.Description = dr["Description"].ToString();
task.IsEnabled = (bool)dr["IsEnabled"];
task.IsProcessing = (bool)dr["IsProcessing"];
task.IsManualLaunch = (bool)dr["IsManualLaunch"];
task.ProcessMachineName = dr["ProcessMachineName"].ToString();
task.NextRun = (DateTime)dr["NextRun"];
task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
task.SleepMinutes = (int)dr["SleepMinutes"];
task.ScheduleId = (int)dr["ScheduleId"];
task.CurrentRuns = (int)dr["CurrentRuns"];
task.TotalRuns = (int)dr["TotalRuns"];
SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
//Queue up task to worker thread and start
ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);
}
}
}
}
catch (Exception ex)
{
//Handle
}
}
private void ThreadProc(object taskObject)
{
SchedulerTask task = (SchedulerTask)taskObject;
ScheduledTaskEngine engine = null;
try
{
engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
engine.StartTask(task.Task);
}
catch (Exception ex)
{
//Handle
}
finally
{
task.TaskResetEvent.Set();
task.TaskResetEvent.Dispose();
}
}
解决方案 4
事实证明,这个奇怪问题的根源与架构无关,而是因为将解决方案从 3.5 转换为 4.0。我重新创建了解决方案,没有执行任何代码更改,并且问题再也没有发生。
其他提示
0x8007000E 是内存不足错误。这和句柄计数似乎表明存在资源泄漏。确保您正在处理实现的每个对象 IDisposable
. 。这包括以下数组 ManualResetEvent
你正在使用。
如果您有时间,您可能还想转换为使用 .NET 4.0 Task
班级;它旨在更干净地处理此类复杂场景。通过定义孩子 Task
对象,您可以减少总线程数(线程非常昂贵,不仅因为调度,还因为它们的堆栈空间)。
我正在寻找类似问题的答案(句柄数随着时间的推移而增加)。
我查看了您的应用程序架构,并想向您提出一些可以帮助您的建议:
您听说过 IOCP(输入输出完成端口)吗?
我不确定使用 C# 实现这一点是否困难,但在 C/C++ 中这是小菜一碟。通过使用此功能,您可以创建一个唯一的线程池(该池中的线程数通常定义为PC或PC或服务器中处理器内核的数量2 x)工作。请参阅这些功能的帮助:创建IoCompletionPort();PostQueuedCompletionStatus();获取队列完成状态();
一般来说,动态创建和退出线程可能非常耗时,并会导致性能损失和内存碎片。MSDN 和 google 上有数千篇有关 IOCP 的文献。
我认为你应该重新考虑你的架构。事实上,您只能同时拥有 3 个连接,这几乎要求您使用 1 个线程来生成文件列表,并使用 3 个线程来处理它们。您的生产者线程会将所有文件插入队列中,3 个消费者线程将在项目到达队列时出队并继续处理。阻塞队列可以显着简化代码。如果您使用 .NET 4.0,那么您可以利用 阻塞收集 班级。
public class Example
{
private BlockingCollection<string> m_Queue = new BlockingCollection<string>();
public void Start()
{
var threads = new Thread[]
{
new Thread(Producer),
new Thread(Consumer),
new Thread(Consumer),
new Thread(Consumer)
};
foreach (Thread thread in threads)
{
thread.Start();
}
}
private void Producer()
{
while (true)
{
Thread.Sleep(TimeSpan.FromSeconds(5));
ScheduledTask task = GetScheduledTask();
if (task != null)
{
foreach (string file in task.Files)
{
m_Queue.Add(task);
}
}
}
}
private void Consumer()
{
// Make a connection to the resource that is assigned to this thread only.
while (true)
{
string file = m_Queue.Take();
// Process the file.
}
}
}
我在上面的例子中确实过于简单化了,但我希望你能明白总体思路。请注意,这要简单得多,因为线程同步的方式并不多(大多数将嵌入到阻塞队列中),当然也没有使用 WaitHandle
对象。显然,您必须添加正确的机制来正常关闭线程,但这应该相当容易。