Утечка/сбой ресурса Threadpool/WaitHandle
-
16-09-2020 - |
Вопрос
Я думаю, мне, возможно, придется переосмыслить свой дизайн.Мне трудно выявить ошибку, из-за которой мой компьютер полностью зависает, иногда выдавая HRESULT 0x8007000E из VS 2010.
У меня есть консольное приложение (которое я позже преобразую в сервис), которое обрабатывает передачу файлов на основе очереди базы данных.
Я ограничиваю количество потоков, разрешенных для передачи.Это связано с тем, что некоторые системы, к которым мы подключаемся, могут содержать только определенное количество подключений от определенных учетных записей.
Например, Система А может принимать только 3 одновременных соединения (что означает 3 отдельных потока).Каждый из этих потоков имеет свой собственный уникальный объект соединения, поэтому у нас не должно возникнуть проблем с синхронизацией, поскольку они не используют совместное соединение.
Мы хотим обрабатывать файлы из этих систем циклически.Так, например, мы разрешим 3 соединения, которые могут передавать до 100 файлов за одно соединение.Это означает, что для перемещения 1000 файлов из системы А мы можем обработать только 300 файлов за цикл, поскольку разрешено 3 потока по 100 файлов в каждом.Следовательно, за время существования этой передачи у нас будет 10 потоков.Мы можем запускать только 3 одновременно.Итак, будет 3 цикла, и последний цикл будет использовать только 1 поток для передачи последних 100 файлов.(3 потока x 100 файлов = 300 файлов за цикл)
Текущая архитектура в качестве примера:
- System.Threading.Timer проверяет очередь каждые 5 секунд на наличие необходимых действий, вызывая GetScheduledTask().
- Если делать нечего, GetScheduledTask() просто ничего не делает.
- Если есть работа, создайте поток ThreadPool для ее обработки [Рабочий поток A]
- Рабочий поток А видит, что нужно передать 1000 файлов.
- Рабочий поток A видит, что в системе, из которой он получает файлы, может быть запущено только 3 потока.
- Рабочий поток A запускает три новых рабочих потока [B,C,D] и передает
- Рабочий поток A ожидает B, C, D
[WaitHandle.WaitAll(transfersArray)]
- Рабочий поток A видит, что в очереди еще есть файлы (теперь их должно быть 700).
- Рабочий поток A создает новый массив для ожидания
[transfersArray = new TransferArray[3]
это максимум для системы А, но может варьироваться в зависимости от системы - Рабочий поток A запускает три новых рабочих потока [B,C,D] и ждет их.
[WaitHandle.WaitAll(transfersArray)]
- Процесс повторяется до тех пор, пока не останется файлов для перемещения.
- Рабочая нить A сигнализирует о завершении
Я использую ManualResetEvent для обработки сигналов.
Мои вопросы:
- Есть ли какие-либо явные обстоятельства, которые могут вызвать утечку ресурсов или проблему, с которой я столкнулся?
- Должен ли я перебирать массив после каждого
WaitHandle.WaitAll(array)
и позвониarray[index].Dispose()?
- Число дескрипторов этого процесса в диспетчере задач медленно увеличивается.
- Я вызываю первоначальное создание рабочего потока A из System.Threading.Timer.Будут ли с этим какие-то проблемы?Код этого таймера:
(Некоторый код класса для планирования)
private ManualResetEvent _ResetEvent;
private void Start()
{
_IsAlive = true;
ManualResetEvent transferResetEvent = new ManualResetEvent(false);
//Set the scheduler timer to 5 second intervals
_ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}
private void ScheduledTasks_Tick(object state)
{
ManualResetEvent resetEvent = null;
try
{
resetEvent = (ManualResetEvent)state;
//Block timer until GetScheduledTasks() finishes
_ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
GetScheduledTasks();
}
finally
{
_ScheduledTasks.Change(5000, 5000);
Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
resetEvent.Set();
}
}
private void GetScheduledTask()
{
try
{
//Check to see if the database connection is still up
if (!_IsAlive)
{
//Handle
_ConnectionLostNotification = true;
return;
}
//Get scheduled records from the database
ISchedulerTask task = null;
using (DataTable dt = FastSql.ExecuteDataTable(
_ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
{
if (dt != null)
{
if (dt.Rows.Count == 1)
{ //Only 1 row is allowed
DataRow dr = dt.Rows[0];
//Get task information
TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
task = ScheduledTaskFactory.CreateScheduledTask(taskType);
task.Description = dr["Description"].ToString();
task.IsEnabled = (bool)dr["IsEnabled"];
task.IsProcessing = (bool)dr["IsProcessing"];
task.IsManualLaunch = (bool)dr["IsManualLaunch"];
task.ProcessMachineName = dr["ProcessMachineName"].ToString();
task.NextRun = (DateTime)dr["NextRun"];
task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
task.SleepMinutes = (int)dr["SleepMinutes"];
task.ScheduleId = (int)dr["ScheduleId"];
task.CurrentRuns = (int)dr["CurrentRuns"];
task.TotalRuns = (int)dr["TotalRuns"];
SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
//Queue up task to worker thread and start
ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);
}
}
}
}
catch (Exception ex)
{
//Handle
}
}
private void ThreadProc(object taskObject)
{
SchedulerTask task = (SchedulerTask)taskObject;
ScheduledTaskEngine engine = null;
try
{
engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
engine.StartTask(task.Task);
}
catch (Exception ex)
{
//Handle
}
finally
{
task.TaskResetEvent.Set();
task.TaskResetEvent.Dispose();
}
}
Решение 4
Оказывается, источник этой странной проблемы был связан не с архитектурой, а с преобразованием решения из версии 3.5 в версию 4.0.Я воссоздал решение, не внося никаких изменений в код, и проблема больше никогда не возникала.
Другие советы
0x8007000E — ошибка нехватки памяти.Это, а также количество дескрипторов, похоже, указывают на утечку ресурсов.Убедитесь, что вы избавляетесь от каждого объекта, реализующего IDisposable
.Сюда входят массивы ManualResetEvent
вы используете.
Если у вас есть время, вы также можете перейти на использование .NET 4.0. Task
сорт;он был разработан для более четкой обработки подобных сложных сценариев.Определяя ребенка Task
объектов, вы можете уменьшить общее количество потоков (потоки довольно дороги не только из-за планирования, но и из-за их стекового пространства).
Я ищу ответы на аналогичную проблему (количество дескрипторов увеличивается с течением времени).
Я изучил архитектуру вашего приложения и хотел бы предложить вам кое-что, что могло бы вам помочь:
Слышали ли вы о IOCP (порты завершения ввода-вывода).
Я не уверен в сложности реализации этого с помощью C#, но на C/C++ это проще простого.Используя это, вы создаете уникальный пул потоков (количество потоков в этом пуле, в целом, определяется как 2 x количество процессоров или процессоров ядер на ПК или на сервере). работа.См. справку по этим функциям:CreateIoCompletionPort();PostQueuedCompletionStatus();GetQueuedCompletionStatus();
В общем, создание потоков и выход из них на лету может занять много времени и привести к снижению производительности и фрагментации памяти.В MSDN и Google есть тысячи литературы по IOCP.
Я думаю, вам следует полностью пересмотреть свою архитектуру.Тот факт, что вы можете иметь только 3 одновременных соединения, практически заставляет вас использовать 1 поток для генерации списка файлов и 3 потока для их обработки.Ваш поток-производитель вставит все файлы в очередь, а три потока-потребителя выйдут из очереди и продолжат обработку по мере поступления элементов в очередь.Блокирующая очередь может существенно упростить код.Если вы используете .NET 4.0, вы можете воспользоваться преимуществами БлокированиеСбор сорт.
public class Example
{
private BlockingCollection<string> m_Queue = new BlockingCollection<string>();
public void Start()
{
var threads = new Thread[]
{
new Thread(Producer),
new Thread(Consumer),
new Thread(Consumer),
new Thread(Consumer)
};
foreach (Thread thread in threads)
{
thread.Start();
}
}
private void Producer()
{
while (true)
{
Thread.Sleep(TimeSpan.FromSeconds(5));
ScheduledTask task = GetScheduledTask();
if (task != null)
{
foreach (string file in task.Files)
{
m_Queue.Add(task);
}
}
}
}
private void Consumer()
{
// Make a connection to the resource that is assigned to this thread only.
while (true)
{
string file = m_Queue.Take();
// Process the file.
}
}
}
В приведенном выше примере я определенно слишком упростил ситуацию, но надеюсь, что вы уловили общую идею.Обратите внимание, что это намного проще, поскольку в синхронизации потоков не так уж много (большинство из них будет встроено в очередь блокировки) и, конечно же, нет никакой пользы от WaitHandle
объекты.Очевидно, вам придется добавить правильные механизмы для корректного закрытия потоков, но это должно быть довольно легко.