I would like to create a queue derivative, where I can use a single QueueWorker class to process multiple queues, but I am not able to achieve it. Any help would be greatly appreciated.

The module structure is the following.

queue_examples/
├── queue_examples.info.yml
├── queue_examples.routing.yml
└── src
    ├── Form
    │   └── QueueDataImportForm.php
    └── Plugin
        ├── Derivative
        │   └── QueueDataProcessorDerivative.php
        └── QueueWorker
            └── QueueDataProcessor.php

The code I am using is the following.

QueueDataImportForm.php

namespace Drupal\queue_examples\Form;

use Drupal\Core\Batch\BatchBuilder;
use Drupal\Core\Form\FormBase;
use Drupal\Core\Form\FormStateInterface;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\ClientInterface;
use Drupal\Core\Url;
use Drupal\Core\Link;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Component\Serialization\Json;
use Drupal\Core\Queue\QueueWorkerManager;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Component\Datetime\Time;
use Drupal\Component\Utility\Unicode;

/**
 * Configure queue_examples settings for this site.
 */
class QueueDataImportForm extends FormBase {

  /**
   * The HTTP client to fetch the feed data with.
   *
   * @var \GuzzleHttp\ClientInterface
   */
  protected $httpClient;

  /**
   * Batch Builder.
   *
   * @var \Drupal\Core\Batch\BatchBuilder
   */
  protected $batchBuilder;

  /**
   * Drupal\Component\Datetime\Time instance.
   *
   * @var \Drupal\Component\Datetime\Time
   */
  public $time;

  /**
   * Drupal\Core\Queue\QueueFactory instance.
   *
   * @var \Drupal\Core\Queue\QueueFactory
   */
  protected $queueFactory;

  /**
   * The module handler service.
   *
   * @var \Drupal\Core\Extension\ModuleHandlerInterface
   */
  protected $moduleHandler;

  /**
   * LocationsBatchImportForm constructor.
   */
  public function __construct(ClientInterface $http_client, QueueFactory $queue_factory, QueueWorkerManager $queue_manager, ModuleHandlerInterface $module_handler, Time $time) {

    $this->httpClient = $http_client;
    $this->batchBuilder = new BatchBuilder();
    $this->queue_factory = $queue_factory;
    $this->queue_manager = $queue_manager;
    $this->moduleHandler = $module_handler;
    $this->time = $time;

  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container) {
    return new static(
      $container->get('http_client'),
      $container->get('queue'),
      $container->get('plugin.manager.queue_worker'),
      $container->get('module_handler'),
      $container->get('datetime.time')

    );
  }

  /**
   * {@inheritdoc}
   */
  public function getFormId() {
    return 'queue_examples_batch_form';
  }

  /**
   * {@inheritdoc}
   */
  public function buildForm(array $form, FormStateInterface $form_state) {


    $form['help'] = [
      '#markup' => $this->t('This form is used to get the location data from remote URL and push data to queue in Batches.'),
    ];

    $sourceUrl = 'https://raw.githubusercontent.com/dr5hn/countries-states-cities-database/master/countries.json';

    $link = Link::fromTextAndUrl('here', Url::fromUri($sourceUrl, ['attributes' => ['target' => '_blank']]));

    $locations = [
      'countries' => $this->t('Countries'),
      'states' => $this->t('States'),
      'cities' => $this->t('Cities'),
      'cities_one' => $this->t('Cities One'),
    ];

    $form['locations_list'] = [
      '#title' => $this->t('Select Location Type.'),
      '#type' => 'select',
      '#options' => $locations,
      '#empty_option' => $this->t('Choose a Location'),
      '#required' => TRUE,
    ];

    $form['actions'] = ['#type' => 'actions'];
    $form['actions']['run'] = [
      '#type' => 'submit',
      '#value' => $this->t('Import Locations'),
      '#button_type' => 'primary',
    ];

    return $form;
  }

  /**
   * {@inheritdoc}
   */
  public function validateForm(array &$form, FormStateInterface $form_state) {
    // Check whether the Location Type is valid or not.
    $location = $form_state->getValue('locations_list');
    if (!in_array($location, ['countries', 'states', 'cities'], TRUE)) {
      $form_state->setErrorByName('locations_list', $this->t('The selected location type %location is invalid.', ['%location' => $location]));
    }
  }

  /**
   * {@inheritdoc}
   */
  public function submitForm(array &$form, FormStateInterface $form_state) {

    $data = [];

    $link = Link::fromTextAndUrl('here', Url::fromRoute('queue_examples.queue_data_import_form'));

    $location = $form_state->getValue('locations_list');

    if (!empty($location)) {

      $remoteUrl = 'https://raw.githubusercontent.com/dr5hn/countries-states-cities-database/master/' . $location . '.json';

      $data = $this->fetchData($remoteUrl);

      if (!empty($data)) {

        $decodedData = Json::decode($data);

        $this->batchBuilder
          ->setTitle($this->t('Processing @location Batch...', ['@location' => Unicode::ucfirst($location)]))
          ->setInitMessage($this->t('Initializing Batch...'))
          ->setProgressMessage($this->t('Completed @current of @total. Estimated remaining time: @estimate.'))
          ->setErrorMessage($this->t('An error has occurred.Click @link to return to form', ['@link' => $link->toString()]));
        $this->batchBuilder->addOperation([$this, 'processItems'], [$location, $decodedData]);
        $this->batchBuilder->setFinishCallback([$this, 'finished']);
        batch_set($this->batchBuilder->toArray());
      }
    }

  }

  /**
   * {@inheritdoc}
   */
  public function fetchData(string $url) {
    $data = [];
    try {

      // @todo check whether implementation of 'timeout' => 600
      // is correct or not in httpClient.
      $response = $this->httpClient->get($url, ['headers' => ['Content-Type' => 'application/hal+json']]);
      if ($response->getStatusCode() == 200) {
        $data = (string) $response->getBody();
      }

    }
    catch (RequestException $exception) {

      $this->messenger()->addError('Failed to download JSON data from URL due to an error check logs for more details.');

      $this->logger('queue_examples')->warning('Failed to download JSON data from URL due to "%error".', ['%error' => $exception->getMessage()]);
    }
    return $data;
  }

  /**
   * Processor for batch operations.
   */
  public function processItems($location, $items, array &$context) {
    // Elements per operation.
    $limit = 50;

    // Set default progress values.
    if (empty($context['sandbox']['progress'])) {
      $context['sandbox']['progress'] = 0;
      $context['sandbox']['max'] = count($items);
    }

    // Save items to array which will be changed during processing.
    if (empty($context['sandbox']['items'])) {
      $context['sandbox']['items'] = $items;
    }

    $counter = 0;
    if (!empty($context['sandbox']['items'])) {
      // Remove already processed items.
      if ($context['sandbox']['progress'] != 0) {
        array_splice($context['sandbox']['items'], 0, $limit);
      }

      foreach ($context['sandbox']['items'] as $item) {
        if ($counter != $limit) {
          $this->processItem($location, $item);
          $counter++;
          $context['sandbox']['progress']++;

          $context['message'] = $this->t('Now Processing Data Item :progress of :count', [
            ':progress' => $context['sandbox']['progress'],
            ':count' => $context['sandbox']['max'],
          ]);

          // Increment total processed item values. Will be used in finished
          // callback.
          $context['results']['processed'] = $context['sandbox']['progress'];
        }
      }
    }

    // If not finished
    // If not finished all tasks, we count percentage of process. 1 = 100%.
    if ($context['sandbox']['progress'] != $context['sandbox']['max']) {
      $context['finished'] = $context['sandbox']['progress'] / $context['sandbox']['max'];
    }
  }

  /**
   * Process single item.
   *
   * @param string $location
   *   Location Type.
   * @param array $item
   *   Single Location Data.
   */
  public function processItem(string $location, array $item) {

    if (!empty($item)) {

      // Create new queue item.
      $queue = $this->queue_factory->get('queue_data_processor:' . $location);
      $queue->createItem($item);
      // $queue->deleteItem($data);
      $this->logger('queue_examples')
        ->notice($this->t('@location Data Item Pushed to Queue: @data',
          [
            '@data' => implode(',', $item),
            '@location' => Unicode::ucfirst($location),
          ])
        );
    }

  }

  /**
   * Finished callback for batch.
   */
  public function finished($success, $results, $operations) {
    $message = $this->t('Number of Items processed by batch: @count', [
      '@count' => $results['processed'],
    ]);

    $this->messenger()->addStatus($message);
    $this->logger('queue_examples')->info($message);
  }

}

QueueDataProcessorDerivative.php

namespace Drupal\queue_examples\Plugin\Derivative;

use Drupal\Component\Plugin\Derivative\DeriverBase;

/**
 * The simple derivative example for Queue.
 */
class QueueDataProcessorDerivative extends DeriverBase {

  /**
   * {@inheritdoc}
   */
  public function getDerivativeDefinitions($base_plugin_definition) {

    $locations = ['countries', 'states', 'cities'];

    foreach ($locations as $location) {

      $this->derivatives[$location] = $base_plugin_definition;
      $this->derivatives[$location]['title'] = $location . 'Queue';

    }

    return $this->derivatives;
  }

}

### QueueDataProcessor.php

namespace Drupal\queue_examples\Plugin\QueueWorker;

use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Messenger\MessengerInterface;
use Drupal\Core\Database\Connection;
use Drupal\Core\StringTranslation\StringTranslationTrait;

/**
 * Processes Locations Data.
 *
 * @QueueWorker(
 *   id = "queue_data_processor",
 *   title = @Translation("Queue Data Processor."),
 *   cron = {"time" = 10},
 *   deriver = "Drupal\queue_examples\Plugin\Derivative\QueueDataProcessorDerivative",
 * )
 */
class QueueDataProcessor extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  use StringTranslationTrait;

  /**
   * The Messenger service.
   *
   * @var \Drupal\Core\Messenger\MessengerInterface
   */
  protected $messenger;

  /**
   * Logger service.
   *
   * @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
   */
  protected $logger;

  /**
   * Database service.
   *
   * @var \Drupal\Core\Database\Connection
   */

  protected $database;

  /**
   * {@inheritdoc}
   */
  public function __construct(LoggerChannelFactoryInterface $logger, MessengerInterface $messenger, Connection $connection) {
    $this->logger = $logger->get('queue_examples');
    $this->messenger = $messenger;
    $this->database = $connection;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $container->get('logger.factory'),
      $container->get('messenger'),
      $container->get('database')
    );
  }

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {

    $id = $this->getDerivativeId();

    // @todo if id = countries insert data into countries table.
    // if id = states insert data into states table.
    // if id = cities insert data into cities table.
    $this->logger->info($id . 'Queue Processed.');

    if (!empty($data)) {
      // $query = $this->database->insert($id);
      // $query->fields($data);
      // $query->execute();
      $this->logger->info($this->t('Data available to Process.'));

    }
    else {
      $this->logger->warning($this->t('No Data available to Process.'));

    }

  }

}

What's wrong in the code I am using?

I would like to insert countries data into countries table, states data into states table, and cities data into cities table. All using single queue processor file using derivatives.

I know of other way where you can conditionally check data and process the data accordingly, but I would like to achieve it using queue derivative.

I can see 3 Queues in the list when I run drush queue:list.

 ------------------------------------ ------- --------------------------------- 
  Queue                                Items   Class                            
 ------------------------------------ ------- ---------------------------------  
  queue_data_processor:countries       0       Drupal\Core\Queue\DatabaseQueue  
  queue_data_processor:states          0       Drupal\Core\Queue\DatabaseQueue  
  queue_data_processor:cities          0       Drupal\Core\Queue\DatabaseQueue  
 ------------------------------------ ------- ---------------------------------

In case it helps, I got this idea reading Drupal 8: Derivatives — множественные экземпляры плагина.

Edit 1:

The issue I am having is Id is Null in QueueDataProcessor::processItem($data)

有帮助吗?

解决方案

With the Help of Niklan I was able to resolve the issue .

The reason for $Id is NULL because of missing parent constructor call. $Id value is available once parent constructor is called.

Full code of Queue Controller as follows :

<?php

namespace Drupal\queue_examples\Plugin\QueueWorker;

use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Messenger\MessengerInterface;
use Drupal\Core\Database\Connection;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Drupal\Component\Utility\Unicode;


/**
 * Processes Locations Data.
 *
 * @QueueWorker(
 *   id = "queue_data_processor",
 *   title = @Translation("Queue Data Processor."),
 *   cron = {"time" = 10},
 *   deriver = "Drupal\queue_examples\Plugin\Derivative\QueueDataProcessorDerivative",
 * )
 */
class QueueDataProcessor extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  use StringTranslationTrait;

  /**
   * The Messenger service.
   *
   * @var \Drupal\Core\Messenger\MessengerInterface
   */
  protected $messenger;

  /**
   * Logger service.
   *
   * @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
   */
  protected $logger;

  /**
   * Database service.
   *
   * @var \Drupal\Core\Database\Connection
   */
  protected $database;

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    $instance = new static($configuration, $plugin_id, $plugin_definition);
    $instance->logger = $container->get('logger.factory')->get('queue_examples');
    $instance->messenger = $container->get('messenger');
    $instance->database = $container->get('database');
    return $instance;
  }

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {

    $plugin_id = $this->getPluginId();
    $id = $this->getDerivativeId();


    if (!empty($data)) {
      // $query = $this->database->insert($id);
      // $query->fields($data);
      // $query->execute();
      $this->logger->notice($this->t('@id Data Inserted : @data',
          [
            '@id' => Unicode::ucfirst($id),
            '@data' => implode(',', $data),
          ]
        ));

    }
    else {
      $this->logger->warning($this->t('No Data available to Process.'));

    }

  }

}
许可以下: CC-BY-SA归因
scroll top