Question

I have a simple publisher done in MassTransit. I’m sending the message in an interval and am able to receive it from .NET client using MassTransit. But when I try to observe something from Python, it is silent. Is there a way to consume MassTransit from Python or other languages? Examples appreciated.

Publisher:

builder.Register(c => ServiceBusFactory.New(sbc => {
    sbc.UseRabbitMq();
    sbc.UseBsonSerializer();
    sbc.UseLog4Net();

    sbc.ReceiveFrom("rabbitmq://localhost/masstransit");
});

.NET client:

public void Execute(IJobExecutionContext context) {
   using (var scope = ServiceLocator.Current.GetInstance<ILifetimeScope>().BeginLifetimeScope()) {
       var log = scope.Resolve<ILog>();
       log.Debug("Sending queue message");

       var bus = scope.Resolve<IServiceBus>();
       bus.Publish(new SimpleTextMessage{Text = "some text"});
   }
}

Python client:

import pika
print('Stating consumer')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='python_consumer_1')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback, queue='python_consumer_1')
channel.start_consuming()

The trace from C# app:

Configuration Result:
[Success] Name MyApp
[Success] ServiceName MyApp
Topshelf v3.1.122.0, .NET Framework v4.0.30319.34003
INFO (MassTransit.BusConfigurators.ServiceBusConfiguratorImpl) 209  - MassTransit v2.9.2/v2.9.0.0, .NET Framework v4.0.30319.34003
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 245  - CreatingRabbitMQ connection: rabbitmq://localhost/groups_error
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 246  - Using default configurator for connection: rabbitmq://localhost/groups_error
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 251  - RabbitMQconnection created: localhost:5672//
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 921  - Creating RabbitMQ connection: rabbitmq://localhost/groups
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 922  - Using default configurator for connection: rabbitmq://localhost/groups
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 924  - RabbitMQconnection created: localhost:5672//
DEBUG(MassTransit.ServiceContainer) 1056 - Starting bus service: MassTransit.Subscriptions.Coordinator.SubscriptionRouterService
DEBUG(MassTransit.ServiceContainer) 1062 - Starting bus service: MassTransit.Subscriptions.SubscriptionBusService
DEBUG(MassTransit.Threading.ThreadPoolConsumerPool) 1080 - Starting Consumer Pool for rabbitmq://localhost/groups
[Topshelf.Quartz] Scheduled Job: DEFAULT.ea637337-950a-4281-99c0-f10b842814c9
[Topshelf.Quartz] Job Schedule: Trigger 'DEFAULT.8a1d0b7c-d670-440b-974f-31ec8be6f294':  triggerClass: 'Quartz.Impl.Triggers.SimpleTriggerImpl calendar: '' misfireInstruction: 0 nextFireTime: 01/28/2014 07:12:35 +00:00 - Next Fire Time (local): 28.01.2014 9:12:35 +02:00
[Topshelf.Quartz] Scheduler started...
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1248 - CreatingRabbitMQ connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage_error
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1250 - Using default configurator for connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage_error
DEBUG(Global) 1254 - Sending queue message
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1254 - RabbitMQconnection created: localhost:5672//
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1272 - CreatingRabbitMQ connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1273 - Using default configurator for connection: rabbitmq://localhost/MyApp.Transit:SimpleTextMessage
DEBUG(MassTransit.Transports.RabbitMq.RabbitMqTransportFactory) 1277 - RabbitMQconnection created: localhost:5672//
DEBUG(MassTransit.Messages) 1439 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-961c-08d0ea0f744a:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(MassTransit.Messages) 1441 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-8965-08d0ea0f744b:MyApp.Transit.SimpleTextMessage, MyApp
The MyApp service is now running, press Control+C to exit.

DEBUG(Global) 21212 - Sending queue message
DEBUG(MassTransit.Messages) 21214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-d4fb-08d0ea0f77c4:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 41213 - Sending queue message
DEBUG(MassTransit.Messages) 41214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-077b-08d0ea0f7b40:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 61212 - Sending queue message
DEBUG(MassTransit.Messages) 61214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-2bed-08d0ea0f7ebb:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 81213 - Sending queue message
DEBUG(MassTransit.Messages) 81215 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-5f44-08d0ea0f8236:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 101212 - Sending queue message
DEBUG(MassTransit.Messages) 101214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-80f8-08d0ea0f85b1:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 121212 - Sending queue message
DEBUG(MassTransit.Messages) 121213 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-a971-08d0ea0f892c:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 141212 - Sending queue message
DEBUG(MassTransit.Messages) 141214 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-d53e-08d0ea0f8ca7:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 161212 - Sending queue message
DEBUG(MassTransit.Messages) 172109 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-7504-08d0ea0f9208:MyApp.Transit.SimpleTextMessage, MyApp
DEBUG(Global) 181212 - Sending queue message
DEBUG(MassTransit.Messages) 193461 - SEND:rabbitmq://localhost/MyApp.Transit:SimpleTextMessage:935a0000-5d93-0015-dd26-08d0ea0f95bf:MyApp.Transit.SimpleTextMessage, MyApp
Was it helpful?

Solution

It seems, that the easiest way is to bind the python queue to the exchange in RabbitMq management. After doing it I've sucessfully recieved the messages.

PyhonConsumer now looks the following way:

import pika

print('Stating consumer')

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare('python_consumer_1')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.queue_bind(queue='python_consumer_1', exchange='MyApp.Transit:SimpleTextMessage')
channel.basic_consume(callback, queue='python_consumer_1')
channel.start_consuming()

OTHER TIPS

If you're going to consume messages from another language, you need to look at how the exchanges are created when MassTransit publishes a message. Then, you will need to bind those exchanges to your queues so that messages are delivered to your subscribers.

For your Python code, you need to

exchange_bind(".....:SimpleTextMessage", "phython_consumer_1")

Once you've done that, the messages will be delivered to your queue. You're using BSON, why not use JSON or something that python works with easily? Honestly I'm not sure if Python supports BSON or not, just trying to offer other suggestions.

You could check existing rabbitmq exchanges, queues using rabbitmqctl admin utility, and then play from it's results.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top