سؤال

تأتي الامتدادات التفاعلية مع الكثير من أساليب المساعد لتحويل الأحداث الحالية والعمليات غير المتزامنة إلى Observables ولكن كيف يمكنك تنفيذ iobservableu003CT> من الصفر؟

يحتوي iEnumerable على الكلمة الرئيسية العائد الجميلة لجعل التنفيذ بسيطًا جدًا.

ما هي الطريقة الصحيحة لتنفيذ iobservableu003CT> ؟

هل أحتاج إلى القلق بشأن سلامة الموضوع؟

أعلم أن هناك دعمًا للاستدعاء مرة أخرى في سياق التزامن محدد ، لكن هذا شيء أنا كأصنعة قابلة للرسومu003CT> المؤلف بحاجة إلى القلق أو هذا مدمج بطريقة أو بأخرى؟

تحديث:

ها هو إصدار C# الخاص بي من حل Brian's F#

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

تعديل: لا ترمي ObjectDisposedException إذا تم استدعاء التخلص مرتين

هل كانت مفيدة؟

المحلول

بصراحة ، لست متأكدًا من مدى "الصواب" لكل هذا ، ولكن إذا كان الأمر جيدًا استنادًا إلى تجربتي حتى الآن. إنه رمز F# ، ولكن نأمل أن تشعر بالنكهة. يتيح لك "جديدًا جديدًا" كائنًا مصدرًا ، يمكنك الاتصال به بعد ذلك/إكمال/خطأ ، ويدير الاشتراكات ويحاول تأكيد عندما يقوم المصدر أو العملاء بأشياء سيئة.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

سأكون مهتمًا بأي أفكار حول ما هو جيد أو سيء هنا ؛ لم تتح لي الفرصة للنظر في جميع الأشياء الجديدة RX من Devlabs حتى الآن ...

تجربتي الخاصة تشير إلى:

  • أولئك الذين يشتركون في Observables يجب ألا يرميوا من الاشتراكات. لا يوجد شيء معقول يمكن أن يفعله عندما يرمي المشترك. (هذا مشابه للأحداث.) على الأرجح ، سيقوم الاستثناء فقط بتقاطع معالج كل شيء على مستوى أعلى أو تحطم التطبيق.
  • ربما يجب أن تكون المصادر "خيوط واحدة منطقية". أعتقد أنه قد يكون من الصعب كتابة عملاء يمكنهم الرد على مكالمات onnext المتزامنة ؛ حتى إذا جاءت كل مكالمة فردية من موضوع مختلف ، فمن المفيد تجنب المكالمات المتزامنة.
  • من المفيد بالتأكيد أن يكون لديك فئة قاعدة/مساعد يفرض بعض "العقود".

أنا فضولي للغاية إذا كان بإمكان الناس إظهار المزيد من النصائح الملموسة على هذا المنوال.

نصائح أخرى

ال الوثائق الرسمية يقلل المستخدمين من تطبيق iobservable أنفسهم. بدلاً من ذلك ، من المتوقع أن يستخدم المستخدمون طريقة المصنع Observable.Create

عندما يكون ذلك ممكنًا ، قم بتنفيذ المشغلين الجدد عن طريق تكوين المشغلين الحاليين. بخلاف ذلك ، قم بتنفيذ المشغلين المخصصين باستخدام الملاحظة

يحدث أن يكون ذلك يمكن ملاحظته AnonymousObservable:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

لا أعرف لماذا لم يصنعوا تنفيذهم على الملأ ، ولكن مهلا ، أيا كان.

نعم ، الكلمة الرئيسية العائد جميلة ؛ ربما سيكون هناك شيء مشابه لـ iobservable (oft)؟ [تحرير: في إريك ميجيرز PDC '09 الحديث يقول "نعم ، شاهد هذه المساحة" إلى عائد تصريح لتوليد الملاحظات.

لشيء قريب (بدلاً من لفك) ، تحقق من القاع التابع "(ليس بعد) 101 عينات RX"ويكي ، حيث يقترح الفريق استخدام فئة الموضوع (T) باعتبارها" الواجهة الخلفية "لتنفيذ iobservable (OFT). إليك مثالهم:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}
  1. صدع عاكس مفتوح وإلقاء نظرة.

  2. شاهد بعض مقاطع الفيديو C9 - هذه يوضح المرء كيف يمكنك "استخلاص" SELECT "combinator"

  3. السر هو إنشاء فصول مجهول الهوية ، مجهول الهوية ، مجهول الهوية ، (والتي هي مجرد عمل من أجل حقيقة أنه لا يمكنك إنشاء واجهات). أنها تحتوي على تنفيذ صفر ، أثناء مرور ذلك بالإجراءات والفوانيس.

علي سبيل المثال:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

سأدعك تعمل الباقي ... إنه تمرين جيد جدًا في الفهم.

هناك خيط صغير لطيف ينمو هنا مع الأسئلة ذات الصلة.

ملاحظة واحدة فقط بخصوص هذا التنفيذ:

بعد إدخال المجموعات المتزامنة في .NET FW 4 ، من الأفضل استخدام ConversItioary بدلاً من قاموس بسيط.

يحفظ التعامل مع الأقفال على المجموعة.

عدي.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top