Question

I'd already asked this question regarding publishing from amqplib --> EasyNetQ, and got it working with help from the author of EasyNetQ.

Now, I'm having trouble going the other way.

It did briefly "work", but then i went back and cleaned-out all the queues I'd created and now - it won't work (publishing from amqplib to ENQ still works, but ENQ to amqplib doesn't).

If I have this code:

Bus.SubscribeAsync<BusManifestHolla>(HollaSubID_1,
    msg => Task.Factory.StartNew(() => {
        Console.WriteLine("LOOK===> Received Manifest Holla ID {0}", msg.ManifestID.ToString());
        Console.WriteLine("LOOK===> Responding with Manifest Yo ID {0}", HollaSubID_1);
        Bus.PublishAsync(new BusManifestYo { ManifestID = msg.ManifestID, ServiceName = HollaSubID_1 });
    })
);

What do I need to plug-in in the Node/amqplib below to subscribe/consume it?

Play.AMPQ.then((connection) => {
    return connection.createChannel().then((channel) => {
        return channel.assertExchange(dto.BusManifestYo.Type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
            return channel.assertQueue(dto.BusManifestYo.Type).then((ok) => {
                return channel.consume(ok.queue, (msg) => {
                    console.log(util.format('Received message: %s', msg.content.toString()));
                    var bmy: dto.interfaces.IBusManifestYo = JSON.parse(msg.content.toString());

                    channel.ack(msg);
                });
            });
        });
    });
});

UPDATE

If I have EasyNetQ first create the Queue (that it will publish to), and then remove the 'assertQueue' call in Node (so it doesn't bork the queue), and then follow the naming conventions - it works. Of course, this is not a real solution, but maybe it'll help someone point to a solution?

UPDATE #2

Well, apparently I needed to bind the queue to the exchange. Here's the new working code:

Play.AMPQ.then((connection) => {
    return connection.createChannel().then((channel) => {
        channel.on('error', Play.handleChannelError);

        return channel.assertQueue(dto.BusManifestYo.Type + '_Node', { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
            return channel.assertExchange(dto.BusManifestYo.Type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
                return channel.bindQueue(dto.BusManifestYo.Type + '_Node', dto.BusManifestYo.Type, '#').then((okBindReply) => {
                    return channel.consume(dto.BusManifestYo.Type + '_Node', (msg) => {
                        console.log(util.format('Received message: %s', msg.content.toString()));
                        var bmy: dto.interfaces.IBusManifestYo = JSON.parse(msg.content.toString());

                        channel.ack(msg);
                    });
                });
            });
        });
    });
});

One thing that's not clear to me is where I set the pattern on the bind to '#'. It's working, but I only put that because I saw ENQ use that, and other values don't seem to work so...

Was it helpful?

Solution

EasyNetQ has it's own exchange-binding-queue conventions. When it subscribes, it does as you've discovered:

  1. Creates a topic exchange named after the messages type.
  2. Creates a queue named after the message type plus subscription id.
  3. Binds them with the '#' binding key.

The reason it uses a topic exchange (rather than direct) is to support topic routing, that's why we bind with '#' (give me everything) binding. If you don't publish with a topic, then the message is published with a blank routing key, and that will only get routed to '#' bindings.

I hope this helps.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top