I would like to expand on Tomasz Hławiczka's answer for Celery 5+. As mentioned in that answer, the following works:
from celery import Celery
app = Celery() # args and kwargs as needed
if __name__ == "__main__":
worker = app.Worker()
worker.start()
However, I stumbled upon this answer while trying to run the Celery worker along side a Flask app with the Flask-SocketIO run
method. Specifically, I tried to setup my worker, then run my flask app, however, worker.start()
blocks, so the flask app never ran. To solve this, I used the following:
import subprocess
from celery import Celery
from flask import Flask
from flask_socketio import SocketIO
cel = Celery() # args and kwargs as needed
app = Flask(__name__)
socketio = SocketIO(app, message_queue=<redis or rabbitmq here>)
if __name__ == "__main__":
cmd = "celery -A project.cel worker -Q specific_queue".split(' ')
subprocess.Popen(cmd)
socketio.run(app)
Or more generally:
import subprocess
from celery import Celery
app = Celery() # args and kwargs as needed
if __name__ == "__main__":
cmd = "celery -A project.app worker -Q specific_queue".split(' ')
subprocess.Popen(cmd)
# more code you need to run after starting worker
In addition, there are times previously created workers are still running, and you don't want to start more. This is particularly true during development. To check for other workers before starting another, you can do the following:
import subprocess
from celery import Celery
app = Celery() # args and kwargs as needed
if __name__ == "__main__":
active_workers = app.control.inspect().active()
if active_workers == None:
cmd = "celery -A project.app worker -Q specific_queue".split(' ')
subprocess.Popen(cmd)
# more code you need to run after starting worker
If you are doing all this across multiple hosts and you need to check for a specific worker, do the following:
import subprocess
from celery import Celery
app = Celery() # args and kwargs as needed
if __name__ == "__main__":
active_workers = app.control.inspect().active()
if active_workers == None or "celery@hostname" not in active_workers.keys():
cmd = "celery -A project.app worker -Q specific_queue".split(' ')
subprocess.Popen(cmd)
# more code you need to run after starting worker
If you don't care about blocking, and don't want to use the subprocess
library but you still want your worker to listen to a specific queue, do the following:
from celery import Celery
app = Celery() # args and kwargs as needed
if __name__ == "__main__":
worker = app.Worker(queues=["specific_queue"])
worker.start()
Of course, if you want multiple workers, each listening to their own specific queue, you'd have to use the subprocess method, as you cannot start another worker after the first start()
call blocks.