Question

I've define a Celery app in a module, and now I want to start the worker from the same module in its __main__, i.e. by running the module with python -m instead of celery from the command line. I tried this:

app = Celery('project', include=['project.tasks'])

# do all kind of project-specific configuration
# that should occur whenever this module is imported

if __name__ == '__main__':
    # log stuff about the configuration
    app.start(['worker', '-A', 'project.tasks'])

but now Celery thinks I'm running the worker without arguments:

Usage: worker <command> [options] 

Show help screen and exit.

Options:
  -A APP, --app=APP     app instance to use (e.g. module.attr_name)
[snip]

The usage message is the one you get from celery --help, as if it didn't get a command. I've also tried

app.worker_main(['-A', 'project.tasks'])

but that complains about the -A not being recognized.

So how do I do this? Or alternatively, how do I pass a callback to the worker to have it log information about its configuration?

Was it helpful?

Solution 3

Based on code from Django-Celery module you could try something like this:

from __future__ import absolute_import, unicode_literals

from celery import current_app
from celery.bin import worker


if __name__ == '__main__':
    app = current_app._get_current_object()

    worker = worker.worker(app=app)

    options = {
        'broker': 'amqp://guest:guest@localhost:5672//',
        'loglevel': 'INFO',
        'traceback': True,
    }

    worker.run(**options)

OTHER TIPS

using app.worker_main method (v3.1.12):

± cat start_celery.py
#!/usr/bin/python

from myapp import app


if __name__ == "__main__":
    argv = [
        'worker',
        '--loglevel=DEBUG',
    ]
    app.worker_main(argv)

Since Celery 5 things have been changed

The worker_main results now:

AttributeError: 'Celery' object has no attribute 'worker_main'

For Celery 5 do following:

app = celery.Celery(
    'project',
    include=['project.tasks']
)

if __name__ == '__main__':
    worker = app.Worker(
        include=['project.tasks']
    )
    worker.start()

See here celery.apps.worker and celery.worker.WorkController.setup_defaults for details (hope it will be documented better in the future).

worker_main was put back in celery 5.0.3 here: https://github.com/celery/celery/pull/6481

This worked for me on 5.0.4:

self.app.worker_main(argv = ['worker', '--loglevel=info', '--concurrency={}'.format(os.environ['CELERY_CONCURRENCY']), '--without-gossip'])

I think you are just missing wrapping the args so celery can read them, like:

queue = Celery('blah', include=['blah'])
queue.start(argv=['celery', 'worker', '-l', 'info'])

I would like to expand on Tomasz Hławiczka's answer for Celery 5+. As mentioned in that answer, the following works:

from celery import Celery

app = Celery() # args and kwargs as needed

if __name__ == "__main__":
    worker = app.Worker()
    worker.start()

However, I stumbled upon this answer while trying to run the Celery worker along side a Flask app with the Flask-SocketIO run method. Specifically, I tried to setup my worker, then run my flask app, however, worker.start() blocks, so the flask app never ran. To solve this, I used the following:

import subprocess
from celery import Celery
from flask import Flask
from flask_socketio import SocketIO

cel = Celery() # args and kwargs as needed
app = Flask(__name__)
socketio = SocketIO(app, message_queue=<redis or rabbitmq here>)

if __name__ == "__main__":
    cmd = "celery -A project.cel worker -Q specific_queue".split(' ')
    subprocess.Popen(cmd)

    socketio.run(app)

Or more generally:

import subprocess
from celery import Celery

app = Celery() # args and kwargs as needed

if __name__ == "__main__":
    cmd = "celery -A project.app worker -Q specific_queue".split(' ')
    subprocess.Popen(cmd)

    # more code you need to run after starting worker

In addition, there are times previously created workers are still running, and you don't want to start more. This is particularly true during development. To check for other workers before starting another, you can do the following:

import subprocess
from celery import Celery

app = Celery() # args and kwargs as needed

if __name__ == "__main__":
    active_workers = app.control.inspect().active() 

    if active_workers == None:
        cmd = "celery -A project.app worker -Q specific_queue".split(' ')
        subprocess.Popen(cmd)

    # more code you need to run after starting worker

If you are doing all this across multiple hosts and you need to check for a specific worker, do the following:

import subprocess
from celery import Celery

app = Celery() # args and kwargs as needed

if __name__ == "__main__":
    active_workers = app.control.inspect().active() 

    if active_workers == None or "celery@hostname" not in active_workers.keys():
        cmd = "celery -A project.app worker -Q specific_queue".split(' ')
        subprocess.Popen(cmd)

    # more code you need to run after starting worker

If you don't care about blocking, and don't want to use the subprocess library but you still want your worker to listen to a specific queue, do the following:

from celery import Celery

app = Celery() # args and kwargs as needed

if __name__ == "__main__":
    worker = app.Worker(queues=["specific_queue"])
    worker.start()

Of course, if you want multiple workers, each listening to their own specific queue, you'd have to use the subprocess method, as you cannot start another worker after the first start() call blocks.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top