¿Cuál es un método eficiente para el procesamiento de eventos en orden utilizando CCR?

StackOverflow https://stackoverflow.com/questions/5399365

  •  28-10-2019
  •  | 
  •  

Pregunta

Estaba experimentando con los iteradores de CCR como una solución a una tarea que requiere el procesamiento paralelo de toneladas de alimentos de datos, donde los datos de cada alimento deben procesarse en orden. Ninguno de los feeds depende entre sí, por lo que el procesamiento en orden puede ser paralelo por alimentación.

A continuación se muestra una maqueta rápida y sucia con un alimento entero, que simplemente empuja enteros a un puerto a una velocidad de aproximadamente 1.5k/segundo, y luego los saca con un iterador CCR para mantener la garantía de procesamiento en orden.

class Program
{
    static Dispatcher dispatcher = new Dispatcher();
    static DispatcherQueue dispatcherQueue = 
       new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
    static Port<int> intPort = new Port<int>();

    static void Main(string[] args)
    {
        Arbiter.Activate(
            dispatcherQueue,
            Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));

        int counter = 0;
        Timer t = new Timer( (x) => 
            { for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
              , null, 0, 1000);

        Console.ReadKey();
    }

    public static IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue;
            if( (currentValue = intPort) % 1000 == 0)
            {
                Console.WriteLine("{0}, Current Items In Queue:{1}", 
                  currentValue, intPort.ItemCount);
            }
        }
    }
}

Lo que me sorprendió mucho de esto fue que CCR no podía mantenerse al día en una caja CoreI7, con el tamaño de la cola creciendo sin límites. En otra prueba para medir la latencia desde el post () al recibir () bajo una carga o ~ 100 post/seg., La latencia entre el primer post () y recibir () en cada lote era de alrededor de 1 m.

¿Hay algo malo en mi maqueta? Si es así, ¿cuál es la mejor manera de hacer esto usando CCR?

¿Fue útil?

Solución

Sí, estoy de acuerdo, esto de hecho parece extraño. Su código parece inicialmente funcionar sin problemas, pero después de unos pocos miles de elementos, el uso del procesador aumenta hasta el punto de que el rendimiento es realmente mediocre. Esto me perturba y sugiere un problema en el marco. Después de una obra de teatro con su código, realmente no puedo identificar por qué este es el caso. Sugeriría llevar este problema al Foros de Robótica de Microsoft Y ver si puedes conseguir que George Chrysanthakopoulos (o uno de los otros cerebros de CCR) te diga cuál es el problema. Sin embargo, puedo suponer que su código tal como está es terriblemente ineficiente.

La forma en que está tratando con los elementos de "estallar" desde el puerto es muy ineficiente. Esencialmente, se despierta el iterador cada vez que hay un mensaje en el puerto y trata solo un mensaje (a pesar del hecho de que puede haber varios cientos más en el puerto), luego cuelga del yield Mientras el control se vuelve al marco. En el momento en que el receptor certificado causa otro "despertar" del iterador, muchos mensajes han llenado el puerto. Mientras tanto, extraer un hilo del despachador para lidiar con un solo elemento (cuando muchos se han acumulado) es casi seguro que no es la mejor manera de obtener un buen rendimiento.

He modificado su código de tal manera que después del rendimiento, verificamos el puerto para ver si hay más mensajes en cola y tratarlos también, vaciando por completo el puerto antes de volver al marco. También he refactorado tu código un poco para usar CcrServiceBase que simplifica la sintaxis de algunas de las tareas que está haciendo:

internal class Test:CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;
    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest() {
        SpawnIterator(ProcessInts);
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 1500; ++i)
                                  intPort.Post(counter++);
                          }
                          ,
                          null,
                          0,
                          1000);
    }

    public IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue = intPort;
            ReportCurrent(currentValue);
            while(intPort.Test(out currentValue))
            {
                ReportCurrent(currentValue);
            }
        }
    }

    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

Alternativamente, podría eliminar el iterador por completo, ya que no se usa realmente bien en su ejemplo (aunque no estoy completamente seguro de qué efecto tiene esto en el orden de procesamiento):

internal class Test : CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;

    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest()
    {
        Activate(
            Arbiter.Receive(true,
                            intPort,
                            i =>
                            {
                                ReportCurrent(i);
                                int currentValue;
                                while (intPort.Test(out currentValue))
                                {
                                    ReportCurrent(currentValue);
                                }
                            }));
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 500000; ++i)
                              {
                                  intPort.Post(counter++);
                              }
                          }
                          ,
                          null,
                          0,
                          1000);
    }



    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

Ambos ejemplos aumentan significativamente el rendimiento por órdenes de magnitud. Espero que esto ayude.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top