处理一个平面文件,在大块多使用生产/消费模式和SqlBulkCopy到SQL服务器数据库
-
20-09-2019 - |
题
我希望你会原谅我.我想要提供尽可能多的信息,我可以。主要的问题是如何创建一个结构(如一堆),将用于通过多线程,将出现一个价值,并使用它来处理一个大型平板文件和可能的做骑自行车一次又一次直到整个文件处理。如果一个文件,有100,000个记录可以处理通过5线使用2.000行块 然后每个线程会得到10块来处理。
我的目标是将数据在一个平面文件(带头...副标题...细节,详细说明,详细说明,...细节,子页脚,副标题...细节,详细说明,详细说明,...细节,子页脚, 副标题...细节,详细说明,详细说明,...细节,子页脚,页脚结构)成只读DB已恢复的模式以简单(可能满)到3表:1代表副标题的唯一关键本在副标题排,第2中间表SubheaderGroup代表组的详细行块,2000年记录(需要有副标题的身份PK作为其FK和第3代表详细行FK指向副标题PK。
我做的手册交易管理,因为我可以有数以万计的详细行 和我使用一个特殊的领域,是定向0在目的地表过负荷然后在文件结尾处理我做的事务更新改变这个价值为1,其可信号的其他应用程序,装载完成。
我想砍这个平面文件分成多个平等件(数量相同的行)就可以处理与多线程和进口使用SqlBulkCopy使用通过创建目的地表的元数据)。
我想要使用生产/消费模式(如下面的链接-pdf分析和代码样本)使用SqlBulkCopy与SqlBulkCopyOptions.TableLock的选择。http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx 这一模式能够创造多生产商和同等数量的消费者需订阅的生产者到消费行。
在TestSqlBulkCopy项目,DataProducer.cs文件有一个方法,模拟生产的成千上万的记录。
public void Produce (DataConsumer consumer, int numberOfRows) {
int bufferSize = 100000;
int numberOfBuffers = numberOfRows / bufferSize;
for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
DataTable buffer = consumer.GetBufferDataTable ();
for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
object[] values = GetRandomRow (consumer);
buffer.Rows.Add (values);
}
consumer.AddBufferDataTable (buffer);
}
}
这种方法将要执行的上下文中的一个新的螺纹。我想这个新的螺纹阅读仅是一个独特的大块的原始文件和另外一个线程将启处理的下一区块。消费者会然后将数据(这被泵送到他们)到SQL服务器数据库使用SqlBulkCopy ADO.NET 类。
因此,这里的问题是关于主要程序规定是什么lineFrom到lineTo应该处理的每一个线程和我认为应该发生在线创建。第二个方案是可能的线分享的一些结构和使用的一些独特的东西来它们(像程编号或序列编号)以查阅的共用结构(可能一堆和弹出一个值(锁定一叠,而这样做),然后下一个线程将然后摄取的下一个价值。主要的程序将回到平面文件,并确定大小的区块,并创造了堆。
因此有人可以提供一些代码段、伪鳕鱼如何多线程的会过程中的一个文件,而只得到一个独特的部分,文件?
谢谢, Rad
解决方案
什么工作以及对我来说是使用排队持有未加工的工作和字典,以跟踪工作在飞行:
- 创建工作人员类需要的 文件名,启动线路,线数 并具有更新方法, 不会的数据库插入。通过一个回调的方法, 工作人员使用的信号时,它的完成。
- 负载的队列有实例的工作人员 类,每个区块。
- 产生一个调度螺纹,弹出一个 工作人员实例,推出更新 方法,并增加了工作人员的实例进一字典,为键的其线的低开销方案使用.做这个 直到你的允许的最大线 数达到,正如所指出的 词典。数。调度员 等待直到一个线程完成 然后启动另一个。有几种方法对于它等待。
- 因为每个线程的完成,其回调 除其低开销方案使用从 词典。如果该线退出 因为一个错误(如 连接的超时),那么 回调可以重新插入的工作 入队列中。这是一个好地方 来更新你的用户界面。
- 你的用户界面可以显示出积极的螺纹,总的进度,时间和每块。它能让用户调整数量的活动线,暂停处理,显示错误,或停止早。
- 当队和字典是空的,你完成。
演示码作为一个控制台应用程序:
using System;
using System.Collections.Generic;
using System.Threading;
namespace threadtest
{
public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);
class Program
{
static void Main(string[] args)
{
Supervisor supv = new Supervisor();
supv.LoadQueue();
supv.Dispatch();
}
}
public class Supervisor
{
public Queue<Worker> pendingWork = new Queue<Worker>();
public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();
private object pendingLock = new object();
private object activeLock = new object();
private int maxThreads = 200;
public void LoadQueue()
{
for (int i = 0; i < 1000; i++)
{
Worker worker = new Worker();
worker.Callback = new DoneCallbackDelegate(WorkerFinished);
lock (pendingLock)
{
pendingWork.Enqueue(worker);
}
}
}
public void Dispatch()
{
int activeThreadCount;
while (true)
{
lock (activeLock) { activeThreadCount = activeWork.Count; }
while (true)
{
lock (activeLock)
{
if (activeWork.Count == maxThreads) break;
}
lock (pendingWork)
{
if (pendingWork.Count > 0)
{
Worker worker = pendingWork.Dequeue();
Thread thread = new Thread(new ThreadStart(worker.DoWork));
thread.IsBackground = true;
worker.ThreadId = thread.ManagedThreadId;
lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
thread.Start();
}
else
{
break;
}
}
}
Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)
lock (pendingLock)
lock (activeLock)
{
if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
}
}
}
// remove finished threads from activeWork, resubmit if necessary, and update UI
public void WorkerFinished(int idArg, bool successArg, string messageArg)
{
lock (pendingLock)
lock (activeLock)
{
Worker worker = activeWork[idArg];
activeWork.Remove(idArg);
if (!successArg)
{
// check the message or something to see if you should resubmit thread
pendingWork.Enqueue(worker);
}
// update UI
int left = Console.CursorLeft;
int top = Console.CursorTop;
Console.WriteLine(string.Format("pending:{0} active:{1} ", pendingWork.Count, activeWork.Count));
Console.SetCursorPosition(left, top);
}
}
}
public class Worker
{
// this is where you put in your problem-unique stuff
public int ThreadId { get; set; }
DoneCallbackDelegate callback;
public DoneCallbackDelegate Callback { set { callback = value; } }
public void DoWork()
{
try
{
Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
callback(ThreadId, true, null);
}
catch (Exception ex)
{
callback(ThreadId, false, ex.ToString());
}
}
}
}