كيف (وإذا) لكتابة قائمة انتظار مستهلك واحد باستخدام TPL؟

StackOverflow https://stackoverflow.com/questions/2293976

سؤال

لقد سمعت مجموعة من البودكاست مؤخرًا حول TPL في .NET 4.0. يصف معظمهم أنشطة الخلفية مثل تنزيل الصور أو إجراء حساب ، باستخدام المهام بحيث لا يتداخل العمل مع موضوع واجهة المستخدم الرسومية.

تحتوي معظم الكود الذي أعمل عليه على نكهة متعددة المنتجات / المستهلك الواحد ، حيث يجب قائمة العناصر من مصادر متعددة ثم معالجتها بالترتيب. أحد الأمثلة على ذلك هو التسجيل ، حيث يتم تسلسل خطوط السجل من مؤشرات ترابط متعددة إلى قائمة انتظار واحدة للكتابة في نهاية المطاف إلى ملف أو قاعدة بيانات. يجب أن تبقى جميع السجلات من أي مصدر واحد في النظام ، ويجب أن تكون السجلات من نفس اللحظة في الوقت المناسب "قريبة" من بعضها البعض في الإخراج النهائي.

لذا فإن عدة مؤشرات ترابط أو مهام أو أي شيء يستدعيون جميعهم:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

ويقوم مؤشر ترابط العامل المخصص بمعالجة قائمة الانتظار:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

يبدو أنه من المعقول دائمًا تكريس خيط عامل لجانب المستهلك من هذه المهام. هل يجب أن أكتب برامج مستقبلية باستخدام بعض البناء من TPL بدلاً من ذلك؟ أي واحدة؟ لماذا ا؟

هل كانت مفيدة؟

المحلول

يمكنك استخدام مهمة طويلة المدى لمعالجة العناصر من الانتقادات التي اقترحتها Wilka. إليك مثال يفي بمتطلبات التطبيقات الخاصة بك إلى حد كبير. سترى إخراج شيء من هذا القبيل:

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

لا يظهر مخرجات من A ، B ، C&D عشوائيًا لأنها تعتمد على وقت بدء الخيوط ولكن يظهر B دائمًا قبل B1.

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}

نصائح أخرى

أعلم أن هذه الإجابة متأخرة حوالي عام ، لكن ألق نظرة عليها MSDN.

مما يوضح كيفية إنشاء محدودة currencyleveltascscheduler من فئة المهام. عن طريق الحد من التزامن مع مهمة واحدة ، يجب بعد ذلك معالجة مهامك بالترتيب لأنها يتم وضعها في قائمة الانتظار عبر:

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);

factory.StartNew(()=> 
{
   // your code
});

لست متأكدًا من أن TPL كافية في حالة الاستخدام الخاصة بك. من فهمي ، فإن حالة الاستخدام الرئيسية لـ TPL هي تقسيم مهمة ضخمة إلى عدة مهام أصغر يمكن تشغيلها جنبًا إلى جنب. على سبيل المثال ، إذا كان لديك قائمة كبيرة وتريد تطبيق نفس التحول على كل عنصر. في هذه الحالة ، يمكنك الحصول على عدة مهام لتطبيق التحول على مجموعة فرعية من القائمة.

لا يبدو أن الحالة التي تصفها تتناسب مع هذه الصورة بالنسبة لي. في حالتك ، ليس لديك العديد من المهام التي تفعل الشيء نفسه بالتوازي. لديك العديد من المهام المختلفة التي يقوم بها كل منها هي الوظيفة الخاصة (المنتجين) ومهمة واحدة تستهلكها. ربما يمكن استخدام TPL لجزء المستهلك إذا كنت ترغب في الحصول على العديد من المستهلكين لأنه في هذه الحالة ، يقوم كل مستهلك بنفس المهمة (على افتراض أنك تجد منطقًا لفرض الاتساق الزمني الذي تبحث عنه).

حسنًا ، هذا بالطبع هو مجرد رأيي الشخصي حول هذا الموضوع

عش لفترة طويلة وازدهار

هذا يبدو وكأنه BlockingCollection سيكون مفيدًا لك. لذلك للرمز الخاص بك أعلاه ، يمكنك استخدام شيء مثل (على افتراض _queue هو BlockingCollection نموذج):

// for your producers 
_queue.Add(some_work);

مؤشر ترابط العامل المخصص لمعالجة قائمة الانتظار:

foreach (var some_work in _queue.GetConsumingEnumerable())
{
    deal_with(some_work);
}

ملاحظة: عندما ينتهي جميع المنتجين من إنتاج الأشياء ، ستحتاج إلى الاتصال CompleteAdding() تشغيل _queue وإلا فإن المستهلك الخاص بك سوف يكون عالقًا في انتظار المزيد من العمل.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top