プロデューサ/コンシューマ パターンと SQL Server DB への SqlBulkCopy を使用した複数のスレッドを使用したチャンク内のフラット ファイルの処理
-
20-09-2019 - |
質問
ご了承いただければ幸いです。できる限り多くの情報を提供したいと思いました。主な問題は、値をポップし、それを使用して 1 つの大きなフラット ファイルを処理し、場合によってはファイル全体が処理されるまで繰り返し循環する複数のスレッドで使用される構造 (スタックのような) を作成する方法です。ファイルには、2.000行のチャンクを使用して5つのスレッドで処理できる100.000のレコードがある場合、各スレッドは10個のチャンクを処理します。
私の目標は、データをフラットファイルに移動することです(ヘッダー...サブヘッダー...詳細、詳細、詳細、...サブフッター、サブヘッダー...詳細、ディテール、...ディテール、サブフッター、 Subheader ...ディテール、ディテール、ディテール、...ディテール、サブフッター、フッター構造、フッター構造)は、3つのテーブルにリカバリモードを単純に(可能性のあるフル)に備えています。1 番目はサブヘッダー行に存在するサブヘッダーの一意のキーを表し、2 番目は中間テーブル SubheaderGroup で、2000 レコードのチャンク内の詳細行のグループ化を表します (サブヘッダーの ID PK を FK として持つ必要があります。3 番目はサブヘッダー PK を指す FK を持つ詳細行を表します)。
私は数万の詳細行を持つことができるので手動トランザクション管理を行っています。ロード中に宛先テーブルで0に設定され、ファイル処理の最後に、これを変更するトランザクションアップターを実行しています荷重が終了したことを他のアプリケーションに信号することができる1の値。
このフラット ファイルを複数の等しい部分 (同じ行数) に分割し、複数のスレッドで処理し、宛先テーブルのメタデータから作成された IDataReader を使用して SqlBulkCopy を使用してインポートできるようにしたいと考えています。
SqlBulkCopyOptions.TableLock オプションで SqlBulkCopy を使用するために、プロデューサー/コンシューマー パターン (以下のリンク - PDF 分析とコード サンプルで説明されている) を使用したいと考えています。http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspxこのパターンでは、複数のプロデューサーを作成でき、行を消費するには同数のコンシューマーがプロデューサーにサブスクライブする必要があります。
TestSqlBulkCopy プロジェクトの DataProducer.cs ファイルには、数千のレコードの生成をシミュレートするメソッドがあります。
public void Produce (DataConsumer consumer, int numberOfRows) {
int bufferSize = 100000;
int numberOfBuffers = numberOfRows / bufferSize;
for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
DataTable buffer = consumer.GetBufferDataTable ();
for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
object[] values = GetRandomRow (consumer);
buffer.Rows.Add (values);
}
consumer.AddBufferDataTable (buffer);
}
}
このメソッドは、新しいスレッドのコンテキストで実行されます。この新しいスレッドが元のフラット ファイルの一意のチャンクのみを読み取り、別のスレッドが次のチャンクの処理を開始するようにしたいと考えています。次に、コンシューマーは、SqlBulkCopy ADO.NET クラスを使用して、(ポンプで送られる) データを SQL Server DB に移動します。
したがって、ここでの質問は、各スレッドでどの lineFrom から lineTo までを処理するかを決定するメインプログラムについてであり、それはスレッドの作成中に発生するはずだと思います。おそらく 2 番目の解決策は、スレッドが何らかの構造を共有し、スレッドに固有のもの (スレッド番号やシーケンス番号など) を使用して共有構造 (おそらくスタック) を検索し、値をポップし (実行中にスタックをロックする)、次のスレッドが次に、次の値を取得します。メイン プログラムはフラット ファイルを選択し、チャンクのサイズを決定し、スタックを作成します。
それでは、複数のスレッドが 1 つのファイルを処理し、そのファイルの一意の部分のみを取得する方法についてのコード スニペット、つまりコードを誰かが提供してもらえないでしょうか。
ありがとう、ラッド
解決
私にとってうまくいったのは、キューを使用して未処理の作業を保持し、辞書を使用して処理中の作業を追跡することです。
- ファイル名、スタートライン、ラインカウントを取得し、データベース挿入を実行する更新方法を備えたワーカークラスを作成します。ワーカーが完了したときに信号を送信するために使用するコールバック方法に合格します。
- ワーカークラスのインスタンスを使用してキューをロードします。
- ワーカーインスタンスをDequeuesし、更新方法を起動し、スレッドのManagedThreadIDでキーを付けた辞書にワーカーインスタンスを追加するディスパッチャースレッドをスポーンします。dictionary.countに記載されているように、最大許容スレッド数に達するまでこれを行います。ディスパッチャーは、スレッドが終了するまで待機し、別のスレッドを起動します。待つ方法はいくつかあります。
- 各スレッドが終了すると、そのコールバックは辞書からmanagedthreadidを削除します。エラー(接続タイムアウトなど)のためにスレッドが終了すると、コールバックはワーカーをキューに再挿入できます。これはUIを更新するのに適した場所です。
- UI には、アクティブなスレッド、合計進行状況、およびチャンクごとの時間を表示できます。これにより、ユーザーはアクティブなスレッドの数を調整したり、処理を一時停止したり、エラーを表示したり、早期に停止したりすることができます。
- キューとディクショナリが空になったら完了です。
コンソール アプリとしてのデモ コード:
using System;
using System.Collections.Generic;
using System.Threading;
namespace threadtest
{
public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);
class Program
{
static void Main(string[] args)
{
Supervisor supv = new Supervisor();
supv.LoadQueue();
supv.Dispatch();
}
}
public class Supervisor
{
public Queue<Worker> pendingWork = new Queue<Worker>();
public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();
private object pendingLock = new object();
private object activeLock = new object();
private int maxThreads = 200;
public void LoadQueue()
{
for (int i = 0; i < 1000; i++)
{
Worker worker = new Worker();
worker.Callback = new DoneCallbackDelegate(WorkerFinished);
lock (pendingLock)
{
pendingWork.Enqueue(worker);
}
}
}
public void Dispatch()
{
int activeThreadCount;
while (true)
{
lock (activeLock) { activeThreadCount = activeWork.Count; }
while (true)
{
lock (activeLock)
{
if (activeWork.Count == maxThreads) break;
}
lock (pendingWork)
{
if (pendingWork.Count > 0)
{
Worker worker = pendingWork.Dequeue();
Thread thread = new Thread(new ThreadStart(worker.DoWork));
thread.IsBackground = true;
worker.ThreadId = thread.ManagedThreadId;
lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
thread.Start();
}
else
{
break;
}
}
}
Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)
lock (pendingLock)
lock (activeLock)
{
if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
}
}
}
// remove finished threads from activeWork, resubmit if necessary, and update UI
public void WorkerFinished(int idArg, bool successArg, string messageArg)
{
lock (pendingLock)
lock (activeLock)
{
Worker worker = activeWork[idArg];
activeWork.Remove(idArg);
if (!successArg)
{
// check the message or something to see if you should resubmit thread
pendingWork.Enqueue(worker);
}
// update UI
int left = Console.CursorLeft;
int top = Console.CursorTop;
Console.WriteLine(string.Format("pending:{0} active:{1} ", pendingWork.Count, activeWork.Count));
Console.SetCursorPosition(left, top);
}
}
}
public class Worker
{
// this is where you put in your problem-unique stuff
public int ThreadId { get; set; }
DoneCallbackDelegate callback;
public DoneCallbackDelegate Callback { set { callback = value; } }
public void DoWork()
{
try
{
Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
callback(ThreadId, true, null);
}
catch (Exception ex)
{
callback(ThreadId, false, ex.ToString());
}
}
}
}