Pregunta

Necesito tener algún tipo de objeto que actúe como un BroadcastBlock, pero con entrega garantizada.Entonces usé una respuesta de esta pregunta.Pero realmente no entiendo claramente el flujo de ejecución aquí.Tengo una aplicación de consola.Aquí está mi código:

static void Main(string[] args)
{
    ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
    List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();

    for (int i = 0; i <= 10; i++)
        blocks.Add(new ActionBlock<int>(num => 
        {
            int coef = i;
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef); 
        }, execopt));

    ActionBlock<int> broadcaster = new ActionBlock<int>(async num => 
    {
        foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
    }, execopt);

    broadcaster.Completion.ContinueWith(task =>
        {
            foreach (ActionBlock<int> block in blocks) block.Complete();
        });

    Task producer = Produce(broadcaster);
    List<Task> ToWait = new List<Task>();
    foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
    ToWait.Add(producer);

    Task.WaitAll(ToWait.ToArray());

    Console.ReadLine();
}

static async Task Produce(ActionBlock<int> broadcaster)
{
    for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);

    broadcaster.Complete();
}

Cada número debe manejarse secuencialmente, por lo que no puedo usar MaxDegreeOfParallelism en el bloque de emisoras.Pero todos los bloques de acción que reciben el número pueden ejecutarse en paralelo.

Así que aquí está la cuestión:

En el resultado puedo ver diferentes ID de subprocesos.¿Lo entiendo correctamente? Funciona de la siguiente manera:

Golpes de ejecución await block.SendAsync(num); en una emisora.Si el bloque actual no está listo para aceptar el número, la ejecución sale de la emisora ​​y se bloquea en Task.WaitAll.Cuando el bloque acepta el número, el resto de la declaración foreach en la emisora ​​se ejecuta en un grupo de subprocesos.Y lo mismo hasta el final.Cada iteración de foreach se ejecuta en un grupo de subprocesos.Pero en realidad sucede de forma secuencial.

¿Tengo razón o no en mi entendimiento?¿Cómo puedo cambiar este código para enviar el número a todos los bloques de forma asíncrona?

Para asegurarme de que si uno de los bloques no está listo para recibir el número en este momento, no esperaré y todos los demás que estén listos recibirán el número.Y que todos los bloques puedan funcionar en paralelo.Y garantizar la entrega.

¿Fue útil?

Solución

Suponiendo que desea manejar un elemento a la vez por el broadcaster mientras permite que los bloques de destino reciban ese elemento al mismo tiempo, debe cambiar el broadcaster ofrecer el número a todos los bloques al mismo tiempo y luego esperar asincrónicamente a todos ellos juntos para aceptarlo antes de pasar al siguiente número:

var broadcaster = new ActionBlock<int>(async num => 
{
    var tasks = new List<Task>();
    foreach (var block in blocks)
    {
        tasks.Add(block.SendAsync(num));
    }
    await Task.WhenAll(tasks);
}, execopt);

Ahora, en este caso en el que no tienes trabajo después de la espera, puedes optimizar ligeramente y al mismo tiempo devolver una tarea en espera:

ActionBlock<int> broadcaster = new ActionBlock<int>(
    num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top