Question

I followed celery docs to define 2 queues on my dev machine.

My celery settings:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}

i opened two terminal windows, in virtualenv of my project, and ran following commands:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker

what i get is that all tasks are being processed by both queues.

My goal is to have one queue to process only the one task defined in CELERY_ROUTES and default queue to process all other tasks.

I also followed this SO question, rabbitmqctl list_queues returns celery 0, and running rabbitmqctl list_bindings returns exchange celery queue celery [] twice. Restarting rabbit server didn't change anything.

Was it helpful?

Solution

Ok, so i figured it out. Following is my whole setup, settings and how to run celery, for those who might be wondering about same thing as my question did.

Settings

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'feeds',
        'routing_key': 'long_tasks',
    },
}

How to run celery?

terminal - tab 1:

celery -A proj worker -Q default -l debug -n default_worker

this will start first worker that consumes tasks from default queue. NOTE! -n default_worker is not a must for the first worker, but is a must if you have any other celery instances up and running. Setting -n worker_name is the same as --hostname=default@%h.

terminal - tab 2:

celery -A proj worker -Q feeds -l debug -n feeds_worker

this will start second worker that consumers tasks from feeds queue. Notice -n feeds_worker, if you are running with -l debug (log level = debug), you will see that both workers are syncing between them.

terminal - tab 3:

celery -A proj beat -l debug

this will start the beat, executing tasks according to the schedule in your CELERYBEAT_SCHEDULE. I didn't have to change the task, or the CELERYBEAT_SCHEDULE.

For example, this is how looks my CELERYBEAT_SCHEDULE for the task that should go to feeds queue:

CELERYBEAT_SCHEDULE = {
    ...
    'update_feeds': {
        'task': 'arena.social.tasks.Update',
        'schedule': crontab(minute='*/6'),
    },
    ...
}

As you can see, no need for adding 'options': {'routing_key': 'long_tasks'} or specifying to what queue it should go. Also, if you were wondering why Update is upper cased, its because its a custom task, which are defined as sub classes of celery.Task.

Update Celery 5.0+

Celery made a couple changes since version 5, here is an updated setup for routing of tasks.

How to create the queues?

Celery can create the queues automatically. It works perfectly for simple cases, where celery default values for routing are ok.

task_create_missing_queues=True or, if you're using django settings and you're namespacing all celery configs under CELERY_ key, CELERY_TASK_CREATE_MISSING_QUEUES=True. Note, that it is on by default.

Automatic scheduled task routing

After configuring celery app:

celery_app.conf.beat_schedule = {
  "some_scheduled_task": {
    "task": "module.path.some_task",
    "schedule": crontab(minute="*/10"),
    "options": {"queue": "queue1"}
  }
}

Automatic task routing

Celery app still has to be configured first and then:

app.conf.task_routes = {
  "module.path.task2": {"queue": "queue2"},
}

Manual routing of tasks

In case and you want to route the tasks dynamically, then when sending the task specify the queue:

from module import task

def do_work():
  # do some work and launch the task
  task.apply_async(args=(arg1, arg2), queue="queue3")

More details re routing can be found here: https://docs.celeryproject.org/en/stable/userguide/routing.html

And regarding calling tasks here: https://docs.celeryproject.org/en/stable/userguide/calling.html

OTHER TIPS

In addition to accepted answer, if anyone comes here and still wonders why his settings aren't working (as I did just moments ago), here's why: celery documentation isn't listing settings names properly.

For celery 5.0.5 settings CELERY_DEFAULT_QUEUE, CELERY_QUEUES, CELERY_ROUTES should be named CELERY_TASK_DEFAULT_QUEUE, CELERY_TASK_QUEUESand CELERY_TASK_ROUTES instead. These are settings that I've tested, but my guess is the same rule applies for exchange and routing key aswell.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top