Question

I would like to know how to delay with Amqpphplib.

I used this great coffee script tutorial :

https://github.com/jamescarr/rabbitmq-scheduled-delivery

but it doesn't seems to work with PHP-amqplib.

The message expires as I want, but it seems that "x-dead-letter-exchange" don't do the work. I used RabbitMQ management console and I see all queue creation and deletion in live. But my message do go to the immediate queue after expiring. I use RabbitMQ 3.2.3 version, PHP-amqplib 2.2.* version.

Here is my code :

Connection class :

class Connection
{
/**
 * @var $ch
 */
public $ch;

/**
 * @var $consumer_tag
 */
public $consumer_tag;

/**
 * @var $exchange
 */
public $exchange;

/**
 * @var $conn
 */
public $conn;

public function __construct($host, $port, $user, $password, $vhost)
{

    $this->exchange = 'immediate';
    $this->queue = 'right.now.queue';
    $this->consumer_tag = 'consumer';


    $this->conn = new AMQPConnection($host, $port, $user, $password, $vhost);
    $this->ch = $this->conn->channel();

    $this->ch->exchange_declare($this->exchange, 'direct', false, true, false);

    $this->ch->queue_declare($this->queue, false, true, false, false, false);

    $this->ch->queue_bind($this->queue, $this->exchange);


}

public function createDelayedQueue ($name, $delay_seconds) {
    $this->ch->queue_declare($name, false, false, false, true, true, array(
        "x-dead-letter-exchange" => array("S", $this->exchange),
        "x-message-ttl" => array("I", $delay_seconds*1000),
        "x-expires" => array("I", $delay_seconds*1000+1000)
    ));
}
}

Publish code

$name = 'send.later.'.$ts;
$amqp->createDelayedQueue($name, 2);
$msg = new AMQPMessage($msg_body, array('content_type' => 'text/plain', 'delivery_mode' => 2));
$amqp->ch->basic_publish($msg);

Consumer code

$amqp = $this->getContainer()->get('amqp_connexion');

    $amqp->ch->basic_consume($amqp->queue, $amqp->consumer_tag, false, false, false, false, function ($msg) {

        echo $msg->body;
        echo "\n--------\n";
    });

    $output->writeln('Listening '.$amqp->queue.'...');

    // Loop as long as the channel has callbacks registered
    while (count($amqp->ch->callbacks)) {
        $amqp->ch->wait();
    }
Was it helpful?

Solution

I just wrote a simplified working version for php:

/////// simplified ///////

// include the AMQPlib Classes || use an autoloader

// queue/exchange names
$queueRightNow = 'right.now.queue';
$exchangeRightNow = 'right.now.exchange';
$queueDelayed5sec = 'delayed.five.seconds.queue';
$exchangeDelayed5sec = 'delayed.five.seconds.exchange';

$delay = 5; // delay in seconds

// create connection
$AMQPConnection = new \PhpAmqpLib\Connection\AMQPConnection('localhost',5672,'guest','guest');

// create a channel
$channel = $AMQPConnection->channel();

// create the right.now.queue, the exchange for that queue and bind them together
$channel->queue_declare($queueRightNow);
$channel->exchange_declare($exchangeRightNow, 'direct');
$channel->queue_bind($queueRightNow, $exchangeRightNow);

// now create the delayed queue and the exchange
$channel->queue_declare(
        $queueDelayed5sec,
        false,
        false,
        false,
        true,
        true,
        array(
            'x-message-ttl' => array('I', $delay*1000),   // delay in seconds to milliseconds
            "x-expires" => array("I", $delay*1000+1000),
            'x-dead-letter-exchange' => array('S', $exchangeRightNow) // after message expiration in delay queue, move message to the right.now.queue
        )
);
$channel->exchange_declare($exchangeDelayed5sec, 'direct');
$channel->queue_bind($queueDelayed5sec, $exchangeDelayed5sec);

// now create a message und publish it to the delayed exchange
$msg = new \PhpAmqpLib\Message\AMQPMessage(
    time(),
    array(
        'delivery_mode' => 2
    )
);
$channel->basic_publish($msg,$exchangeDelayed5sec);


// consume the delayed message
$consumeCallback = function(\PhpAmqpLib\Message\AMQPMessage $msg) {
    $messagePublishedAt = $msg->body;
    echo 'seconds between publishing and consuming: '
        . (time()-$messagePublishedAt) . PHP_EOL;
};
$channel->basic_consume($queueRightNow, '', false, true, false, false, $consumeCallback);

// start consuming
while (count($channel->callbacks) > 0) {
    $channel->wait();
}

OTHER TIPS

If you choose amqp interop based transport you won't need to dig into details at all. Only a few things to do:

Install enqueue/amqp-lib (btw you can use other transports based on amqp ext and a great bunny lib) transport and enqueue/amqp-tools.

composer require enqueue/amqp-lib enqueue/amqp-tools

Create amqp context, add a delay strategy and send delayed messages:

<?php
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;

$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDlxDelayStrategy())

$queue = $context->createQueue('foo');
$context->declareQueue($queue);

$message = $context->createMessage('Hello world!');

$context->createProducer()
    ->setDeliveryDelay(5000) // 5 sec
    ->send($queue, $message)
;

By the way, this not this only strategy available. there is one based on RabbitMQ delay plugin. It could be used the same way.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top