为什么 ParallelQuery<T>.Where 在转换为 Observable 时不起作用?
-
21-09-2019 - |
题
我有一个可观察的集合,我想并行处理,然后在过滤时观察处理后的值,最后订阅接收过滤后的值的处理程序。
我的示例在语法上是正确的并且编译得很好,当我运行代码时, Where
执行过滤的语句被评估。但订阅中没有任何数据。如果我删除 AsParallel
这样处理就可以在常规的情况下完成 IEnumerable
, ,数据通过并且一切都按预期进行。
这是我的示例,对字符串进行一些处理:
// Generate some data every second
var strings = Observable.Generate(() =>
new TimeInterval<Notification<string>>(
new Notification<string>
.OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
select "Parallel " + value;
// Filter and observe
var data = String.Empty;
parallelStrings
.Where(value => !String.IsNullOrEmpty(value))
.ToObservable()
.Subscribe(value => data = value);
下一个奇怪的事情是,如果我使用 TakeWhile
运算符,在我看来,它在概念上类似于Where,观察ParallelQuery按预期工作:
// Filter and observe
var data = String.Empty;
parallelStrings
.TakeWhile(cs => !String.IsNullOrEmpty(cs))
.ToObservable()
.Subscribe(value => data = value);
向订阅添加一些日志代码显示数据已收到,直到 ToObservable
转换,但不在以下之后:
1. var data = String.Empty;
2. parallelStrings
3. .Where(value => !String.IsNullOrEmpty(value))
4. .Select(value => value)
5. .ToObservable()
6. .Select(value => value)
7. .Subscribe(value => data = value);
第 4 行的 lambda 中的断点被击中,而第 6 行的 lambda 中的断点从未被击中。
为什么会 TakeWhile
使数据到达订阅者,同时 Where
才不是?
如果它很重要,我会在 Visual Studio 2010 RC 中使用一个针对 .Net 4.0 Framework Client Profile 的项目来开发代码。
更新:基于 @Sergeys 回答 我重新设计了 Where
筛选。以下代码按预期工作:
var processedStrings = from value in strings
let processedValue = "Parallel " + value
where !String.IsNullOrEmpty(processedValue)
select processedValue;
var data = String.Empty;
processedStrings
.ToEnumerable()
.AsParallel()
.ToObservable()
.Subscribe(value => data = value );
必须先转换初始可观察值仍然感觉有点尴尬 processedStrings
转换为可枚举以使其并行化,然后将其转换回可观察以订阅最终结果。
解决方案
来自 C# 4.0 简而言之:
目前,PLINQ 的并行化功能存在一些实际限制。这些限制可能会随后的服务包和框架版本而放松。以下查询操作员可以阻止查询并行化,除非源元素处于其原始索引位置:
- Take、TakeWhile、Skip 和 SkipWhile
- Select、SelectMany 和 ElementAt 的索引版本
大多数查询操作员会更改元素的索引位置(包括删除元素的索引位置,例如在哪里)。这意味着,如果您想使用前面的操作员,他们通常需要在查询开始时
因此,事实上,使用 TakeWhile 会阻止 .AsParallel() 并行化。这很难说 为什么 Where 杀死订阅,但将其放在 AsParallel 之前 可能 解决问题。
其他提示
TakeWhile
在概念上不等于 Where
, ,因为这取决于订购。我怀疑查询是 实际上 按顺序执行(参见 这篇博文)。尝试打电话 .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
在你的 TakeWhile
例如,我怀疑您会看到相同的结果。
我不知道为什么它在并行情况下不起作用......我可以建议您进行一些日志记录以查看数据达到了多远?例如,您可以使用 Select 执行有用的日志记录,该 Select 会在记录后返回原始项目。