Вопрос

Я пишу то, что загружает записи с SQL Server на очередь Azure. Дело в том, что количество элементов в результате выбора может быть очень большим, поэтому я хотел бы начать вещи в очереди, когда данные все еще получают.

Я пытаюсь использовать EF6 (Async), все асинхронные методы и и TPL для параллельного enqueureing. Так что у меня есть:

        // This defines queue that Generator will publsh to and 
        // QueueManager wil read from. More info:
        // http://msdn.microsoft.com/en-us/library/hh228601(v=vs.110).aspx
        var queue = new BufferBlock<ProcessQueueItem>();

        // Configure queue listener first
        var result = this.ReceiveAndEnqueue(queue);

        // Start generation process
        var tasks = generator.Generate(batchId);
.

Получатель прост:

    private async Task ReceiveAndEnqueue(ISourceBlock<ProcessQueueItem> queue)
    {
        while (await queue.OutputAvailableAsync())
        {
            var processQueueItem = await queue.ReceiveAsync();
            await this.queueManager.Enqueue(processQueueItem);
            this.tasksEnqueued++;
        }
    }
.

Генератор генерирует () Подпись выглядит следующим образом:

public void Generate(Guid someId, ITargetBlock<ProcessQueueItem> target)
.

который вызывает метод SENDASYNC () на цели, чтобы разместить новые предметы. То, что я делаю сейчас, разделяет общее количество результатов в «партии», загружая их и отправляя им ASYNC, пока все сделано:

    public void Generate(Guid batchId, ITargetBlock<ProcessQueueItem> target)
    {
        var accountPromise = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString());
        accountPromise.Wait();
        var accounts = accountPromise.Result;

        // Batch configuration
        var itemCount = accounts.Count();
        var numBatches = (int)Math.Ceiling((double)itemCount / this.batchSize);
        Debug.WriteLine("Found {0} items what will be put in {1} batches of {2}", itemCount, numBatches, this.batchSize); 


        for (int i = 0; i < numBatches; i++)
        {
            var itemsToTake = Math.Min(this.batchSize, itemCount - currentIndex);
            Debug.WriteLine("Running batch - skip {0} and take {1}", currentIndex, itemsToTake);

            // Take a subset of the items and place them onto the queue
            var batch = accounts.Skip(currentIndex).Take(itemsToTake);

            // Generate a list of tasks to enqueue the items
            var taskList = new List<Task>(itemsToTake);
            taskList.AddRange(batch.Select(account => target.SendAsync(account.AsProcessQueueItem(batchId))));

            // Return the control when all tasks have been enqueued
            Task.WaitAll(taskList.ToArray());

            currentIndex = currentIndex + this.batchSize;
        } 
.

Это работает, однако, мой коллега заметил - «Разве мы не можем сделать интерфейс проще, и пусть генерируют () и сделайте интерфейс таким образом:

public Task<IEnumerable<ProcessQueueItem> Generate(Guid someId)
.

Очиститель намного, и нет зависимости от генерации метода на библиотеку TPL. Я полностью согласен, я просто добиваюсь, что если я сделаю это, мне придется позвонить

var result = Generate().Wait().Result;
.

В какой-то момент перед Enqueuinig все элементы. Это заставит меня ждать, пока все вещи загружены и в памяти.

Так, на что снижается мой вопрос: как я могу начать использовать результаты запроса EF, как только они «капелься» из выбора? Как будто EF будет запустить «доходность» по результатам, если вы поймаете мой дрейф.

Редактировать Я думаю, что я сделал ошибку думать. EF загружает элементы ленивых по умолчанию. Таким образом, я могу просто вернуть все результаты как iQueryable <>, но это не значит, что они на самом деле загружены из БД. Затем я повторяю их и снимаю их.

Редактировать 2 Нет, это не работает, поскольку мне нужно преобразовать объект из базы данных в методе Generate () ...

Это было полезно?

Решение

Хорошо, это то, что я оказался:

    public IEnumerable<ProcessQueueItem> Generate(Guid batchId)
    {
        var accounts = this.AccountStatusRepository.GetAccountsByBatchId(batchId.ToString());

        foreach (var accountStatuse in accounts)
        {
            yield return accountStatuse.AsProcessQueueItem(batchId);
        }
    }
.

Репозиторий возвращает iEnumerable из некоторого datacontext.stuff.ge (...).Генератор использует метод расширения для преобразования объекта в модель домена (ProcessQueueItem), которую с помощью дохода немедленно отправляется в звонящий метода, который начнет вызов Queuemanager начать очередь.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top