Domanda

Le estensioni reattivi sono dotati di un sacco di metodi di supporto per trasformare gli eventi e le operazioni asincrone in osservabili, ma come si potrebbe implementare un IObservable da zero?

IEnumerable ha la parola resa bella di rendere molto semplice da implementare.

Qual è il modo corretto di attuare IObservable ?

Ho bisogno di preoccuparsi per la sicurezza dei thread?

So che c'è il supporto per ottenere richiamato in un contesto di sincronizzazione specifica, ma è questa una cosa che come IObservable autore è necessario preoccuparsi di questo in qualche modo o built-in?

Aggiornamento:

Ecco la mia C # versione di F # soluzione di Brian

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

modifica Non gettare ObjectDisposedException se Dispose viene chiamato due volte

È stato utile?

Soluzione

Onestamente, io non sono sicuro di come 'diritto' tutto questo è, ma se si sente abbastanza bene in base alla mia esperienza finora. E 'il codice F #, ma si spera che si ottiene un senso del sapore. Esso consente di 'nuova up' un oggetto di origine, che è quindi possibile chiamare Avanti / Completata / Errore, e gestisce le sottoscrizioni e cerca di far valere quando la fonte o clienti fanno cose cattive.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

Sarò interessato a qualche idea su ciò che è buono o cattivo qui; Non ho avuto la possibilità di guardare tutte le cose nuove Rx da devlabs ancora ...

Le mie esperienze suggeriscono che:

  • Coloro che sottoscrivono osservabili non dovrebbe mai buttare dagli abbonamenti. Non c'è nulla di ragionevole un'osservabile può fare quando un abbonato getta. (Questo è simile agli eventi.) Molto probabilmente l'eccezione sarà solo bolla fino ad un livello superiore catch-all handler o mandare in crash l'applicazione.
  • Fonti probabilmente dovrebbe essere "filettato logicamente single". Penso che potrebbe essere più difficile di scrivere i clienti che possono reagire alle chiamate simultanee OnNext; anche se ogni singola chiamata proviene da un thread diverso, è utile per evitare chiamate simultanee.
  • E 'sicuramente utile avere una classe base / supporto che fa rispettare alcuni 'contratti'.

Sono molto curioso di sapere se le persone possono mostrare consiglio più concreto in questa direzione.

Altri suggerimenti

Il documentazione ufficiale depreca gli utenti di attuazione IObservable stessi. Invece, gli utenti sono tenuti a utilizzare il metodo factory Observable.Create

  

Quando possibile, implementare nuovi operatori componendo operatori esistenti. In caso contrario, implementare operatori personalizzati utilizzando Observable.Create

Succede che Observable.Create è un wrapper banale intorno AnonymousObservable classe interna del reattivo:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

Non so perché non hanno fatto la loro attuazione pubblico, ma hey, qualunque sia.

Sì, la parola resa è incantevole; forse ci sarà qualcosa di simile per IObservable (OFT)? [Edit: In PDC '09 parlare dice "sì, guarda questo spazio" per una resa dichiarativa per generare osservabili.]

Per qualcosa di simile (invece di tirare il proprio), controlla la fondo della " (non ancora) 101 Rx campioni " wiki, in cui il team suggerisce l'uso del soggetto (T) di classe come un "back-end" per l'attuazione di un IObservable (OFT). Ecco il loro esempio:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}
  1. Crack riflettore aperto e dare un'occhiata.

  2. guardare alcuni video C9 - questo una mostra come si può 'derivare' Select 'combinatore'

  3. Il segreto è quello di creare classi AnonymousObservable, AnonymousObserver e AnonymousDisposable, (che sono solo il lavoro arounds per il fatto che non è possibile creare un'istanza interfacce). Essi contengono implementazione pari a zero, come si passa che con azioni e funcs.

Ad esempio:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

ti lascio lavorare il resto ... è un ottimo esercizio di comprensione.

C'è un bel po 'di filo in crescita qui con relativo domande.

solo un'osservazione riguardo a questa implementazione:

dopo collezioni concorrenti in fase di introduzione in .net fw 4 è probabilmente meglio utilizzare ConcurrentDictioary invece di un semplice dizionario.

si salva la manipolazione serrature sulla raccolta.

adi.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top