質問

I'll explain my situation bit first. What want to accomplish, is multiple producers and multiple consumers and one broker on my server. I want messages to be persistent and queues to be durable. I want to have multiple queues that do not interfere with each other. With one queue and one producer and one consumer this works already fine. Current code:

Consumer:

from qpid.connection import Connection
from qpid.datatypes import Message, uuid4
from qpid.util import connect

socket = connect('localhost',5672)
connection = Connection(sock=socket, username='***', password='***')
connection.start()
session = connection.session(str(uuid4()))
local_queue_name = 'my_queue'
queue = session.incoming(local_queue_name)
session.message_subscribe(queue='ProdSixQueue', destination=local_queue_name)
queue.start()
content=''
while content != 'Done':
    message = queue.get(timeout=1000000)
    while(queue.empty() == True):
        time.sleep(1)
    content = message.body
    if(validate(etree.fromstring(content))):
        session.message_accept(RangedSet(message.id)) 
        store(content) #my function to store data..

Producer:

from qpid.connection import Connection
from qpid.datatypes import Message, uuid4
from qpid.util import connect

def produce(theMsg):
        socket = connect('ser.ver.s.ip',5672)
        connection = Connection(sock=socket,username='***',password='***')
        connection.start()
        session = connection.sesssion(str(uuid4()))
        session.queue_declare(queue='ProdSixQueue')
        session.exchange_bind(exchange='amg.direct',queue='ProdSixQueue',binding_key='routing_key')
        properties = session.delivery_properties(routing_key='routing_key')
        session.message_transfer(destination='amq.direct',message=Message(properties,str(theMsg)))
        session.close(timeout=100)

Of course, those are not the whole program, but they are all the code that is involved in Qpid broker.

The problem now is if I make new consumer and producer to different devices, and have different traffic that I would like to move from producer to consumer, the first consumer steals all the data from broker even though I send it to different queue, like ProdSevenQueue, and at other consumer try to store it different local queue like "prodseven_local_Queue". Do I have to use the queues differently somehow or do I misunderstand the whole idea here? Also I hear people talking about configuring broker top redirect certain kind of traffic to somewhere else, yet I find no examples of this. Push to right direction would be awesome. To confuse you even more, here's picture of situation: Link to imgur

For my defense, I've never done anything with AMQP or Apache Qpid before, and I'm still learning python too. Also to complain bit more, I have never seen more incoherent documentation then Apache Qpid has. No love for new players obviously.

役に立ちましたか?

解決

Your program suggests a misunderstanding of the way Qpid works.

In Qpid (for AMQP protocols 0-8 to 0-10) message producers send messages to Exchanges. The Exchange is then responsible for routing the message to zero or more Queues. The precise details of that routing depend on the exchange type. It is through this mechanism that Qpid supports common messaging topologies (point-to-point, publish/subscribe, fanout etc).

Your use-case requires the use of an instance of a direct exchange (such as the built-in amq.direct).

A direct exchange routes messages to queues based on an exact match between the routing key of the message, and the binding key used to bind the queue to the exchange. A common convention is to bind the queue to the exchange using the name of queue itself as binding key. It appears your program is currently using the string "routing_key" for this purpose, and I suspect will explain the undesired behaviour you observed.

You will find more explanation here:

http://qpid.apache.org/releases/qpid-0.24/java-broker/book/Java-Broker-Concepts-Exchanges.html (Qpid Java Broker documentation - but the concept is shared)

https://access.redhat.com/site/documentation/en-US/Red_Hat_Enterprise_MRG/1.1/html/Messaging_User_Guide/chap-Messaging_User_Guide-Exchanges.html (Redhat's MRG docs explain the same concepts with useful diagrams)

The Python Examples (see Qpid website) are a useful reference.

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top