Question

I have an ASP.NET MVC action that verifies that a connection string properly works to connect to a server using RabbitMQ. How I do this is by simply creating a queue, subscribing to it and then right away publish a message to it. I expect the message to show up on my subscriber at most a few seconds later, but it doesn't. The subscriber only gets notified the first time I call the action (right after I delete the queue from RabbitMQ using its web based manager), but then fails to call the subscriber once I publish any following messages with the queue already created. Please, take a look at my code and let me know if you see something I'm not seeing. Thanks in advance.

    //This is just the action called on a POST request.
    //Here is where the test is done.
    [HttpPost]
    public void DoConnectionVerificationAsync()
    {
        const string paramName = "queueSuccessful";
        try
        {
            //This is my routing key
            const string routingKey = "management.verify";

            //Here I declare the key and the exchange and bind them.
            var queue = Queue.DeclareDurable(routingKey);
            var exchange = Exchange.DeclareDirect("Orchard");
            queue.BindTo(exchange, routingKey);

            //Here I just generate a random int to send in the test message to the queue
            Random random = new Random();
            int randomInt = random.Next();
            //Instantiate the actual message with the random integer.
            var message = new Message<VerifyMessage>(new VerifyMessage
            {
                Content = randomInt.ToString(CultureInfo.InvariantCulture)
            });
            message.Properties.AppId = "CRM";

            //Because this is an asynchronous action, here I hint the beginning of the async operation.
            AsyncManager.OutstandingOperations.Increment();

            //Here I have my subscriber. The subscriber gets called only the first time, when the queue hasn't been created yet. That's a problem, it should be called every time I publish, which is in the following lines of code.
            _bus.Subscribe<VerifyMessage>(queue, (response, messageReceivedInfo) => Task.Factory.StartNew(() =>
            {
                VerifyMessage receivedMessage = response.Body;
                string content = receivedMessage.Content;
                int integer = int.Parse(content);
                //I expect the int received from the queue is the same as the one I sent
                bool success = integer == randomInt;
                AsyncManager.Parameters[paramName] = success;
                AsyncManager.OutstandingOperations.Decrement();
            }));

            //And here I publish the message. This always works, I can see the message stored in the queue using the web based RabbitMQ Manager
            //The problem is that the message gets stuck in the queue and never gets sent to the subscriber defined above
            using (var publishChannel = _bus.OpenPublishChannel(x => x.WithPublisherConfirms()))
            {
                publishChannel.Publish(exchange, routingKey, message, t =>
                    t.OnSuccess(() =>
                        {
                            // If we successfully publish, then there's nothing we really need to do. So this function stays empty.
                        })
                        .OnFailure(() =>
                            {
                                AsyncManager.Parameters[paramName] = false; AsyncManager.OutstandingOperations.Decrement();
                            }));
            }
        }
        catch (EasyNetQException)
        {
            AsyncManager.Parameters[paramName] = false;
            AsyncManager.OutstandingOperations.Decrement();
        }
    }

    //These functions down here don't really matter, but I'm including them just in case so that you can see it all.
    public ActionResult DoConnectionVerificationCompleted(bool queueSuccessful)
    {
        return RedirectToAction("VerifyQueueConnectionResult", new { queueSuccessful });
    }

    public ActionResult VerifyQueueConnectionResult(bool queueSuccessful)
    {
        VerifyQueueConnectionResultModel model = new VerifyQueueConnectionResultModel();
        model.Succeded = queueSuccessful;
        return View(model);
    }
Was it helpful?

Solution

Each call to bus.Subscribe creates a new consumer. So the first time it works, you have just one consumer on your queue and when you publish a message to it it's routed to that consumer.

The second time you call bus.Subscribe a second consumer is bound to the same queue. When RabbitMQ has multiple consumers on a single queue it round-robins messages to the consumers. It's a feature. That's how it does work-sharing out-of-the-box. Your message is routed to the first consumer, not the consumer that you just declared. Same for the 3rd, 4th, etc consumers, so it appears that message has not arrived.

OTHER TIPS

Well, I found a simple solution to the problem. I ended up searching through the available methods in the EasyNetQ API's and found Queue.SetAsSingleUse(). I simply called the method after binding the queue to the exchange and that pretty much did it. Not sure what queue.SetAsSingleUse() does, but by its name it sounds like it disposes the queue after the first message is published and delivered to a subscriber. In my case, it makes sense to use the queue only once for each test, but someone who might want to keep the queue around for whatever reason might run into trouble.

You could use different queues for every different subscriber, like this:

bus.Subscribe<MyMessage>("my_subscription_1"...
bus.Subscribe<MyMessage>("my_subscription_2"...

This way whatever message you publish of type MyMessage will arrive in all these queues.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top