Question

I ai une collection observable que je veux traiter en parallèle, puis observer les valeurs traitées tout en filtrant et finalement souscrire un gestionnaire qui reçoit les valeurs filtrées.

Mon échantillon est syntaxiquement correct et compile très bien, et quand je lance le code, la déclaration de Where faire le filtrage est évalué. Mais aucune donnée ne passe par l'abonnement. Si je retire AsParallel de sorte que le traitement est effectué sur une IEnumerable régulière, les données et passe par tout fonctionne comme prévu.

Voici mon exemple, effectuer des traitements sur les chaînes:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

La prochaine chose étrange est que si j'utilise l'opérateur TakeWhile, qui dans mon esprit est conceptuellement semblable à où, en observant le ParallelQuery fonctionne comme prévu:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

Ajout d'un code d'enregistrement à la souscription montre que les données sont reçues jusqu'à jusqu'à la conversion de ToObservable, mais pas après:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

Un point d'arrêt dans le lambda à la ligne 4 est touché en un point d'arrêt dans le lambda à la ligne 6 est jamais frappé.

Pourquoi va TakeWhile rendre les données passent par l'abonné pendant Where ne fonctionne pas?

S'il est important, je développe mon code dans Visual Studio 2010 RC avec un projet de ciblage .Net 4.0 Framework Client Profile.

Mise à jour : basé sur @ Sergeys répondre . Le code suivant fonctionne comme prévu:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

Il se sent toujours un peu gênant d'avoir à d'abord convertir le processedStrings observable initial en un dénombrable afin de paralléliser, puis la reconvertir en une observable afin de souscrire au résultat final.

Était-ce utile?

La solution

De la C # 4.0 dans un Nutshell:


Il existe actuellement des limites pratiques sur ce PLINQ peut paralléliser. Celles-ci limitations peuvent se desserrer avec les service packs ultérieurs et versions cadres. Les opérateurs de requête suivants empêchent une requête de parallélisation, à moins que le éléments de source sont dans leur position d'indexation d'origine:

  • Prenez, TakeWhile, Passer et SkipWhile
  • Les versions indexées de Select, SelectMany et ElementAt

La plupart des opérateurs de requête modifier la position d'indexation des éléments (y compris ceux qui supprimer des éléments tels que Si). Cela signifie que si vous souhaitez utiliser la précédente les opérateurs, ils doivent généralement être au début de la requête


Donc, en fait, en utilisant TakeWhile empêche la .AsParallel () de parallélisation. Il est difficile de dire pourquoi Où tue le subscriptiion, mais avant de le mettre AsParallel peut résoudre le problème.

Autres conseils

TakeWhile n'est pas sur le plan conceptuel équivalent à Where, car cela dépend de la commande. Je soupçonne que la requête est en fait exécution séquentielle (voir ce billet de blog ). Essayez d'appeler .WithExecutionMode(ParallelExecutionMode.ForceParallelism) dans votre exemple TakeWhile, et je pense que vous verrez le même résultat.

Je ne sais pas pourquoi cela ne fonctionne pas dans le cas parallèle si ... puis-je suggérer que vous mettez dans une exploitation forestière pour voir dans quelle mesure les données atteint? Vous pouvez effectuer une journalisation utile avec un Select qui retourne l'élément d'origine après la connexion, par exemple.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top