Pregunta

I'm trying to use Q within NodeJS to create some amqplib wrappers. The wrappers are working correctly (so far), but I have the feeling my usage of Q is... incorrect.

First, there's an initialization method:

private static Startup(): void {
    var sub_YoMsgHandler = (msg: any) => {
        console.log(util.format('Received Yo: %s', msg.content.toString()));
        var bmy: dto.interfaces.IBusManifestYo = JSON.parse(msg.content.toString());
    }

    var sub_TelemetryMsgHandler = (msg: any) => {
        var bmy: dto.interfaces.IAsyncProcessorCommand = JSON.parse(msg.content.toString());
        console.log(util.format('Received Telemetry: %s', msg.content.toString()));
    }

    Play.AMQ.Open.then((connection) => {
        Play.AMQ.ConfirmChannel = connection.createConfirmChannel();
        Play.AMQ.ConfirmChannel.then((confirmChannel) => {
            confirmChannel.on('error', Play.handleChannelError);

            Play.AMQ.CommandQueue = confirmChannel.assertQueue('AsyncProcessorCommandQueue', { durable: true, exclusive: false, autoDelete: false });
            Play.AMQ.TelemetryQueue = confirmChannel.assertQueue('AsyncProcessorTelemetryQueue', { durable: true, exclusive: false, autoDelete: false });
            Play.ReceiveTelemetry(sub_TelemetryMsgHandler);

            Play.CreateConsumer('Node', dto.BusManifestYo.Type, sub_YoMsgHandler).then((consumerTag) => {
                //track consumer tags in CreateConsumer?
                Play.AMQ.Subscribers.push(consumerTag);
            });
        });
    });
}

Did I mention I'm using TypeScript? That method is connecting, creating a channel, creating two send/receive queues, creating two subscribers - and then saving the connection, channel and queue promises into an object. Then here's one method for creating a subscriber (consumer):

private static CreateConsumer(name: string, type: string, handler: (msg: any) => void): Q.Promise<string> {
    var qid = type + '_' + name;

    return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        return confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
            return confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
                return confirmChannel.bindQueue(qid, type, '').then((okBindReply) => {
                    return confirmChannel.consume(qid, (msg) => {
                        handler(msg);
                        confirmChannel.ack(msg);
                    });
                });
            });
        });
    },
    (failReason) => {
        throw new Error('create consumer issue: ' + failReason);
    });
}

Finally, here's my method for publishing:

 private static Publish(obj: dto.interfaces.IPublishable): Q.Promise<boolean> {
    var ackDeferred = Q.defer<boolean>();

    var handleChannelConfirm = (err, ok): void => {
        if (err !== null) {
            console.warn('Message nacked!');
            ackDeferred.resolve(false);
        }
        else {
            console.log('Message acked');
            ackDeferred.resolve(true);
        }
    }

    // '#' instead of ''?
    Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        confirmChannel.assertExchange(obj.GetType(), 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
            confirmChannel.publish(obj.GetType(), '', Play.ToBuffer(obj), { type: obj.GetType() }, handleChannelConfirm);
        });
    },
    (failReason) => {
        throw new Error('create consumer issue: ' + failReason);
    });

    return ackDeferred.promise;
}

As I said, there are all working but it feels like I'm not using the promises in the correct or recommended way.

Are there any glaring missteps in here - or am I doing it right? Specifically, I guess, I'm curious about my chaining and error handling (I think the error handling is especially likely to be wrong). Bonus points for showing me the correct way in that publish method for taking the callback style handler and Promise-ifying it...

¿Fue útil?

Solución

If Q follows the promise spec it should work like that

return Play.AMQ.ConfirmChannel
.then(confirmChannel => confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }))
.then(okQueueReply => confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }))
.then(okExchangeReply => confirmChannel.bindQueue(qid, type, ''))
.then(okBindReply => confirmChannel.consume(qid))
.then(msg => {
    handler(msg);
    confirmChannel.ack(msg);
});

And it makes sense to return something from the last then too.

The callback in the then method returns a new promise so you can chain them without nesting callbacks.

Otros consejos

To summerize your idea,

/*
Open() // connecting...
    .then confirmChannel()  //create a channel
        .then function () {
            commandQueue(); //create send queue?
            TelemetryQueue(); // create recv queue?
        }
            .then createConsumer(); //create two subsribers?


.catch(function (err) {
    The most outter error handler.
})
.end() terminate the promise chain
*/

I will go through the functions one by one

Play.AMQ.Open.then((connection) => {

        Play.AMQ.ConfirmChannel = connection.createConfirmChannel();
        return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        //since ConfirmChannel() is a promise, it means channel is working I assume if you can run this callback
        //confirmChannel.on('error', Play.handleChannelError); 

            //No idea following codes are promise or not...
        //Assuming that following 3 lines won't be fail
                Play.AMQ.CommandQueue = confirmChannel.assertQueue('AsyncProcessorCommandQueue', { durable: true, exclusive: false, autoDelete: false });
                Play.AMQ.TelemetryQueue = confirmChannel.assertQueue('AsyncProcessorTelemetryQueue', { durable: true, exclusive: false, autoDelete: false });
                Play.ReceiveTelemetry(sub_TelemetryMsgHandler);

        //Chainning promise....
        //Btw... I am curious why you call a `prviate` method directly
                return Play.CreateConsumer('Node', dto.BusManifestYo.Type, sub_YoMsgHandler).then((consumerTag) => {
                    //track consumer tags in CreateConsumer?
                    Play.AMQ.Subscribers.push(consumerTag);
                });
        });

})
.end(); //Catch any un-handled errors and terminate the Q chain (EG error from confirmChannel())



private static CreateConsumer(name: string, type: string, handler: (msg: any) => void): Q.Promise<string> {
    var qid = type + '_' + name;

    //I suggest you put `confirmChannel` as one of the input arguments since this method will only be called after open() and confirmChannel()
    return Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        return confirmChannel.assertQueue(qid, { durable: true, exclusive: false, autoDelete: false }).then((okQueueReply) => {
            return confirmChannel.assertExchange(type, 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
                return confirmChannel.bindQueue(qid, type, '').then((okBindReply) => {
                    return confirmChannel.consume(qid, (msg) => {
                        handler(msg);
                        confirmChannel.ack(msg);
                    });
                });
            });
        });
    }
    //I am not familiarize syntax on TypeScript, I think below error handler is an input argument of Play.AMQ.ConfirmChannel.then()
    //If so, this error handler can handler error from ConfirmChannel() only.
    ,
    (failReason) => {
    throw new Error('create consumer issue: ' + failReason);
    });

}



/*
I am not sure the intent (or command flow?) of your below function.

1. If ConfirmChannel() fail, I think your program will crash since no one handling below error
    throw new Error('create consumer issue: ' + failReason);

2. No idea to figure out the relationship between 2 promises ackDeferred and Play.AMQ.ConfirmChannel
**/
private static Publish(obj: dto.interfaces.IPublishable): Q.Promise<boolean> {
    var ackDeferred = Q.defer<boolean>();

    var handleChannelConfirm = (err, ok): void => {
    if (err !== null) {
        console.warn('Message nacked!');
        ackDeferred.resolve(false);
    }
    else {
        console.log('Message acked');
        ackDeferred.resolve(true);
    }
    }

    // '#' instead of ''?
    Play.AMQ.ConfirmChannel.then((confirmChannel) => {
        confirmChannel.assertExchange(obj.GetType(), 'topic', { durable: true, autoDelete: false }).then((okExchangeReply) => {
        confirmChannel.publish(obj.GetType(), '', Play.ToBuffer(obj), { type: obj.GetType() }, handleChannelConfirm);
    });
    },
    (failReason) => {
    throw new Error('create consumer issue: ' + failReason);
    });

    return ackDeferred.promise;
}

Hope it helps.

By the way.... How to turn on the syntax highlight :D?

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top