سؤال

لفترة من الوقت في شركتي ، استخدمنا منزلًا ObjectPool<T> التنفيذ الذي يوفر حظر الوصول إلى محتوياته. إنه واضح ومباشر: أ Queue<T>, ، و object لقفل ، و AutoResetEvent للإشارة إلى مؤشر ترابط "الاقتراض" عند إضافة عنصر.

لحم الفصل هو حقا هاتين الطريقتين:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

لقد استخدمنا هذا وعدد قليل من فئات التجميع الأخرى بدلاً من تلك المقدمة System.Collections.Concurrent لأننا نستخدم .NET 3.5 ، وليس 4.0. لكن في الآونة الأخيرة اكتشفنا ذلك نظرًا لأننا نستخدم الامتدادات التفاعلية, ، نحن في الواقع فعل لديك Concurrent مساحة الاسم المتاحة لنا (في System.Threading.dll).

بطبيعة الحال ، اعتقدت ذلك منذ ذلك الحين BlockingCollection<T> هي واحدة من الفئات الأساسية في Concurrent مساحة الاسم ، من المحتمل أن توفر أداءً أفضل من أي شيء كتبته أنا أو زملائي في الفريق.

لذلك حاولت كتابة تطبيق جديد يعمل ببساطة:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

لدهشتي ، وفقًا لبعض الاختبارات البسيطة (الاقتراض/العودة إلى المسبح عدة آلاف مرة من خيوط متعددة) ، تنفيذنا الأصلي يدق بشكل كبير BlockingCollection<T> من حيث الأداء. كلاهما يبدو أنه يعمل بشكل صحيح; ؛ يبدو أن تنفيذنا الأصلي أسرع بكثير.

سؤالي:

  1. لماذا سيكون هذا؟ هل هو بسبب BlockingCollection<T> يوفر مرونة أكبر (أفهم أنه يعمل عن طريق لف IProducerConsumerCollection<T>) ، والتي تقدم بالضرورة الأداء النفقات العامة؟
  2. هل هذا مجرد استخدام مضلل من BlockingCollection<T> صف دراسي؟
  3. إذا كان هذا استخدامًا مناسبًا لـ BlockingCollection<T>, ، هل أنا فقط لا أستخدم بشكل صحيح؟ على سبيل المثال ، هو Take/Add النهج المبسط بشكل مفرط ، وهناك طريقة أفضل أداء للحصول على نفس الوظيفة؟

ما لم يكن لدى أي شخص بعض الأفكار لتقديمها ردًا على هذا السؤال الثالث ، يبدو أننا سنلتزم بتنفيذنا الأصلي في الوقت الحالي.

هل كانت مفيدة؟

المحلول

هناك بعض الاحتمالات المحتملة ، هنا.

أولاً، BlockingCollection<T> في الامتدادات التفاعلية ، هناك مُخلف ، وليس بالضبط نفس الإصدار النهائي .NET 4. لن أتفاجأ إذا كان أداء هذا الخلفية يختلف عن .NET 4 RTM (على الرغم من أنني لم أقم بتعيين هذه المجموعة على وجه التحديد). يؤدي الكثير من TPL بشكل أفضل في .NET 4 مما كان عليه في .NET 3.5 Backport.

ومع ذلك ، أظن أن تنفيذي سوف يتفوق على BlockingCollection<T> إذا كان لديك موضوع منتج واحد وخيط مستهلك واحد. مع وجود منتج واحد ومستهلك واحد ، سيكون لقفلك تأثير أصغر على الأداء الكلي ، وحدث إعادة التعيين وسيلة فعالة للغاية للانتظار على جانب المستهلك.

لكن، BlockingCollection<T> تم تصميمه للسماح للعديد من مؤشرات الترابط المنتجين بـ "enqueue" بشكل جيد للغاية. لن يؤدي هذا جيدًا مع تنفيذك ، حيث سيبدأ خلاف القفل في أن يصبح مشكلة إلى حد ما.

ومع ذلك ، أود أيضًا أن أشير إلى اعتقاد خاطئ واحد هنا:

... من المحتمل أن توفر أداءً أفضل من أي شيء كتبته أنا أو زملائي في الفريق.

هذا في كثير من الأحيان ليس صحيحا. عادة ما تؤدي فئات جمع الإطارات ممتاز, ، ولكن في كثير من الأحيان ليس الخيار الأكثر أداء لسيناريو معين. ومع ذلك ، فإنهم يميلون إلى الأداء بشكل جيد مع كونهم مرنين للغاية وقوي للغاية. غالبًا ما يميلون إلى التوسع جيدًا. غالبًا ما تتفوق فصول التجميع "المكتوبة بالمنزل" على مجموعات إطار في سيناريوهات محددة ، ولكنها تميل إلى أن تكون مشكلة عند استخدامها في السيناريوهات خارج المجموعة التي تم تصميمها على وجه التحديد. أظن أن هذا واحد من تلك المواقف.

نصائح أخرى

حاولت BlockingCollection ضد أ ConurrentQueue/AutoResetEvent التحرير والسرد (على غرار حل OP ، ولكن lockless) في .NET 4 ، والسرد الأخير كان لذا أسرع بكثير لحالة الاستخدام الخاصة بي ، التي تخلت عن الانتقادات. لسوء الحظ ، كان هذا قبل عام تقريبًا ولم أتمكن من العثور على النتائج القياسية.

استخدام Autoresetevent منفصل لا يجعل الأمور أكثر تعقيدًا. في الواقع ، يمكن للمرء حتى تجريده بعيدًا ، مرة واحدة وإلى الأبد ، إلى أ BlockingCollectionSlim....

يعتمد BlockingCollection داخليًا على concurrentqueue أيضًا ، ولكن يفعل بعض التوفيق الإضافي مع الإشارات النحيفة و الرموز الإلغاء, الذي ينتج عن ميزات إضافية ، ولكن بتكلفة ، حتى عند عدم استخدامه. تجدر الإشارة أيضًا إلى أن الانتقالات ليست متزوجة من ConcurrentQueue ، ولكن يمكن استخدامها مع المنفذيين الآخرين IProducerConsumerCollection بدلا من ذلك كذلك.


تنفيذ العظام غير المقيدة ، عارية جميلة التنفيذ:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);

صادفت نفس مشكلات الأداء مع BlockingCollection في .NET 4.7.2 ووجدت هذا المنشور. حالتي هي أجهزة الاستثمار المتعددة ، ولا سيما أن أجزاء البيانات الصغيرة تتم قراءة من العديد من المصادر ويجب معالجتها من قبل العديد من المرشحات. تم استخدام العديد من (Env.ProcessorCount) لانتقادات وانتهى بي الأمر مع profiler الأداء يخبرني بذلك BlockingCollection.GetConsumingEnumerable.MoveNext() يأكل المزيد من وقت وحدة المعالجة المركزية من التصفية الفعلية!

شكرا لك ، eugene beresovsky ، على الرمز الخاص بك. لمعلوماتك: في بيئتي كان أبطأ ما يقرب من مرتين من الانشقاق. لذا ، إليك مثلي:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

وبالنسبة للرمز الناقص للأداء ، أود أن أقترح تجنبه readonly المعدل الحقل. يضيف فحصًا على كل وصول ميداني في IL. مع رمز الاختبار التالي

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

على Core i5-3210m مع 2 النوى وتمكين HT لقد حصلت على الإخراج التالي:

BlockingCollection     0.0006ms
BlockingCollectionSlim 0.0010ms (Eugene Beresovsky implementation)
BlockingCollectionSpin 0.0003ms

لذلك ، النسخة المدونة أسرع مرتين من .NET BlockingCollection. لكنني أقترح استخدامه فقط! إذا كنت تفضل حقًا الأداء مقابل Code Simplicity (والصيانة).

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top