我正在编写一个从SQL Server加载到Azure队列的记录。事情是,选择结果中的项目数量可能非常大,因此我想在数据仍在检索数据时启动排队。

我正在尝试利用EF6(异步),所有异步方法和 tpl 用于并行延时。所以我有:

        // This defines queue that Generator will publsh to and 
        // QueueManager wil read from. More info:
        // http://msdn.microsoft.com/en-us/library/hh228601(v=vs.110).aspx
        var queue = new BufferBlock<ProcessQueueItem>();

        // Configure queue listener first
        var result = this.ReceiveAndEnqueue(queue);

        // Start generation process
        var tasks = generator.Generate(batchId);
.

收款人简单:

    private async Task ReceiveAndEnqueue(ISourceBlock<ProcessQueueItem> queue)
    {
        while (await queue.OutputAvailableAsync())
        {
            var processQueueItem = await queue.ReceiveAsync();
            await this.queueManager.Enqueue(processQueueItem);
            this.tasksEnqueued++;
        }
    }
.

生成()签名如下:

public void Generate(Guid someId, ITargetBlock<ProcessQueueItem> target)
.

调用目标上的sendaSync()方法以放置新项目。我现在正在做的是将结果的总数分为“批次”,加载它们,并将其发送异步,直到所有完成:

    public void Generate(Guid batchId, ITargetBlock<ProcessQueueItem> target)
    {
        var accountPromise = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString());
        accountPromise.Wait();
        var accounts = accountPromise.Result;

        // Batch configuration
        var itemCount = accounts.Count();
        var numBatches = (int)Math.Ceiling((double)itemCount / this.batchSize);
        Debug.WriteLine("Found {0} items what will be put in {1} batches of {2}", itemCount, numBatches, this.batchSize); 


        for (int i = 0; i < numBatches; i++)
        {
            var itemsToTake = Math.Min(this.batchSize, itemCount - currentIndex);
            Debug.WriteLine("Running batch - skip {0} and take {1}", currentIndex, itemsToTake);

            // Take a subset of the items and place them onto the queue
            var batch = accounts.Skip(currentIndex).Take(itemsToTake);

            // Generate a list of tasks to enqueue the items
            var taskList = new List<Task>(itemsToTake);
            taskList.AddRange(batch.Select(account => target.SendAsync(account.AsProcessQueueItem(batchId))));

            // Return the control when all tasks have been enqueued
            Task.WaitAll(taskList.ToArray());

            currentIndex = currentIndex + this.batchSize;
        } 
. 然而,这适用于,我的同事们说 - '不能让界面更简单,并让viaentate()并使界面如下:

public Task<IEnumerable<ProcessQueueItem> Generate(Guid someId)
.

很多清洁器,并且没有生成方法的依赖性在TPL库上。我完全同意,我只是吓倒了,如果我这样做,我要打电话给

var result = Generate().Wait().Result;
. 在某个点,在enqueuinig之前所有的物品。这将使我等待直到所有的东西都加载并在内存中。

所以我的问题归结为:我如何在从选择中开始使用EF查询结果?如果您抓住我的漂移,EF会在结果上运行“收益率”。

编辑 我想我犯了一个思考。默认情况下,ef加载项目懒惰。所以我可以返回所有结果作为IQueryable <>,但这并不意味着它们实际上从DB加载。然后我会迭代它们并留下它们。

编辑2 nope,这不起作用,因为我需要在生成()方法中的数据库中转换对象...

有帮助吗?

解决方案

好的,这是我最终的内容:

    public IEnumerable<ProcessQueueItem> Generate(Guid batchId)
    {
        var accounts = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString());

        foreach (var accountStatuse in accounts)
        {
            yield return accountStatuse.AsProcessQueueItem(batchId);
        }
    }
.

存储库只是一些datacontext.stuff.stuff.where(...)返回一个IEnumerable。生成器使用扩展方法将实体转换为域模型(ProcessQueueItem),该域模型(ProcessQueueItem)将立即发送到该方法的呼叫者的产量,这将开始调用QueueManager开始排队。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top