Pergunta

Tenho uma coleção observável que desejo processar em paralelo, depois observar os valores processados ​​durante a filtragem e, finalmente, assinar um manipulador que receba os valores filtrados.

Meu exemplo está sintaticamente correto e compila perfeitamente, e quando executo o código, o Where a instrução que faz a filtragem é avaliada.Mas nenhum dado chega à assinatura.Se eu remover AsParallel para que o processamento seja feito regularmente IEnumerable, os dados chegam e tudo funciona conforme o esperado.

Aqui está meu exemplo, fazendo algum processamento em strings:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

A próxima coisa estranha é que se eu usar o TakeWhile operador, que na minha opinião é conceitualmente semelhante a Where, observando que o ParallelQuery funciona conforme o esperado:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

Adicionar algum código de registro à assinatura mostra que os dados são recebidos até o ToObservable conversão, mas não depois:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

Um ponto de interrupção no lambda na linha 4 é atingido, enquanto um ponto de interrupção no lambda na linha 6 nunca é atingido.

Por que TakeWhile fazer com que os dados cheguem ao assinante enquanto Where não?

Se for importante, desenvolvo meu código no Visual Studio 2010 RC com um projeto direcionado ao .Net 4.0 Framework Client Profile.

Atualizar:baseado em Resposta do @Sergey Reformulei o posicionamento do Where filtro.O código a seguir funciona conforme o esperado:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

Ainda parece um pouco estranho ter que primeiro converter o observável inicial processedStrings em um enumerável para paralelizá-lo e, em seguida, convertê-lo novamente em um observável para subscrever o resultado final.

Foi útil?

Solução

De C# 4.0 em poucas palavras:


Atualmente existem algumas limitações práticas sobre o que o PLINQ pode paralelizar.Essas limitações podem se soltar com os pacotes de serviço subsequentes e as versões da estrutura.Os seguintes operadores de consulta impedem que uma consulta seja paralela, a menos que os elementos de origem estejam em sua posição de indexação original:

  • Pegue, TakeWhile, Pular e SkipWhile
  • As versões indexadas de Select, SelectMany e ElementAt

A maioria dos operadores de consulta altera a posição de indexação dos elementos (incluindo aqueles que removem elementos, como onde).Isso significa que, se você quiser usar os operadores anteriores, eles geralmente precisam estar no início da consulta


Então, na verdade, usar TakeWhile evita que .AsParallel() seja paralelizado.É difícil dizer por que Onde mata a assinatura, mas coloca antes do AsParallel poder corrigir o problema.

Outras dicas

TakeWhile não é conceitualmente equivalente a Where, porque depende do pedido.Eu suspeito que a consulta seja na verdade executando sequencialmente (veja esta postagem do blog).Tente ligar .WithExecutionMode(ParallelExecutionMode.ForceParallelism) na tua TakeWhile por exemplo, e suspeito que você verá o mesmo resultado.

Não sei por que não está funcionando no caso paralelo ...posso sugerir que você faça algum registro para ver até onde os dados chegam?Você pode realizar um registro útil com um Select que retorna o item original após registrá-lo, por exemplo.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top