Pregunta

Tengo una colección observable que quiero proceso en paralelo, entonces, observar los valores procesados mientras que el filtrado y finalmente suscribir un controlador que recibe los valores filtrados.

Mi ejemplo es sintácticamente correcta, y se compila correctamente, y cuando ejecuto el código, el Where declaración de hacer el filtrado se evalúa.Pero no hay datos viene a través de la suscripción.Si puedo quitar AsParallel de modo que el proceso se realiza a través de una regular IEnumerable, datos vienen a través de y todo funciona como se espera.

Aquí está mi ejemplo, haciendo algún tipo de procesamiento de cadenas de caracteres:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

La siguiente cosa rara es que si puedo usar el TakeWhile el operador, que en mi mente es conceptualmente similar a Donde, observando la ParallelQuery funciona como se esperaba:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

La adición de algunas código de registro para la suscripción muestra que los datos se reciben hasta el ToObservable la conversión, pero no después de:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

Un punto de interrupción en la lambda en la línea 4 es golpeado mientras que un punto de interrupción en la lambda en la línea 6 es nunca de golpe.

¿Por qué será TakeWhile los datos vienen a través del suscriptor al Where no?

Si es de importancia, puedo desarrollar mi código en Visual Studio 2010 RC con un proyecto de orientación .Net 4.0 Framework Client Profile.

Actualización:basado en @Sergeys respuesta He rehecho la colocación de la Where el filtro.El siguiente código funciona como se esperaba:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

Todavía se siente un poco incómodo tener que convertir primero la inicial observable processedStrings en un enumerable para paralelizar, y, a continuación, volver a convertir un observable para suscribir el resultado final.

¿Fue útil?

Solución

A partir de la C# 4.0 en una cáscara de Nuez:


Actualmente existen algunas limitaciones prácticas en lo que PLINQ puede paralelizar.Estos las limitaciones pueden aflojar con los service pack posteriores y las versiones de Framework.La siguiente consulta a los operadores de prevenir una consulta de ser paralelizado, a menos que el origen de los elementos están en su idioma original posición:

  • Tomar, y takewhile, Saltar, y SkipWhile
  • El indexado versiones de Seleccionar, SelectMany, y ElementAt

La mayoría de los operadores de consulta cambiar la indexación de la posición de los elementos (incluyendo aquellos que eliminar elementos, tales como Dónde).Esto significa que si usted desea utilizar el anterior los operadores, que se necesita para estar en el inicio de la consulta


Así que, de hecho, el uso y takewhile impide la .AsParallel() de la paralelización.Es difícil decir por qué Donde mata a la subscriptiion, pero antes de someterlo a AsParallel podría solucionar el problema.

Otros consejos

TakeWhile no es conceptualmente equivalente a Where, porque depende de pedidos.Sospecho que la consulta es en realidad ejecutar de forma secuencial (ver este blog).Trate de llamar .WithExecutionMode(ParallelExecutionMode.ForceParallelism) en su TakeWhile ejemplo, y sospecho que va a ver el mismo resultado.

No sé por qué no está funcionando en paralelo caso, aunque...puedo sugerir que se ponga en algún registro para ver hasta qué punto los datos alcanza?Puede realizar útil de registro con un Select que devuelve el elemento original después de iniciar sesión, por ejemplo.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top