Domanda

I have a consumer which listens for messages, if the flow of messages is more than the consumer can handle I want to start another instance of this consumer.

But I also want to be able to poll for information from the consumer(s), my thought was that I could use RPC to request this information from the producers by using a fanout exchange so all the producers gets the RPC-call.

My question is first of all is this possible and secondly is it reasonable?

È stato utile?

Soluzione 2

After some researching it seems that this is not possible. If you look at the tutorial on RabbitMQ.com you see that there is an id for the call which, as far as I understand gets consumed.

I've choosen to go another way, which is reading the log-files and aggregating the data.

Altri suggerimenti

If the question is "is it possible to send an RPC message to more than one server?" the answer is yes.

When you build an RPC call you attach a temporary queue to the message (usually in header.reply_to but you can also use internal message fields). This is the queue where RPC targets will publish their answers.

When you send an RPC to a single server you can receive more than one message on the temporary queue: this means that an RPC answer could be formed by:

  • a single message from a single source
  • more than one message from a single source
  • more than one message from several sources

The problems arising in this scenario are

  • when do you stop listening? If you know the number of RPC servers you can wait until each of them sent you an answer, otherwise you have to implement some form of timeout
  • do you need to track the source of the answer? You can add some special fields in your message to keep this information. The same for messages order.

Just some code to show how you can do it (Python with Pika library). Pay attention, this is far from perfection: the biggest problem is that you should reset the timeout when you get a new answer.

    def consume_rpc(self, queue, result_len=1, callback=None, timeout=None, raise_timeout=False):
        if timeout is None:
            timeout = self.rpc_timeout

        result_list = []

        def _callback(channel, method, header, body):
            print "### Got 1/%s RPC result" %(result_len)
            msg = self.encoder.decode(body)
            result_dict = {}
            result_dict.update(msg['content']['data'])
            result_list.append(result_dict)

            if callback is not None:
                callback(msg)

            if len(result_list) == result_len:
                print "### All results are here: stopping RPC"
                channel.stop_consuming()

        def _outoftime():
            self.channel.stop_consuming()
            raise TimeoutError

        if timeout != -1:
            print "### Setting timeout %s seconds" %(timeout)
            self.conn_broker.add_timeout(timeout, _outoftime)

        self.channel.basic_consume(_callback, queue=queue, consumer_tag=queue)

        if raise_timeout is True:
            print "### Start consuming RPC with raise_timeout"
            self.channel.start_consuming()
        else:
            try:
                print "### Start consuming RPC without raise_timeout"
                self.channel.start_consuming()
            except TimeoutError:
                pass

        return result_list
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top