تبدو الامتدادات التفاعلية بطيئة للغاية - هل أفعل شيئًا خاطئًا؟

StackOverflow https://stackoverflow.com/questions/4272354

  •  28-09-2019
  •  | 
  •  

سؤال

أقوم بتقييم RX لمشروع منصة تداول سيحتاج إلى معالجة الآلاف من الرسائل في الثانية. يحتوي النظام الأساسي الحالي على نظام توجيه الأحداث المعقد (مندوبون متعدد البث) يستجيب لهذه الرسائل ويؤدي الكثير من المعالجة اللاحقة.

لقد نظرت في الامتدادات التفاعلية للحصول على الفوائد الواضحة ، لكنني لاحظت أنها أبطأ إلى حد ما ، ومعتاد 100 مرة أبطأ.

لقد قمت بإنشاء اختبار الوحدة لإظهار ذلك الذي يدير زيادة بسيطة بقيمة مليون مرة ، باستخدام مختلف نكهات RX واختبار "التحكم" المباشر ".

ها هي النتائج:

Delegate                                 - (1000000) - 00:00:00.0410000
Observable.Range()                       - (1000000) - 00:00:04.8760000
Subject.Subscribe() - NewThread          - (1000000) - 00:00:02.7630000
Subject.Subscribe() - CurrentThread      - (1000000) - 00:00:03.0280000
Subject.Subscribe() - Immediate          - (1000000) - 00:00:03.0030000
Subject.Subscribe() - ThreadPool         - (1000000) - 00:00:02.9800000
Subject.Subscribe() - Dispatcher         - (1000000) - 00:00:03.0360000

كما ترون ، فإن جميع أساليب RX أبطأ حوالي 100 مرة من ما يعادل المندوب. من الواضح أن RX تقوم بالكثير من الأغطية التي ستستخدم في مثال أكثر تعقيدًا ، ولكن هذا يبدو بطيئًا بشكل لا يصدق.

هل هذا طبيعي أم أن افتراضات الاختبار الخاصة بي غير صالحة؟ رمز NUNIT للأقل أدناه -

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using NUnit.Framework;
using System.Concurrency;

namespace RxTests
{
    [TestFixture]
    class ReactiveExtensionsBenchmark_Tests
    {
        private int counter = 0;

        [Test]
        public void ReactiveExtensionsPerformanceComparisons()
        {
            int iterations = 1000000;

            Action<int> a = (i) => { counter++; };

            DelegateSmokeTest(iterations, a);
            ObservableRangeTest(iterations, a);
            SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread");
            SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread");
            SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate");
            SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool");
            SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher");
        }

        public void ObservableRangeTest(int iterations, Action<int> action)
        {
            counter = 0;

            long start = DateTime.Now.Ticks;

            Observable.Range(0, iterations).Subscribe(action);

            OutputTestDuration("Observable.Range()", start);
        }


        public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
        {
            counter = 0;

            var eventSubject = new Subject<int>();
            var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb
            events.Subscribe(action);

            long start = DateTime.Now.Ticks;

            Enumerable.Range(0, iterations).ToList().ForEach
                (
                    a => eventSubject.OnNext(1)
                );

            OutputTestDuration("Subject.Subscribe() - " + mode, start);
        }

        public void DelegateSmokeTest(int iterations, Action<int> action)
        {
            counter = 0;
            long start = DateTime.Now.Ticks;

            Enumerable.Range(0, iterations).ToList().ForEach
                (
                    a => action(1)
                );

            OutputTestDuration("Delegate", start);
        }


        /// <summary>
        /// Output helper
        /// </summary>
        /// <param name="test"></param>
        /// <param name="duration"></param>
        public void OutputTestDuration(string test, long duration)
        {
            Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration)));
        }

        /// <summary>
        /// Test timing helper
        /// </summary>
        /// <param name="elapsedTicks"></param>
        /// <returns></returns>
        public string ElapsedDuration(long elapsedTicks)
        {
            return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString();
        }

    }
}
هل كانت مفيدة؟

المحلول

أظن أن فريق RX يركز على بناء الوظيفة أولاً ولا يهتم بتحسين الأداء حتى الآن.

استخدم profiler لتحديد الاختناقات واستبدال فئات RX البطيئة بالإصدارات المحسنة الخاصة بك.

فيما يلي مثالان.

نتائج:

Delegate                                 - (1000000) - 00:00:00.0368748

Simple - NewThread                       - (1000000) - 00:00:00.0207676
Simple - CurrentThread                   - (1000000) - 00:00:00.0214599
Simple - Immediate                       - (1000000) - 00:00:00.0162026
Simple - ThreadPool                      - (1000000) - 00:00:00.0169848

FastSubject.Subscribe() - NewThread      - (1000000) - 00:00:00.0588149
FastSubject.Subscribe() - CurrentThread  - (1000000) - 00:00:00.0508842
FastSubject.Subscribe() - Immediate      - (1000000) - 00:00:00.0513911
FastSubject.Subscribe() - ThreadPool     - (1000000) - 00:00:00.0529137

بادئ ذي بدء ، يبدو أنه يهم كثيرًا كيف يتم تنفيذ الملاحظة. إليك ملاحظات لا يمكن إلغاؤها ، لكنها سريعة:

private IObservable<int> CreateFastObservable(int iterations)
{
    return Observable.Create<int>(observer =>
    {
        new Thread(_ =>
        {
            for (int i = 0; i < iterations; i++)
            {
                observer.OnNext(i);
            }
            observer.OnCompleted();
        }).Start();
        return () => { };
    });
}

اختبار:

public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
    counter = 0;

    var start = Stopwatch.StartNew();

    var observable = CreateFastObservable(iterations);

    observable.SubscribeOn(scheduler).Run(action);

    OutputTestDuration("Simple - " + mode, start);
}

الموضوعات تضيف الكثير من النفقات العامة. إليك موضوعًا تم تجريده من الكثير من الوظائف المتوقعة من موضوع ما ، لكنه سريع:

class FastSubject<T> : ISubject<T>
{
    private event Action onCompleted;
    private event Action<Exception> onError;
    private event Action<T> onNext;

    public FastSubject()
    {
        onCompleted += () => { };
        onError += error => { };
        onNext += value => { };
    }

    public void OnCompleted()
    {
        this.onCompleted();
    }

    public void OnError(Exception error)
    {
        this.onError(error);
    }

    public void OnNext(T value)
    {
        this.onNext(value);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        this.onCompleted += observer.OnCompleted;
        this.onError += observer.OnError;
        this.onNext += observer.OnNext;

        return Disposable.Create(() =>
        {
            this.onCompleted -= observer.OnCompleted;
            this.onError -= observer.OnError;
            this.onNext -= observer.OnNext;
        });
    }
}

اختبار:

public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
    counter = 0;

    var start = Stopwatch.StartNew();

    var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount();

    observable.SubscribeOn(scheduler).Run(action);

    OutputTestDuration("FastSubject.Subscribe() - " + mode, start);
}

نصائح أخرى

تحديث لـ RX 2.0: أخذت الرمز من المنشور الأصلي مع (تقريبًا) أحدث LINQPAD BETA 4.42.04 (حسنًا ، هناك 06 ، ولكن على أي حال):Rx Main assemblies

... وتعديله قليلاً لاستخدام بناء جملة RX V2 الجديد:

        public void ReactiveExtensionsPerformanceComparisons()
    {
        int iterations = 1000000;

        Action<int> a = (i) => { counter++; };

        DelegateSmokeTest(iterations, a);
        ObservableRangeTest(iterations, a);
        SubjectSubscribeTest(iterations, a, NewThreadScheduler.Default, "NewThread");
        SubjectSubscribeTest(iterations, a, CurrentThreadScheduler.Instance, "CurrentThread");
        SubjectSubscribeTest(iterations, a, ImmediateScheduler.Instance, "Immediate");
        SubjectSubscribeTest(iterations, a, ThreadPoolScheduler.Instance, "ThreadPool");
        // I *think* this is the same as the ThreadPool scheduler in my case
        SubjectSubscribeTest(iterations, a, DefaultScheduler.Instance, "Default");                
        // doesn't work, as LinqPad has no Dispatcher attched to the Gui thread, maybe there's a workaround; the Instance property on it is obsolete
        //SubjectSubscribeTest(iterations, a, DispatcherScheduler.Current, "ThreadPool");
    }

ملحوظة: تختلف النتائج بشكل كبير ، في حالات نادرة ، يتفوق Threadpool على NewThread ، ولكن في معظم الحالات ، يكون لـ NewThread ميزة طفيفة فوق المجدولين الموجودين أدناه في القائمة:

Delegate                                 - (1000000) - 00:00:00.0440025
Observable.Range()                       - (1000000) - 00:00:01.9251101
Subject.Subscribe() - NewThread          - (1000000) - 00:00:00.0400023
Subject.Subscribe() - CurrentThread      - (1000000) - 00:00:00.0530030
Subject.Subscribe() - Immediate          - (1000000) - 00:00:00.0490028
Subject.Subscribe() - ThreadPool         - (1000000) - 00:00:00.0490028
Subject.Subscribe() - Default            - (1000000) - 00:00:00.0480028

لذلك يبدو أنهم عملوا بجد على الأداء ..

تذكر أن مندوبتك لا يضمن أي سلامة مؤشر ترابط - فهو يدعو حرفيًا المندوب من أي خيط يطلق عليه من أي موضوع ، بينما عند الاتصال بالملاحظة. ما تعتقد أنه يفعل.

لذلك ، قد يتحرك المندوبون بسرعة فائقة ، ولكن إذا كنت ترغب في بناء شيء عملي باستخدامه ، فسوف ينتهي بك الأمر إلى بناء مزامنة باليدين مما سيؤدي إلى إبطائك. ومع ذلك ، فإن RX ، تمامًا مثل LINQ ، عبارة عن تجريد - إذا كنت بحاجة إلى أن تكون سريعًا يبعث على السخرية ، فيجب عليك البدء في كتابة التعليمات البرمجية القبيحة.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top