Skip to content

Commit

Permalink
Merge pull request #38 from maciejmrozinski/queue-index-alias-support
Browse files Browse the repository at this point in the history
Support index alias when populating and consuming
  • Loading branch information
makasim authored Oct 27, 2022
2 parents 0103ddc + 417cd10 commit 3be2638
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
2 changes: 2 additions & 0 deletions DependencyInjection/EnqueueElasticaExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public function load(array $configs, ContainerBuilder $container)
$container->register('enqueue_elastica.populate_processor', PopulateProcessor::class)
->addArgument(new Reference('fos_elastica.pager_provider_registry'))
->addArgument(new Reference('fos_elastica.pager_persister_registry'))
->addArgument(new Reference('fos_elastica.index_manager'))

->addTag('enqueue.command_subscriber', ['client' => $transport])
->addTag('enqueue.transport.processor', ['transport' => $transport])
Expand All @@ -49,6 +50,7 @@ public function load(array $configs, ContainerBuilder $container)
->addArgument(new Reference('enqueue_elastica.context'))
->addArgument(new Reference('fos_elastica.persister_registry'))
->addArgument(new Reference('event_dispatcher'))
->addArgument(new Reference('fos_elastica.index_manager'))

->addTag('fos_elastica.pager_persister', ['persisterName' => 'queue'])
->setPublic(true)
Expand Down
27 changes: 22 additions & 5 deletions Persister/QueuePagerPersister.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use Enqueue\ElasticaBundle\Queue\Commands;
use Enqueue\Util\JSON;
use FOS\ElasticaBundle\Index\IndexManager;
use FOS\ElasticaBundle\Persister\Event\PostAsyncInsertObjectsEvent;
use FOS\ElasticaBundle\Persister\Event\PostPersistEvent;
use FOS\ElasticaBundle\Persister\Event\PrePersistEvent;
Expand All @@ -28,11 +29,21 @@ final class QueuePagerPersister implements PagerPersisterInterface
*/
private $dispatcher;

public function __construct(Context $context, PersisterRegistry $registry, EventDispatcherInterface $dispatcher)
{
/**
* @var IndexManager
*/
private $indexManager;

public function __construct(
Context $context,
PersisterRegistry $registry,
EventDispatcherInterface $dispatcher,
IndexManager $indexManager
) {
$this->context = $context;
$this->dispatcher = $dispatcher;
$this->registry = $registry;
$this->indexManager = $indexManager;
}

/**
Expand All @@ -42,15 +53,21 @@ public function insert(PagerInterface $pager, array $options = array())
{
$pager->setMaxPerPage(empty($options['max_per_page']) ? 100 : $options['max_per_page']);

$options = array_replace([
$defaultOptions = [
'max_per_page' => $pager->getMaxPerPage(),
'first_page' => $pager->getCurrentPage(),
'last_page' => $pager->getNbPages(),
'populate_queue' => Commands::POPULATE,
'populate_reply_queue' => null,
'reply_receive_timeout' => 5000, // ms
'limit_overall_reply_time' => 180, // sec
], $options);
'limit_overall_reply_time' => 180 // sec
];
$index = $this->indexManager->getIndex($options['indexName']);
if ($index->getName() !== $index->getOriginalName()) {
$defaultOptions['realIndexName'] = $index->getName();
}

$options = array_replace($defaultOptions, $options);

$pager->setCurrentPage($options['first_page']);

Expand Down
11 changes: 10 additions & 1 deletion Queue/PopulateProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Consumption\QueueSubscriberInterface;
use Enqueue\Consumption\Result;
use FOS\ElasticaBundle\Index\IndexManager;
use FOS\ElasticaBundle\Persister\InPlacePagerPersister;
use FOS\ElasticaBundle\Persister\PagerPersisterRegistry;
use FOS\ElasticaBundle\Provider\PagerProviderRegistry;
Expand All @@ -18,12 +19,16 @@ final class PopulateProcessor implements Processor, CommandSubscriberInterface,

private $pagerPersisterRegistry;

private $indexManager;

public function __construct(
PagerProviderRegistry $pagerProviderRegistry,
PagerPersisterRegistry $pagerPersisterRegistry
PagerPersisterRegistry $pagerPersisterRegistry,
IndexManager $indexManager
) {
$this->pagerPersisterRegistry = $pagerPersisterRegistry;
$this->pagerProviderRegistry = $pagerProviderRegistry;
$this->indexManager = $indexManager;
}

public function process(Message $message, Context $context): Result
Expand Down Expand Up @@ -53,6 +58,10 @@ public function process(Message $message, Context $context): Result
$options['first_page'] = $data['page'];
$options['last_page'] = $data['page'];

if (isset($options['realIndexName'])) {
$this->indexManager->getIndex($options['indexName'])->overrideName($options['realIndexName']);
}

$provider = $this->pagerProviderRegistry->getProvider($options['indexName']);
$pager = $provider->provide($options);
$pager->setMaxPerPage($options['max_per_page']);
Expand Down

0 comments on commit 3be2638

Please sign in to comment.