Question

Mes applications différentes zones d'apparition des charges de petits threads de travail via ThreadPool.QueueUserWorkItem que je garde une trace de via plusieurs instances de ManualResetEvent. J'utilise la méthode WaitHandle.WaitAll pour bloquer ma demande de fermer jusqu'à ce que ces fils sont terminés.

Je n'ai jamais eu de problèmes auparavant, cependant, que ma demande est plus CORNING sous charge à savoir plus de threads créés, je commence maintenant pour obtenir cette exception:

WaitHandles must be less than or equal to 64 - missing documentation

Quelle est la meilleure solution alternative à cela?

Code Snippet

List<AutoResetEvent> events = new List<AutoResetEvent>();

// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
    // do work
    evt.Set();
});

...
WaitHandle.WaitAll(events.ToArray());

Solution

int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);

...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
    try
    {
         // do work
    }
    finally
    {
        if (Interlocked.Decrement(ref threadCount) == 0)
        {
             finished.Set();
        }
    }
});

...
finished.WaitOne();
Était-ce utile?

La solution

Créer une variable qui permet de suivre le nombre de tâches en cours d'exécution:

int numberOfTasks = 100;

Créer un signal:

ManualResetEvent signal = new ManualResetEvent(false);

Décrémenter le nombre de tâches à chaque fois qu'une tâche est terminée:

if (Interlocked.Decrement(ref numberOftasks) == 0)
{

S'il n'y a pas de tâche restante, régler le signal:

    signal.Set();
}

Pendant ce temps, ailleurs, attendre le signal à régler:

signal.WaitOne();

Autres conseils

A partir de .NET 4.0, vous avez deux options plus (et l'OMI, plus propres) à vous.

La première consiste à utiliser la classe CountdownEvent . Il évite la nécessité d'avoir à gérer la incrémenter et décrémenter sur votre propre:

int tasks = <however many tasks you're performing>;

// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
    // Queue work.
    ThreadPool.QueueUserWorkItem(() => {
        // Do work
        ...

        // Signal when done.
        e.Signal();
    });

    // Wait till the countdown reaches zero.
    e.Wait();
}

Cependant, il y a une solution encore plus robuste, et c'est d'utiliser le classe Task , comme suit:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Factory.StartNew(() => {
        // Do work.
    }

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Task.WaitAll(tasks);

Utilisation de la classe Task et l'appel à WaitAll est beaucoup plus propre, l'OMI, comme vous « re tissage primitives moins de filetage tout au long de votre code (avis, pas de poignées d'attente); vous ne disposez pas de mettre en place un compteur, poignée incrémenter / décrémenter, vous venez de configurer vos tâches puis attendre sur eux. Cela permet le code soit plus expressif dans la ce de ce que vous voulez faire et non les primitives de comment (au moins en termes de gestion de la parallélisation de celui-ci).

.NET 4.5 offre encore plus d'options, vous pouvez simplifier la génération de la séquence des instances de Task en appelant le méthode de Run statique de la classe de Task :

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Run(() => {
        // Do work.
    })

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Tasks.WaitAll(tasks);

Ou, vous pouvez profiter de la TPL bibliothèque DataFlow (il est dans l'espace de noms System , il est donc officiel, même si elle est un téléchargement de NuGet, comme Entity Framework) et utiliser un ActionBlock<TInput> , comme ceci:

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
    // Do work.
});

// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);

// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();

// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();

Notez que le ActionBlock<TInput> par défaut traite un élément à la fois, donc si vous voulez avoir traiter plusieurs actions à un moment donné, vous devez définir le nombre d'éléments simultanés que vous voulez traiter dans le constructeur en passant un < a href = "http://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.executiondataflowblockoptions.aspx"> ExecutionDataflowBlockOptions instance et le réglage de la MaxDegreeOfParallelism :

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

Si votre action est enfilez vraiment sûr, vous pouvez définir la propriété MaxDegreeOfParallelsim DataFlowBlockOptions.Unbounded :

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});

Le point étant, vous avez le contrôle à grain fin sur comment parallèle que vous voulez que vos options soient.

Bien sûr, si vous avez une séquence d'éléments que vous voulez dans votre passé instance de ActionBlock<TInput>, vous pouvez lier un ISourceBlock<TOutput> mise en œuvre pour nourrir la ActionBlock<TInput>, comme suit:

// The buffer block.
var buffer = new BufferBlock<int>();

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
    // Do work.
});

// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock, 
    // Want to propagate completion state to the action block.
    new DataflowLinkOptions {
        PropagateCompletion = true,
    },
    // Can filter on items flowing through if you want.
    i => true)
{ 
    // Post 100 times to the *buffer*
    foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);

    // Signal complete, this doesn't actually stop
    // the block, but says that everything is done when the currently
    // posted items are completed.
    actionBlock.Complete();

    // Wait for everything to complete, the Completion property
    // exposes a Task which can be waited on.
    actionBlock.Completion.Wait();
}

En fonction de ce que vous devez faire, la bibliothèque TPL Dataflow devient beaucoup plus attrayante dans la mesure où il gère la concurrence dans tous les tâches reliées entre elles, et il vous permet d'être très précis sur juste comment vous voulez que chaque parallèle pièce à, tout en maintenant une séparation correcte des préoccupations pour chaque bloc.

Votre solution de contournement est incorrect. La raison en est que le Set et WaitOne pourrait courir si le dernier élément de travail provoque l'threadCount aller à zéro avant le fil de mise en attente a eu au hasard de file d'attente tous éléments de travail. La solution est simple. Traiter votre fil faire la queue comme si elle était un élément de travail lui-même. Initialize threadCount à 1 et faire un décrément et signal lorsque la file d'attente est terminée.

int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount); 
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
        if (Interlocked.Decrement(ref threadCount) == 0) 
        { 
             finished.Set(); 
        } 
    } 
}); 
... 
if (Interlocked.Decrement(ref threadCount) == 0)
{
  finished.Set();
}
finished.WaitOne(); 

Comme une préférence personnelle Je aime utiliser la classe CountdownEvent pour faire le comptage pour moi.

var finished = new CountdownEvent(1);
...
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
      finished.Signal();
    } 
}); 
... 
finished.Signal();
finished.Wait(); 

Ajout à la réponse de DTB vous pouvez envelopper cela dans une belle classe simple.

public class Countdown : IDisposable
{
    private readonly ManualResetEvent done;
    private readonly int total;
    private long current;

    public Countdown(int total)
    {
        this.total = total;
        current = total;
        done = new ManualResetEvent(false);
    }

    public void Signal()
    {
        if (Interlocked.Decrement(ref current) == 0)
        {
            done.Set();
        }
    }

    public void Wait()
    {
        done.WaitOne();
    }

    public void Dispose()
    {
        ((IDisposable)done).Dispose();
    }
}

Ajout à la réponse de DTB quand nous voulons avoir callbacks.

using System;
using System.Runtime.Remoting.Messaging;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        Main m = new Main();
        m.TestMRE();
        Console.ReadKey();

    }
}

class Main
{
    CalHandler handler = new CalHandler();
    int numberofTasks =0;
    public void TestMRE()
    {

        for (int j = 0; j <= 3; j++)
        {
            Console.WriteLine("Outer Loop is :" + j.ToString());
            ManualResetEvent signal = new ManualResetEvent(false);
            numberofTasks = 4;
            for (int i = 0; i <= 3; i++)
            {
                CalHandler.count caller = new CalHandler.count(handler.messageHandler);
                caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal);
            }
            signal.WaitOne();
        }

    }

    private void NumberCallback(IAsyncResult result)
    {
        AsyncResult asyncResult = (AsyncResult)result;

        CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate;

        int num = caller.EndInvoke(asyncResult);

        Console.WriteLine("Number is :"+ num.ToString());

        ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState;
        if (Interlocked.Decrement(ref numberofTasks) == 0)
        {
            mre.Set();
        }
    }

}
public class CalHandler
{
    public delegate int count(int number);

    public int messageHandler ( int number )
    {
        return number;
    }

}
protected void WaitAllExt(WaitHandle[] waitHandles)
{
    //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles
    const int waitAllArrayLimit = 64;
    var prevEndInd = -1;
    while (prevEndInd < waitHandles.Length - 1)
    {
        var stInd = prevEndInd + 1;
        var eInd = stInd + waitAllArrayLimit - 1;
        if (eInd > waitHandles.Length - 1)
        {
            eInd = waitHandles.Length - 1;
        }
        prevEndInd = eInd;

        //do wait
        var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray();
        WaitHandle.WaitAll(whSubarray);
    }

}

Je ne résolu par paginant simplement le nombre d'événements à attendre sans beaucoup performace perdu, et il fonctionne parfaitement sur l'environnement de production. Suit le code:

        var events = new List<ManualResetEvent>();

        // code omited

        var newEvent = new ManualResetEvent(false);
        events.Add(newEvent);
        ThreadPool.QueueUserWorkItem(c => {

            //thread code
            newEvent.Set();
        });

        // code omited

        var wait = true;
        while (wait)
        {
            WaitHandle.WaitAll(events.Take(60).ToArray());
            events.RemoveRange(0, events.Count > 59 ? 60 : events.Count);
            wait = events.Any();

        }

Voici une autre solution. Voici les « événements » est une liste de ManualResetEvent. La taille de la liste peut être supérieure à 64 (MAX_EVENTS_NO).

int len = events.Count;
if (len <= MAX_EVENTS_NO)
    {
        WaitHandle.WaitAll(events.ToArray());
    } else {
        int start = 0;
        int num = MAX_EVENTS_NO;
        while (true)
            {
                if(start + num > len)
                {
                   num = len - start;
                }
                List<ManualResetEvent> sublist = events.GetRange(start, num);
                WaitHandle.WaitAll(sublist.ToArray());
                start += num;
                if (start >= len)
                   break;
           }
   }

Windows XP SP3 prend en charge deux WaitHandles maximale. Pour les cas plus de 2 WaitHandles application se termine prématurément.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top