Frage

Die Reactive Extensions kommen mit vielen Hilfsmethoden zum Drehen bestehende Ereignisse und asynchrone Operationen in Observablen aber wie würden Sie ein IObservable von Grunde auf neu implementieren?

IEnumerable hat das schöne Ausbeute Schlüsselwort es sehr einfach zu machen zu implementieren.

Was ist die richtige Art und Weise der Umsetzung IObservable ?

muss ich Sorge um die Thread-Sicherheit?

Ich weiß, es ist die Unterstützung wieder auf einem bestimmten Synchronisationskontext immer genannt, aber das ist etwas, was ich als IObservable Autor Grund zur Sorge über oder dieses irgendwie built-in?

Update:

Hier ist meine C # Version von Brian F # Lösung

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

Bearbeiten Werfen ObjectDisposedException nicht, wenn Dispose wird zweimal aufgerufen

War es hilfreich?

Lösung

Ehrlich gesagt, bin ich nicht sicher, wie ‚richtig‘ das alles ist, aber wenn fühlt sich ziemlich gut auf Grund meiner bisherigen Erfahrungen. Es ist F # -Code, aber hoffentlich bekommen Sie ein Gefühl für den Geschmack. Damit können Sie ‚neue up‘ ein Quellobjekt, mit dem Sie dann auf Weiter anrufen / Abgeschlossen / Fehler auf, und es verwaltet Abonnements und versucht zu behaupten, wenn die Quelle oder Kunden schlechte Dinge tun.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

Ich werde in irgendwelchen Gedanken über interessiert sein, was gut oder schlecht ist hier; Ich hatte keine Chance bei allen neuen Rx Sachen zu sehen von DevLabs noch ...

Meine eigenen Erfahrungen legen nahe, dass:

  • Diejenigen, die Observablen abonnieren sollten nie aus den Abonnements werfen. Es gibt nichts, vernünftig eine beobachtbare tun kann, wenn ein Teilnehmer wirft. (Dies ist vergleichbar mit Ereignissen.) Die meisten wahrscheinlich die Ausnahme wird nur Blase bis zu einem Top-Level-catch-all-Handler oder die App zum Absturz bringen.
  • Quellen sollten wahrscheinlich "logisch single threaded" sein. Ich denke, es kann schwieriger zu schreiben Clients, die gleichzeitig OnNext Anrufe reagieren können; selbst wenn jeder einzelne Anruf von einem anderen Thread kommt, ist es hilfreich gleichzeitige Anrufe zu vermeiden.
  • Es ist auf jeden Fall sinnvoll, eine Basis / Hilfsklasse zu haben, die einige ‚Verträge‘ erzwingt.

Ich bin sehr gespannt, ob die Menschen in diese Richtung mehr konkrete Ratschläge zeigen.

Andere Tipps

Die offizielle Dokumentation Benutzer Umsetzung IObservable sich deprecates. Stattdessen werden Benutzer erwartet, dass die Factory-Methode Observable.Create

  

Wenn möglich, implementieren neue Betreiber von bestehenden Betreiber zu komponieren. Ansonsten implementieren mithilfe von benutzerdefinierten Operatoren Observable.Create

Es kommt vor, dass Observable.Create eine triviale Wrapper um Reactive interne Klasse AnonymousObservable ist:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

Ich weiß nicht, warum sie nicht ihre Umsetzung öffentlich machen, aber hey, was auch immer.

Ja, die Ausbeute Schlüsselwort ist schön; vielleicht gibt es etwas ähnliches für IObservable (OFT) sein? [Edit: In Eric Meijer PDC '09 Gespräch er sagt: „Ja, diese Uhr Raum“zu einer deklarativen Ausbeute für Observablen zu erzeugen.]

Für etwas in der Nähe (anstatt Ihre eigene Walz) Besuche den Boden der " (noch nicht) 101 Rx Proben “Wiki, wo das Team der Verwendung des Subjekts (T) Klasse schlägt als ein „Back-End“, um eine IObservable (oFT) zu implementieren. Hier ist ihr Beispiel:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}
  1. Riss offen Reflektor und einen Blick haben.

  2. Sehen Sie einige C9 Videos - dieses eine zeigt, wie man 'derive' das auswählen 'combinator'

  3. Das Geheimnis ist AnonymousObservable, AnonymousObserver und AnonymousDisposable Klassen zu erstellen, (die nur Workarounds für die Tatsache sind, dass Sie nicht instantiate Schnittstellen). Sie enthalten Null Implementierung, wie Sie passieren, dass in mit Aktionen und funcs.

Zum Beispiel:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

Ich lasse Sie den Rest trainieren ... es ist eine sehr gute Übung zu verstehen.

Es ist ein schöner kleiner Faden wächst hier mit verwandten Fragen.

nur eine Bemerkung bezüglich dieser Umsetzung:

nach gleichzeitige Sammlungen in .net fw 4 eingeführt ist es wahrscheinlich besser ConcurrentDictioary anstelle eines einfachen Wörterbuch zu verwenden.

spart Sperren für die Sammlung der Handhabung.

adi.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top