Mise en œuvre IObservable à partir de zéro
-
21-09-2019 - |
Question
Les extensions réactives viennent avec beaucoup de méthodes d'aide pour transformer les événements et les opérations asynchrones existantes en mais comment observables vous mettre en œuvre un IObservable
IEnumerable a le mot-clé joli rendement pour le rendre très simple à mettre en œuvre.
Quelle est la bonne façon de mettre en œuvre IObservable
Ai-je besoin à vous soucier de la sécurité de fil?
Je sais qu'il ya un soutien pour favoriser rappelé un contexte de synchronisation spécifique mais est-ce quelque chose que je IObservable comme
Mise à jour:
Voici ma version C # de la solution F # Brian
using System;
using System.Linq;
using Microsoft.FSharp.Collections;
namespace Jesperll
{
class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
{
private FSharpMap<int, IObserver<T>> subscribers =
FSharpMap<int, IObserver<T>>.Empty;
private readonly object thisLock = new object();
private int key;
private bool isDisposed;
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (disposing && !isDisposed)
{
OnCompleted();
isDisposed = true;
}
}
protected void OnNext(T value)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnNext(value);
}
}
protected void OnError(Exception exception)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
if (exception == null)
{
throw new ArgumentNullException("exception");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnError(exception);
}
}
protected void OnCompleted()
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnCompleted();
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
lock (thisLock)
{
int k = key++;
subscribers = subscribers.Add(k, observer);
return new AnonymousDisposable(() =>
{
lock (thisLock)
{
subscribers = subscribers.Remove(k);
}
});
}
}
}
class AnonymousDisposable : IDisposable
{
Action dispose;
public AnonymousDisposable(Action dispose)
{
this.dispose = dispose;
}
public void Dispose()
{
dispose();
}
}
}
modifier Ne pas jeter ObjectDisposedException si Dispose est appelé deux fois
La solution
Honnêtement, je ne suis pas sûr de savoir comment « droit » est tout cela, mais si se sent assez bien après mon expérience jusqu'à présent. Il est F # code, mais nous espérons que vous obtenez un sens de la saveur. Il vous permet de « nouvelle up » un objet source, que vous pouvez ensuite appeler suivant / Terminé / Erreur sur, et il gère les abonnements et tente de Affirmez lorsque la source ou les clients font de mauvaises choses.
type ObservableSource<'T>() = // '
let protect f =
let mutable ok = false
try
f()
ok <- true
finally
Debug.Assert(ok, "IObserver methods must not throw!")
// TODO crash?
let mutable key = 0
// Why a Map and not a Dictionary? Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
let mutable subscriptions = Map.empty : Map<int,IObserver<'T>> // '
let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
let thisLock = new obj()
let obs =
{ new IObservable<'T> with // '
member this.Subscribe(o) =
let k =
lock thisLock (fun () ->
let k = key
key <- key + 1
subscriptions <- subscriptions.Add(k, o)
k)
{ new IDisposable with
member this.Dispose() =
lock thisLock (fun () ->
subscriptions <- subscriptions.Remove(k)) } }
let mutable finished = false
// The methods below are not thread-safe; the source ought not call these methods concurrently
member this.Next(x) =
Debug.Assert(not finished, "IObserver is already finished")
next x
member this.Completed() =
Debug.Assert(not finished, "IObserver is already finished")
finished <- true
completed()
member this.Error(e) =
Debug.Assert(not finished, "IObserver is already finished")
finished <- true
error e
// The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
member this.Value = obs
Je serai intéressé par des idées sur ce qui est bon ou mauvais ici; Je ne l'ai pas eu l'occasion de regarder toutes les nouveautés Rx devlabs encore ...
Mes expériences suggèrent que:
- Ceux qui souscrivent à ne devrait jamais jeter observables des souscriptions. Il n'y a rien de raisonnable observable peut faire lorsqu'un abonné jette. (Ceci est similaire à des événements.) Très probablement l'exception sera juste bulles jusqu'à un fourre-tout gestionnaire de haut niveau ou un plantage de l'application.
- Sources devrait probablement être « fileté logiquement unique ». Je pense qu'il peut être plus difficile à écrire des clients qui peuvent réagir aux appels OnNext simultanés; même si chaque appel provient d'un autre thread, il est utile pour éviter les appels simultanés.
- Il est certainement utile d'avoir une classe de base / aide qui impose des « contrats ».
Je suis très curieux de savoir si les gens peuvent montrer des conseils plus concrets dans ce sens.
Autres conseils
Le documentation officielle deprecates les utilisateurs eux-mêmes la mise en œuvre IObservable. , Les utilisateurs sont plutôt censés utiliser la méthode de l'usine Observable.Create
Lorsque cela est possible, la mise en œuvre de nouveaux opérateurs en composant les opérateurs existants. Dans le cas contraire de mettre en œuvre les opérateurs personnalisés à l'aide Observable.Create
Il arrive que Observable.Create est un wrapper trivial autour AnonymousObservable
de classe interne réactive:
public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
if (subscribe == null)
{
throw new ArgumentNullException("subscribe");
}
return new AnonymousObservable<TSource>(subscribe);
}
Je ne sais pas pourquoi ils ne faisaient pas leur public la mise en œuvre, mais bon, peu importe.
Oui, le mot-clé yield est belle; peut-être, il y aura quelque chose de similaire pour IObservable (OFT)? [Edit: Eric Meijer href="http://www.microsoftpdc.com/2009/VTL04?type=wmvhigh" PDC '09 il dit « oui, regardez cette l'espace » à un rendement déclarative pour générer des observables.]
Pour quelque chose de proche (au lieu de rouler votre propre), consultez le fond du " (pas encore) 101 Rx « wiki d'échantillons, où l'équipe suggère l'utilisation de la classe Objet (T) un « back-end » pour mettre en œuvre un IObservable (OFT). Voici leur exemple:
public class Order
{
private DateTime? _paidDate;
private readonly Subject<Order> _paidSubj = new Subject<Order>();
public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }
public void MarkPaid(DateTime paidDate)
{
_paidDate = paidDate;
_paidSubj.OnNext(this); // Raise PAID event
}
}
private static void Main()
{
var order = new Order();
order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe
order.MarkPaid(DateTime.Now);
}
-
Crack réflecteur ouvert et un coup d'oeil.
-
Regardez quelques vidéos C9 - cette on montre comment vous pouvez 'derive' Select 'Combinator'
-
Le secret est de créer des classes AnonymousObservable, AnonymousObserver et AnonymousDisposable, (qui sont fonctionnent tout contournements du fait que vous ne pouvez pas instancier interfaces). Ils contiennent la mise en œuvre de zéro, comme vous repasserez avec les actions et funcs.
Par exemple:
public class AnonymousObservable<T> : IObservable<T>
{
private Func<IObserver<T>, IDisposable> _subscribe;
public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
{
_subscribe = subscribe;
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _subscribe(observer);
}
}
Je vais vous laisser travailler le reste ... il est un très bon exercice dans la compréhension.
Il y a un joli petit fil de plus en plus avec connexes questions.
juste une remarque au sujet de cette mise en œuvre:
après des collections simultanées introduites dans .net fw 4, il est probablement préférable d'utiliser ConcurrentDictioary au lieu d'un simple dictionnaire.
il enregistre la manipulation des verrous sur la collection.
adi.