Question

J'ai besoin d'une sorte d'objet qui agit comme un BroadcastBlock, mais avec une livraison garantie.J'ai donc utilisé une réponse de cette question.Mais je ne comprends pas vraiment clairement le flux d'exécution ici.J'ai une application console.Voici mon code :

static void Main(string[] args)
{
    ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
    List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();

    for (int i = 0; i <= 10; i++)
        blocks.Add(new ActionBlock<int>(num => 
        {
            int coef = i;
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef); 
        }, execopt));

    ActionBlock<int> broadcaster = new ActionBlock<int>(async num => 
    {
        foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
    }, execopt);

    broadcaster.Completion.ContinueWith(task =>
        {
            foreach (ActionBlock<int> block in blocks) block.Complete();
        });

    Task producer = Produce(broadcaster);
    List<Task> ToWait = new List<Task>();
    foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
    ToWait.Add(producer);

    Task.WaitAll(ToWait.ToArray());

    Console.ReadLine();
}

static async Task Produce(ActionBlock<int> broadcaster)
{
    for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);

    broadcaster.Complete();
}

Chaque numéro doit être traité séquentiellement, je ne peux donc pas utiliser MaxDegreeOfParallelism dans le bloc de diffusion.Mais tous les blocs d'action qui reçoivent le numéro peuvent s'exécuter en parallèle.

Voici donc la question :

Dans la sortie, je peux voir différents identifiants de thread.Dois-je bien comprendre que cela fonctionne comme suit :

Coups d’exécution await block.SendAsync(num); dans un diffuseur.Si le bloc actuel n'est pas prêt à accepter le numéro, l'exécution quitte le diffuseur et se bloque au niveau de Task.WaitAll.Lorsque le bloc accepte le numéro, le reste de l'instruction foreach dans le diffuseur est exécuté dans un pool de threads.Et pareil jusqu'à la fin.Chaque itération de foreach est exécutée dans un pool de threads.Mais en réalité, cela se produit de manière séquentielle.

Ai-je raison ou tort dans ma compréhension ?Comment puis-je modifier ce code pour envoyer le numéro à tous les blocs de manière asynchrone ?

Pour m'assurer que si l'un des blocs n'est pas prêt à recevoir le numéro pour le moment, je ne l'attendrai pas et tous les autres blocs prêts recevront le numéro.Et que tous les blocs peuvent fonctionner en parallèle.Et garantir la livraison.

Était-ce utile?

La solution

En supposant que vous souhaitiez gérer un élément à la fois par le broadcaster tout en permettant aux blocs cibles de recevoir cet élément simultanément, vous devez modifier le broadcaster pour proposer le numéro à tous les blocs en même temps, puis les attendre tous de manière asynchrone ensemble pour l'accepter avant de passer au numéro suivant :

var broadcaster = new ActionBlock<int>(async num => 
{
    var tasks = new List<Task>();
    foreach (var block in blocks)
    {
        tasks.Add(block.SendAsync(num));
    }
    await Task.WhenAll(tasks);
}, execopt);

Maintenant, dans ce cas où vous n'avez pas de travail après l'attente, vous pouvez légèrement optimiser tout en renvoyant une tâche attendue :

ActionBlock<int> broadcaster = new ActionBlock<int>(
    num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top