Question

Les extensions réactives viennent avec beaucoup de méthodes d'aide pour transformer les événements et les opérations asynchrones existantes en mais comment observables vous mettre en œuvre un IObservable à partir de zéro?

IEnumerable a le mot-clé joli rendement pour le rendre très simple à mettre en œuvre.

Quelle est la bonne façon de mettre en œuvre IObservable ?

Ai-je besoin à vous soucier de la sécurité de fil?

Je sais qu'il ya un soutien pour favoriser rappelé un contexte de synchronisation spécifique mais est-ce quelque chose que je IObservable comme auteur besoin de s'inquiéter ou cela d'une certaine manière intégrée?

Mise à jour:

Voici ma version C # de la solution F # Brian

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

modifier Ne pas jeter ObjectDisposedException si Dispose est appelé deux fois

Était-ce utile?

La solution

Honnêtement, je ne suis pas sûr de savoir comment « droit » est tout cela, mais si se sent assez bien après mon expérience jusqu'à présent. Il est F # code, mais nous espérons que vous obtenez un sens de la saveur. Il vous permet de « nouvelle up » un objet source, que vous pouvez ensuite appeler suivant / Terminé / Erreur sur, et il gère les abonnements et tente de Affirmez lorsque la source ou les clients font de mauvaises choses.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

Je serai intéressé par des idées sur ce qui est bon ou mauvais ici; Je ne l'ai pas eu l'occasion de regarder toutes les nouveautés Rx devlabs encore ...

Mes expériences suggèrent que:

  • Ceux qui souscrivent à ne devrait jamais jeter observables des souscriptions. Il n'y a rien de raisonnable observable peut faire lorsqu'un abonné jette. (Ceci est similaire à des événements.) Très probablement l'exception sera juste bulles jusqu'à un fourre-tout gestionnaire de haut niveau ou un plantage de l'application.
  • Sources devrait probablement être « fileté logiquement unique ». Je pense qu'il peut être plus difficile à écrire des clients qui peuvent réagir aux appels OnNext simultanés; même si chaque appel provient d'un autre thread, il est utile pour éviter les appels simultanés.
  • Il est certainement utile d'avoir une classe de base / aide qui impose des « contrats ».

Je suis très curieux de savoir si les gens peuvent montrer des conseils plus concrets dans ce sens.

Autres conseils

Le documentation officielle deprecates les utilisateurs eux-mêmes la mise en œuvre IObservable. , Les utilisateurs sont plutôt censés utiliser la méthode de l'usine Observable.Create

  

Lorsque cela est possible, la mise en œuvre de nouveaux opérateurs en composant les opérateurs existants. Dans le cas contraire de mettre en œuvre les opérateurs personnalisés à l'aide Observable.Create

Il arrive que Observable.Create est un wrapper trivial autour AnonymousObservable de classe interne réactive:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

Je ne sais pas pourquoi ils ne faisaient pas leur public la mise en œuvre, mais bon, peu importe.

  1. Crack réflecteur ouvert et un coup d'oeil.

  2. Regardez quelques vidéos C9 - cette on montre comment vous pouvez 'derive' Select 'Combinator'

  3. Le secret est de créer des classes AnonymousObservable, AnonymousObserver et AnonymousDisposable, (qui sont fonctionnent tout contournements du fait que vous ne pouvez pas instancier interfaces). Ils contiennent la mise en œuvre de zéro, comme vous repasserez avec les actions et funcs.

Par exemple:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

Je vais vous laisser travailler le reste ... il est un très bon exercice dans la compréhension.

Il y a un joli petit fil de plus en plus avec connexes questions.

juste une remarque au sujet de cette mise en œuvre:

après des collections simultanées introduites dans .net fw 4, il est probablement préférable d'utiliser ConcurrentDictioary au lieu d'un simple dictionnaire.

il enregistre la manipulation des verrous sur la collection.

adi.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top