Как использовать слушать на Basic.return в Python клиент AMQP
-
25-09-2019 - |
Вопрос
Я хотел бы убедиться, что мое сообщение было доставлено в очередь.
Чтобы сделать это, я добавляю обязательный параметр на Basic_Publish. Что еще я должен сделать, чтобы получить basic.return
Сообщение, если мое сообщение не было успешно доставлено?
Я не могу использовать channel.wait()
слушать basic.return
Потому что, когда мое сообщение успешно доставлено wait()
Функция висит навсегда. (Там нет ожидания) с другой стороны. Когда я не звоню channel.wait()
то channel.returned_messages
Будет оставаться пустым, даже если сообщение не доставляется.
я использую py-amqplib
Версия 0.6.
Любое решение приветствуется.
Решение
В настоящее время это невозможно, как basic.return
отправляется асинхронно, когда сообщение упадет в брокер. Когда сообщение было успешно отправлено никаких данных, не сообщается от сервера. Так что Pyamqp не может слушать такие сообщения.
Я читал несколько потоков об этой проблеме. Возможное решение было:
- Используйте TXAMQP, витая версия AMQP, которая обрабатывает Basic. Возрождение
- Используйте Pyamqp с ожиданием время ожидания. (Я не уверен, что это возможно на данный момент)
- Ping Server часто со синхронными командами, так что Pyamqp сможет выбрать
basic.return
сообщения, когда они прибывают.
Поскольку уровень поддержки Pyamqp и Rabbitmq в целом довольно низко, мы решили вообще не использовать брокер AMQP.
Другие советы
Вы пробовали только библиотеку Python AMQP, которая завершена? Это не так широко используется, потому что он не аккуратно упакован.
Шаг 1. Составьте библиотеку C - вам может понадобиться sudo apt-get install autotools-dev autoconf automake libtool
mkdir rabbitc
cd rabbitc
hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
hg clone http://hg.rabbitmq.com/rabbitmq-c/
cd rabbitmq-c
autoreconf -i
make clean
./configure --prefix=/usr
make
sudo make install
Шаг 2. Установите библиотеку Python
pip install pylibrabbitmq
Вы не можете сделать это синхронно, так как это асинхронная система. Но вы можете решить эту проблему с помощью потоков.
Основная идея заключается в том, что вы запускаете нить, которая делает ждать на канале, когда он выходит из ожидания, он вызывает функцию Call_Back для любого возвращенного сообщения в очереди возвращенной сообщения. Затем вы можете иметь дело с этим сообщением, однако вы хотите в функции Call_
def registerCallback(channel, call_back): """ This method sets up a thread which deals with the asynchronous callback for a message which could not be routed by the exchange. """ def wait(): try: channel.wait() except Exception, e: print("Problem waiting on publish channel: %s" % str(e)) while not channel.returned_messages.empty(): returnedMessage = channel.returned_messages.get() processReturnedMessageThread = Thread(target=call_back, args=(returnedMessage)) processReturnedMessageThread.start() wait() waiting = Thread(target=wait) waiting.start()