Question

This is the thing.

I am reading results queue from Rabbitmq using PHP AMQP in order to process vital information on every email sent. After this is done, i need to delete or mark that message as written so the next time i read the queue i don't get the messages already processed.

As the Rabbitmq server is sending over 10.000 emails an hour, every time i read the queue to process result sendings, the script can be running for at lease 5 minutes in order to process all messages in the queue, so after it is done, several hundred of new messages were places during those 5 minutes. That makes impossible for me to purge the queue after script finishes because it would delete messages places during the script running which where not processed.

That leaves me with only one choice. Mark or delete a message just after being processed or read by my AMQP script.

Is there a way to do that? (here is the script)

<?php
/**
 *  result.php
 *  Script that connects to RabbitMQ, and takes the result message from
 *  the result message queue.
 */

// include the settings
 require_once('settings.php');

// try to set up a connection to the RabbitMQ server
try
{
    // construct the connection to the RabbitMQ server
    $connection = new AMQPConnection(array(
        'host'      =>  $hostname,
        'login'     =>  $username,
        'password'  =>  $password,
        'vhost'     =>  $vhost
    ));

    // connect to the RabbitMQ server
    $connection->connect();
}
catch (AMQPException $exception)
{
    echo "Could not establish a connection to the RabbitMQ server.\n";
}

// try to create the channel
try
{
    // open the channel
    $channel = new AMQPChannel($connection);
}
catch (AMQPConnectionException $exception)
{
    echo "Connection to the broker was lost (creating channel).\n";
}

// try to create the queue
try
{
    // create the queue and bind the exchange
    $queue   = new AMQPQueue($channel);
    $queue->setName($resultbox);
    $queue->setFlags(AMQP_DURABLE);
    $queue->bind('exchange1', 'key1');
    $queue->declare();
}
catch (AMQPQueueException $exception)
{
    echo "Channel is not connected to a broker (creating queue).\n";
}
catch (AMQPConnectionException $exception)
{
    echo "Connection to the broker was lost. (creating queue)/\n";
}

// Get the message from the queue. 
while ($envelope = $queue->get()) {
    //Function that processes the message
    process_message($envelope->getBody());
}
    $queue->purge();

// done, close the connection to RabbitMQ
$connection->disconnect();
?>
Was it helpful?

Solution

Acknowledge message(s) $queue->ack() after successful processing or even consume/get them with AMQP_AUTOACK flag.

UPD:

Based on your code:

1. Ack'ing message
while ($envelope = $queue->get()) {
    //Function that processes the message
    process_message($envelope->getBody());
    $queue->ack($envelope->getDeliveryTag());
}
2. Getting it with AMQP_AUTOACK flag:
while ($envelope = $queue->get(AMQP_AUTOACK)) {
    //Function that processes the message
    process_message($envelope->getBody());
}

P.S.:

Check AMQPQueue::consume documentation, looks like it is more suitable here.

3. You can consume and ack message after it been processed:
$queue->consume(function ($envelope, $queue) {
        process_message($envelope->getBody());
        $queue->ack($envelope->getDeliveryTag());
});
4. or consume with AMQP_AUTOACK flag, but when procesing fails you won't be able to process message again:
$queue->consume(function ($envelope, $queue) {
        process_message($envelope->getBody());
        $queue->ack($envelope->getDeliveryTag());
}, AMQP_AUTOACK);

Conclusion: I would recommend to use #3 solution, but it's up to you.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top