TPL 与反应式框架
-
23-09-2019 - |
题
人们什么时候会选择使用 Rx 而不是 TPL,或者这两个框架是正交的?
据我了解,Rx 主要旨在提供事件的抽象并允许组合,但它也允许提供异步操作的抽象。使用 Createxx 重载和 Fromxxx 重载,并通过处理返回的 IDisposable 取消。
TPL 还通过任务和取消功能提供操作的抽象。
我的困境是何时使用哪个以及用于什么场景?
解决方案
Rx的主要目的不是为了提供对事件的抽象。这仅仅是其成果之一。其主要目的是提供一种用于收集一可组合的推模型。
的反应性框架(Rx)的基于IObservable<T>
作为数学双IEnumerable<T>
的。因此,而不是使用IEnumerable<T>
从一个集合的“拉”的项目,我们可以通过IObservable<T>
有对象“推”给我们。
当然,当我们真正去寻找可观测源之类的事件与异步操作的最佳候选。
的反应性框架自然需要多线程模型,以便能够观看观察数据的来源和用于管理查询和预订。 RX实际上使得大量使用TPL的做到这一点。
所以,如果你使用的Rx你都隐含使用TPL。
您将使用TPL直接,如果你想在你的任务直接控制。
但如果你有,你要观察的数据源,并执行对随后查询我全力推荐的反应框架。
其他提示
我喜欢遵循的一些准则:
- 我是否正在处理并非由我原创的数据?数据随心所欲地到达?然后是RX。
- 我是否发起计算并需要管理并发?然后是TPL。
- 我是否管理多个结果,并且需要根据时间从中进行选择?然后是RX。
我喜欢 Scott W 的要点。举几个更具体的例子 Rx 与
- 消费流
- 执行非阻塞异步工作,例如 Web 请求。
- 流事件(鼠标移动等 .net 事件或服务总线消息类型事件)
- 将事件“流”组合在一起
- Linq 风格的操作
- 从公共 API 公开数据流
TPL 似乎很好地映射到
- 工作的内部并行化
- 执行非阻塞异步工作,例如 Web 请求
- 执行工作流程和延续
我注意到 IObservable (Rx) 的一件事是它变得普遍。一旦进入您的代码库,因为它无疑会通过其他接口公开,所以它最终将出现在您的应用程序中。我想一开始这可能会很可怕,但团队中的大多数人现在都对 Rx 非常满意,并且喜欢它为我们节省的大量工作。
恕我直言,Rx 将成为 TPL 上的主导库,因为它已经在 .NET 3.5、4.0、Silverlight 3、Silverlight 4 和 Javascript 中得到支持。这意味着您实际上必须学习一种风格,并且它适用于许多平台。
编辑: :我改变了关于 Rx 优于 TPL 的看法。他们解决不同的问题,所以不应该这样比较。在 .NET 4.5/C# 5.0 中,async/await 关键字将进一步将我们与 TPL 联系起来(这很好)。有关 Rx、事件、TPL 等的深入讨论。看看 第一章 我的在线书 IntroToRx.com
2016 年 12 月更新: 如果你有 30 分钟的时间,我建议你阅读 Joe Duffy 的第一手资料,而不是我的猜测。我认为我的分析站得住脚,但如果您发现了这个问题,我强烈建议您查看博客文章而不是这些答案,因为除了 TPL 与 Rx.NET 之外,他还涵盖了 MS 研究项目(Midori、Cosmos)。
http://joeduffyblog.com/2016/11/30/15-years-of-concurrency/
我认为微软在.NET 2.0出来后犯了一个过度纠正的大错误。他们同时从公司的不同部门引入了许多不同的并发管理 API。
- Steven Toub 极力推动线程安全原语取代 Event(最初是
Future<T>
并变成了Task<T>
) - MS Research 有 MIN-LINQ 和反应性扩展 (Rx)
- 硬件/嵌入式有 机器人实验室 (CCR)
与此同时,许多托管 API 团队正在尝试接受 APM,并且 Threadpool.QueueUserWorkItem()
, ,不知道 Toub 是否会赢得发货之战 Future<T>
/Task<T>
在 mscorlib.dll 中。最后看起来他们是对冲的,并且都发货了 Task<T>
和 IObservable<T>
在 mscorlib 中,但不允许任何其他 Rx API(甚至不允许 ISubject<T>
) 在 mscorlib.我认为这种对冲最终导致了大量的重复(稍后会详细介绍),并浪费了公司内部和外部的精力。
对于复制,请参阅: Task
与 IObservable<Unit>
, Task<T>
与 AsyncSubject<T>
, Task.Run()
与 Observable.Start()
. 。这只是冰山一角。但在更高的层面上考虑:
- StreamInsight - SQL 事件流,本机代码优化,但使用 LINQ 语法定义事件查询
- TPL Dataflow - 基于 TPL 构建,与 Rx 并行构建,针对调整线程并行性进行了优化,不擅长编写查询
- Rx - 惊人的表现力,但充满危险。将“热”流与
IEnumerable
-风格的扩展方法,这意味着你很容易永远阻塞(调用First()
热门流永远不会返回)。调度限制(限制并行性)是通过相当奇怪的方式完成的SubscribeOn()
扩展方法,这是奇怪的隐式并且很难正确执行。如果开始学习 Rx,请预留很长的时间来学习所有要避免的陷阱。但如果编写复杂的事件流或者需要复杂的过滤/查询,Rx 实际上是唯一的选择。
在 MS 发布之前,我认为 Rx 没有被广泛采用的机会 ISubject<T>
在 mscorlib 中。这是可悲的,因为 Rx 包含一些非常有用的具体(通用)类型,例如 TimeInterval<T>
和 Timestamped<T>
, ,我认为应该在 Core/mscorlib 中 Nullable<T>
. 。还, System.Reactive.EventPattern<TEventArgs>
.
我要说的是TPL数据流罩中的Rx专业的功能子集。数据流是可以采取的时间可测量的量的数据处理,而R x是事件,诸如鼠标位置,错误状态等,其中处理时间是可以忽略不计。
例如:您的“订阅”的处理程序是异步的,你不想要的时候超过1个执行人。其中Rx你有块,在它周围有没有其他办法,因为RX是异步无关,在许多地方并不以一种特殊的方式威胁异步。
.Subscribe(myAsyncHandler().Result)
如果您没有阻止,随后,RX将考虑在仍然异步执行处理程序操作完成。
您可能会认为,如果你这样做
.ObserveOn(Scheduler.EventLoopSchedule)
比问题得到解决。但是,这将打破你的.Complete()的工作流程,因为Rx将认为这是只要它计划执行,你会退出你的应用程序,而无需等待异步操作完成完成。
如果你想允许不超过4个并行异步任务比的Rx不提供任何开箱。也许你可以通过实现自己的调度破解的东西,缓冲等。
TPL数据流提供在ActionBlock非常好的解决方案。它可以同时节流行动,以一定数量和它理解异步操作,因此调用完成(),并等待已完成将不正是你所期望的:等待所有正在进行的异步任务来完成
另一个特征TPL具有为“背压”。比方说,你在你的处理程序,并需要重计算上月的数据发现错误。如果你订阅源使用Rx和您的管道包含无限的缓冲区,或ObserveOn,比你跑的几秒钟事记忆,因为源将继续读快于处理可以处理。即使你实现拦截消费者,您的来源可以来自阻塞调用受到影响,例如,如果源是异步的。在TPL可以实现源作为
while(...)
await actionBlock.SendAsync(msg)
不挡住源还同时处理超载将等待。
总的来说,我发现,R x是适合使用它们的时间和计算光动作。如果处理时间变显着的,你在奇怪的副作用和深奥调试世界。
好消息是,TPL数据流玩积木很不错,其中Rx。他们有AsObserver / AsObservable适配器并在需要时您可以在管道的Rx的中间把它们粘。但是RX具有更多的模式和使用情况。所以我的经验法则是,开始与Rx和添加TPL数据流需要。