Skip to content

Commit

Permalink
Upgrade to Enqueue 0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Dec 17, 2018
1 parent c195787 commit 838c3a4
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 156 deletions.
2 changes: 1 addition & 1 deletion DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public function getConfigTreeBuilder()
$rootNode
->children()
->booleanNode('enabled')->defaultValue(true)->end()
->scalarNode('context')->defaultValue('enqueue.transport.context')->cannotBeEmpty()->end()
->scalarNode('transport')->defaultValue('%enqueue.default_transport%')->cannotBeEmpty()->isRequired()->end()
->arrayNode('doctrine')
->children()
->scalarNode('driver')->defaultValue('orm')->cannotBeEmpty()
Expand Down
63 changes: 41 additions & 22 deletions DependencyInjection/EnqueueElasticaExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

namespace Enqueue\ElasticaBundle\DependencyInjection;

use Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor;
use Enqueue\ElasticaBundle\Doctrine\SyncIndexWithObjectChangeListener;
use Symfony\Component\Config\FileLocator;
use Enqueue\ElasticaBundle\Persister\Listener\PurgePopulateQueueListener;
use Enqueue\ElasticaBundle\Persister\QueuePagerPersister;
use Enqueue\ElasticaBundle\Queue\PopulateProcessor;
use Enqueue\Symfony\DependencyInjection\TransportFactory;
use Enqueue\Symfony\DiUtils;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;

Expand All @@ -22,13 +26,44 @@ public function load(array $configs, ContainerBuilder $container)
return;
}

$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
$loader->load('services.yml');
$transport = $container->getParameterBag()->resolveValue($config['transport']);

$container->setAlias('enqueue_elastica.context', $config['context']);
$diUtils = new DiUtils(TransportFactory::MODULE, $transport);
$container->setAlias('enqueue_elastica.context', $diUtils->format('context'));

$container->register('enqueue_elastica.populate_processor', PopulateProcessor::class)
->addArgument(new Reference('fos_elastica.pager_provider_registry'))
->addArgument(new Reference('fos_elastica.pager_persister_registry'))

->addTag('enqueue.command_subscriber', ['client' => $transport])
->addTag('enqueue.transport.processor', ['transport' => $transport])
;

$container->register('enqueue_elastica.purge_populate_queue_listener', PurgePopulateQueueListener::class)
->addArgument(new Reference('enqueue_elastica.context'))

->addTag('kernel.event_subscriber')
;

$container->register('enqueue_elastica.queue_pager_perister', QueuePagerPersister::class)
->addArgument(new Reference('enqueue_elastica.context'))
->addArgument(new Reference('fos_elastica.persister_registry'))
->addArgument(new Reference('event_dispatcher'))

->addTag('fos_elastica.pager_persister', ['persisterName' => 'queue'])
;

$doctrineDriver = $config['doctrine']['driver'];
if (false == empty($config['doctrine']['queue_listeners'])) {
$doctrineDriver = $config['doctrine']['driver'];

$container->register('enqueue_elastica.doctrine.sync_index_with_object_change_processor', SyncIndexWithObjectChangeProcessor::class)
->addArgument(new Reference($this->getManagerRegistry($doctrineDriver)))
->addArgument(new Reference('fos_elastica.persister_registry'))
->addArgument(new Reference('fos_elastica.indexable'))
->addTag('enqueue.command_subscriber', ['client' => $transport])
->addTag('enqueue.transport.processor', ['transport' => $transport])
;

foreach ($config['doctrine']['queue_listeners'] as $listenerConfig) {
$listenerId = sprintf(
'enqueue_elastica.doctrine_queue_listener.%s.%s',
Expand All @@ -45,19 +80,8 @@ public function load(array $configs, ContainerBuilder $container)
;
}
}

$serviceId = 'enqueue_elastica.doctrine.sync_index_with_object_change_processor';
$managerRegistry = $this->getManagerRegistry($doctrineDriver);
$container
->getDefinition($serviceId)
->replaceArgument(0, new Reference($managerRegistry));
}

/**
* @param string $driver
*
* @return string
*/
private function getManagerRegistry(string $driver): string
{
switch ($driver) {
Expand All @@ -70,11 +94,6 @@ private function getManagerRegistry(string $driver): string
}
}

/**
* @param string $driver
*
* @return string
*/
private function getEventSubscriber(string $driver): string
{
switch ($driver) {
Expand Down
44 changes: 13 additions & 31 deletions Doctrine/Queue/SyncIndexWithObjectChangeProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,23 @@
use Enqueue\Util\JSON;
use FOS\ElasticaBundle\Persister\PersisterRegistry;
use FOS\ElasticaBundle\Provider\IndexableInterface;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;
use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Doctrine\Common\Persistence\ManagerRegistry;

final class SyncIndexWithObjectChangeProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
final class SyncIndexWithObjectChangeProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface
{
const INSERT_ACTION = 'insert';

const UPDATE_ACTION = 'update';

const REMOVE_ACTION = 'remove';

/**
* @var PersisterRegistry
*/
private $persisterRegistry;

/**
* @var IndexableInterface
*/
private $indexable;

/**
* @var ManagerRegistry
*/
private $doctrine;

public function __construct(ManagerRegistry $doctrine, PersisterRegistry $persisterRegistry, IndexableInterface $indexable)
Expand All @@ -42,10 +33,7 @@ public function __construct(ManagerRegistry $doctrine, PersisterRegistry $persis
$this->doctrine = $doctrine;
}

/**
* {@inheritdoc}
*/
public function process(PsrMessage $message, PsrContext $context)
public function process(Message $message, Context $context): Result
{
$data = JSON::decode($message->getBody());

Expand Down Expand Up @@ -94,7 +82,7 @@ public function process(PsrMessage $message, PsrContext $context)
}
}

return self::ACK;
return Result::ack();
case self::INSERT_ACTION:
if (false == $object = $repository->{$repositoryMethod}($id)) {
$persister->deleteById($id);
Expand All @@ -106,33 +94,27 @@ public function process(PsrMessage $message, PsrContext $context)
$persister->insertOne($object);
}

return self::ACK;
return Result::ack();
case self::REMOVE_ACTION:
$persister->deleteById($id);

return self::ACK;
return Result::ack();
default:
return Result::reject(sprintf('The action "%s" is not supported', $action));
}
}

/**
* {@inheritdoc}
*/
public static function getSubscribedCommand()
public static function getSubscribedCommand(): array
{
return [
'processorName' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE,
'queueName' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE,
'queueNameHardcoded' => true,
'command' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE,
'queue' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE,
'prefix_queue' => false,
'exclusive' => true,
];
}

/**
* {@inheritdoc}
*/
public static function getSubscribedQueues()
public static function getSubscribedQueues(): array
{
return [Commands::SYNC_INDEX_WITH_OBJECT_CHANGE];
}
Expand Down
7 changes: 2 additions & 5 deletions Doctrine/SyncIndexWithObjectChangeListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
use Enqueue\ElasticaBundle\Doctrine\Queue\Commands;
use Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor as SyncProcessor;
use Enqueue\Util\JSON;
use Interop\Queue\PsrContext;
use Interop\Queue\Context;
use Doctrine\Common\EventSubscriber;

final class SyncIndexWithObjectChangeListener implements EventSubscriber
{
/**
* @var PsrContext
*/
private $context;

/**
Expand All @@ -25,7 +22,7 @@ final class SyncIndexWithObjectChangeListener implements EventSubscriber
*/
private $config;

public function __construct(PsrContext $context, $modelClass, array $config)
public function __construct(Context $context, $modelClass, array $config)
{
$this->context = $context;
$this->modelClass = $modelClass;
Expand Down
10 changes: 2 additions & 8 deletions Persister/Listener/PurgePopulateQueueListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,15 @@
namespace Enqueue\ElasticaBundle\Persister\Listener;

use FOS\ElasticaBundle\Persister\Event\PrePersistEvent;
use Interop\Queue\PsrContext;
use Interop\Queue\Context;
use FOS\ElasticaBundle\Persister\Event\Events;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

class PurgePopulateQueueListener implements EventSubscriberInterface
{
/**
* @var PsrContext
*/
private $context;

/**
* @param PsrContext $context
*/
public function __construct(PsrContext $context)
public function __construct(Context $context)
{
$this->context = $context;
}
Expand Down
7 changes: 2 additions & 5 deletions Persister/QueuePagerPersister.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@
use FOS\ElasticaBundle\Persister\PagerPersisterInterface;
use FOS\ElasticaBundle\Persister\PersisterRegistry;
use FOS\ElasticaBundle\Provider\PagerInterface;
use Interop\Queue\PsrContext;
use Interop\Queue\Context;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

final class QueuePagerPersister implements PagerPersisterInterface
{
const NAME = 'queue';

/**
* @var PsrContext
*/
private $context;

/**
Expand All @@ -32,7 +29,7 @@ final class QueuePagerPersister implements PagerPersisterInterface
*/
private $dispatcher;

public function __construct(PsrContext $context, PersisterRegistry $registry, EventDispatcherInterface $dispatcher)
public function __construct(Context $context, PersisterRegistry $registry, EventDispatcherInterface $dispatcher)
{
$this->context = $context;
$this->dispatcher = $dispatcher;
Expand Down
54 changes: 12 additions & 42 deletions Queue/PopulateProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,15 @@
use FOS\ElasticaBundle\Persister\InPlacePagerPersister;
use FOS\ElasticaBundle\Persister\PagerPersisterRegistry;
use FOS\ElasticaBundle\Provider\PagerProviderRegistry;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;
use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Enqueue\Util\JSON;

final class PopulateProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
final class PopulateProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface
{
/**
* @var PagerProviderRegistry
*/
private $pagerProviderRegistry;

/**
* @var PagerPersisterRegistry
*/
private $pagerPersisterRegistry;

public function __construct(
Expand All @@ -32,10 +26,7 @@ public function __construct(
$this->pagerProviderRegistry = $pagerProviderRegistry;
}

/**
* {@inheritdoc}
*/
public function process(PsrMessage $message, PsrContext $context)
public function process(Message $message, Context $context): Result
{
if ($message->isRedelivered()) {
$replyMessage = $this->createReplyMessage($context, $message, 0,'The message was redelivered. Chances are that something has gone wrong.');
Expand Down Expand Up @@ -81,14 +72,7 @@ public function process(PsrMessage $message, PsrContext $context)
}
}

/**
* @param PsrContext $context
* @param PsrMessage $message
* @param int $objectsCount
* @param \Throwable $e
* @return PsrMessage
*/
private function createExceptionReplyMessage(PsrContext $context, PsrMessage $message, $objectsCount, \Throwable $e)
private function createExceptionReplyMessage(Context $context, Message $message, int $objectsCount, \Throwable $e): Message
{
$errorMessage = sprintf(
'<error>The queue processor has failed to process the message with exception: </error><comment>%s: %s in file %s at line %s.</comment>',
Expand All @@ -101,15 +85,7 @@ private function createExceptionReplyMessage(PsrContext $context, PsrMessage $me
return $this->createReplyMessage($context, $message, $errorMessage);
}

/**
* @param PsrContext $context
* @param PsrMessage $message
* @param int $objectsCount
* @param string|null $error
*
* @return PsrMessage
*/
private function createReplyMessage(PsrContext $context, PsrMessage $message, $objectsCount, $error = null)
private function createReplyMessage(Context $context, Message $message, int $objectsCount, string $error = null): Message
{
$replyMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders());
$replyMessage->setProperty('fos-populate-objects-count', $objectsCount);
Expand All @@ -121,23 +97,17 @@ private function createReplyMessage(PsrContext $context, PsrMessage $message, $o
return $replyMessage;
}

/**
* {@inheritdoc}
*/
public static function getSubscribedCommand()
public static function getSubscribedCommand(): array
{
return [
'processorName' => Commands::POPULATE,
'queueName' => Commands::POPULATE,
'queueNameHardcoded' => true,
'command' => Commands::POPULATE,
'queue' => Commands::POPULATE,
'prefix_queue' => false,
'exclusive' => true,
];
}

/**
* {@inheritdoc}
*/
public static function getSubscribedQueues()
public static function getSubscribedQueues(): array
{
return [Commands::POPULATE];
}
Expand Down
Loading

0 comments on commit 838c3a4

Please sign in to comment.