Quelle est une méthode efficace pour le traitement dans l'ordre des événements à l'aide de CCR?

StackOverflow https://stackoverflow.com/questions/5399365

  •  28-10-2019
  •  | 
  •  

Question

J'expérimentais les itérateurs CCR comme solution à une tâche qui nécessite le traitement parallèle de tonnes de flux de données, où les données de chaque flux doivent être traitées dans l'ordre.Aucun des flux ne dépend les uns des autres, de sorte que le traitement dans l'ordre peut être mis en parallèle par flux.

Voici une maquette rapide et sale avec un flux d'entiers, qui pousse simplement les entiers dans un port à une vitesse d'environ 1,5 K / seconde, puis les extrait à l'aide d'un itérateur CCR pour conserver la garantie de traitement dans l'ordre.

class Program
{
    static Dispatcher dispatcher = new Dispatcher();
    static DispatcherQueue dispatcherQueue = 
       new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
    static Port<int> intPort = new Port<int>();

    static void Main(string[] args)
    {
        Arbiter.Activate(
            dispatcherQueue,
            Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));

        int counter = 0;
        Timer t = new Timer( (x) => 
            { for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
              , null, 0, 1000);

        Console.ReadKey();
    }

    public static IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue;
            if( (currentValue = intPort) % 1000 == 0)
            {
                Console.WriteLine("{0}, Current Items In Queue:{1}", 
                  currentValue, intPort.ItemCount);
            }
        }
    }
}

Ce qui m'a beaucoup surpris à ce sujet, c'est que CCR ne pouvait pas suivre le rythme d'un Corei7, la taille de la file d'attente augmentant sans limites.Dans un autre test pour mesurer la latence entre le Post () et le Receive () sous une charge ou ~ 100 Post / sec., La latence entre le premier Post () et Receive () de chaque lot était d'environ 1 ms.

Y a-t-il un problème avec ma maquette?Si tel est le cas, quelle est la meilleure façon de procéder avec CCR?

Était-ce utile?

La solution

Oui, je suis d'accord, cela semble vraiment bizarre. Votre code semble initialement fonctionner correctement, mais après quelques milliers d'éléments, l'utilisation du processeur augmente au point où les performances sont vraiment médiocres. Cela me dérange et suggère un problème dans le cadre. Après avoir joué avec votre code, je ne peux pas vraiment identifier pourquoi c'est le cas. Je vous suggère de soumettre ce problème aux forums Microsoft Robotics et voir si vous pouvez demander à George Chrysanthakopoulos (ou à l'un des autres cerveaux du CCR) de vous dire quel est le problème. Je peux cependant supposer que votre code tel qu'il est est terriblement inefficace.

La façon dont vous gérez les éléments "pop-up" du port est très inefficace. Essentiellement, l'itérateur est réveillé chaque fois qu'il y a un message dans le port et il ne traite qu'un seul message (malgré le fait qu'il pourrait y en avoir plusieurs centaines de plus dans le port), puis se bloque sur le yield pendant que le contrôle est renvoyé au framework . Au moment où le récepteur cédé provoque un autre "réveil" de l'itérateur, de nombreux messages ont rempli le port. Tirer un fil du Dispatcher pour ne traiter qu'un seul élément (alors que beaucoup se sont empilés entre-temps) n'est certainement pas le meilleur moyen d'obtenir un bon débit.

J'ai modifié votre code de telle sorte qu'après le rendement, nous vérifions le port pour voir s'il y a d'autres messages en file d'attente et les traitons également, vidant ainsi complètement le port avant de revenir au framework. J'ai aussi quelque peu remanié votre code pour utiliser CcrServiceBase qui simplifie la syntaxe de certaines des tâches que vous effectuez:

internal class Test:CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;
    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest() {
        SpawnIterator(ProcessInts);
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 1500; ++i)
                                  intPort.Post(counter++);
                          }
                          ,
                          null,
                          0,
                          1000);
    }

    public IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue = intPort;
            ReportCurrent(currentValue);
            while(intPort.Test(out currentValue))
            {
                ReportCurrent(currentValue);
            }
        }
    }

    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

Alternativement, vous pouvez supprimer complètement l'itérateur, car il n'est pas vraiment bien utilisé dans votre exemple (même si je ne suis pas tout à fait sûr de l'effet que cela a sur l'ordre de traitement):

internal class Test : CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;

    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest()
    {
        Activate(
            Arbiter.Receive(true,
                            intPort,
                            i =>
                            {
                                ReportCurrent(i);
                                int currentValue;
                                while (intPort.Test(out currentValue))
                                {
                                    ReportCurrent(currentValue);
                                }
                            }));
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 500000; ++i)
                              {
                                  intPort.Post(counter++);
                              }
                          }
                          ,
                          null,
                          0,
                          1000);
    }



    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

Ces deux exemples augmentent considérablement le débit de plusieurs ordres de grandeur. J'espère que cela vous aidera.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top