Async 'Paged' EFのプロセッシングの実行方法
-
21-12-2019 - |
質問
SQL ServerからAzureキューにレコードをロードするものを書いています。そのことは、選択結果の項目の数が非常に大きくなる可能性があるため、データがまだ取得されている間にキューイングスタッフを起動します。
EF6(ASYNC)、すべての非同期メソッドとhthyf="http://msdn.microsoft.com/en-us/library/dd460717%28v=vs.110%29を活用しようとしています。 ASPX "REL=" NOFOLLOW "> TPL パラレルエンキューの場合。だから私は持っています:
// This defines queue that Generator will publsh to and
// QueueManager wil read from. More info:
// http://msdn.microsoft.com/en-us/library/hh228601(v=vs.110).aspx
var queue = new BufferBlock<ProcessQueueItem>();
// Configure queue listener first
var result = this.ReceiveAndEnqueue(queue);
// Start generation process
var tasks = generator.Generate(batchId);
.
受信者デデークの単純なもの:
private async Task ReceiveAndEnqueue(ISourceBlock<ProcessQueueItem> queue)
{
while (await queue.OutputAvailableAsync())
{
var processQueueItem = await queue.ReceiveAsync();
await this.queueManager.Enqueue(processQueueItem);
this.tasksEnqueued++;
}
}
.
ジェネレータgenerate()署名は次のとおりです。
public void Generate(Guid someId, ITargetBlock<ProcessQueueItem> target)
.
ターゲット上のsendAsync()メソッドを呼び出して新しい項目を配置します。私が今やっているのは、結果の総数を「バッチ」に分けて、それらをロードして非同期を送信しています。
public void Generate(Guid batchId, ITargetBlock<ProcessQueueItem> target)
{
var accountPromise = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString());
accountPromise.Wait();
var accounts = accountPromise.Result;
// Batch configuration
var itemCount = accounts.Count();
var numBatches = (int)Math.Ceiling((double)itemCount / this.batchSize);
Debug.WriteLine("Found {0} items what will be put in {1} batches of {2}", itemCount, numBatches, this.batchSize);
for (int i = 0; i < numBatches; i++)
{
var itemsToTake = Math.Min(this.batchSize, itemCount - currentIndex);
Debug.WriteLine("Running batch - skip {0} and take {1}", currentIndex, itemsToTake);
// Take a subset of the items and place them onto the queue
var batch = accounts.Skip(currentIndex).Take(itemsToTake);
// Generate a list of tasks to enqueue the items
var taskList = new List<Task>(itemsToTake);
taskList.AddRange(batch.Select(account => target.SendAsync(account.AsProcessQueueItem(batchId))));
// Return the control when all tasks have been enqueued
Task.WaitAll(taskList.ToArray());
currentIndex = currentIndex + this.batchSize;
}
.
これは私の同僚に述べた - 'インタフェースをより簡単にすることができず、generation()を作成してそのようなインターフェイスを作成させることができません。
public Task<IEnumerable<ProcessQueueItem> Generate(Guid someId)
.
ロットクリーナー、そしてGenerationメソッドのTPLライブラリへの依存関係はありません。私は完全に同意します、私はそれをするならば、私はそれをするならば、私は呼び出す必要があることを私は
var result = Generate().Wait().Result;
.
ある時点で、すべての項目をenqueuinigする前に。それは私がすべてのものがロードされてメモリ内にあるのを待つようになるでしょう。
だから私の質問が降りるのは:選択から「DRIP」を「ドリップ」するとすぐにEFクエリ結果の使用を開始する方法は?あたかも私のドリフトをキャッチした場合、EFが結果の上に「歩留まり」を実行するだろうかのように。
編集 私は思考の間違いをしたと思います。 EFはデフォルトで遅延項目をロードします。だから私はすべての結果をiQueryable <>として返すことができますが、それは実際にDBからロードされているという意味ではありません。それから私は彼らの上に反復してそれらをエンキューします。
編集2 generation()メソッドのデータベースからオブジェクトを変換する必要があるので、うまくいかないNopeは...
解決
OK、これは私が終わったものです:
public IEnumerable<ProcessQueueItem> Generate(Guid batchId)
{
var accounts = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString());
foreach (var accountStatuse in accounts)
{
yield return accountStatuse.AsProcessQueueItem(batchId);
}
}
.
リポジトリは、いくつかのDataContext.Stuff.whereのIENUMERABLEを返します(...)。ジェネレータは拡張方法を使用してエンティティを使用して、エンティティを歩留まりの手段によって直ちにメソッドの呼び出し側に送信されます。これにより、キューイングを開始するためにQueueManagerが呼び出します。