Threadpool / Waithandleリソースリーク/クラッシュ
-
16-09-2020 - |
質問
私は私のデザインを再考する必要があるかもしれないと思います。私のコンピュータが完全にハングアップされているバグを絞り込むのに苦労しています。
私は、データベースキューに基づいてファイルの転送を処理するコンソールアプリケーション(後でサービスに変換する)を持っています。
転送に許可されたスレッドを調整しています。これは、私たちが接続しているシステムによっては、特定のアカウントからの特定の数の接続しか含まれていないためです。
たとえば、システムAは3つの同時接続のみを受け入れることができます(これは、3つの個別のスレッドを意味します)。これらのスレッドのそれぞれには独自の接続オブジェクトがありますので、接続を共有していないため、どんな同期の問題にも実行しないでください。それらのシステムからサイクルでファイルを処理したい。したがって、たとえば、接続ごとに最大100個のファイルを転送できる3つの接続を許可します。つまり、システムAから1000ファイルを移動するために、3つのスレッドがそれぞれ100個のファイルで許可されているため、サイクルごとに300ファイルのみを処理できます。したがって、この転送の存続期間にわたっては10個のスレッドがあります。私たちは一度に3つだけ実行できます。そのため、3サイクルがあり、最後のサイクルは最後の100個のファイルを転送するために1つのスレッドのみを使用します。 (3スレッド×100ファイル= 300ファイルあたり300ファイル)
例による現在のアーキテクチャは次のとおりです。
- System.Threading.Timerは、GetScheduledTask() を呼び出すことで何かをすることによって、5秒ごとにキューをチェックします。
- 何もない場合、getScheduledTask()は単に を意味します
- 作業がある場合は、作業を処理するためのスレッドプールスレッドを作成してください[ワークスレッドA]
- 作業スレッドAは転送する1000個のファイルがあることを見ています
- 作業スレッドAは、システムに3つのスレッドを実行できるようにしていることが からファイルを取得していることを確認します。
- 作業スレッドAは3つの新しい作業スレッド[B、C、D]と転送 を起動します。
- ワークスレッドAは、B、C、D
[WaitHandle.WaitAll(transfersArray)]
を待機します。
- ワークスレッドAは、キューにまだより多くのファイルがあることを見ています(700今すぐ)
- 作業スレッドAは、システムAのMAXであるがシステム では異なる場合がありますそれらの
- 作業スレッドAは3つの新しい作業スレッド[B、C、D]を起動し、それらを待機し、
[WaitHandle.WaitAll(transfersArray)]
- 移動するファイルがこれ以上ないまでプロセスが繰り返されます。
- 作業スレッド終了
- 私が経験しているリソースの漏洩や問題を引き起こす可能性のある視覚的な状況はありますか?
-
WaitHandle.WaitAll(array)
ごとにアレイをスルーして呼び出してarray[index].Dispose()?
を呼び出します。
- このプロセスのタスクマネージャの下のハンドルカウントはゆっくり登って
- 私はSystem.Threading.timerからワーカースレッドAの初期作成を呼び出しています。これに問題があるだろうか?そのタイマーのコードは次のとおりです。
[transfersArray = new TransferArray[3]
を待つ新しい配列を作成します。
ManualResetEventを使用してシグナリングを処理します。
私の質問は次のとおりです。
(スケジューリングのためのクラスコード)
private ManualResetEvent _ResetEvent;
private void Start()
{
_IsAlive = true;
ManualResetEvent transferResetEvent = new ManualResetEvent(false);
//Set the scheduler timer to 5 second intervals
_ScheduledTasks = new Timer(new TimerCallback(ScheduledTasks_Tick), transferResetEvent, 200, 5000);
}
private void ScheduledTasks_Tick(object state)
{
ManualResetEvent resetEvent = null;
try
{
resetEvent = (ManualResetEvent)state;
//Block timer until GetScheduledTasks() finishes
_ScheduledTasks.Change(Timeout.Infinite, Timeout.Infinite);
GetScheduledTasks();
}
finally
{
_ScheduledTasks.Change(5000, 5000);
Console.WriteLine("{0} [Main] GetScheduledTasks() finished", DateTime.Now.ToString("MMddyy HH:mm:ss:fff"));
resetEvent.Set();
}
}
private void GetScheduledTask()
{
try
{
//Check to see if the database connection is still up
if (!_IsAlive)
{
//Handle
_ConnectionLostNotification = true;
return;
}
//Get scheduled records from the database
ISchedulerTask task = null;
using (DataTable dt = FastSql.ExecuteDataTable(
_ConnectionString, "hidden for security", System.Data.CommandType.StoredProcedure,
new List<FastSqlParam>() { new FastSqlParam(ParameterDirection.Input, SqlDbType.VarChar, "@ProcessMachineName", Environment.MachineName) })) //call to static class
{
if (dt != null)
{
if (dt.Rows.Count == 1)
{ //Only 1 row is allowed
DataRow dr = dt.Rows[0];
//Get task information
TransferParam.TaskType taskType = (TransferParam.TaskType)Enum.Parse(typeof(TransferParam.TaskType), dr["TaskTypeId"].ToString());
task = ScheduledTaskFactory.CreateScheduledTask(taskType);
task.Description = dr["Description"].ToString();
task.IsEnabled = (bool)dr["IsEnabled"];
task.IsProcessing = (bool)dr["IsProcessing"];
task.IsManualLaunch = (bool)dr["IsManualLaunch"];
task.ProcessMachineName = dr["ProcessMachineName"].ToString();
task.NextRun = (DateTime)dr["NextRun"];
task.PostProcessNotification = (bool)dr["NotifyPostProcess"];
task.PreProcessNotification = (bool)dr["NotifyPreProcess"];
task.Priority = (TransferParam.Priority)Enum.Parse(typeof(TransferParam.SystemType), dr["PriorityId"].ToString());
task.SleepMinutes = (int)dr["SleepMinutes"];
task.ScheduleId = (int)dr["ScheduleId"];
task.CurrentRuns = (int)dr["CurrentRuns"];
task.TotalRuns = (int)dr["TotalRuns"];
SchedulerTask scheduledTask = new SchedulerTask(new ManualResetEvent(false), task);
//Queue up task to worker thread and start
ThreadPool.QueueUserWorkItem(new WaitCallback(this.ThreadProc), scheduledTask);
}
}
}
}
catch (Exception ex)
{
//Handle
}
}
private void ThreadProc(object taskObject)
{
SchedulerTask task = (SchedulerTask)taskObject;
ScheduledTaskEngine engine = null;
try
{
engine = SchedulerTaskEngineFactory.CreateTaskEngine(task.Task, _ConnectionString);
engine.StartTask(task.Task);
}
catch (Exception ex)
{
//Handle
}
finally
{
task.TaskResetEvent.Set();
task.TaskResetEvent.Dispose();
}
}
.解決 4
この奇妙な問題の原因はアーキテクチャとは関係なく、ソリューションを3.5から4.0に変換することによるものです。私は解決策を再作成し、コードの変更を実行しなくて、問題は再び発生しなかった。
他のヒント
0x8007000eはメモリ不足エラーです。それとハンドルカウントはリソースリークを指すように見えます。IDisposable
を実装するすべてのオブジェクトを廃棄してください。これには、使用しているManualResetEvent
sの配列が含まれます。
時間がある場合は、.NET 4.0 Task
クラスの使用に変換することもできます。これは、このような複雑なシナリオをよりきれいに処理するように設計されていました。Child Task
オブジェクトを定義することで、全体的なスレッド数を減らすことができます(スレッドはスケジューリングのためだけでなく、スタックスペースのためにも非常に高価です)。
私は同様の問題に対する答えを探しています(時間の経過とともに増加するハンドル数)。
私はあなたのアプリケーションアーキテクチャを見て、あなたがあなたを助けることができる何かを提案するのが好きです:
IOCP(入力出力完了ポート)について聞いたことがあります。
C#を使ってこれを実装するのは困難なことがわからない。 これを使用すると、一意のスレッドプールが作成されます(PCまたはサーバーのプロセッサーまたはプロセッサコアの数、プロセッサーまたはプロセッサーのコアの数が一般に定義されているスレッド数が一般的に定義されています)。 このプールをIOCPハンドルに関連付け、プールは作業を行います。 これらの機能のヘルプを参照してください。 CreateIoCompletionPort(); PostQueuedCompletionStatus(); getQueedCompletionStatus();
一般的に、スレッドを作成し終了すると、時間がかかり、パフォーマンスの罰金とメモリの断片化につながります。 MSDNとGoogleのIOCPについての何千もの文献があります。
あなたはあなたのアーキテクチャを完全に再考するべきだと思います。 3つの同時接続しか持つことができないという事実は、1つのスレッドを使用してファイルのリストを生成するために1つのスレッドを使用することをほとんど懇願しています。プロデューサスレッドはすべてのファイルをキューに挿入し、3つのコンシューマスレッドは、キューに到着するアイテムとして処理をデキューして続行します。ブロッキングキューはコードを大幅に単純化することができます。あなたが.NET 4.0を使用している場合は、 blockingCollection クラス。
public class Example
{
private BlockingCollection<string> m_Queue = new BlockingCollection<string>();
public void Start()
{
var threads = new Thread[]
{
new Thread(Producer),
new Thread(Consumer),
new Thread(Consumer),
new Thread(Consumer)
};
foreach (Thread thread in threads)
{
thread.Start();
}
}
private void Producer()
{
while (true)
{
Thread.Sleep(TimeSpan.FromSeconds(5));
ScheduledTask task = GetScheduledTask();
if (task != null)
{
foreach (string file in task.Files)
{
m_Queue.Add(task);
}
}
}
}
private void Consumer()
{
// Make a connection to the resource that is assigned to this thread only.
while (true)
{
string file = m_Queue.Take();
// Process the file.
}
}
}
.