Question

I'm replicating EasyNetQ functionality in NodeJS (so that a Node app can communicate with over Rabbit with an EasyNetQ enabled .NET app). I've replicated EasyNetQ's Publish/Subscribe and EasyNetQ's Send/Receive, but i'm having some difficulty with EasyNetQ's Request/Response.

Here is my current Node code:

var rqrxID = uuid.v4(); //a GUID
var responseQueue = 'easynetq.response.' + rqrxID;

Q(Play.AMQ.ConfirmChannel.assertQueue(responseQueue, { durable: false, exclusive: true, autoDelete: true }))
.then((okQueueReply) =>
    Play.AMQ.ConfirmChannel.consume(responseQueue, (msg) => {
        //do something here...
        Play.AMQ.ConfirmChannel.ack(msg);
    })
)
.then((okSubscribeReply) => {
    Q(Play.AMQ.ConfirmChannel.assertExchange('easy_net_q_rpc', 'direct', { durable: true, autoDelete: false }))
    .then((okExchangeReply) =>
        Play.AMQ.ConfirmChannel.publish(
            global.AppConfig.amq.rpc.exchange,
            dto.AsyncProcessorCommand.Type,
            Play.ToBuffer(command),
            { type: command.GetType() },
            (err, ok): void => {
                if (err !== null) {
                    console.warn('Message nacked!');
                    responseDeferred.reject(err);
                }
            }
        )
    )
})
.catch((failReason) => {
    console.error(util.format('Error creating response queue: %s', failReason));
    return null;
});

Note that the publish works and is received by the .NET code. That code then sends a response and the issue is that the response isn't received. Here's the .NET code:

Bus.Respond<AsyncProcessorCommand, AsyncProcessorCommandResponse>(
    request =>
    {
        Console.WriteLine("Got request: '{0}'", request);
        return new AsyncProcessorCommandResponse()
        {
            ID = Guid.NewGuid(),
            ResponseType = "ENQResp"
        };
    });

I'm sure I'm missing something, but not sure what. Who can help?

UPDATE I have solved at least part of this. Taking the value of responseQueue and setting that into the options for publish as "replyTo" hooks the response up - nice. Now I just have to figure out how to either not create a new queue each time OR, make the response queue go away...

UPDATE FINAL So, using the channel setup I had and saving the cinsumerTag (actually, specifying it) allowed me to cancel the consumer and the queue auto-deleted.

Was it helpful?

Solution

Taking my comments from above to answer this.

There are two pieces to this. First, from the code above, create your response queue so that it auto-deletes (when the consumer count drops to 0):

channel.assertQueue(responseQueue, { durable: false, exclusive: true, autoDelete: true }))

Then create/publish to the queue the "server" is listening on - making sure to set "replyTo" for the response queue you just created (the type piece is another bit of ENQ-needed code):

{ type: command.GetType(), replyTo: responseQueue }

So an entire (currently messy as it's "play" code) method for executing this pattern looks like:

private static Request(command: dto.AsyncProcessorCommand): Q.Promise<dto.interfaces.IAsyncProcessorCommandResponse> {
    var responseDeferred = Q.defer<dto.interfaces.IAsyncProcessorCommandResponse>();

    var consumerTag = uuid.v4();
    var rqrxID = uuid.v4();
    var responseQueue = 'easynetq.response.' + rqrxID;

    var handleResponse = (msg: any): void => {
        var respType = null;
        switch(command.Action) {
            default:
                respType = 'testResp';
        }

        //just sending *something* back, should come from 'msg'
        responseDeferred.resolve(new dto.AsyncProcessorCommandResponse(respType, { xxx: 'yyy', abc: '123' }));
    }

    Q(Play.AMQ.ConfirmChannel.assertQueue(responseQueue, { durable: false, exclusive: true, autoDelete: true }))
        .then((okQueueReply) =>
            Play.AMQ.ConfirmChannel.consume(responseQueue, (msg) => {
                handleResponse(msg);
                Play.AMQ.ConfirmChannel.ack(msg);
                Play.AMQ.ConfirmChannel.cancel(consumerTag);
            },
            { consumerTag: consumerTag })
        )
        .then((okSubscribeReply) => {
            Q(Play.AMQ.ConfirmChannel.assertExchange('easy_net_q_rpc', 'direct', { durable: true, autoDelete: false }))
            .then((okExchangeReply) =>
                Play.AMQ.ConfirmChannel.publish(
                    'easy_net_q_rpc',
                    dto.AsyncProcessorCommand.Type,
                    Play.ToBuffer(command),
                    { type: command.GetType(), replyTo: responseQueue },
                    (err, ok): void => {
                        if (err !== null) {
                            console.warn('Message nacked!');
                            responseDeferred.reject(err);
                        }
                    }
                )
            )
        })
        .catch((failReason) => {
            console.error(util.format('Error creating response queue: %s', failReason));
            return null;
        });

    return responseDeferred.promise
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top