Обработка плоского файла по частям с использованием нескольких потоков с использованием шаблона производителя / потребителя и SqlBulkCopy в базе данных SQL Server

StackOverflow https://stackoverflow.com/questions/2066503

Вопрос

Я надеюсь, что вы будете терпеливы ко мне.Я хотел предоставить как можно больше информации.Основная проблема заключается в том, как создать структуру (например, стек), которая будет использоваться несколькими потоками, которые будут извлекать значение и использовать его для обработки одного большого плоского файла и, возможно, выполнять циклирование снова и снова, пока не будет обработан весь файл.Если файл содержит 100.000 записей, которые могут быть обработаны 5 потоками с использованием 2.000 фрагментов строк тогда каждый поток получит для обработки 10 фрагментов.

Моя цель - переместить данные в плоский файл (с заголовком...Подзаголовок...Подробно, Подробно, подробно, ...Подробно, подзаголовок, подзаголовок...Подробно, подробно, подробно, ...Подробно, подзаголовок, Подзаголовок...Подробно, Detail, Detail, ...Подробно, структура подзаголовка, нижнего колонтитула) в базу данных OLTP, которая имеет режим восстановления до простого (возможно полного) в 3 таблицы:1-й представляет уникальный ключ подзаголовка, присутствующий в строке подзаголовка, 2-й - промежуточную группу подзаголовков таблицы, представляющую группировку строк сведений порциями по 2000 записей (в качестве FK должен использоваться идентификатор подзаголовка PK), и 3-й - строки сведений, где FK указывает на подзаголовок PK.

Я занимаюсь ручным управлением транзакциями, поскольку у меня могут быть десятки тысяч подробных строк и я использую специальное поле, для которого в таблицах назначения установлено значение 0 во время загрузки, а затем в конце обработки файла я выполняю транзакционное обновление, изменяя это значение на 1, что может сигнализировать другому приложению о завершении загрузки.

Я хочу разбить этот плоский файл на несколько равных частей (одинаковое количество строк), которые могут быть обработаны несколькими потоками и импортированы с помощью SqlBulkCopy с использованием IDataReader, который создается из метаданных целевой таблицы).

Я хочу использовать шаблон производителя / потребителя (как описано по ссылке ниже - анализ PDF и пример кода), чтобы использовать SqlBulkCopy с SqlBulkCopyOptions.Опция блокировки таблицы.http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Этот шаблон позволяет создать несколько производителей, и эквивалентное количество потребителей должно подписаться на производителей, чтобы использовать строку.

В проекте TestSqlBulkCopy, файле DataProducer.cs, есть метод, который имитирует создание тысяч записей.

public void Produce (DataConsumer consumer, int numberOfRows) {
    int bufferSize = 100000;
    int numberOfBuffers = numberOfRows / bufferSize;

    for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
        DataTable buffer = consumer.GetBufferDataTable ();

        for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
            object[] values = GetRandomRow (consumer);
            buffer.Rows.Add (values);
        }
        consumer.AddBufferDataTable (buffer);
    }
}

Этот метод будет выполнен в контексте нового потока.Я хочу, чтобы этот новый поток читал только уникальный фрагмент исходного плоского файла, а другой поток приступал к обработке следующего фрагмента.Затем потребители будут перемещать данные (которые передаются им) в базу данных SQL Server, используя класс SqlBulkCopy ADO.NET.

Итак, вопрос здесь заключается в том, что основная программа диктует, какая строка от lineTo должна обрабатываться каждым потоком, и я думаю, что это должно произойти во время создания потока.Второе решение, вероятно, заключается в том, чтобы потоки совместно использовали некоторую структуру и использовали что-то уникальное для них (например, номер потока или порядковый номер) для поиска общей структуры (возможно, стека) и ввода значения (при этом блокируя стек), а затем следующий поток получит следующее значение.Основная программа перейдет к плоскому файлу, определит размер фрагментов и создаст стек.

Итак, может ли кто-нибудь предоставить несколько фрагментов кода, псевдо cod о том, как несколько потоков будут обрабатывать один файл и получать только уникальную часть этого файла?

Спасибо, Рад

Это было полезно?

Решение

Что хорошо сработало для меня, так это использование очереди для хранения необработанной работы и словаря для отслеживания работы в полете:

  1. Создайте рабочий класс, который принимает имя файла, начальную строку и количество строк и имеет метод обновления, который выполняет вставки в базу данных.Передайте метод обратного вызова, который worker использует для подачи сигнала о завершении.
  2. Загрузите очередь экземплярами класса worker , по одному для каждого фрагмента.
  3. Создает поток диспетчера, который удаляет из очереди экземпляр worker, запускает его метод update и добавляет экземпляр worker в словарь с ключом ManagedThreadId своего потока.Делайте это до тех пор, пока не будет достигнуто максимально допустимое количество потоков , как указано в Словаре.Считайте.Диспетчер ожидает завершения потока и затем запускает другой.Есть несколько способов заставить это подождать.
  4. Когда каждый поток завершается, его обратный вызов удаляет его ManagedThreadId из Словаря.Если поток завершается из-за ошибки (такой как время ожидания соединения), то обратный вызов может повторно вставить worker в очередь.Это хорошее место для обновления вашего пользовательского интерфейса.
  5. Ваш пользовательский интерфейс может показывать активные потоки, общий прогресс и время на каждый фрагмент.Это может позволить пользователю регулировать количество активных потоков, приостанавливать обработку, показывать ошибки или останавливаться досрочно.
  6. Когда очередь и словарь опустеют, все готово.

Демонстрационный код в виде консольного приложения:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
    public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

    class Program
    {
        static void Main(string[] args)
        {
            Supervisor supv = new Supervisor();
            supv.LoadQueue();
            supv.Dispatch();
        }
    }

    public class Supervisor
    {
        public Queue<Worker> pendingWork = new Queue<Worker>();
        public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

        private object pendingLock = new object();
        private object activeLock = new object();

        private int maxThreads = 200;

        public void LoadQueue()
        {
            for (int i = 0; i < 1000; i++)
            {
                Worker worker = new Worker();
                worker.Callback = new DoneCallbackDelegate(WorkerFinished);
                lock (pendingLock)
                {
                    pendingWork.Enqueue(worker);
                }
            }
        }

        public void Dispatch()
        {
            int activeThreadCount;

            while (true)
            {
                lock (activeLock) { activeThreadCount = activeWork.Count; }
                while (true)
                {
                    lock (activeLock)
                    {
                        if (activeWork.Count == maxThreads) break;
                    }
                    lock (pendingWork)
                    {
                        if (pendingWork.Count > 0)
                        {
                            Worker worker = pendingWork.Dequeue();
                            Thread thread = new Thread(new ThreadStart(worker.DoWork));
                            thread.IsBackground = true;
                            worker.ThreadId = thread.ManagedThreadId;
                            lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
                            thread.Start();
                        }
                        else
                        {
                            break;
                        }
                    }
                }
                Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

                lock (pendingLock)
                    lock (activeLock)
                    {
                        if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
                    }
            }
        }

        // remove finished threads from activeWork, resubmit if necessary, and update UI
        public void WorkerFinished(int idArg, bool successArg, string messageArg)
        {
            lock (pendingLock)
                lock (activeLock)
                {
                    Worker worker = activeWork[idArg];
                    activeWork.Remove(idArg);
                    if (!successArg)
                    {
                        // check the message or something to see if you should resubmit thread
                        pendingWork.Enqueue(worker);
                    }
                    // update UI
                    int left = Console.CursorLeft;
                    int top = Console.CursorTop;
                    Console.WriteLine(string.Format("pending:{0} active:{1}        ", pendingWork.Count, activeWork.Count));
                    Console.SetCursorPosition(left, top);
                }
        }
    }

    public class Worker
    {
        // this is where you put in your problem-unique stuff
        public int ThreadId { get; set; }

        DoneCallbackDelegate callback;
        public DoneCallbackDelegate Callback { set { callback = value; } }

        public void DoWork()
        {
            try
            {
                Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
                callback(ThreadId, true, null);
            }
            catch (Exception ex)
            {
                callback(ThreadId, false, ex.ToString());
            }
        }
    }
}
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top