Domanda

I miei depone le uova di applicazione carichi di diversi thread di lavoro di piccole dimensioni tramite ThreadPool.QueueUserWorkItem che ho tenere traccia di via più istanze ManualResetEvent. Io uso il metodo WaitHandle.WaitAll per bloccare la mia applicazione di chiudere fino a quando questi fili sono state completate.

Non ho mai avuto problemi prima, però, come la mia domanda è in arrivo sotto un carico maggiore ossia più thread in fase di creazione, ora sto cominciando a ottenere questa eccezione:

WaitHandles must be less than or equal to 64 - missing documentation

Qual è la migliore soluzione alternativa a questo?

Snippet di codice

List<AutoResetEvent> events = new List<AutoResetEvent>();

// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
    // do work
    evt.Set();
});

...
WaitHandle.WaitAll(events.ToArray());

Soluzione

int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);

...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
    try
    {
         // do work
    }
    finally
    {
        if (Interlocked.Decrement(ref threadCount) == 0)
        {
             finished.Set();
        }
    }
});

...
finished.WaitOne();
È stato utile?

Soluzione

Creare una variabile che tiene traccia del numero di esecuzione di attività:

int numberOfTasks = 100;

Crea un segnale:

ManualResetEvent signal = new ManualResetEvent(false);

decremento il numero di attività ogni volta che un compito è finito:

if (Interlocked.Decrement(ref numberOftasks) == 0)
{

Se non c'è compito rimanente, impostare il segnale:

    signal.Set();
}

Nel frattempo, da qualche altra parte, attesa per il segnale da impostare:

signal.WaitOne();

Altri suggerimenti

A partire da .NET 4.0, si dispone di altri due (e IMO, più puliti) opzioni a vostra disposizione.

Il primo è quello di utilizzare la CountdownEvent classe . Previene la necessità di dover gestire l'incremento e il decremento da soli:

int tasks = <however many tasks you're performing>;

// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
    // Queue work.
    ThreadPool.QueueUserWorkItem(() => {
        // Do work
        ...

        // Signal when done.
        e.Signal();
    });

    // Wait till the countdown reaches zero.
    e.Wait();
}

Tuttavia, c'è una soluzione ancora più robusta, e che di utilizzare il class Task , in questo modo:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Factory.StartNew(() => {
        // Do work.
    }

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Task.WaitAll(tasks);

Uso della classe Task e la chiamata a WaitAll è molto più pulito, IMO, come si 'ri tessitura primitive meno filettatura in tutto il codice (avviso, nessuna attesa maniglie); non c'è bisogno di creare un contatore, maniglia di incremento / decremento, basta impostare le attività e quindi attendere su di loro. In questo modo il codice di essere più espressivo nel cosa di ciò che si vuole fare e non le primitive di come (almeno, in termini di gestione della parallelizzazione di esso).

.NET 4.5 offre ancora più opzioni, è possibile semplificare la generazione della sequenza di istanze Task chiamando il metodo statico Run sulla classe Task :

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Run(() => {
        // Do work.
    })

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Tasks.WaitAll(tasks);

In alternativa, si potrebbe sfruttare la TPL DataFlow biblioteca (è nello spazio dei nomi System , quindi è ufficiale, anche se è un download da NuGet, come Entity Framework) e utilizzare un ActionBlock<TInput> , in questo modo:

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
    // Do work.
});

// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);

// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();

// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();

Si noti che il ActionBlock<TInput> di default elabora un elemento alla volta, quindi se si vuole avere elaborare più operazioni in una sola volta, è necessario impostare il numero di elementi concorrenti si desidera elaborare nel costruttore passando un < a href = "http://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.executiondataflowblockoptions.aspx"> ExecutionDataflowBlockOptions istanza e l'impostazione della MaxDegreeOfParallelism :

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

Se la vostra azione è veramente thread-safe, quindi è possibile impostare la proprietà MaxDegreeOfParallelsim a DataFlowBlockOptions.Unbounded :

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});

Il punto è, si ha il controllo a grana fine su come parallelo si desidera che le opzioni siano.

Naturalmente, se si dispone di una serie di elementi che si desidera passato nella tua istanza ActionBlock<TInput>, quindi è possibile collegare un ISourceBlock<TOutput> implementazione per alimentare il ActionBlock<TInput>, in questo modo:

// The buffer block.
var buffer = new BufferBlock<int>();

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
    // Do work.
});

// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock, 
    // Want to propagate completion state to the action block.
    new DataflowLinkOptions {
        PropagateCompletion = true,
    },
    // Can filter on items flowing through if you want.
    i => true)
{ 
    // Post 100 times to the *buffer*
    foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);

    // Signal complete, this doesn't actually stop
    // the block, but says that everything is done when the currently
    // posted items are completed.
    actionBlock.Complete();

    // Wait for everything to complete, the Completion property
    // exposes a Task which can be waited on.
    actionBlock.Completion.Wait();
}

A seconda di ciò che devi fare, la libreria TPL Dataflow diventa una molto un'opzione più interessante, in quanto gestisce la concorrenza attraverso tutti i compiti collegati tra loro, e che ti permette di essere molto specifiche su solo come parallelo si desidera che ogni pezzo di essere, pur mantenendo una corretta separazione degli interessi per ogni blocco.

Il tuo soluzione non è corretta. La ragione è che il Set e WaitOne potrebbe correre se l'ultimo elemento di lavoro fa sì che il threadCount di andare a zero prima il filo di accodamento ha dovuto possibilità di coda di tutti elementi di lavoro. La soluzione è semplice. Trattare il tuo thread in coda come se fosse un elemento di lavoro in sé. threadCount inizializzazione a 1 e fare un decremento e segnale quando la coda è completa.

int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount); 
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
        if (Interlocked.Decrement(ref threadCount) == 0) 
        { 
             finished.Set(); 
        } 
    } 
}); 
... 
if (Interlocked.Decrement(ref threadCount) == 0)
{
  finished.Set();
}
finished.WaitOne(); 

Come una preferenza personale che mi piace utilizzare la classe CountdownEvent di fare il conteggio per me.

var finished = new CountdownEvent(1);
...
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
      finished.Signal();
    } 
}); 
... 
finished.Signal();
finished.Wait(); 

L'aggiunta alla risposta di DTB si può avvolgere questo in un bel semplice classe.

public class Countdown : IDisposable
{
    private readonly ManualResetEvent done;
    private readonly int total;
    private long current;

    public Countdown(int total)
    {
        this.total = total;
        current = total;
        done = new ManualResetEvent(false);
    }

    public void Signal()
    {
        if (Interlocked.Decrement(ref current) == 0)
        {
            done.Set();
        }
    }

    public void Wait()
    {
        done.WaitOne();
    }

    public void Dispose()
    {
        ((IDisposable)done).Dispose();
    }
}

L'aggiunta alla risposta di DTB quando vogliamo avere callback.

using System;
using System.Runtime.Remoting.Messaging;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        Main m = new Main();
        m.TestMRE();
        Console.ReadKey();

    }
}

class Main
{
    CalHandler handler = new CalHandler();
    int numberofTasks =0;
    public void TestMRE()
    {

        for (int j = 0; j <= 3; j++)
        {
            Console.WriteLine("Outer Loop is :" + j.ToString());
            ManualResetEvent signal = new ManualResetEvent(false);
            numberofTasks = 4;
            for (int i = 0; i <= 3; i++)
            {
                CalHandler.count caller = new CalHandler.count(handler.messageHandler);
                caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal);
            }
            signal.WaitOne();
        }

    }

    private void NumberCallback(IAsyncResult result)
    {
        AsyncResult asyncResult = (AsyncResult)result;

        CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate;

        int num = caller.EndInvoke(asyncResult);

        Console.WriteLine("Number is :"+ num.ToString());

        ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState;
        if (Interlocked.Decrement(ref numberofTasks) == 0)
        {
            mre.Set();
        }
    }

}
public class CalHandler
{
    public delegate int count(int number);

    public int messageHandler ( int number )
    {
        return number;
    }

}
protected void WaitAllExt(WaitHandle[] waitHandles)
{
    //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles
    const int waitAllArrayLimit = 64;
    var prevEndInd = -1;
    while (prevEndInd < waitHandles.Length - 1)
    {
        var stInd = prevEndInd + 1;
        var eInd = stInd + waitAllArrayLimit - 1;
        if (eInd > waitHandles.Length - 1)
        {
            eInd = waitHandles.Length - 1;
        }
        prevEndInd = eInd;

        //do wait
        var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray();
        WaitHandle.WaitAll(whSubarray);
    }

}

ho risolto semplicemente impaginazione il numero di eventi di aspettare senza molta performace perduta, e sta funzionando perfettamente su ambiente di produzione. Segue il codice:

        var events = new List<ManualResetEvent>();

        // code omited

        var newEvent = new ManualResetEvent(false);
        events.Add(newEvent);
        ThreadPool.QueueUserWorkItem(c => {

            //thread code
            newEvent.Set();
        });

        // code omited

        var wait = true;
        while (wait)
        {
            WaitHandle.WaitAll(events.Take(60).ToArray());
            events.RemoveRange(0, events.Count > 59 ? 60 : events.Count);
            wait = events.Any();

        }

Ecco un'altra soluzione. Ecco gli "eventi" è un elenco di ManualResetEvent. La dimensione della lista può essere maggiore di 64 (MAX_EVENTS_NO).

int len = events.Count;
if (len <= MAX_EVENTS_NO)
    {
        WaitHandle.WaitAll(events.ToArray());
    } else {
        int start = 0;
        int num = MAX_EVENTS_NO;
        while (true)
            {
                if(start + num > len)
                {
                   num = len - start;
                }
                List<ManualResetEvent> sublist = events.GetRange(start, num);
                WaitHandle.WaitAll(sublist.ToArray());
                start += num;
                if (start >= len)
                   break;
           }
   }

Windows XP SP3 supporta al massimo due WaitHandles. Per i casi più di 2 WaitHandles applicazione termina prematuramente.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top