質問

Reactive Extensions には、既存のイベントや非同期操作をオブザーバブルに変換するためのヘルパー メソッドが多数付属していますが、IObservable<T> を最初から実装するにはどうすればよいでしょうか?

IEnumerable には、実装を非常に簡単にする素敵な yield キーワードがあります。

IObservable<T> を実装する適切な方法は何ですか?

スレッドの安全性について心配する必要がありますか?

特定の同期コンテキストでのコールバックのサポートがあることは知っていますが、これは IObservable<T> 作成者として心配する必要があることなのでしょうか、それともこれは何らかの形で組み込まれているのでしょうか?

アップデート:

これは、Brian の F# ソリューションの C# バージョンです。

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

編集: Dispose が 2 回呼び出された場合に ObjectDissolvedException をスローしない

役に立ちましたか?

解決

正直、私などの権利'すべてのことだって感じのか良好に基づく私の経験です。このF#コードでも使うこくのある味になります。できる、新しいアップ、ソースオブジェクトで呼び出して、次/完了/エラーで管理契約数をとろうと主張する場合、またはお客様になっいます。

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

私が興味をもった思いや、なかなかない機会がたく新Rxらdevlabsしかも---

自分の経験によると:

  • 人加わゆる観察可能な要因などが投げから参加できます。あの合理的な視できない場合の加入者です.(これは類似するものとします) その例外でのバブルトップレベルのキャッチオーハンドクラッシュのアプリです。
  • 源うべき"論理的にネジ付きシングル".と思うのですが、硬めの書きすることができるクライアント反応の同時OnNext電話;場合でも個別の通話が違っているスレッドではな兼職の状況ます。
  • レコード店ディスクユニオンの便利なベース/ヘルパークラスと実施を行う一部の契約'.

私は大変な興味を抱き、それまでになります具体的なアドバイス。

他のヒント

公式ドキュメント IObservable を自分で実装するユーザーは非推奨になります。代わりに、ユーザーはファクトリーメソッドを使用することが期待されます Observable.Create

可能であれば、既存の演算子を合成して新しい演算子を実装します。それ以外の場合は、Observable.Create を使用してカスタム演算子を実装します。

Observable.Create は Reactive の内部クラスの簡単なラッパーです。 AnonymousObservable:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

なぜ実装を公開しなかったのかはわかりませんが、まあ、どうでもいいでしょう。

はい、yieldキーワードは素敵です。多分IObservable(OFT)のために似たようながあるでしょうか? [編集:彼のはエリック・メイジャーの PDC '09話はい、これを見て」と言います観測を生成するための宣言収率スペース」。

"の

(代わりに、独自の圧延の)何かの近くでは、チェックアウト下する (まだ)101 Rxのチームは、件名の使用を示唆しているサンプルの」ウィキ、(T)クラスとしてIObservable(OFT)を実装するための「バックエンド」。ここではその一例です。

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}
  1. き裂開放射しています。

  2. ォC9動画でわかりやすく解説 この 一方できる'を選択しcombinator'

  3. その秘密をAnonymousObservable,AnonymousObserverとAnonymousDisposableクラス(仕事だけでaroundsは、ここ半年間の間、できないインスタンスを生成インタフェースの場合).が含まれているゼロを実施しているとの行動とFuncs.

例えば:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

おう仕事を休---でも運動です。

がちょっとしたスレッドの成長 こちらの 関連する質問をします。

この実施に関するひとつの発言ます:

同時コレクションは、.NET FW 4で導入された後、

は、それは簡単な辞書の代わりにConcurrentDictioaryを使用する方が良いでしょう。

これは、コレクションのロックを取り扱う保存します。

ADIます。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top