Question

I try to use the exchange name in my exchange_declare callback. I use the SelectConnection adapter for asynchronous requests in Pika.

My idea is to first build lists of exchanges, queues and bindings in my program. I then dispatch multiple exchange and queue declarations to Pika, and create bindings as soon as the needed queues and exchanges for each binding have got their DeclareOK messages.

This means I need to know the names of exchanges and queues in my callbacks so that I can correlate them with the bindings to create.

Something like this:

print "Create A"
channel.exchange_declare(callback=on_exchange, exchange="exchangeA")
print "Create B"
channel.exchange_declare(callback=on_exchange, exchange="exchangeB")

def on_exchange(response):
  # How do I know if this was exchangeA or exchangeB ?
  print "Exchange declared"
  print response

This gives the following output, which clearly shows that the exchanges are declared first and callbacks triggered later (as expected):

Create A
Create B
Exchange declared
<METHOD(['method=<Exchange.DeclareOk>', 'channel_number=1', 'frame_type=1'])>
Exchange declared
<METHOD(['method=<Exchange.DeclareOk>', 'channel_number=1', 'frame_type=1'])>

I have figured out that response is of type pika.frame.Method but how do I go from there? Are the exchanges perhaps guaranteed to be declared in order?

Was it helpful?

Solution

Closures is a solution to keep the code async:

def declare_exchanges(self):
    exchange = "exchangeA"
    callback = self.on_exchange(exchange)
    channel.exchange_declare(callback=callback, exchange=exchange)

    exchange = "exchangeB"
    callback = self.on_exchange(exchange)
    channel.exchange_declare(callback=callback, exchange=exchange)

def on_exchange(self, exchange):
    def callback(response):
        print(exchange)

    return callback

PS: I opened an issue upstream, https://github.com/pika/pika/issues/898

OTHER TIPS

You could chain your callbacks accordingly to control the setup process. Just build your exchanges and queues step by step. This is more verbose but you keep control.

class ChatServer(object):
    def __init__(self):
        self.channel_in = None
        self.channel_out = None
        cred = pika.PlainCredentials('guest','guest')
        param = pika.ConnectionParameters(host='localhost',
                                          port=5672,
                                          virtual_host='/',
                                          credentials=cred)
        self.connection = pika.SelectConnection(param,self.on_connected)


    def on_connected(self, connection):
        self.connection.channel(self.on_channel_out_open)

    def on_channel_out_open(self,channel):
        self.channel_out = channel
        self.channel_out.exchange_declare(exchange='chatserver_out',
                                          type='direct',
                                          auto_delete=False,
                                          callback=self.on_exchange_declare_out)
    def on_exchange_declare_out(self,method_frame):
        self.channel_out.exchange_declare(exchange='cmd',
                                          type='direct',
                                          auto_delete=False,
                                          callback=self.on_exchange_declare_cmd_out)
    def on_exchange_declare_cmd_out(self,method_frame):
        # now all exchanges are defined.
        # Let's create the queues.
        # ...

It seems to be impossible to read the exchange name in the exchange_declare callback. What you need to do is instead to keep a counter so you know when all your exchanges are created:

class ...:
  def __init__(self):
    self.exchangestocreate = 2

  #...

  def declare_exchanges(self):
    channel.exchange_declare(callback=self.on_exchange, exchange="exchangeA")
    channel.exchange_declare(callback=self.on_exchange, exchange="exchangeB")

  def on_exchange(self, response):
    self.exchangestocreate -= 1
    if self.exhangestocreate == 0:
      # Declare bindings here

This will effectively make the interface synchronous in the exchange declaration step.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top