Domanda

Ho una collezione osservabile che voglio elaborare in parallelo, quindi osservare i valori elaborati filtrando ed infine sottoscrivere un gestore che riceve i valori filtrati.

Il mio campione è sintatticamente corretto e compila bene, e quando faccio funzionare il codice, la dichiarazione Where che esegue il filtraggio viene valutato. Ma non ci sono dati passa attraverso la sottoscrizione. Se rimuovo AsParallel in modo che la lavorazione avviene nel corso di un IEnumerable regolare, i dati passa attraverso e tutto funziona come previsto.

Ecco il mio campione, facendo alcune elaborazioni sulle stringhe:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

La prossima cosa strana è che se uso l'operatore TakeWhile, che nella mia mente è concettualmente simile a Dove, osservando la ParallelQuery funziona come previsto:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

L'aggiunta di un po 'di codice di registrazione alla sottoscrizione mostra che i dati vengono ricevuti fino a tardissimo la conversione ToObservable, ma non dopo:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

Un punto di interruzione nella lambda alla linea 4 è colpito, mentre un punto di interruzione nella lambda alla linea 6 non viene mai colpito.

Perché si fanno TakeWhile dati vengono attraverso al sottoscrittore mentre Where non lo fa?

Se è di importanza, ho sviluppato il mio codice in Visual Studio 2010 RC con un progetto mira .Net Framework 4.0 Client Profile.

Aggiorna : sulla base di @ Sergeys rispondere ho rielaborato il posizionamento del filtro Where. Il seguente codice funziona come previsto:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

Si sente ancora un po 'imbarazzante dover prima convertire il processedStrings osservabile iniziale in un enumerabile per parallelizzare, e poi riconvertirlo in un osservabile al fine di sottoscrivere il risultato finale.

È stato utile?

Soluzione

C # 4.0 in a Nutshell :


Al momento non ci sono alcune limitazioni pratiche su ciò che PLINQ può parallelizzare. Questi limitazioni possono allentarsi con Service Pack successivi e versioni Framework. I seguenti operatori di query impediscono una query di essere parallelizzato, a meno che il elementi sorgente sono nella loro posizione originale indicizzazione:

  • Prendere, TakeWhile, Skip, e SkipWhile
  • Le versioni indicizzate del Select, SelectMany, e ElementAt

La maggior parte operatori di query cambiare la posizione di indicizzazione di elementi (compresi quelli che rimuovere elementi, come Dove). Ciò significa che se si desidera utilizzare il precedente operatori, faranno di solito hanno bisogno di essere al via della query


Quindi, in realtà, utilizzando TakeWhile impedisce il .AsParallel () dalla parallelizzazione. E 'difficile dire perché Dove uccide il subscriptiion, ma mettendo prima AsParallel potrebbe risolvere il problema.

Altri suggerimenti

TakeWhile non è concettualmente equivalente a Where, perché dipende ordinazione. Ho il sospetto che la query è effettivamente l'esecuzione in sequenza (si veda questo post del blog ). Prova a chiamare .WithExecutionMode(ParallelExecutionMode.ForceParallelism) nel tuo esempio TakeWhile, e ho il sospetto si vedrà lo stesso risultato.

Non so il motivo per cui non funziona nel caso in parallelo anche se ... posso suggerire che si mette in qualche registrazione per vedere fino a che punto i dati raggiungono? È possibile eseguire la registrazione utile con un Select che restituisce l'elemento originale dopo l'accesso, per esempio.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top