How do I create a derivative?
-
24-02-2021 - |
题
I would like to create a queue derivative, where I can use a single QueueWorker
class to process multiple queues, but I am not able to achieve it. Any help would be greatly appreciated.
The module structure is the following.
queue_examples/
├── queue_examples.info.yml
├── queue_examples.routing.yml
└── src
├── Form
│ └── QueueDataImportForm.php
└── Plugin
├── Derivative
│ └── QueueDataProcessorDerivative.php
└── QueueWorker
└── QueueDataProcessor.php
The code I am using is the following.
QueueDataImportForm.php
namespace Drupal\queue_examples\Form;
use Drupal\Core\Batch\BatchBuilder;
use Drupal\Core\Form\FormBase;
use Drupal\Core\Form\FormStateInterface;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\ClientInterface;
use Drupal\Core\Url;
use Drupal\Core\Link;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Component\Serialization\Json;
use Drupal\Core\Queue\QueueWorkerManager;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Extension\ModuleHandlerInterface;
use Drupal\Component\Datetime\Time;
use Drupal\Component\Utility\Unicode;
/**
* Configure queue_examples settings for this site.
*/
class QueueDataImportForm extends FormBase {
/**
* The HTTP client to fetch the feed data with.
*
* @var \GuzzleHttp\ClientInterface
*/
protected $httpClient;
/**
* Batch Builder.
*
* @var \Drupal\Core\Batch\BatchBuilder
*/
protected $batchBuilder;
/**
* Drupal\Component\Datetime\Time instance.
*
* @var \Drupal\Component\Datetime\Time
*/
public $time;
/**
* Drupal\Core\Queue\QueueFactory instance.
*
* @var \Drupal\Core\Queue\QueueFactory
*/
protected $queueFactory;
/**
* The module handler service.
*
* @var \Drupal\Core\Extension\ModuleHandlerInterface
*/
protected $moduleHandler;
/**
* LocationsBatchImportForm constructor.
*/
public function __construct(ClientInterface $http_client, QueueFactory $queue_factory, QueueWorkerManager $queue_manager, ModuleHandlerInterface $module_handler, Time $time) {
$this->httpClient = $http_client;
$this->batchBuilder = new BatchBuilder();
$this->queue_factory = $queue_factory;
$this->queue_manager = $queue_manager;
$this->moduleHandler = $module_handler;
$this->time = $time;
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container) {
return new static(
$container->get('http_client'),
$container->get('queue'),
$container->get('plugin.manager.queue_worker'),
$container->get('module_handler'),
$container->get('datetime.time')
);
}
/**
* {@inheritdoc}
*/
public function getFormId() {
return 'queue_examples_batch_form';
}
/**
* {@inheritdoc}
*/
public function buildForm(array $form, FormStateInterface $form_state) {
$form['help'] = [
'#markup' => $this->t('This form is used to get the location data from remote URL and push data to queue in Batches.'),
];
$sourceUrl = 'https://raw.githubusercontent.com/dr5hn/countries-states-cities-database/master/countries.json';
$link = Link::fromTextAndUrl('here', Url::fromUri($sourceUrl, ['attributes' => ['target' => '_blank']]));
$locations = [
'countries' => $this->t('Countries'),
'states' => $this->t('States'),
'cities' => $this->t('Cities'),
'cities_one' => $this->t('Cities One'),
];
$form['locations_list'] = [
'#title' => $this->t('Select Location Type.'),
'#type' => 'select',
'#options' => $locations,
'#empty_option' => $this->t('Choose a Location'),
'#required' => TRUE,
];
$form['actions'] = ['#type' => 'actions'];
$form['actions']['run'] = [
'#type' => 'submit',
'#value' => $this->t('Import Locations'),
'#button_type' => 'primary',
];
return $form;
}
/**
* {@inheritdoc}
*/
public function validateForm(array &$form, FormStateInterface $form_state) {
// Check whether the Location Type is valid or not.
$location = $form_state->getValue('locations_list');
if (!in_array($location, ['countries', 'states', 'cities'], TRUE)) {
$form_state->setErrorByName('locations_list', $this->t('The selected location type %location is invalid.', ['%location' => $location]));
}
}
/**
* {@inheritdoc}
*/
public function submitForm(array &$form, FormStateInterface $form_state) {
$data = [];
$link = Link::fromTextAndUrl('here', Url::fromRoute('queue_examples.queue_data_import_form'));
$location = $form_state->getValue('locations_list');
if (!empty($location)) {
$remoteUrl = 'https://raw.githubusercontent.com/dr5hn/countries-states-cities-database/master/' . $location . '.json';
$data = $this->fetchData($remoteUrl);
if (!empty($data)) {
$decodedData = Json::decode($data);
$this->batchBuilder
->setTitle($this->t('Processing @location Batch...', ['@location' => Unicode::ucfirst($location)]))
->setInitMessage($this->t('Initializing Batch...'))
->setProgressMessage($this->t('Completed @current of @total. Estimated remaining time: @estimate.'))
->setErrorMessage($this->t('An error has occurred.Click @link to return to form', ['@link' => $link->toString()]));
$this->batchBuilder->addOperation([$this, 'processItems'], [$location, $decodedData]);
$this->batchBuilder->setFinishCallback([$this, 'finished']);
batch_set($this->batchBuilder->toArray());
}
}
}
/**
* {@inheritdoc}
*/
public function fetchData(string $url) {
$data = [];
try {
// @todo check whether implementation of 'timeout' => 600
// is correct or not in httpClient.
$response = $this->httpClient->get($url, ['headers' => ['Content-Type' => 'application/hal+json']]);
if ($response->getStatusCode() == 200) {
$data = (string) $response->getBody();
}
}
catch (RequestException $exception) {
$this->messenger()->addError('Failed to download JSON data from URL due to an error check logs for more details.');
$this->logger('queue_examples')->warning('Failed to download JSON data from URL due to "%error".', ['%error' => $exception->getMessage()]);
}
return $data;
}
/**
* Processor for batch operations.
*/
public function processItems($location, $items, array &$context) {
// Elements per operation.
$limit = 50;
// Set default progress values.
if (empty($context['sandbox']['progress'])) {
$context['sandbox']['progress'] = 0;
$context['sandbox']['max'] = count($items);
}
// Save items to array which will be changed during processing.
if (empty($context['sandbox']['items'])) {
$context['sandbox']['items'] = $items;
}
$counter = 0;
if (!empty($context['sandbox']['items'])) {
// Remove already processed items.
if ($context['sandbox']['progress'] != 0) {
array_splice($context['sandbox']['items'], 0, $limit);
}
foreach ($context['sandbox']['items'] as $item) {
if ($counter != $limit) {
$this->processItem($location, $item);
$counter++;
$context['sandbox']['progress']++;
$context['message'] = $this->t('Now Processing Data Item :progress of :count', [
':progress' => $context['sandbox']['progress'],
':count' => $context['sandbox']['max'],
]);
// Increment total processed item values. Will be used in finished
// callback.
$context['results']['processed'] = $context['sandbox']['progress'];
}
}
}
// If not finished
// If not finished all tasks, we count percentage of process. 1 = 100%.
if ($context['sandbox']['progress'] != $context['sandbox']['max']) {
$context['finished'] = $context['sandbox']['progress'] / $context['sandbox']['max'];
}
}
/**
* Process single item.
*
* @param string $location
* Location Type.
* @param array $item
* Single Location Data.
*/
public function processItem(string $location, array $item) {
if (!empty($item)) {
// Create new queue item.
$queue = $this->queue_factory->get('queue_data_processor:' . $location);
$queue->createItem($item);
// $queue->deleteItem($data);
$this->logger('queue_examples')
->notice($this->t('@location Data Item Pushed to Queue: @data',
[
'@data' => implode(',', $item),
'@location' => Unicode::ucfirst($location),
])
);
}
}
/**
* Finished callback for batch.
*/
public function finished($success, $results, $operations) {
$message = $this->t('Number of Items processed by batch: @count', [
'@count' => $results['processed'],
]);
$this->messenger()->addStatus($message);
$this->logger('queue_examples')->info($message);
}
}
QueueDataProcessorDerivative.php
namespace Drupal\queue_examples\Plugin\Derivative;
use Drupal\Component\Plugin\Derivative\DeriverBase;
/**
* The simple derivative example for Queue.
*/
class QueueDataProcessorDerivative extends DeriverBase {
/**
* {@inheritdoc}
*/
public function getDerivativeDefinitions($base_plugin_definition) {
$locations = ['countries', 'states', 'cities'];
foreach ($locations as $location) {
$this->derivatives[$location] = $base_plugin_definition;
$this->derivatives[$location]['title'] = $location . 'Queue';
}
return $this->derivatives;
}
}
### QueueDataProcessor.php
namespace Drupal\queue_examples\Plugin\QueueWorker;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Messenger\MessengerInterface;
use Drupal\Core\Database\Connection;
use Drupal\Core\StringTranslation\StringTranslationTrait;
/**
* Processes Locations Data.
*
* @QueueWorker(
* id = "queue_data_processor",
* title = @Translation("Queue Data Processor."),
* cron = {"time" = 10},
* deriver = "Drupal\queue_examples\Plugin\Derivative\QueueDataProcessorDerivative",
* )
*/
class QueueDataProcessor extends QueueWorkerBase implements ContainerFactoryPluginInterface {
use StringTranslationTrait;
/**
* The Messenger service.
*
* @var \Drupal\Core\Messenger\MessengerInterface
*/
protected $messenger;
/**
* Logger service.
*
* @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
*/
protected $logger;
/**
* Database service.
*
* @var \Drupal\Core\Database\Connection
*/
protected $database;
/**
* {@inheritdoc}
*/
public function __construct(LoggerChannelFactoryInterface $logger, MessengerInterface $messenger, Connection $connection) {
$this->logger = $logger->get('queue_examples');
$this->messenger = $messenger;
$this->database = $connection;
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static(
$container->get('logger.factory'),
$container->get('messenger'),
$container->get('database')
);
}
/**
* {@inheritdoc}
*/
public function processItem($data) {
$id = $this->getDerivativeId();
// @todo if id = countries insert data into countries table.
// if id = states insert data into states table.
// if id = cities insert data into cities table.
$this->logger->info($id . 'Queue Processed.');
if (!empty($data)) {
// $query = $this->database->insert($id);
// $query->fields($data);
// $query->execute();
$this->logger->info($this->t('Data available to Process.'));
}
else {
$this->logger->warning($this->t('No Data available to Process.'));
}
}
}
What's wrong in the code I am using?
I would like to insert countries data into countries table, states data into states table, and cities data into cities table. All using single queue processor file using derivatives.
I know of other way where you can conditionally check data and process the data accordingly, but I would like to achieve it using queue derivative.
I can see 3 Queues in the list when I run drush queue:list
.
------------------------------------ ------- ---------------------------------
Queue Items Class
------------------------------------ ------- ---------------------------------
queue_data_processor:countries 0 Drupal\Core\Queue\DatabaseQueue
queue_data_processor:states 0 Drupal\Core\Queue\DatabaseQueue
queue_data_processor:cities 0 Drupal\Core\Queue\DatabaseQueue
------------------------------------ ------- ---------------------------------
In case it helps, I got this idea reading Drupal 8: Derivatives — множественные экземпляры плагина.
Edit 1:
The issue I am having is Id is Null in QueueDataProcessor::processItem($data)
解决方案
With the Help of Niklan I was able to resolve the issue .
The reason for $Id is NULL because of missing parent constructor call. $Id value is available once parent constructor is called.
Full code of Queue Controller as follows :
<?php
namespace Drupal\queue_examples\Plugin\QueueWorker;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Drupal\Core\Logger\LoggerChannelFactoryInterface;
use Drupal\Core\Messenger\MessengerInterface;
use Drupal\Core\Database\Connection;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Drupal\Component\Utility\Unicode;
/**
* Processes Locations Data.
*
* @QueueWorker(
* id = "queue_data_processor",
* title = @Translation("Queue Data Processor."),
* cron = {"time" = 10},
* deriver = "Drupal\queue_examples\Plugin\Derivative\QueueDataProcessorDerivative",
* )
*/
class QueueDataProcessor extends QueueWorkerBase implements ContainerFactoryPluginInterface {
use StringTranslationTrait;
/**
* The Messenger service.
*
* @var \Drupal\Core\Messenger\MessengerInterface
*/
protected $messenger;
/**
* Logger service.
*
* @var \Drupal\Core\Logger\LoggerChannelFactoryInterface
*/
protected $logger;
/**
* Database service.
*
* @var \Drupal\Core\Database\Connection
*/
protected $database;
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
$instance = new static($configuration, $plugin_id, $plugin_definition);
$instance->logger = $container->get('logger.factory')->get('queue_examples');
$instance->messenger = $container->get('messenger');
$instance->database = $container->get('database');
return $instance;
}
/**
* {@inheritdoc}
*/
public function processItem($data) {
$plugin_id = $this->getPluginId();
$id = $this->getDerivativeId();
if (!empty($data)) {
// $query = $this->database->insert($id);
// $query->fields($data);
// $query->execute();
$this->logger->notice($this->t('@id Data Inserted : @data',
[
'@id' => Unicode::ucfirst($id),
'@data' => implode(',', $data),
]
));
}
else {
$this->logger->warning($this->t('No Data available to Process.'));
}
}
}