Обработка плоского файла по частям с использованием нескольких потоков с использованием шаблона производителя / потребителя и SqlBulkCopy в базе данных SQL Server
-
20-09-2019 - |
Вопрос
Я надеюсь, что вы будете терпеливы ко мне.Я хотел предоставить как можно больше информации.Основная проблема заключается в том, как создать структуру (например, стек), которая будет использоваться несколькими потоками, которые будут извлекать значение и использовать его для обработки одного большого плоского файла и, возможно, выполнять циклирование снова и снова, пока не будет обработан весь файл.Если файл содержит 100.000 записей, которые могут быть обработаны 5 потоками с использованием 2.000 фрагментов строк тогда каждый поток получит для обработки 10 фрагментов.
Моя цель - переместить данные в плоский файл (с заголовком...Подзаголовок...Подробно, Подробно, подробно, ...Подробно, подзаголовок, подзаголовок...Подробно, подробно, подробно, ...Подробно, подзаголовок, Подзаголовок...Подробно, Detail, Detail, ...Подробно, структура подзаголовка, нижнего колонтитула) в базу данных OLTP, которая имеет режим восстановления до простого (возможно полного) в 3 таблицы:1-й представляет уникальный ключ подзаголовка, присутствующий в строке подзаголовка, 2-й - промежуточную группу подзаголовков таблицы, представляющую группировку строк сведений порциями по 2000 записей (в качестве FK должен использоваться идентификатор подзаголовка PK), и 3-й - строки сведений, где FK указывает на подзаголовок PK.
Я занимаюсь ручным управлением транзакциями, поскольку у меня могут быть десятки тысяч подробных строк и я использую специальное поле, для которого в таблицах назначения установлено значение 0 во время загрузки, а затем в конце обработки файла я выполняю транзакционное обновление, изменяя это значение на 1, что может сигнализировать другому приложению о завершении загрузки.
Я хочу разбить этот плоский файл на несколько равных частей (одинаковое количество строк), которые могут быть обработаны несколькими потоками и импортированы с помощью SqlBulkCopy с использованием IDataReader, который создается из метаданных целевой таблицы).
Я хочу использовать шаблон производителя / потребителя (как описано по ссылке ниже - анализ PDF и пример кода), чтобы использовать SqlBulkCopy с SqlBulkCopyOptions.Опция блокировки таблицы.http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx Этот шаблон позволяет создать несколько производителей, и эквивалентное количество потребителей должно подписаться на производителей, чтобы использовать строку.
В проекте TestSqlBulkCopy, файле DataProducer.cs, есть метод, который имитирует создание тысяч записей.
public void Produce (DataConsumer consumer, int numberOfRows) {
int bufferSize = 100000;
int numberOfBuffers = numberOfRows / bufferSize;
for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
DataTable buffer = consumer.GetBufferDataTable ();
for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
object[] values = GetRandomRow (consumer);
buffer.Rows.Add (values);
}
consumer.AddBufferDataTable (buffer);
}
}
Этот метод будет выполнен в контексте нового потока.Я хочу, чтобы этот новый поток читал только уникальный фрагмент исходного плоского файла, а другой поток приступал к обработке следующего фрагмента.Затем потребители будут перемещать данные (которые передаются им) в базу данных SQL Server, используя класс SqlBulkCopy ADO.NET.
Итак, вопрос здесь заключается в том, что основная программа диктует, какая строка от lineTo должна обрабатываться каждым потоком, и я думаю, что это должно произойти во время создания потока.Второе решение, вероятно, заключается в том, чтобы потоки совместно использовали некоторую структуру и использовали что-то уникальное для них (например, номер потока или порядковый номер) для поиска общей структуры (возможно, стека) и ввода значения (при этом блокируя стек), а затем следующий поток получит следующее значение.Основная программа перейдет к плоскому файлу, определит размер фрагментов и создаст стек.
Итак, может ли кто-нибудь предоставить несколько фрагментов кода, псевдо cod о том, как несколько потоков будут обрабатывать один файл и получать только уникальную часть этого файла?
Спасибо, Рад
Решение
Что хорошо сработало для меня, так это использование очереди для хранения необработанной работы и словаря для отслеживания работы в полете:
- Создайте рабочий класс, который принимает имя файла, начальную строку и количество строк и имеет метод обновления, который выполняет вставки в базу данных.Передайте метод обратного вызова, который worker использует для подачи сигнала о завершении.
- Загрузите очередь экземплярами класса worker , по одному для каждого фрагмента.
- Создает поток диспетчера, который удаляет из очереди экземпляр worker, запускает его метод update и добавляет экземпляр worker в словарь с ключом ManagedThreadId своего потока.Делайте это до тех пор, пока не будет достигнуто максимально допустимое количество потоков , как указано в Словаре.Считайте.Диспетчер ожидает завершения потока и затем запускает другой.Есть несколько способов заставить это подождать.
- Когда каждый поток завершается, его обратный вызов удаляет его ManagedThreadId из Словаря.Если поток завершается из-за ошибки (такой как время ожидания соединения), то обратный вызов может повторно вставить worker в очередь.Это хорошее место для обновления вашего пользовательского интерфейса.
- Ваш пользовательский интерфейс может показывать активные потоки, общий прогресс и время на каждый фрагмент.Это может позволить пользователю регулировать количество активных потоков, приостанавливать обработку, показывать ошибки или останавливаться досрочно.
- Когда очередь и словарь опустеют, все готово.
Демонстрационный код в виде консольного приложения:
using System;
using System.Collections.Generic;
using System.Threading;
namespace threadtest
{
public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);
class Program
{
static void Main(string[] args)
{
Supervisor supv = new Supervisor();
supv.LoadQueue();
supv.Dispatch();
}
}
public class Supervisor
{
public Queue<Worker> pendingWork = new Queue<Worker>();
public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();
private object pendingLock = new object();
private object activeLock = new object();
private int maxThreads = 200;
public void LoadQueue()
{
for (int i = 0; i < 1000; i++)
{
Worker worker = new Worker();
worker.Callback = new DoneCallbackDelegate(WorkerFinished);
lock (pendingLock)
{
pendingWork.Enqueue(worker);
}
}
}
public void Dispatch()
{
int activeThreadCount;
while (true)
{
lock (activeLock) { activeThreadCount = activeWork.Count; }
while (true)
{
lock (activeLock)
{
if (activeWork.Count == maxThreads) break;
}
lock (pendingWork)
{
if (pendingWork.Count > 0)
{
Worker worker = pendingWork.Dequeue();
Thread thread = new Thread(new ThreadStart(worker.DoWork));
thread.IsBackground = true;
worker.ThreadId = thread.ManagedThreadId;
lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
thread.Start();
}
else
{
break;
}
}
}
Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)
lock (pendingLock)
lock (activeLock)
{
if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
}
}
}
// remove finished threads from activeWork, resubmit if necessary, and update UI
public void WorkerFinished(int idArg, bool successArg, string messageArg)
{
lock (pendingLock)
lock (activeLock)
{
Worker worker = activeWork[idArg];
activeWork.Remove(idArg);
if (!successArg)
{
// check the message or something to see if you should resubmit thread
pendingWork.Enqueue(worker);
}
// update UI
int left = Console.CursorLeft;
int top = Console.CursorTop;
Console.WriteLine(string.Format("pending:{0} active:{1} ", pendingWork.Count, activeWork.Count));
Console.SetCursorPosition(left, top);
}
}
}
public class Worker
{
// this is where you put in your problem-unique stuff
public int ThreadId { get; set; }
DoneCallbackDelegate callback;
public DoneCallbackDelegate Callback { set { callback = value; } }
public void DoWork()
{
try
{
Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
callback(ThreadId, true, null);
}
catch (Exception ex)
{
callback(ThreadId, false, ex.ToString());
}
}
}
}