How do I create celery queues on runtime so that tasks sent to that queue gets picked up by workers?

StackOverflow https://stackoverflow.com/questions/21687997

문제

I'm using django 1.4, celery 3.0, rabbitmq

To describe the problem, I have many content networks in a system and I want a queue for processing tasks related to each of these network.

However content is created on the fly when the system is live and therefore I need to create queues on the fly and have existing workers start picking up on them.

I've tried scheduling tasks in the following way (where content is a django model instance):

queue_name = 'content.{}'.format(content.pk)
# E.g. queue_name = content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba
add_content.apply_async(args=[content], queue=queue_name)

This create a queue with name content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba, creates a new exchange with name content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba and routing key content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba and sends a task to that queue.

However I never see the workers picking up on these tasks. Workers that I have currently set up are not listening to any specific queues (not initialized with queue names) and pick up tasks sent to the default queue just fine. My Celery settings are:

BROKER_URL = "amqp://test:password@localhost:5672/vhost"
CELERY_TIMEZONE = 'UTC'
CELERY_ALWAYS_EAGER = False

from kombu import Exchange, Queue

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'

CELERY_QUEUES = (
    Queue(CELERY_DEFAULT_QUEUE, Exchange(CELERY_DEFAULT_EXCHANGE),
        routing_key=CELERY_DEFAULT_ROUTING_KEY),
)

CELERY_CREATE_MISSING_QUEUES = True
CELERYD_PREFETCH_MULTIPLIER = 1

Any idea how I can get the workers to pick up on tasks sent to this newly created queue?

도움이 되었습니까?

해결책

You need to tell the workers to start consuming the new queues. Relevant docs are here.

From the command line:

$ celery control add_consumer content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba

Or from within python:

>>> app.control.add_consumer('content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba', reply=True)

Both forms accept a destination argument, so you can tell individual workers only about the new queues if required.

다른 팁

We can dynamically add queues and attach workers to them.

from celery import current_app as app
from task import celeryconfig #your celeryconfig module

To dynamically define and route task into a queue

from task import process_data
process_data.apply_async(args,kwargs={}, queue='queue-name')
reply = app.control.add_consumer('queue_name', destination = ('your-worker-name',), reply = True)

You have to keep the queue names in persistent data store like redis so you can remember it when it restarts.

redis.sadd('CELERY_QUEUES','queue_name')

celeryconfig.py also uses the same to keep remember queue names

CELERY_QUEUES = {
    'celery-1': {
        'binding_key': 'celery-1'
    },
    'gateway-1': {
        'binding_key': 'gateway-1'
    },
    'gateway-2': {
        'binding_key': 'gateway-2'
    }
}
for queue in redis.smembers('CELERY_QUEUES'):
        CELERY_QUEUES[queue] = dict(binding_key=queue)
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top