質問

私のアプリケーションは、さまざまな小さなワーカーのスレッドを介して大量の負荷を発生させます ThreadPool.QueueUserWorkItem 私はそれを倍に追跡します ManualResetEvent インスタンス。私は使用します WaitHandle.WaitAll これらのスレッドが完了するまで、アプリケーションを閉じるのをブロックする方法。

私のアプリケーションがより多くの負荷にかかっているので、私はこれまでに問題を抱えたことはありません。

WaitHandles must be less than or equal to 64 - missing documentation

これに対する最良の代替ソリューションは何ですか?

コードスニペット

List<AutoResetEvent> events = new List<AutoResetEvent>();

// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
    // do work
    evt.Set();
});

...
WaitHandle.WaitAll(events.ToArray());

回避策

int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);

...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
    try
    {
         // do work
    }
    finally
    {
        if (Interlocked.Decrement(ref threadCount) == 0)
        {
             finished.Set();
        }
    }
});

...
finished.WaitOne();
役に立ちましたか?

解決

実行中のタスクの数を追跡する変数を作成します。

int numberOfTasks = 100;

信号を作成する:

ManualResetEvent signal = new ManualResetEvent(false);

タスクが終了するたびにタスクの数を減らす:

if (Interlocked.Decrement(ref numberOftasks) == 0)
{

残りのタスクがない場合は、信号を設定します。

    signal.Set();
}

一方、どこかで、信号が設定されるのを待ちます。

signal.WaitOne();

他のヒント

.NET 4.0から始めて、利用可能な2つの(およびIMO、クリーナー)オプションがあります。

1つ目は、を使用することです CountdownEvent クラス. 。これは、自分で増加と減少を処理する必要がないことを防ぎます。

int tasks = <however many tasks you're performing>;

// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
    // Queue work.
    ThreadPool.QueueUserWorkItem(() => {
        // Do work
        ...

        // Signal when done.
        e.Signal();
    });

    // Wait till the countdown reaches zero.
    e.Wait();
}

しかし、さらに堅牢なソリューションがあり、それは Task クラス, 、 そのようです:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Factory.StartNew(() => {
        // Do work.
    }

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Task.WaitAll(tasks);

を使用して Task クラスと呼び出し WaitAll コード全体でプリミティブをスレッディングしないように織り込んでいるので、はるかにきれいです(注意、待機ハンドルなし)。カウンターをセットアップしたり、増加/減少を処理したりする必要はありません。タスクを設定してから待つだけです。これにより、コードをより表現力を高めることができます のプリミティブではなく、やりたいことの どうやって (少なくとも、それの並列化を管理するという点で)。

.NET 4.5はさらに多くのオプションを提供します。シーケンスの生成を簡素化できます Task 電話をかけることによるインスタンス 静的 Run の方法 Task クラス:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Run(() => {
        // Do work.
    })

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Tasks.WaitAll(tasks);

または、あなたはそれを利用することができます TPLデータフローライブラリ (それはにあります System 名前空間なので、エンティティフレームワークのようにnugetからダウンロードしていても公式です) ActionBlock<TInput>, 、 そのようです:

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
    // Do work.
});

// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);

// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();

// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();

に注意してください ActionBlock<TInput> デフォルトでは一度に1つのアイテムを処理するため、一度に複数のアクションを処理する場合は、コンストラクターで処理する同時のアイテムの数を、渡すことで設定する必要があります。 ExecutionDataflowBlockOptions インスタンスと設定 MaxDegreeOfParallelism 財産:

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

あなたのアクションが本当にスレッド安全である場合、あなたは MaxDegreeOfParallelsim プロパティへ DataFlowBlockOptions.Unbounded:

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});

ポイントは、あなたが細かい粒度を持っていることです どうやって 並列あなたはあなたのオプションを望んでいることを望みます。

もちろん、あなたがあなたの中に渡されたいアイテムのシーケンスがある場合 ActionBlock<TInput> インスタンス、次にリンクできます ISourceBlock<TOutput> フィードするための実装 ActionBlock<TInput>, 、 そのようです:

// The buffer block.
var buffer = new BufferBlock<int>();

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
    // Do work.
});

// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock, 
    // Want to propagate completion state to the action block.
    new DataflowLinkOptions {
        PropagateCompletion = true,
    },
    // Can filter on items flowing through if you want.
    i => true)
{ 
    // Post 100 times to the *buffer*
    foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);

    // Signal complete, this doesn't actually stop
    // the block, but says that everything is done when the currently
    // posted items are completed.
    actionBlock.Complete();

    // Wait for everything to complete, the Completion property
    // exposes a Task which can be waited on.
    actionBlock.Completion.Wait();
}

あなたがする必要があることに応じて、TPLデータフローライブラリは 多くの より魅力的なオプション、それは並行性を処理するという点で すべて タスクは一緒にリンクされており、それはあなたが非常に具体的になることを可能にします ただ 各ブロックの懸念の適切な分離を維持しながら、各作品をどれだけ平行にしたいか。

あなたの回避策は正しくありません。その理由は、 SetWaitOne 最後の作業アイテムが原因となった場合、レースできます threadCount ゼロに移動します キューイングスレッドは、キューにする機会が必要でした すべて ワークアイテム。修正は簡単です。キューイングスレッドをワークアイテム自体であるかのように扱います。初期化 threadCount 1に、キューイングが完了したときに減少と信号を行います。

int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount); 
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
        if (Interlocked.Decrement(ref threadCount) == 0) 
        { 
             finished.Set(); 
        } 
    } 
}); 
... 
if (Interlocked.Decrement(ref threadCount) == 0)
{
  finished.Set();
}
finished.WaitOne(); 

個人的な好みとして、私はそれを使うのが好きです CountdownEvent 私のためにカウントするクラス。

var finished = new CountdownEvent(1);
...
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
      finished.Signal();
    } 
}); 
... 
finished.Signal();
finished.Wait(); 

DTBの回答に加えて、これを素敵なシンプルなクラスに包むことができます。

public class Countdown : IDisposable
{
    private readonly ManualResetEvent done;
    private readonly int total;
    private long current;

    public Countdown(int total)
    {
        this.total = total;
        current = total;
        done = new ManualResetEvent(false);
    }

    public void Signal()
    {
        if (Interlocked.Decrement(ref current) == 0)
        {
            done.Set();
        }
    }

    public void Wait()
    {
        done.WaitOne();
    }

    public void Dispose()
    {
        ((IDisposable)done).Dispose();
    }
}

コールバックを持ちたいときにDTBの回答を追加します。

using System;
using System.Runtime.Remoting.Messaging;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        Main m = new Main();
        m.TestMRE();
        Console.ReadKey();

    }
}

class Main
{
    CalHandler handler = new CalHandler();
    int numberofTasks =0;
    public void TestMRE()
    {

        for (int j = 0; j <= 3; j++)
        {
            Console.WriteLine("Outer Loop is :" + j.ToString());
            ManualResetEvent signal = new ManualResetEvent(false);
            numberofTasks = 4;
            for (int i = 0; i <= 3; i++)
            {
                CalHandler.count caller = new CalHandler.count(handler.messageHandler);
                caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal);
            }
            signal.WaitOne();
        }

    }

    private void NumberCallback(IAsyncResult result)
    {
        AsyncResult asyncResult = (AsyncResult)result;

        CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate;

        int num = caller.EndInvoke(asyncResult);

        Console.WriteLine("Number is :"+ num.ToString());

        ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState;
        if (Interlocked.Decrement(ref numberofTasks) == 0)
        {
            mre.Set();
        }
    }

}
public class CalHandler
{
    public delegate int count(int number);

    public int messageHandler ( int number )
    {
        return number;
    }

}
protected void WaitAllExt(WaitHandle[] waitHandles)
{
    //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles
    const int waitAllArrayLimit = 64;
    var prevEndInd = -1;
    while (prevEndInd < waitHandles.Length - 1)
    {
        var stInd = prevEndInd + 1;
        var eInd = stInd + waitAllArrayLimit - 1;
        if (eInd > waitHandles.Length - 1)
        {
            eInd = waitHandles.Length - 1;
        }
        prevEndInd = eInd;

        //do wait
        var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray();
        WaitHandle.WaitAll(whSubarray);
    }

}

パフォーマンスが失われることなく待機するためにイベントの数を単純にパジングするだけで解決しましたが、それは生産環境で完全に機能しています。コードに従います:

        var events = new List<ManualResetEvent>();

        // code omited

        var newEvent = new ManualResetEvent(false);
        events.Add(newEvent);
        ThreadPool.QueueUserWorkItem(c => {

            //thread code
            newEvent.Set();
        });

        // code omited

        var wait = true;
        while (wait)
        {
            WaitHandle.WaitAll(events.Take(60).ToArray());
            events.RemoveRange(0, events.Count > 59 ? 60 : events.Count);
            wait = events.Any();

        }

これが別のソリューションです。 「イベント」はManualReseTeventのリストです。リストのサイズは64(max_events_no)を超える場合があります。

int len = events.Count;
if (len <= MAX_EVENTS_NO)
    {
        WaitHandle.WaitAll(events.ToArray());
    } else {
        int start = 0;
        int num = MAX_EVENTS_NO;
        while (true)
            {
                if(start + num > len)
                {
                   num = len - start;
                }
                List<ManualResetEvent> sublist = events.GetRange(start, num);
                WaitHandle.WaitAll(sublist.ToArray());
                start += num;
                if (start >= len)
                   break;
           }
   }

Windows XP SP3は、最大2つのウェイトハンドルをサポートしています。 2つ以上のウェイトハンドルアプリケーションの場合、時期尚早に終了します。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top