تطبيق iobservable من الصفر
-
21-09-2019 - |
سؤال
تأتي الامتدادات التفاعلية مع الكثير من أساليب المساعد لتحويل الأحداث الحالية والعمليات غير المتزامنة إلى Observables ولكن كيف يمكنك تنفيذ iobservableu003CT> من الصفر؟
يحتوي iEnumerable على الكلمة الرئيسية العائد الجميلة لجعل التنفيذ بسيطًا جدًا.
ما هي الطريقة الصحيحة لتنفيذ iobservableu003CT> ؟
هل أحتاج إلى القلق بشأن سلامة الموضوع؟
أعلم أن هناك دعمًا للاستدعاء مرة أخرى في سياق التزامن محدد ، لكن هذا شيء أنا كأصنعة قابلة للرسومu003CT> المؤلف بحاجة إلى القلق أو هذا مدمج بطريقة أو بأخرى؟
تحديث:
ها هو إصدار C# الخاص بي من حل Brian's F#
using System;
using System.Linq;
using Microsoft.FSharp.Collections;
namespace Jesperll
{
class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
{
private FSharpMap<int, IObserver<T>> subscribers =
FSharpMap<int, IObserver<T>>.Empty;
private readonly object thisLock = new object();
private int key;
private bool isDisposed;
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (disposing && !isDisposed)
{
OnCompleted();
isDisposed = true;
}
}
protected void OnNext(T value)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnNext(value);
}
}
protected void OnError(Exception exception)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
if (exception == null)
{
throw new ArgumentNullException("exception");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnError(exception);
}
}
protected void OnCompleted()
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnCompleted();
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
lock (thisLock)
{
int k = key++;
subscribers = subscribers.Add(k, observer);
return new AnonymousDisposable(() =>
{
lock (thisLock)
{
subscribers = subscribers.Remove(k);
}
});
}
}
}
class AnonymousDisposable : IDisposable
{
Action dispose;
public AnonymousDisposable(Action dispose)
{
this.dispose = dispose;
}
public void Dispose()
{
dispose();
}
}
}
تعديل: لا ترمي ObjectDisposedException إذا تم استدعاء التخلص مرتين
المحلول
بصراحة ، لست متأكدًا من مدى "الصواب" لكل هذا ، ولكن إذا كان الأمر جيدًا استنادًا إلى تجربتي حتى الآن. إنه رمز F# ، ولكن نأمل أن تشعر بالنكهة. يتيح لك "جديدًا جديدًا" كائنًا مصدرًا ، يمكنك الاتصال به بعد ذلك/إكمال/خطأ ، ويدير الاشتراكات ويحاول تأكيد عندما يقوم المصدر أو العملاء بأشياء سيئة.
type ObservableSource<'T>() = // '
let protect f =
let mutable ok = false
try
f()
ok <- true
finally
Debug.Assert(ok, "IObserver methods must not throw!")
// TODO crash?
let mutable key = 0
// Why a Map and not a Dictionary? Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
let mutable subscriptions = Map.empty : Map<int,IObserver<'T>> // '
let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
let thisLock = new obj()
let obs =
{ new IObservable<'T> with // '
member this.Subscribe(o) =
let k =
lock thisLock (fun () ->
let k = key
key <- key + 1
subscriptions <- subscriptions.Add(k, o)
k)
{ new IDisposable with
member this.Dispose() =
lock thisLock (fun () ->
subscriptions <- subscriptions.Remove(k)) } }
let mutable finished = false
// The methods below are not thread-safe; the source ought not call these methods concurrently
member this.Next(x) =
Debug.Assert(not finished, "IObserver is already finished")
next x
member this.Completed() =
Debug.Assert(not finished, "IObserver is already finished")
finished <- true
completed()
member this.Error(e) =
Debug.Assert(not finished, "IObserver is already finished")
finished <- true
error e
// The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
member this.Value = obs
سأكون مهتمًا بأي أفكار حول ما هو جيد أو سيء هنا ؛ لم تتح لي الفرصة للنظر في جميع الأشياء الجديدة RX من Devlabs حتى الآن ...
تجربتي الخاصة تشير إلى:
- أولئك الذين يشتركون في Observables يجب ألا يرميوا من الاشتراكات. لا يوجد شيء معقول يمكن أن يفعله عندما يرمي المشترك. (هذا مشابه للأحداث.) على الأرجح ، سيقوم الاستثناء فقط بتقاطع معالج كل شيء على مستوى أعلى أو تحطم التطبيق.
- ربما يجب أن تكون المصادر "خيوط واحدة منطقية". أعتقد أنه قد يكون من الصعب كتابة عملاء يمكنهم الرد على مكالمات onnext المتزامنة ؛ حتى إذا جاءت كل مكالمة فردية من موضوع مختلف ، فمن المفيد تجنب المكالمات المتزامنة.
- من المفيد بالتأكيد أن يكون لديك فئة قاعدة/مساعد يفرض بعض "العقود".
أنا فضولي للغاية إذا كان بإمكان الناس إظهار المزيد من النصائح الملموسة على هذا المنوال.
نصائح أخرى
ال الوثائق الرسمية يقلل المستخدمين من تطبيق iobservable أنفسهم. بدلاً من ذلك ، من المتوقع أن يستخدم المستخدمون طريقة المصنع Observable.Create
عندما يكون ذلك ممكنًا ، قم بتنفيذ المشغلين الجدد عن طريق تكوين المشغلين الحاليين. بخلاف ذلك ، قم بتنفيذ المشغلين المخصصين باستخدام الملاحظة
يحدث أن يكون ذلك يمكن ملاحظته AnonymousObservable
:
public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
if (subscribe == null)
{
throw new ArgumentNullException("subscribe");
}
return new AnonymousObservable<TSource>(subscribe);
}
لا أعرف لماذا لم يصنعوا تنفيذهم على الملأ ، ولكن مهلا ، أيا كان.
نعم ، الكلمة الرئيسية العائد جميلة ؛ ربما سيكون هناك شيء مشابه لـ iobservable (oft)؟ [تحرير: في إريك ميجيرز PDC '09 الحديث يقول "نعم ، شاهد هذه المساحة" إلى عائد تصريح لتوليد الملاحظات.
لشيء قريب (بدلاً من لفك) ، تحقق من القاع التابع "(ليس بعد) 101 عينات RX"ويكي ، حيث يقترح الفريق استخدام فئة الموضوع (T) باعتبارها" الواجهة الخلفية "لتنفيذ iobservable (OFT). إليك مثالهم:
public class Order
{
private DateTime? _paidDate;
private readonly Subject<Order> _paidSubj = new Subject<Order>();
public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }
public void MarkPaid(DateTime paidDate)
{
_paidDate = paidDate;
_paidSubj.OnNext(this); // Raise PAID event
}
}
private static void Main()
{
var order = new Order();
order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe
order.MarkPaid(DateTime.Now);
}
صدع عاكس مفتوح وإلقاء نظرة.
شاهد بعض مقاطع الفيديو C9 - هذه يوضح المرء كيف يمكنك "استخلاص" SELECT "combinator"
السر هو إنشاء فصول مجهول الهوية ، مجهول الهوية ، مجهول الهوية ، (والتي هي مجرد عمل من أجل حقيقة أنه لا يمكنك إنشاء واجهات). أنها تحتوي على تنفيذ صفر ، أثناء مرور ذلك بالإجراءات والفوانيس.
علي سبيل المثال:
public class AnonymousObservable<T> : IObservable<T>
{
private Func<IObserver<T>, IDisposable> _subscribe;
public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
{
_subscribe = subscribe;
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _subscribe(observer);
}
}
سأدعك تعمل الباقي ... إنه تمرين جيد جدًا في الفهم.
هناك خيط صغير لطيف ينمو هنا مع الأسئلة ذات الصلة.
ملاحظة واحدة فقط بخصوص هذا التنفيذ:
بعد إدخال المجموعات المتزامنة في .NET FW 4 ، من الأفضل استخدام ConversItioary بدلاً من قاموس بسيط.
يحفظ التعامل مع الأقفال على المجموعة.
عدي.