Question

I'm writing something that loads records from SQL server onto an azure queue. The thing is, the number of items in the select result might be very large, so I would like to start the queuing stuff while data is still being retrieved.

I'm trying to leverage EF6 (async), all async methods and and TPL for parallel enqueueing. So I have:

        // This defines queue that Generator will publsh to and 
        // QueueManager wil read from. More info:
        // http://msdn.microsoft.com/en-us/library/hh228601(v=vs.110).aspx
        var queue = new BufferBlock<ProcessQueueItem>();

        // Configure queue listener first
        var result = this.ReceiveAndEnqueue(queue);

        // Start generation process
        var tasks = generator.Generate(batchId);

The ReceiveAndEnqueue is simple:

    private async Task ReceiveAndEnqueue(ISourceBlock<ProcessQueueItem> queue)
    {
        while (await queue.OutputAvailableAsync())
        {
            var processQueueItem = await queue.ReceiveAsync();
            await this.queueManager.Enqueue(processQueueItem);
            this.tasksEnqueued++;
        }
    }

The generator generate() signature is as follows:

public void Generate(Guid someId, ITargetBlock<ProcessQueueItem> target)

Which calls the SendAsync() method on the target to place new items. What I'm doing right now is dividing the total number of results into 'batches', loading them in, and sending them async, untill all is done:

    public void Generate(Guid batchId, ITargetBlock<ProcessQueueItem> target)
    {
        var accountPromise = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString());
        accountPromise.Wait();
        var accounts = accountPromise.Result;

        // Batch configuration
        var itemCount = accounts.Count();
        var numBatches = (int)Math.Ceiling((double)itemCount / this.batchSize);
        Debug.WriteLine("Found {0} items what will be put in {1} batches of {2}", itemCount, numBatches, this.batchSize); 


        for (int i = 0; i < numBatches; i++)
        {
            var itemsToTake = Math.Min(this.batchSize, itemCount - currentIndex);
            Debug.WriteLine("Running batch - skip {0} and take {1}", currentIndex, itemsToTake);

            // Take a subset of the items and place them onto the queue
            var batch = accounts.Skip(currentIndex).Take(itemsToTake);

            // Generate a list of tasks to enqueue the items
            var taskList = new List<Task>(itemsToTake);
            taskList.AddRange(batch.Select(account => target.SendAsync(account.AsProcessQueueItem(batchId))));

            // Return the control when all tasks have been enqueued
            Task.WaitAll(taskList.ToArray());

            currentIndex = currentIndex + this.batchSize;
        } 

This works however, my colleague remarked - 'can't we make the interface simpler, and let Generate() and make the interface like so:

public Task<IEnumerable<ProcessQueueItem> Generate(Guid someId)

A lot cleaner, and no dependency of the Generate method onto the TPL library. I totally agree, I'm just affraid that if I do that, I'm going to have to call

var result = Generate().Wait().Result;

at some point, before enqueuinig all the items. That will make me wait untill ALL the stuff is loaded in and is in memory.

So what my question comes down is: how can I start using EF query results as soon as they 'drip in' from a select? As if EF would run a 'yield' over the results if you catch my drift.

EDIT I think I made a thinking mistake. EF loads items lazy by default. So I can just return all the results as IQueryable<> but that doesn't mean they're actually loaded from the DB. I'll then iterate over them and enqueue them.

EDIT 2 Nope, that doesn't work, since I need to transform the object from the database in the Generate() method...

Was it helpful?

Solution

OK, this is what I ended up with:

    public IEnumerable<ProcessQueueItem> Generate(Guid batchId)
    {
        var accounts = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString());

        foreach (var accountStatuse in accounts)
        {
            yield return accountStatuse.AsProcessQueueItem(batchId);
        }
    }

The repository returns an IEnumerable of just some DataContext.Stuff.Where(...). The generator uses the extension method to transform the entity to the domain model (ProcessQueueItem) which by the means of yield is immediately sent to the caller of the method, that will start calling the QueueManager to start queueing.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top