I'm trying to use Q within NodeJS to create some amqplib wrappers. The wrappers are working correctly (so far), but I have the feeling my usage of Q is... incorrect.
First, there's an initialization method:
private static Startup(): void {
var sub_YoMsgHandler = (msg: any) => {
console.log(util.format('Received Yo: %s', msg.content.toString()));
var bmy: dto.interfaces.IBusManifestYo = JSON.parse(msg.content.toString());
}
var sub_TelemetryMsgHandler = (msg: any) => {
var bmy: dto.interfaces.IAsyncProcessorCommand = JSON.parse(msg.content.toString());
console.log(util.format('Received Telemetry: %s', msg.content.toString()));
}
Play.AMQ.Open.then((connection) => {
Play.AMQ.ConfirmChannel = connection.createConfirmChannel();
Play.AMQ.ConfirmChannel.then((confirmChannel) => {
confirmChannel.on('error', Play.handleChannelError);
Play.AMQ.CommandQueue = confirmChannel.assertQueue('AsyncProcessorCommandQueue', { durable: true, exclusive: false, autoDelete: false });
Play.AMQ.TelemetryQueue = confirmChannel.assertQueue('AsyncProcessorTelemetryQueue', { durable: true, exclusive: false, autoDelete: false });
Play.ReceiveTelemetry(sub_TelemetryMsgHandler);
Play.CreateConsumer('Node', dto.BusManifestYo.Type, sub_YoMsgHandler).then((consumerTag) => {
//track consumer tags in CreateConsumer?
Play.AMQ.Subscribers.push(consumerTag);
});
});
});
}
Did I mention I'm using TypeScript? That method is connecting, creating a channel, creating two send/receive queues, creating two subscribers - and then saving the connection, channel and queue promises into an object. Then here's one method for creating a subscriber (consumer):
private static CreateConsumer(name: string, type: string, handler: (msg: any) => void): Q.Promise<string> {
var qid = type + '_' + name;
return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
return confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
return confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
return confirmChannel.bindQueue(qid, type, '').then((okBindReply) => {
return confirmChannel.consume(qid, (msg) => {
handler(msg);
confirmChannel.ack(msg);
});
});
});
});
},
(failReason) => {
throw new Error('create consumer issue: ' + failReason);
});
}
Finally, here's my method for publishing:
private static Publish(obj: dto.interfaces.IPublishable): Q.Promise<boolean> {
var ackDeferred = Q.defer<boolean>();
var handleChannelConfirm = (err, ok): void => {
if (err !== null) {
console.warn('Message nacked!');
ackDeferred.resolve(false);
}
else {
console.log('Message acked');
ackDeferred.resolve(true);
}
}
// '#' instead of ''?
Play.AMQ.ConfirmChannel.then((confirmChannel) => {
confirmChannel.assertExchange(obj.GetType(), 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
confirmChannel.publish(obj.GetType(), '', Play.ToBuffer(obj), { type: obj.GetType() }, handleChannelConfirm);
});
},
(failReason) => {
throw new Error('create consumer issue: ' + failReason);
});
return ackDeferred.promise;
}
As I said, there are all working but it feels like I'm not using the promises in the correct or recommended way.
Are there any glaring missteps in here - or am I doing it right? Specifically, I guess, I'm curious about my chaining and error handling (I think the error handling is especially likely to be wrong). Bonus points for showing me the correct way in that publish method for taking the callback style handler and Promise-ifying it...