スレッドプールで労働者を追跡する堅牢な方法
-
25-09-2019 - |
質問
私は、すべてのスレッドが終了するためにWaithandle.Waitall()を使用して、ThreadPoolでキューに留められたときに労働者が失敗した良い追跡方法(カウント)を探しています。
インターロックカウンターは優れたテクニックですか、それともより堅牢な戦略がありますか?
解決
さて、ここにあなたが取ることができるアプローチがあります。クラスに追跡したいデータをカプセル化しました TrackedWorkers
. 。このクラスには、労働者の数を設定できるようにするコンストラクターがあります。その後、労働者は使用されます LaunchWorkers
これには、食べる代表が必要です object
そして、aを返します bool
. 。 object
労働者への入力を表します bool
に応じて成功または失敗を表します true
また false
それぞれ返品値であること。
したがって、基本的に私たちがしていることは、労働者状態を追跡する配列を持っています。労働者を立ち上げ、労働者からの返品価値に応じて、その労働者に対応するステータスを設定します。労働者が戻ったとき、私たちは設定します AutoResetEvent
と WaitHandle.WaitAll
すべてのために AutoResetEvents
設定する。
作業を追跡するためのネストされたクラス(代表者)があることに注意してください。 ID
ステータスを設定するために使用されます AutoResetEvent
そのスレッドに対応しています。
作業が完了したら、作業代表への参照を保持していないことに非常に注意してください func
また input
. 。これは重要であるため、誤ってものが収集されないようにしないようにします。
特定の労働者のステータスを取得する方法、成功した労働者のすべてのインデックス、失敗した労働者のすべてのインデックスを取得する方法があります。
最後の注:このコード制作は準備ができているとは考えていません。それは私が取るアプローチの単なるスケッチです。テスト、例外処理、その他の詳細を追加するように注意する必要があります。
class TrackedWorkers {
class WorkerState {
public object Input { get; private set; }
public int ID { get; private set; }
public Func<object, bool> Func { get; private set; }
public WorkerState(Func<object, bool> func, object input, int id) {
Func = func;
Input = input;
ID = id;
}
}
AutoResetEvent[] events;
bool[] statuses;
bool _workComplete;
int _number;
public TrackedWorkers(int number) {
if (number <= 0 || number > 64) {
throw new ArgumentOutOfRangeException(
"number",
"number must be positive and at most 64"
);
}
this._number = number;
events = new AutoResetEvent[number];
statuses = new bool[number];
_workComplete = false;
}
void Initialize() {
_workComplete = false;
for (int i = 0; i < _number; i++) {
events[i] = new AutoResetEvent(false);
statuses[i] = true;
}
}
void DoWork(object state) {
WorkerState ws = (WorkerState)state;
statuses[ws.ID] = ws.Func(ws.Input);
events[ws.ID].Set();
}
public void LaunchWorkers(Func<object, bool> func, object[] inputs) {
Initialize();
for (int i = 0; i < _number; i++) {
WorkerState ws = new WorkerState(func, inputs[i], i);
ThreadPool.QueueUserWorkItem(this.DoWork, ws);
}
WaitHandle.WaitAll(events);
_workComplete = true;
}
void ThrowIfWorkIsNotDone() {
if (!_workComplete) {
throw new InvalidOperationException("work not complete");
}
}
public bool GetWorkerStatus(int i) {
ThrowIfWorkIsNotDone();
return statuses[i];
}
public IEnumerable<int> SuccessfulWorkers {
get {
return WorkersWhere(b => b);
}
}
public IEnumerable<int> FailedWorkers {
get {
return WorkersWhere(b => !b);
}
}
IEnumerable<int> WorkersWhere(Predicate<bool> predicate) {
ThrowIfWorkIsNotDone();
for (int i = 0; i < _number; i++) {
if (predicate(statuses[i])) {
yield return i;
}
}
}
}
サンプルの使用法:
class Program {
static Random rg = new Random();
static object lockObject = new object();
static void Main(string[] args) {
int count = 64;
Pair[] pairs = new Pair[count];
for(int i = 0; i < count; i++) {
pairs[i] = new Pair(i, 2 * i);
}
TrackedWorkers workers = new TrackedWorkers(count);
workers.LaunchWorkers(SleepAndAdd, pairs.Cast<object>().ToArray());
Console.WriteLine(
"Number successful: {0}",
workers.SuccessfulWorkers.Count()
);
Console.WriteLine(
"Number failed: {0}",
workers.FailedWorkers.Count()
);
}
static bool SleepAndAdd(object o) {
Pair pair = (Pair)o;
int timeout;
double d;
lock (lockObject) {
timeout = rg.Next(1000);
d = rg.NextDouble();
}
Thread.Sleep(timeout);
bool success = d < 0.5;
if (success) {
Console.WriteLine(pair.First + pair.Second);
}
return (success);
}
}
上記のプログラムは、64個のスレッドを起動する予定です。 i
THスレッドには、番号を追加するタスクがあります i
と 2 * i
結果をコンソールに印刷します。しかし、忙しさをシミュレートするためにランダムな量の睡眠(1秒未満)を追加し、コインをひっくり返してスレッドの成功または失敗を判断しました。成功した人は、彼らが任命された合計を印刷し、返品します true
. 。失敗したものは何も印刷して戻ってきます false
.
ここで私が使用しました
struct Pair {
public int First { get; private set; }
public int Second { get; private set; }
public Pair(int first, int second) : this() {
this.First = first;
this.Second = second;
}
}