Perché ParallelQuery .Dove non funziona durante la conversione in osservabile?
-
21-09-2019 - |
Domanda
Ho una collezione osservabile che voglio elaborare in parallelo, quindi osservare i valori elaborati filtrando ed infine sottoscrivere un gestore che riceve i valori filtrati.
Il mio campione è sintatticamente corretto e compila bene, e quando faccio funzionare il codice, la dichiarazione Where
che esegue il filtraggio viene valutato. Ma non ci sono dati passa attraverso la sottoscrizione. Se rimuovo AsParallel
in modo che la lavorazione avviene nel corso di un IEnumerable
regolare, i dati passa attraverso e tutto funziona come previsto.
Ecco il mio campione, facendo alcune elaborazioni sulle stringhe:
// Generate some data every second
var strings = Observable.Generate(() =>
new TimeInterval<Notification<string>>(
new Notification<string>
.OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
select "Parallel " + value;
// Filter and observe
var data = String.Empty;
parallelStrings
.Where(value => !String.IsNullOrEmpty(value))
.ToObservable()
.Subscribe(value => data = value);
La prossima cosa strana è che se uso l'operatore TakeWhile
, che nella mia mente è concettualmente simile a Dove, osservando la ParallelQuery funziona come previsto:
// Filter and observe
var data = String.Empty;
parallelStrings
.TakeWhile(cs => !String.IsNullOrEmpty(cs))
.ToObservable()
.Subscribe(value => data = value);
L'aggiunta di un po 'di codice di registrazione alla sottoscrizione mostra che i dati vengono ricevuti fino a tardissimo la conversione ToObservable
, ma non dopo:
1. var data = String.Empty;
2. parallelStrings
3. .Where(value => !String.IsNullOrEmpty(value))
4. .Select(value => value)
5. .ToObservable()
6. .Select(value => value)
7. .Subscribe(value => data = value);
Un punto di interruzione nella lambda alla linea 4 è colpito, mentre un punto di interruzione nella lambda alla linea 6 non viene mai colpito.
Perché si fanno TakeWhile
dati vengono attraverso al sottoscrittore mentre Where
non lo fa?
Se è di importanza, ho sviluppato il mio codice in Visual Studio 2010 RC con un progetto mira .Net Framework 4.0 Client Profile.
Aggiorna : sulla base di @ Sergeys rispondere ho rielaborato il posizionamento del filtro Where
. Il seguente codice funziona come previsto:
var processedStrings = from value in strings
let processedValue = "Parallel " + value
where !String.IsNullOrEmpty(processedValue)
select processedValue;
var data = String.Empty;
processedStrings
.ToEnumerable()
.AsParallel()
.ToObservable()
.Subscribe(value => data = value );
Si sente ancora un po 'imbarazzante dover prima convertire il processedStrings
osservabile iniziale in un enumerabile per parallelizzare, e poi riconvertirlo in un osservabile al fine di sottoscrivere il risultato finale.
Soluzione
Al momento non ci sono alcune limitazioni pratiche su ciò che PLINQ può parallelizzare. Questi limitazioni possono allentarsi con Service Pack successivi e versioni Framework. I seguenti operatori di query impediscono una query di essere parallelizzato, a meno che il elementi sorgente sono nella loro posizione originale indicizzazione:
- Prendere, TakeWhile, Skip, e SkipWhile
- Le versioni indicizzate del Select, SelectMany, e ElementAt
La maggior parte operatori di query cambiare la posizione di indicizzazione di elementi (compresi quelli che rimuovere elementi, come Dove). Ciò significa che se si desidera utilizzare il precedente operatori, faranno di solito hanno bisogno di essere al via della query
Quindi, in realtà, utilizzando TakeWhile impedisce il .AsParallel () dalla parallelizzazione. E 'difficile dire perché Dove uccide il subscriptiion, ma mettendo prima AsParallel potrebbe risolvere il problema.
Altri suggerimenti
TakeWhile
non è concettualmente equivalente a Where
, perché dipende ordinazione. Ho il sospetto che la query è effettivamente l'esecuzione in sequenza (si veda questo post del blog ). Prova a chiamare .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
nel tuo esempio TakeWhile
, e ho il sospetto si vedrà lo stesso risultato.
Non so il motivo per cui non funziona nel caso in parallelo anche se ... posso suggerire che si mette in qualche registrazione per vedere fino a che punto i dati raggiungono? È possibile eseguire la registrazione utile con un Select che restituisce l'elemento originale dopo l'accesso, per esempio.