Question

I'm developing a system that will build a huge n-gram model for an AI project.
My pipeline is the following:
Resource input --> Fetch data --> Parsers --> Trainer
The resource input (basically URLs that have to be parsed) is not constant, meaning I can introduce a bulk of thousands of resources at a time, later on another bulk of dozens and so on.

My idea is to implement each step of the pipeline as a Celery task and deploy it on the cloud (for instance, using Heroku's worker dynos). But I'm new to Celery and I'm having doubts about how to queue these tasks to get my worker(s) working at 100% and mantaining the system's integrity at the same time.
The straight forward approach is to start queueing tasks as soon as the previous one is finished, so for example if I get a resource input of 1000 items, I'd schedule 1000 "fetch data" tasks and each one of them would queue a "parse" task when finished and so on. But this will lead to a huge queue because more resources will come in before these tasks are finished, and I know it will take months to build the model (if it ever completes!).

So I'm not sure if Celery can handle all of that without falling into memory issues (Heroku have its limits) or any other problems I can't imagine now. Or maybe I should use a more complicated technique like scheduling chunks of tasks every X minutes, storing partial results in the database, etc. which might avoid some of these problems but I won't get the 100% of my workers time.

Any thoughts?
Thanks!


EDIT

The answer to my question is actually in the comments of the accepted answer

Was it helpful?

Solution

By having separate queues for each task and running a dedicated worker for each queue you can ensure that your system will utilize 100% system resources equally paying attention to each task. Additionally you can add workers to balance task processing based on task runtimes.

For example, defining tasks

@celery.task
def fetch(url):
    # fetch url
    return html

@celery.task
def parse(html):
    pass

And configuring automatic routing:

CELERY_ROUTES = {'tasks.fetch': {'queue': 'fetch_queue'}, 'tasks.parse': {'queue': 'parse_queue'}}

And running workers:

$ celery worker -Q fetch_queue

$ celery worker -Q parse_queue

You will have a separate worker for each task type.

Using callbacks you can easily parse after fetching:

fetch.apply_async((url), link=parse.subtask())

P.S. For fetching worker you can use Eventlet pool to take advantage of asynchronous IO.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top