Почему ParallelQuery<T>.Where не работает при преобразовании в Observable?

StackOverflow https://stackoverflow.com/questions/2287963

Вопрос

У меня есть наблюдаемая коллекция, которую я хочу обрабатывать параллельно, затем наблюдать за обработанными значениями во время фильтрации и, наконец, подписаться на обработчик, который получает отфильтрованные значения.

Мой пример синтаксически корректен и компилируется просто отлично, и когда я запускаю код, Where вычисляется оператор, выполняющий фильтрацию.Но никакие данные не поступают в подписку.Если я удалю AsParallel таким образом, чтобы обработка производилась в течение регулярного IEnumerable, данные поступают, и все работает, как ожидалось.

Вот мой пример, выполняющий некоторую обработку строк:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

Следующая странная вещь заключается в том, что если я использую TakeWhile оператор, который, на мой взгляд, концептуально похож на Where, наблюдая, что ParallelQuery работает так, как ожидалось:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

Добавление некоторого кода ведения журнала в подписку показывает, что данные принимаются до ToObservable обращение, но не после:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

Точка останова в лямбда-выражении в строке 4 достигнута, в то время как точка останова в лямбда-выражении в строке 6 никогда не достигнута.

Почему будет TakeWhile передавать данные абоненту, в то время как Where не делает?

Если это важно, я разрабатываю свой код в Visual Studio 2010 RC с проектом, ориентированным на профиль клиента .Net 4.0 Framework.

Обновить:основанный на @Sergeys ответить Я переработал размещение Where Фильтр.Следующий код работает так, как ожидалось:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

Все еще кажется немного неловким необходимость сначала преобразовать начальный наблюдаемый processedStrings в перечислимый, чтобы распараллелить его, а затем преобразовать обратно в наблюдаемый, чтобы подписаться на конечный результат.

Это было полезно?

Решение

Из C # 4.0 в двух словах:


В настоящее время существуют некоторые практические ограничения на то, что PLINQ может распараллеливать.Эти ограничения могут быть ослаблены с последующими пакетами обновления и версиями Framework.Следующие операторы запроса предотвращают распараллеливание запроса, если только исходные элементы не находятся в исходном положении индексации:

  • Брать, Брать пока, Пропускать и пропускать пока
  • Индексированные версии Select, SelectMany и ElementAt

Большинство операторов запросов изменяют положение элементов в индексе (включая те, которые удаляют элементы, такие как Where).Это означает, что если вы хотите использовать предыдущие операторы, они обычно должны находиться в начале запроса


Таким образом, фактически, использование takeWhile предотвращает .AsParallel() от распараллеливания.Это трудно сказать почему Где убивает подписку, но помещает ее перед AsParallel мог бы устраните проблему.

Другие советы

TakeWhile не является концептуально эквивалентным Where, потому что это зависит от заказа.Я подозреваю, что запрос является на самом деле выполняется последовательно (см. это сообщение в блоге).Попробуйте позвонить .WithExecutionMode(ParallelExecutionMode.ForceParallelism) в вашем TakeWhile пример, и я подозреваю, что вы увидите тот же результат.

Хотя я не знаю, почему это не работает в параллельном случае...могу ли я предложить вам ввести некоторые протоколирования, чтобы увидеть, как далеко доходят данные?Вы можете выполнить полезное протоколирование с помощью Select, который возвращает исходный элемент, например, после его протоколирования.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top