Question

The way of setting up magento queues in M2 EE 2.2.* is different from how it used to be in version prior to 2.2.*. The queue.xml is deprecated and there are 3 new configuration files queue_consumer.xml, queue_publisher.xml and queue_topology.xml.

I've worked on a project to integrate AWS S3 Buckets with Magento. This task uses various cool technologies one of them is RabbitMQ.

So below is an answer which might or might not save some time on setting up message queues in Magento 2.2.*

Was it helpful?

Solution

Pre-requirements

  1. RabbitMQ
  2. Magento EE (If you're in CE you can use this extension, this answer will not help you setting up with that extension.)
  3. Know the basics of the MQ protocol here a useful diagram

How to get started?

Everything starts from the communication.xml file (I assume you're creating a new module), its where you define the topics for the exchanges, topics are used to bind to queues. Here is sample:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue.xsd">
    <topic name="productImport.topic" request="string" />
</config>

We now need to tell magento what port and host RabbitMQ is on in env.php.

'queue' => 
  array (
    'amqp' => 
    array (
      'host' => '127.0.0.1',
      'port' => '5672',
      'user' => 'username',
      'password' => 'password',
      'virtualhost' => '/',
      'ssl' => '',
    ),
  ),

Then queue_topology.xml, this file is responsible for creating routing rules, queues and exchanges.

<?xml version="1.0"?>
    <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_topology.xsd">
        <exchange name="product-import-exchange1" type="topic" connection="amqp">
            <binding id="productImportBinding1" topic="productImport.topic" destinationType="queue" destination="productImport-queue"/>
        </exchange>
    </config>

Then we have the queue_publisher.xml, this determines which exchange and connection adapter connection to use for a specific topic.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_publisher.xsd">
    <publisher topic="productImport.topic">
        <connection name="amqp" exchange="porduct-import-exchange1" />
    </publisher>
</config>

Up to this point once you do a setup:update, you should be able to see product-import-exchange1 in the exchange list and productImport-queue in the list of queues in rabbitMQ, you don't need to manually setup queue or exchanges in rabbitMQ as Magento EE does this for you already.

Now we need to create a publisher class, this class will be responsible to publish new message to our queue in rabbitMQ. It would look like something like this

class ImagePublisher
{
    const TOPIC_NAME = "productImport.topic";

    /**
     * @var PublisherInterface
     */
    protected $publisher;

    /**
     * ImagePublisher constructor.
     *
     * @param   PublisherInterface  $publisher
     */
    public function __construct(
        PublisherInterface $publisher
    ) {
        $this->publisher = $publisher;
    }

    /**
     * Build and publishes message to RabbitMQ.
     *
     * @param   array   $data
     * @return  void
     */
    public function publish(array $data)
    {
        $this->publisher->publish(self::TOPIC_NAME, json_encode($data));
    }
}

Make sure the data type you are sending matches the data set in communication.xml request attribute. You could do something like this to test if rabbitMQ is receiving any messages.

$count = 1;
while ($count <= 10000) {
   // @var ImagePublisher $publisher
   $publisher->publish(array("id" => $count));
   $count++;
}

The last thing you need to do is to create a consumer or consumers. You define consumers in queue_consumer.xml. Here what it would look like:

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue_consumer.xsd">
    <consumer name="productimport.consumer.one" queue="productImport-queue" connection="amqp" handler="CLASS_TO_PROCESS_MESSAGE::METHOD"/>
</config>

Now check that the consumer exists bin/magento queue:consumers:list, also make sure you build your consumer class, which is defined in the handler attribute of the queue_consumer.xml.

To start the consumer you use the following: bin/magento queue:consumers:start CONSUMER_NAME.

References

RabbitMQ Tutorials

Magento 2.2.2 Guide

Magento Migrate 2.1 Queues to 2.2

Getting Started with rabbitMQ and PHP

Also have a look at the modules module-scalable-checkout and module-scalable-inventory.

Licensed under: CC-BY-SA with attribution
Not affiliated with magento.stackexchange
scroll top