We can dynamically add queues and attach workers to them.
from celery import current_app as app
from task import celeryconfig #your celeryconfig module
To dynamically define and route task into a queue
from task import process_data
process_data.apply_async(args,kwargs={}, queue='queue-name')
reply = app.control.add_consumer('queue_name', destination = ('your-worker-name',), reply = True)
You have to keep the queue names in persistent data store like redis so you can remember it when it restarts.
redis.sadd('CELERY_QUEUES','queue_name')
celeryconfig.py also uses the same to keep remember queue names
CELERY_QUEUES = {
'celery-1': {
'binding_key': 'celery-1'
},
'gateway-1': {
'binding_key': 'gateway-1'
},
'gateway-2': {
'binding_key': 'gateway-2'
}
}
for queue in redis.smembers('CELERY_QUEUES'):
CELERY_QUEUES[queue] = dict(binding_key=queue)