Question

Im trying to move a system from using morbid to rabbitmq, but I cannot seem to get the same broadcast behaviour morbid supplied by default. By broadcast I mean that when a message is added to the queue, every consumer recieves it. With rabbit, when a message is added they are distributed round robin style to every listener.

Can anyone tell me how to achieve the same kind of message distribution?

The stomp library used below is http://code.google.com/p/stomppy/

Failing being able to do with with stomp, even a amqplib example would really help.

My code at present looks like this

The Consumer

import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demoqueue', ack='auto')

while True:
    pass
conn.disconnect()

And the sender looks like this

import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demotopic', ack='auto')

while True:
    pass
conn.disconnect()
Was it helpful?

Solution 2

I finally figured out how to do it by creating an exchange for each "recieving group", im not sure how well rabbit will do with thousands of exchanges, so you might want to figure test this heavily before trying it in production

In the sending code:

conn.send(str(i), exchange=exchange, destination='')

The blank destination is required, all I care about is sending to that exchange

To recieve

import stomp
import sys
from amqplib import client_0_8 as amqp
#read in the exchange name so I can set up multiple recievers for different exchanges to tset
exchange = sys.argv[1]
conn = amqp.Connection(host="localhost:5672", userid="username", password="password",
 virtual_host="/", insist=False)

chan = conn.channel()

chan.access_request('/', active=True, write=True, read=True)

#declare my exchange
chan.exchange_declare(exchange, 'topic')
#not passing a queue name means I get a new unique one back
qname,_,_ = chan.queue_declare()
#bind the queue to the exchange
chan.queue_bind(qname, exchange=exchange)

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'browser', 'browser')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="username", password="password")
headers = {}

#subscribe to the queue
conn.subscribe(destination=qname, ack='auto')

while True:
    pass
conn.disconnect()

OTHER TIPS

Apparently you can't do with directly with STOMP; there is a mailing list thread that shows all the hoops you have to jump through to get broadcast working with stomp (it involves some lower-level AMPQ stuff).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top