题
响应式扩展附带了许多帮助方法,用于将现有事件和异步操作转换为可观察对象,但是如何从头开始实现 IObservable<T> ?
IEnumerable 有可爱的yield 关键字,使其实现起来非常简单。
实现 IObservable<T> 的正确方法是什么?
我需要担心线程安全吗?
我知道支持在特定同步上下文上回调,但这是我作为 IObservable<T> 作者需要担心的事情还是以某种方式内置的?
更新:
这是 Brian 的 F# 解决方案的 C# 版本
using System;
using System.Linq;
using Microsoft.FSharp.Collections;
namespace Jesperll
{
class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
{
private FSharpMap<int, IObserver<T>> subscribers =
FSharpMap<int, IObserver<T>>.Empty;
private readonly object thisLock = new object();
private int key;
private bool isDisposed;
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (disposing && !isDisposed)
{
OnCompleted();
isDisposed = true;
}
}
protected void OnNext(T value)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnNext(value);
}
}
protected void OnError(Exception exception)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
if (exception == null)
{
throw new ArgumentNullException("exception");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnError(exception);
}
}
protected void OnCompleted()
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnCompleted();
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
lock (thisLock)
{
int k = key++;
subscribers = subscribers.Add(k, observer);
return new AnonymousDisposable(() =>
{
lock (thisLock)
{
subscribers = subscribers.Remove(k);
}
});
}
}
}
class AnonymousDisposable : IDisposable
{
Action dispose;
public AnonymousDisposable(Action dispose)
{
this.dispose = dispose;
}
public void Dispose()
{
dispose();
}
}
}
编辑: 如果 Dispose 被调用两次,则不要抛出 ObjectDisposeException
解决方案
老实说,我不确定这一切有多“正确”,但根据我迄今为止的经验,感觉相当不错。这是 F# 代码,但希望您能感受到其中的味道。它允许您“新建”源对象,然后您可以对其调用 Next/Completed/Error,并且它管理订阅并在源或客户端做坏事时尝试断言。
type ObservableSource<'T>() = // '
let protect f =
let mutable ok = false
try
f()
ok <- true
finally
Debug.Assert(ok, "IObserver methods must not throw!")
// TODO crash?
let mutable key = 0
// Why a Map and not a Dictionary? Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
let mutable subscriptions = Map.empty : Map<int,IObserver<'T>> // '
let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
let thisLock = new obj()
let obs =
{ new IObservable<'T> with // '
member this.Subscribe(o) =
let k =
lock thisLock (fun () ->
let k = key
key <- key + 1
subscriptions <- subscriptions.Add(k, o)
k)
{ new IDisposable with
member this.Dispose() =
lock thisLock (fun () ->
subscriptions <- subscriptions.Remove(k)) } }
let mutable finished = false
// The methods below are not thread-safe; the source ought not call these methods concurrently
member this.Next(x) =
Debug.Assert(not finished, "IObserver is already finished")
next x
member this.Completed() =
Debug.Assert(not finished, "IObserver is already finished")
finished <- true
completed()
member this.Error(e) =
Debug.Assert(not finished, "IObserver is already finished")
finished <- true
error e
// The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
member this.Value = obs
我会对这里的任何好坏想法感兴趣;我还没有机会查看 devlabs 中的所有新 Rx 内容......
我自己的经验表明:
- 订阅 observable 的人永远不应该放弃订阅。当订阅者抛出异常时,可观察对象无法做任何合理的事情。(这与事件类似。)很可能异常只会冒泡到顶级捕获所有处理程序或使应用程序崩溃。
- 源可能应该是“逻辑上单线程”。我认为编写能够对并发 OnNext 调用做出反应的客户端可能会更困难;即使每个单独的调用来自不同的线程,也有助于避免并发调用。
- 拥有一个强制执行某些“契约”的基类/辅助类绝对有用。
我很好奇人们是否可以在这些方面提出更具体的建议。
其他提示
这 官方文档 不赞成用户自己实现 IObservable。相反,用户应该使用工厂方法 Observable.Create
如果可能,通过组合现有运算符来实现新的运算符。否则使用 Observable.Create 实现自定义运算符
Observable.Create 恰好是 Reactive 内部类的一个简单包装器 AnonymousObservable
:
public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
if (subscribe == null)
{
throw new ArgumentNullException("subscribe");
}
return new AnonymousObservable<TSource>(subscribe);
}
我不知道他们为什么不公开他们的实施,但是嘿,无论如何。
是,则收率关键字是可爱;也许会有的IObservable(OFT)相似的地方? [编辑:埃里克·梅杰的 PDC '09谈话他说,“是的,看这个空间”的声明性产量,用于产生可观测量。]
对于一些接近(而不是滚动您自己),请的底部“ (目前还没有)101的Rx样品”维基,所在团队建议使用主题(T)类的一个“后台”来实施的IObservable(OFT)。下面是其示例:
public class Order
{
private DateTime? _paidDate;
private readonly Subject<Order> _paidSubj = new Subject<Order>();
public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }
public void MarkPaid(DateTime paidDate)
{
_paidDate = paidDate;
_paidSubj.OnNext(this); // Raise PAID event
}
}
private static void Main()
{
var order = new Order();
order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe
order.MarkPaid(DateTime.Now);
}
打开 Reflector 看看。
观看一些 C9 视频 - 这 其中一个展示了如何“推导”选择“组合器”
秘诀是创建 AnonymousObservable、AnonymousObserver 和 AnonymousDisposable 类(它们只是解决无法实例化接口这一事实的解决方案)。它们包含零实现,因为您通过操作和函数传递它。
例如:
public class AnonymousObservable<T> : IObservable<T>
{
private Func<IObserver<T>, IDisposable> _subscribe;
public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
{
_subscribe = subscribe;
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _subscribe(observer);
}
}
剩下的事我会让你解决...这是一个很好的理解练习。
有一条漂亮的小线在生长 这里 与相关问题。
只是一个关于这个实现的话:
在.NET FW 4被引入并发集合之后它可能是最好使用ConcurrentDictioary而不是简单的字典。
它节省了收集处理锁。
ADI