Processing a flat file in chunks using multiple threads using producer/consumer pattern and SqlBulkCopy into SQL Server DB

StackOverflow https://stackoverflow.com/questions/2066503

Question

I hope you will bear with me. I wanted to provide as much information as I can. The main problem is how to create a structure (like a stack) that will be used by multiple threads that will pop a value and use it to process one big flat file and possibly do cycling again and again until the whole file is processed. If a file has 100.000 records that can be processed by 5 threads using 2.000 row chunks then each thread will get 10 chunks to process.

My goal is to move data in a flat file (with Header...Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Footer structure) into OLTP DB that has recovery mode to Simple (possible Full) into 3 tables: 1st representing Subheader's unique key present in Subheader row, 2nd an intermediate table SubheaderGroup, representing grouping of detail rows in chunks of 2000 records (needs to have Subheader's Identity PK as its FK and 3rd representing Detail rows with FK pointing to Subheader PK.

I am doing manual transaction management since I can have tens of thousands of Detail rows and I am using a special field that is set to 0 in destination tables during the load and then at the end of file processing I am doing a transactional upate changing this value to 1 which can signal other application that the loading finished.

I want to chop this flat file into multiple equal pieces (same number of rows) that can be processed with multiple threads and imported using SqlBulkCopy using IDataReader that is created from Destination table metadata).

I want to use producer/consumer pattern (as explained in link below - pdf analysis and code sample) to use SqlBulkCopy with SqlBulkCopyOptions.TableLock option. http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx This pattern enables creating multiple producers and the equivalent number of consumers need to subscribe to producers to consume the row.

In TestSqlBulkCopy project, DataProducer.cs file there is a method that simulates production of thousands of records.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

This method will be executed in the context of a new thread. I want this new thread to read only a unique chunk of original flat file and another thread will strart processing the next chunk. Consumers would then move data (that is pumped to them) to SQL Server DB using SqlBulkCopy ADO.NET class.

So the question here is about main program dictating what lineFrom to lineTo should be processed by each thread and I think that should happen during thread creation. Second solution is probably for threads to share some structure and use something unique to them (like thread number or sequence number) to lookup a shared structure (possibly a stack and pop a value (locking a stack while doing it) and then next thread will then pickup the next value. The main program will pick into the flat file and determine the size of chunks and created the stack.

So can somebody provide some code snippets, pseudo cod on how multiple threads would process one file and only get a unique portion of that file?

Thanks, Rad

Was it helpful?

Solution

What's worked well for me is to use a queue to hold unprocessed work and a dictionary to keep track of work in-flight:

  1. Create a worker class that takes the filename, start line, and line count and has an update method that does the database inserts. Pass a callback method that the worker uses to signal when its done.
  2. Load a Queue with instances of the worker class, one for each chunk.
  3. Spawn a dispatcher thread that dequeues a worker instance, launches its update method, and adds the worker instance into a Dictionary, keyed by its thread's ManagedThreadId. Do this until your maximum allowable thread count is reached, as noted by the Dictionary.Count. The dispatcher waits until a thread finishes and then launches another. There's several ways for it to wait.
  4. As each thread finishes, its callback removes its ManagedThreadId from the Dictionary. If the thread quits because of an error (such as connection timeout) then the callback can reinsert the worker into the Queue. This is a good place to update your UI.
  5. Your UI can show active threads, total progress, and time per chunk. It can let the user adjust the number of active threads, pause processing, show errors, or stop early.
  6. When the Queue and Dictionary are empty, you're done.

Demo code as a console app:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top