Domanda

Ho un programma che ha un sacco di sensori che producono dati a una velocità abbastanza elevata e consumatori che devono consumarli.I consumatori consumano a ritmi molto diversi.

Dato che utilizzo IObserver/IObservable, la soluzione banale era semplicemente creare un'attività per ogni evento e racchiudere la chiamata OnNext() e i dati in un lamda.Ha funzionato molto bene, sono rimasto sorpreso da quanto poco sovraccarico ci fosse sulle chiamate grezze.

Il problema è che alcuni di questi consumatori hanno bisogno che l'ordine degli eventi venga rigorosamente applicato e non possono perdersi alcun evento."PerferFairness" non è abbastanza buono.

La migliore soluzione che ho trovato è invece di racchiudere la coppia evento/OnNext(), per racchiudere un Insert in un ParallelQueue, una coda per consumatore, e avere un thread all'altra estremità della Queue per creare OnNext() chiamate.

Tre problemi immediati con questo approccio.È molto, molto più lento della soluzione di wrapper Task/OnNext().Non esiste un Blocking Dequeue (o esiste?) per ParallelQueue, quindi l'implementazione è un po' complicata.Il terzo è che questo sembra un problema così comune che non riesco a immaginare che non ci sia un modo per far rispettare l'ordine che mi sono perso, forse qualcosa come più fabbriche di attività che condividono un pool sottostante, ciascuna fabbrica con qualche impostazione che le fa rispettare rigorosamente ordine.

Qualcuno conosce il modo corretto per ottenere ciò che sto cercando di fare?

MODIFICARE :Qualsiasi soluzione che implichi un thread per consumatore o produttore non funziona.I produttori/consumatori formano lunghe catene, ce ne sono centinaia ciascuna.

È stato utile?

Soluzione

IL Libreria TPL DataFlow potrebbe essere una buona soluzione per la tua applicazione.Estende TPL con un paradigma del flusso di dati che consente di configurare un grafico di elaborazione ed eseguirlo in un ambiente di esecuzione ad alte prestazioni.

TPL Dataflow è disponibile come libreria su .NET 4 e dovrebbe essere fornito come parte di .NET 4.5.

Altri suggerimenti

Nessun commento sulla migliore astrazione per imporre un ordinamento parziale, ma se usi a BlockingCollection<T> involucro attorno a a ConcurrentQueue<T>, questo ti darà un blocco Take operazione per rimuovere dalla coda gli elementi.per esempio.:

// the default is ConcurrentQueue, so you don't have to specify, but if you
// wanted different behavior you could use e.g. ConcurrentStack

var coll = new BlockingCollection<int>(new ConcurrentQueue<int>());

coll.Add(5); // blocks if the collection is at max capacity

int five = coll.Take(); // blocks if the collection is empty
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top