Newbie - Is a Consumer Queue necessary in order for Publishing to work in MassTransit

StackOverflow https://stackoverflow.com/questions/15485317

  •  24-03-2022
  •  | 
  •  

Question

I am trying to get my head around MassTransit and RabbitMQ and queuing (day 1)

The question I have is whether a "Consumer is necessary in order for queuing to work in MT. The reason I ask is because I first created Domain and Producer but I didn't see any queued item in RabbitMQ management window."

When Consumer queue is created then I can see message being queued.

Based on my understanding, Producer is never aware of consumer, so why MassTransit required consumer queue to start message publishing?

The Producer

using MassTransit;

namespace Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            Bus.Initialize(sbc =>
            {
                sbc.UseRabbitMq();         //1
                sbc.UseControlBus();
                sbc.EnableMessageTracing();
                sbc.EnableRemoteIntrospection();
                sbc.ReceiveFrom("rabbitmq://localhost/MT.Producer");
                sbc.UseControlBus();
            });

            Bus.Instance.Publish(new NewOrderMessage { OrderName = "Hello World" });
        }
    }
}

The Application

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MassTransit;
using Topshelf;

namespace Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            Bus.Initialize(sbc =>
            {
                sbc.UseRabbitMq();
                sbc.UseRabbitMqRouting();
                sbc.ReceiveFrom("rabbitmq://localhost/MT.ConsumerService");
            });

            var cfg = HostFactory.New(c =>
            {
                c.SetServiceName("MT.ConsumerService");
                c.SetDisplayName("MT.ConsumerService");
                c.SetDescription("MT.ConsumerService");

                //c.BeforeStartingServices(s => {});

                c.Service<ConsumerService>(a =>
                {
                    a.ConstructUsing(service => new ConsumerService());
                    a.WhenStarted(o => o.Start());
                    a.WhenStopped(o => o.Stop());
                });

            });

            try
            {
                cfg.Run();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
                throw;
            }
        }
    }
}

The Message

namespace Domain
{
    public class NewOrderMessage
    {
        public NewOrderMessage()
        {
            OrderId = Guid.NewGuid();
        }
        public Guid OrderId { get; set; }
        public string OrderName { get; set; }
    }
}

The Consumer Service

namespace Consumer
{
    class ConsumerService
    {
        readonly IServiceBus _bus;

        public ConsumerService()
        {
            _bus = Bus.Instance;
        }

        public void Start()
        {
            _bus.SubscribeHandler<NewOrderMessage>(CreateOrder);

            Console.WriteLine("Starting....");
        }

        public void Stop()
        {    
            Console.WriteLine("Stopping....");
        }

        public void CreateOrder(NewOrderMessage command)
        {
            Console.WriteLine("Creating Order: {0}  with Id: {1}", command.OrderName, command.OrderId);
        }
    }
}

code was created using examples on web.

Edit Would also like to add that all namespaces are different projects Domain Producer Consumer

Regards,

Mar

Was it helpful?

Solution

The following answers on masstransit-discus really helped me.

From Google Group masstransit-discuss

... The thing with MassTransit is that you don't actually publish to a queue, you publish to an exchange that is then set up to pass that message on to other queues that have subscribed to messages on that exchange. Since you don't have any consumers, no-one has expressed any interest in your message so it will simply be ignored.

So simply set up a consumer and have it listen to "rabbitmq://localhost/B". First time you run it, it will create the necessary exchanges and links between them, and your message will be passed to a queue named B.

Anders

A bus is the collection of all the exchanges, queues, and services all together. When you publish on a bus, all consumers registered on that bus will receive it.

Exchanges are the RabbitMQ implementation of making this work.

Travis

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top