Question

Imagine you have to process each row in a large table.
For every single row you have to download some data from a web service and store it in a different database.

InfrastructeOverview

Loading all rows at once into the ConsoleApp's memory consumes too much resources and takes too long.

It's important to mention that the table will no longer modified, only Select statements are possible and the data can be queried ordered by the primary key which is an integer value.

A possible solution is to load one row after another.
For doing so you need two things.
Firstly, the table must be queryable in an ordered manner (unique index).
Secondly, you need to know the last loaded row to get the next one.

A simple example using IAsyncEnumerable could look like the following.

  async IAsyncEnumerable<Data> GetRowsAsync()
  {
    // primary key starts at 1
    var lastId = 0;
    while(true)
    {
      Data next = await LoadNextRowAsync(lastId); // where id is greater than lastId

      if(next is null)
      {
        break;
      }

      yield return next;

      lastId = next.Id
    }
  }

But making a network call for each row creates an overhead that makes the app feel like it never ends.

The first way that comes in mind to avoid that overhead is to load n rows rather than 1 row.

  async IAsyncEnumerable<List<Data>> GetRowsAsync()
  {
    var lastId = 0;
    while(true)
    {
      List<Data> nextRows = await LoadNextRowsAsync(lastId: lastId, nElements: 500);

      if(!nextRows.Any())
      {
        break;
      }

      yield return nextRows;

      lastId = nextRows.Last().Id
    }
  }

Great, this makes the app much faster.
But it still takes a while till all rows have been processed.

After some time the app crashed due to a bug and the last id got lost.
So I have to start from scratch again after fixing the bug and I can't garantuee that there are no more bugs.
This leads to a new requirement.

I want to be able to stop processing rows and continue later without starting from the beginning.

The easiest way to achieve this is by persisting the lastId, e.g. in a file located in the output directory of the app.

  async IAsyncEnumerable<List<Data>> GetRowsAsync()
  {
    while(true)
    {
      // load last id from file in out directory
      var lastId = await GetLastIdOrDefaultAsync();

      List<Data> nextRows = await LoadNextRowsAsync(lastId: lastId, nElements: 500);

      if(!nextRows.Any())
      {
        break;
      }

      yield return nextRows;

      // save last id in file
      await UpdateLastIdAsync(nextRows.Last().Id);
    }
  }

This works fine and fulfils all requirements.
Now to my actual question.

Do I misuse IAsyncEnumerable in my final solution?

In almost all examples I've seen IAsyncEnumerable is used in much simpler ways and I wonder if it is bad practice to do that additional IO for storing ids inside the generator method.

Was it helpful?

Solution

I think this is less about IAsyncEnumerable and more about good OOP design. Mainly the rule of least surprise and high cohesion.

I would never expect method named GetRowsAsync to do anything like saving to, or reading from file. I would also not consider reading from database and persisting already read data to be cohesive, eg. It shouldn't be together.

So yes, you have made a smelly design. But it it being IAsyncEnumerable has little to do with it.

OTHER TIPS

I want to be able to stop processing rows and continue later without starting from the beginning.

Given this requirement, I would step away from the GetRowsAsync interface you've currently presented. Since you're pretending (to outside consumers) that this collection is a single collection, your requirements would make the collection behave erratically.

IEnumerable<Foo> myList = GetList();

foreach(var item in myList.Take(5))
{
    Console.Write(item.Name);
}

foreach(var item in myList.Take(5))
{
    Console.Write(item.Name);
}

We're iterating over the same collection twice, and generating the same output. The output should therefore be repeated, i.e.:

A B C D E A B C D E

But with your proposed solution, which only swaps GetList() for await GetRowsAsync(), the first time we'd be printing items 1-5, and the next time we'd be printing items 6-10.

A B C D E F G H I J

This is counterintuitive to how collection operate, and therefore it's not a good implementation.


Nothing you're doing is by itself an issue, but you need to decouple it a bit more.

  • "Loading n rows" is essentially just pagination - albeit in this case you want it to be slightly more refined (which I will get into later).
  • Storing progress is perfectly fine for the reasons mentioned (resuming an interrupted process)
  • The ability to request the next paginated set based on the last known processed value is also fine. The opposite would be forcing pagination to always load from page 1, which makes no sense as an arbitrary rule. It specifically blocks the ability to not load unnecessary data.

Lumping all of this together in a single method/class, is not the greatest idea as it creates a counterintuitive collection behavior (as addressed above). But this is easily resolved by applying a bit more abstraction.

Instead of implicitly tracking the progress and resuming quietly behind the scenes, make it obvious to the consumer that this is what is happening.

There are several ways of doing this. I like to keep things simple, both for the sake of my development and the interface that my consumers have to learn; so I tend to favor a "ask one at a time" approach.

By returning each entry individually, even if you are loading them in chunks behind the scenes, your interface is clearly communicating that a subsequent call to the same method will yield a different value, not the same one repeated.

async Task<Data> GetNextDataEntry()
{
    // Initialization logic
    if(myList == null)
        await LoadFirstPage();

    // Fetch next page if needed
    if(currentIndex >= myList.Length && myList.Length > 0)
        await LoadNextPage();

    // Return null to indicate end of the list
    if(myList.Length == 0)
        return null;

    return myList[currentIndex++];
}

async Task Reset()
{
    await LoadFirstPage();
}

private List<Data> myList = LoadNextRowsAsync(lastId: null, nElements: 500);
private int currentIndex;

private Task LoadFirstPage()
{    
    myList = await LoadNextRowsAsync(lastId: null, nElements: 500);
    currentIndex = 0;
}

private Task LoadNextPage()
{    
    myList = await LoadNextRowsAsync(lastId: myList.Last().Id, nElements: 500);
    currentIndex = 0;
}

Note that returning null is an oversimplified example here. There are better implementations, such as exposing a separate bool IsNextValueAvailable() and Data GetNextValue(), but I wanted to keep the example tidy to explain the main algorithm.

This gives your consumer the freedom to process the list as they see fit. They could ask for items one at a time:

var firstItem =  await myCollection.GetNextDataEntry();
var secondItem = await myCollection.GetNextDataEntry();
var thirdItem =  await myCollection.GetNextDataEntry();

Or they could just iterate over the entire thing:

var item = await myCollection.GetNextDataEntry();

while(item != null)
{
    //process item

    item = await myCollection.GetNextDataEntry();
}

If they break out of the while for any reason, they can just ask for the next item again by calling GetNextDataEntry.

This is in principle the same as what you wanted, but now it is made clear to the consumer that they will be resuming the enumeration instead of starting from the beginning because the method name in question specifies "get next data".

I would recommend decoupling, saving and reading the lastId, from the IAsyncEnumerable; instead, let the caller manage the lastId as they seem fit.

If starting from the beginning, the caller passes a null for lastId.
On resumption from an error, the caller provides the appropriate lastId.

async IAsyncEnumerable<List<Data>> GetRowsAsync(Id? lastId)

This decoupling allows

  • separation of concerns
  • independence of how/where the lastId is stored
  • the ability of multiple lastIds to be stored by different callers
Licensed under: CC-BY-SA with attribution
scroll top