Domanda

I am developing a Qt5 server application and I am using the QAMQP library. What I want to do is the following:

  • Another server should send a message whenever something about a user should change
  • My server, which is distributed among multiple machines and has multiple processes per machine needs to be notified about these updates

The thing is, I am not sure about the architecture that I should build. I just know that whenever something about some user changes, the server needs to send a message to the RabbitMQ broker and all my processes that are interested in updates for that particular user should get the message. But should I create one queue per process, and bind it with a separate exchange for each user? Or maybe create in each process a separate queue for each user and bind that somehow to some exchange. Fanout exchanges come to mind, and one queue per process, I am just not sure about the queue-exchange relations even though I've spent quiet some time trying to figure it out.

Update, in order to clarify things and write about the progress

I have a distributed application that needs to be notified for product changes. Those changes happen often and are tracked by another platform. I want to get those updates in my application. In order to achieve that, each one of my application instances creates it's own queue. Then, whenever an instance is interested in updates for a particular product it creates an exchange for that product and binds it to the queue, like this:

Exchange type : 'direct'
Exchange name : 'product_update'
Routing key   : 'PRODUCT_CODE'

Where PRODUCT_CODE is a string that represents the code of the product. In the platform that track the changes, I just publish messages with the corresponding exchanges.

The problem comes when i need to unsubscribe for a product update. I am using the QAMQP library, and in the destructor of the QAMQP::Exchange there's an unconditional remove() call. When that function is called I am getting error in the RabbitMQ log, which looks like this:

=ERROR REPORT==== 28-Jan-2014::08:41:35 ===
connection <0.937.0>, channel 7 - soft error:
{amqp_error,precondition_failed,
            "exchange 'product_update' in vhost 'test-app' in use",
            'exchange.delete'}

I am not sure how to properly unsubscribe. I know from the RabbitMQ web interface that I have only one exchange ('product_update') which has bindings to multiple queues with difference routing keys. I can see that the call to remove() in QAMQP tries to delete the exchange, but since it's used by my other processes, it's still in use and cannot be removed, which I beleive is ok. But what should I do to delete the exchange object that I created? Should I first unbind it from the queue? I believe that i should be able to delete the object without calling remove(), but I may be mistaken or I may doing it wrong.

Also, if there's a better pattern for what I am trying to accomplish, please advice.

Here's some sample code, per request.

ProductUpdater::ProductUpdater(QObject* parent) : QObject(parent)
{
    mClient = new QAMQP::Client(this);
    mClient->setAutoReconnect(true);
    mClient->open(mConnStr);
    connect(mClient, SIGNAL(connected()), this, SLOT(amqp_connected()));
}

void ProductUpdater::amqp_connected()
{
    mQueue = mClient->createQueue();

    connect(mQueue, SIGNAL(declared()),   this, SLOT(amqp_queue_declared()));
    connect(mQueue, SIGNAL(messageReceived(QAMQP::Queue*)),
               this, SLOT(message_received(QAMQP::Queue*)));

    mQueue->setNoAck(false);
    mQueue->declare(QString(), QAMQP::Queue::QueueOptions(QAMQP::Queue::AutoDelete));
}

void ProductUpdater::amqp_queue_declared()
{
    mQueue->consume();
}

void ProductUpdater::amqp_exchange_declared()
{
    QAMQP::Exchange* exchange = qobject_cast<QAMQP::Exchange*>(sender());
    if (mKeys.contains(exchange))
        mQueue->bind(exchange, mKeys.value(exchange));
}

void ProductUpdater::message_received(QAMQP::Queue* queue)
{
    while (queue->hasMessage())
    {
        const QAMQP::MessagePtr message = queue->getMessage();
        processMessage(message);

        if (!queue->noAck())
            queue->ack(message);
    }
}

bool ProductUpdater::subscribe(const QString& productId)
{
    if (!mClient)
        return false;

    foreach (const QString& id, mSubscriptions) {
        if (id == productId)
            return true; // already subscribed
    }

    QAMQP::Exchange* exchange = mClient->createExchange("product_update");
    mSubscriptions.insert(productId, exchange);
    connect(exchange, SIGNAL(declared()), this, SLOT(amqp_exchange_declared()));
    exchange->declare(QStringLiteral("direct"));

    return true;
}

void ProductUpdater::unsubscribe(const QString& productId)
{
    if (!mSubscriptions.contains(productId))
        return;

    QAMQP::Exchange* exchange = mSubscriptions.take(productId);

    if (exchange) {
        // This may even be unnecessary...?
        mQueue->unbind(exchange, productId);

        // This will produce an error in the RabbitMQ log
        // But if exchange isn't destroyed, we have a memory leak
        // if we do exchange->deleteLater(); it'll also produce an error...
        // exchange->remove();
    }
}
È stato utile?

Soluzione

Amy,

I think your doubt is related to the message distribution style (or patterns) and the exchange types available for RabbitMQ. So, I'll try to cover them all with a short explanation and you can decide which will fit best for your scenario (RabbitMQ tutorials explained in another way).

Work Queue

One queue, multiple consumers - They race for the messages

Using the default exchange and a binding key you can post messages directly yo a queue. Once a message arrives for a queue, the consumers "compete" to grab the message, it means a message is not delivered to more than one consumer. If there are multiple consumers listening to a single queue, the messages will be delivered in a round-robin fashion.

Use this approach when you have work to do and you want to scale across multiple servers/processes easily.

Publish/Subscribe

One queue per consumer, one message is replicated for all consumers listening

In this model, one single sent message may reach many consumers listening on their queues. For this scenario, where you must unselectively dispatch messages to all consumers, you can use a fanout exchange. These exchanges are "dumb" and acts just like their names imply: like a fan. One thing enters and is replicated without any intelligence to all queues that are bound to the exchange. You could as well use direct exchanges, but only if you need to do any filtering or routing on the messages.

Use this scenario when you have something like an event and you may need multiple servers, processes and consumers to handle that event, each one doing a task of different nature to handle the event. If you do not need any filter/routing, use fanout exchange for this scenario.

Routing / Topic

enter image description here

A particular case of the Publish/Subscribe model, where you can have queues "listen" on the exchange using filters, that may have pattern matching (topics) or not (just route).

If you need pattern matching, use topic exchange type. If you don't, use direct. When a queue "listens" to an exchange, a binding is used. In this binding, you may specify a binding key.

To deliver the message to the correct queues, the exchange examines the message's routing key. If it matches the binding key, the message is forwarded to that queue. The match strategy depends on wether you are using topic or direct exchange, as said before.

TL;DR:

For your scenario, if each process do something different with the User change event, use a single exchange with fanout type. Each class of handler declares the same queue name bound to that exchange. This relates to the Publish/Subscribe model above. You can distribute work to among consumers of the same class listening on the same queue name, even if they don't reside on the same process.

However, if all the consumers that are interested in the event perform the same task when handling, use the work queue model.

Hope this helps,

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top