我有一个可观察的集合,我想并行处理,然后在过滤时观察处理后的值,最后订阅接收过滤后的值的处理程序。

我的示例在语法上是正确的并且编译得很好,当我运行代码时, Where 执行过滤的语句被评估。但订阅中没有任何数据。如果我删除 AsParallel 这样处理就可以在常规的情况下完成 IEnumerable, ,数据通过并且一切都按预期进行。

这是我的示例,对字符串进行一些处理:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

下一个奇怪的事情是,如果我使用 TakeWhile 运算符,在我看来,它在概念上类似于Where,观察ParallelQuery按预期工作:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

向订阅添加一些日志代码显示数据已收到,直到 ToObservable 转换,但不在以下之后:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

第 4 行的 lambda 中的断点被击中,而第 6 行的 lambda 中的断点从未被击中。

为什么会 TakeWhile 使数据到达订阅者,同时 Where 才不是?

如果它很重要,我会在 Visual Studio 2010 RC 中使用一个针对 .Net 4.0 Framework Client Profile 的项目来开发代码。

更新:基于 @Sergeys 回答 我重新设计了 Where 筛选。以下代码按预期工作:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

必须先转换初始可观察值仍然感觉有点尴尬 processedStrings 转换为可枚举以使其并行化,然后将其转换回可观察以订阅最终结果。

有帮助吗?

解决方案

来自 C# 4.0 简而言之:


目前,PLINQ 的并行化功能存在一些实际限制。这些限制可能会随后的服务包和框架版本而放松。以下查询操作员可以阻止查询并行化,除非源元素处于其原始索引位置:

  • Take、TakeWhile、Skip 和 SkipWhile
  • Select、SelectMany 和 ElementAt 的索引版本

大多数查询操作员会更改元素的索引位置(包括删除元素的索引位置,例如在哪里)。这意味着,如果您想使用前面的操作员,他们通常需要在查询开始时


因此,事实上,使用 TakeWhile 会阻止 .AsParallel() 并行化。这很难说 为什么 Where 杀死订阅,但将其放在 AsParallel 之前 可能 解决问题。

其他提示

TakeWhile 在概念上不等于 Where, ,因为这取决于订购。我怀疑查询是 实际上 按顺序执行(参见 这篇博文)。尝试打电话 .WithExecutionMode(ParallelExecutionMode.ForceParallelism) 在你的 TakeWhile 例如,我怀疑您会看到相同的结果。

我不知道为什么它在并行情况下不起作用......我可以建议您进行一些日志记录以查看数据达到了多远?例如,您可以使用 Select 执行有用的日志记录,该 Select 会在记录后返回原始项目。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top