Domanda

Stavo sperimentando iteratori CCR come soluzione a un'attività che richiede un'elaborazione parallela di tonnellate di feed di dati, in cui i dati di ciascun feed devono essere elaborati in ordine. Nessuno dei feed dipende l'uno dall'altro, quindi l'elaborazione in ordine può essere parallela a feed.

Di seguito è riportato un modello rapido e sporco con un feed intero, che semplicemente spinge i numeri interi in una porta a una velocità di circa 1,5k/secondo, quindi li estrae usando un iteratore CCR per mantenere la garanzia di elaborazione in ordine.

class Program
{
    static Dispatcher dispatcher = new Dispatcher();
    static DispatcherQueue dispatcherQueue = 
       new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
    static Port<int> intPort = new Port<int>();

    static void Main(string[] args)
    {
        Arbiter.Activate(
            dispatcherQueue,
            Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));

        int counter = 0;
        Timer t = new Timer( (x) => 
            { for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
              , null, 0, 1000);

        Console.ReadKey();
    }

    public static IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue;
            if( (currentValue = intPort) % 1000 == 0)
            {
                Console.WriteLine("{0}, Current Items In Queue:{1}", 
                  currentValue, intPort.ItemCount);
            }
        }
    }
}

Ciò che mi ha sorpreso molto è stato che il CCR non poteva tenere il passo in una scatola Corei7, con la dimensione della coda che cresceva senza limiti. In un altro test per misurare la latenza dal post () alla ricezione () sotto un carico o ~ 100 post/sec., La latenza tra il primo post () e la ricezione () in ciascun lotto era di circa 1 ms.

C'è qualcosa che non va nel mio modello? In tal caso, qual è un modo migliore per farlo usando CCR?

È stato utile?

Soluzione

Sì, sono d'accordo, questo sembra davvero strano. Il tuo codice sembra inizialmente funzionare senza intoppi, ma dopo alcune migliaia di articoli, l'utilizzo del processore aumenta al punto in cui le prestazioni sono davvero poco brillanti. Questo mi disturba e suggerisce un problema nel framework. Dopo un gioco con il tuo codice, non riesco davvero a identificare perché questo è il caso. Suggerirei di portare questo problema al Forum di Microsoft Robotics E vedere se riesci a ottenere George Chrysanthakopoulos (o uno degli altri cervelli CCR) per dirti qual è il problema. Posso comunque supporre che il tuo codice così com'è sia terribilmente inefficiente.

Il modo in cui hai a che fare con gli articoli "scoppiettanti" dalla porta è molto inefficiente. Essenzialmente l'iteratore viene svegliato ogni volta che c'è un messaggio nella porta e si occupa di un solo messaggio (nonostante il fatto che potrebbero esserci diverse centinaia di altre nella porta), quindi è appeso al yield mentre il controllo viene passato al framework. Nel punto che il ricevitore ceduto provoca un altro "risveglio" dell'iteratore, molti messaggi hanno riempito la porta. Tirare un filo dal dispatcher per affrontare un solo oggetto (quando molti si sono accumulati nel frattempo) non è quasi certamente il modo migliore per ottenere un buon throughput.

Ho modificato il tuo codice in modo tale che dopo il rendimento, controlliamo la porta per vedere se ci sono ulteriori messaggi messi in coda e gestiamo anche loro, svuotando così completamente la porta prima di tornare al framework. Ho anche rifatto il tuo codice in qualche modo da usare CcrServiceBase che semplifica la sintassi di alcuni dei compiti che stai svolgendo:

internal class Test:CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;
    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest() {
        SpawnIterator(ProcessInts);
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 1500; ++i)
                                  intPort.Post(counter++);
                          }
                          ,
                          null,
                          0,
                          1000);
    }

    public IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue = intPort;
            ReportCurrent(currentValue);
            while(intPort.Test(out currentValue))
            {
                ReportCurrent(currentValue);
            }
        }
    }

    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

In alternativa, potresti eliminare completamente l'iteratore, in quanto non è davvero ben usato nel tuo esempio (anche se non sono del tutto sicuro di quale effetto abbia sull'ordine di elaborazione):

internal class Test : CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;

    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest()
    {
        Activate(
            Arbiter.Receive(true,
                            intPort,
                            i =>
                            {
                                ReportCurrent(i);
                                int currentValue;
                                while (intPort.Test(out currentValue))
                                {
                                    ReportCurrent(currentValue);
                                }
                            }));
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 500000; ++i)
                              {
                                  intPort.Post(counter++);
                              }
                          }
                          ,
                          null,
                          0,
                          1000);
    }



    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

Entrambi questi esempi aumentano significativamente la produttività per ordini di grandezza. Spero che sia di aiuto.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top