معالجة ملف مسطح في أجزاء باستخدام مؤشرات ترابط متعددة باستخدام نمط المنتج/المستهلك و sqlbulkcopy في SQL Server DB

StackOverflow https://stackoverflow.com/questions/2066503

سؤال

أتمنى أن تتحمل معي. أردت تقديم أكبر قدر ممكن من المعلومات. المشكلة الرئيسية هي كيفية إنشاء بنية (مثل المكدس) التي سيتم استخدامها بواسطة مؤشرات ترابط متعددة من شأنها أن تبرز قيمة واستخدامها لمعالجة ملف مسطح كبير وربما يركب ركوب الدراجات مرارًا وتكرارًا حتى تتم معالجة الملف بأكمله. إذا كان لدى الملف 100.000 سجل يمكن معالجته بواسطة 5 مؤشرات ترابط باستخدام قطع الصفوف 2.000 ، فسيحصل كل مؤشر ترابط على 10 قطع للمعالجة.

هدفي هو نقل البيانات في ملف مسطح (مع رأس ... subheader ... التفاصيل ، التفاصيل ، التفاصيل ، ... التفاصيل ، sub -ooter ، subheader ... التفاصيل ، التفاصيل ، التفاصيل ، ... التفاصيل ، sub -ooter subheader ... التفاصيل ، التفاصيل ، التفاصيل ، ... التفاصيل ، sub -ooter ، بنية تذييل) في OLTP DB التي لديها وضع الاسترداد إلى بسيط (ممكن الكامل) إلى 3 جداول: 1st تمثل المفتاح الفريد من subheader الحاضر في الصف الفرعي ، 2nd an an an an an Table SubheaderGroup ، الذي يمثل تجميع صفوف التفاصيل في قطع 2000 سجلات (يحتاج إلى أن يكون لهوية Subheader PK كـ FK والثالث الذي يمثل صفوفًا تفصيلية مع FK يشير إلى PK Subheader.

أقوم بإدارة المعاملات اليدوية لأنني يمكن أن أحصل على عشرات الآلاف من صفوف التفاصيل وأستخدم حقلًا خاصًا يتم تعيينه على 0 في جداول الوجهة أثناء الحمل ثم في نهاية معالجة الملفات ، أقوم بعمل معاملات تغيير هذا قيمة إلى 1 والتي يمكن أن تشير إلى تطبيق آخر ينتهي التحميل.

أرغب في تقطيع هذا الملف المسطح إلى قطع متعددة متساوية (نفس عدد الصفوف) التي يمكن معالجتها باستخدام مؤشرات ترابط متعددة واستيرادها باستخدام sqlbulkcopy باستخدام idatareader الذي يتم إنشاؤه من بيانات تعريف جدول الوجهة).

أرغب في استخدام نمط المنتج/المستهلك (كما هو موضح في الرابط أدناه - تحليل PDF وعينة الكود) لاستخدام sqlbulkcopy مع خيار sqlbulkcopyoptions.tablelock.http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspxيمكّن هذا النمط إنشاء منتجين متعددين ويحتاج العدد المكافئ للمستهلكين إلى الاشتراك في المنتجين لاستهلاك الصف.

في مشروع TestSqlBulkCopy ، ملف dataProducer.cs هناك طريقة تحاكي إنتاج الآلاف من السجلات.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

سيتم تنفيذ هذه الطريقة في سياق موضوع جديد. أريد أن يقرأ هذا الموضوع الجديد جزءًا فريدًا من الملف المسطح الأصلي وسيقوم مؤشر ترابط آخر بمعالجة الجزء التالي. يقوم المستهلكون بعد ذلك بنقل البيانات (التي يتم ضخها إليهم) إلى SQL Server DB باستخدام فئة SQLBULKCOPY ADO.NET.

لذا فإن السؤال هنا يدور حول البرنامج الرئيسي الذي يملي ما الذي يجب معالجته من قبل كل مؤشر ترابط ، وأعتقد أنه يجب أن يحدث ذلك أثناء إنشاء الخيط. من المحتمل أن يكون الحل الثاني هو مؤشرات الترابط لمشاركة بعض الهيكل واستخدام شيء فريد لهم (مثل رقم مؤشر الترابط أو رقم التسلسل) للبحث عن بنية مشتركة (ربما مكدس وينبث قيمة (قفل مكدس أثناء القيام بذلك) ثم التقاط القيمة التالية. سيختار البرنامج الرئيسي في الملف المسطح وتحديد حجم القطع وإنشاء المكدس.

لذا ، هل يمكن لشخص ما تقديم بعض قصاصات التعليمات البرمجية ، COD Pseudo حول كيفية معالجة مؤشرات الترابط المتعددة ملفًا واحصل على جزء فريد من هذا الملف فقط؟

شكرا يا راد

هل كانت مفيدة؟

المحلول

ما هو جيد بالنسبة لي هو استخدام قائمة انتظار لعقد عمل غير معالج وقاموس لتتبع العمل على متن الطائرة:

  1. قم بإنشاء فئة عامل يأخذ اسم الملف ، وخط البدء ، وعدد الأسطر وله طريقة تحديث تقوم بإدراج قاعدة البيانات. تمرير طريقة رد الاتصال الذي يستخدمه العامل للإشارة عند القيام به.
  2. قم بتحميل قائمة انتظار مع مثيلات من فئة العمال ، واحدة لكل قطعة.
  3. تفرخ مؤشر ترابط المرسل الذي يزيل مثيل العامل ، ويطلق طريقة التحديث الخاصة به ، ويضيف مثيل العامل إلى قاموس ، مفتاح بواسطة مؤشر ترابطه. قم بذلك حتى يتم الوصول إلى الحد الأقصى لعدد الخيط المسموح به ، كما لاحظ القاموس. ينتظر المرسل حتى ينتهي الخيط ثم يطلق آخر. هناك عدة طرق للانتظار.
  4. مع انتهاء كل مؤشر ترابط ، يزيل رد الاتصال الخاص به من القاموس. إذا ترك مؤشر الترابط بسبب خطأ (مثل مهلة الاتصال) ، فيمكن رد الاتصال على العامل في قائمة الانتظار. هذا مكان جيد لتحديث واجهة المستخدم الخاصة بك.
  5. يمكن أن تعرض واجهة المستخدم المواضيع النشطة ، والتقدم الكلي ، والوقت لكل قطعة. يمكن أن يسمح للمستخدم بضبط عدد مؤشرات الترابط النشطة أو معالجة الإيقاف المؤقت أو إظهار الأخطاء أو التوقف مبكرًا.
  6. عندما تكون قائمة الانتظار والقاموس فارغة ، فأنت تنتهي.

الرمز التجريبي كتطبيق وحدة تحكم:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top