Как использовать слушать на Basic.return в Python клиент AMQP

StackOverflow https://stackoverflow.com/questions/2191695

Вопрос

Я хотел бы убедиться, что мое сообщение было доставлено в очередь.

Чтобы сделать это, я добавляю обязательный параметр на Basic_Publish. Что еще я должен сделать, чтобы получить basic.return Сообщение, если мое сообщение не было успешно доставлено?

Я не могу использовать channel.wait() слушать basic.return Потому что, когда мое сообщение успешно доставлено wait() Функция висит навсегда. (Там нет ожидания) с другой стороны. Когда я не звоню channel.wait() то channel.returned_messages Будет оставаться пустым, даже если сообщение не доставляется.

я использую py-amqplib Версия 0.6.

Любое решение приветствуется.

Это было полезно?

Решение

В настоящее время это невозможно, как basic.return отправляется асинхронно, когда сообщение упадет в брокер. Когда сообщение было успешно отправлено никаких данных, не сообщается от сервера. Так что Pyamqp не может слушать такие сообщения.

Я читал несколько потоков об этой проблеме. Возможное решение было:

  • Используйте TXAMQP, витая версия AMQP, которая обрабатывает Basic. Возрождение
  • Используйте Pyamqp с ожиданием время ожидания. (Я не уверен, что это возможно на данный момент)
  • Ping Server часто со синхронными командами, так что Pyamqp сможет выбрать basic.return сообщения, когда они прибывают.

Поскольку уровень поддержки Pyamqp и Rabbitmq в целом довольно низко, мы решили вообще не использовать брокер AMQP.

Другие советы

Вы пробовали только библиотеку Python AMQP, которая завершена? Это не так широко используется, потому что он не аккуратно упакован.

Шаг 1. Составьте библиотеку C - вам может понадобиться sudo apt-get install autotools-dev autoconf automake libtool

mkdir rabbitc
cd rabbitc
hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
hg clone http://hg.rabbitmq.com/rabbitmq-c/
cd rabbitmq-c
autoreconf -i
make clean
./configure --prefix=/usr
make
sudo make install

Шаг 2. Установите библиотеку Python

pip install pylibrabbitmq

Вы не можете сделать это синхронно, так как это асинхронная система. Но вы можете решить эту проблему с помощью потоков.

Основная идея заключается в том, что вы запускаете нить, которая делает ждать на канале, когда он выходит из ожидания, он вызывает функцию Call_Back для любого возвращенного сообщения в очереди возвращенной сообщения. Затем вы можете иметь дело с этим сообщением, однако вы хотите в функции Call_

def registerCallback(channel, call_back):
    """ This method sets up a thread which deals with the asynchronous callback for a message which could not be routed by the exchange.
    """
    def wait():
        try:
            channel.wait()
        except Exception, e:
            print("Problem waiting on publish channel: %s" % str(e))

        while not channel.returned_messages.empty():
            returnedMessage = channel.returned_messages.get()
            processReturnedMessageThread = Thread(target=call_back, args=(returnedMessage))
            processReturnedMessageThread.start()

        wait()

    waiting = Thread(target=wait) 
    waiting.start()
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top