diff --git a/src/Command/RetryCommand.php b/src/Command/RetryCommand.php index 947d48f..8b0a8d7 100644 --- a/src/Command/RetryCommand.php +++ b/src/Command/RetryCommand.php @@ -66,7 +66,7 @@ protected function execute(InputInterface $input, OutputInterface $output) $ids = $input->getArgument('id'); - $failedJobs = ($ids === 'all') ? $this->failed->findAll() : $this->failed->findByIds($ids); + $failedJobs = (in_array('all', $ids)) ? $this->failed->findAll() : $this->failed->findByIds($ids); // If entity is found. if (!empty($failedJobs)) { @@ -118,4 +118,4 @@ protected function resetAttempts($payload) return $payload; } -} \ No newline at end of file +} diff --git a/src/Job/AbstractJob.php b/src/Job/AbstractJob.php index 2fca723..a1567ab 100644 --- a/src/Job/AbstractJob.php +++ b/src/Job/AbstractJob.php @@ -201,6 +201,16 @@ public function timeout() return ArrayHelper::get($this->payload(), 'timeout'); } + /** + * Get the timestamp indicating when the job should timeout. + * + * @return int|null + */ + public function timeoutAt() + { + return !empty($this->payload()['timeoutAt']) ? $this->payload()['timeoutAt'] : null; + } + /** * @inheritdoc */ diff --git a/src/Job/JobsInterface.php b/src/Job/JobsInterface.php index df206d3..190b9ed 100644 --- a/src/Job/JobsInterface.php +++ b/src/Job/JobsInterface.php @@ -68,6 +68,13 @@ public function maxTries(); */ public function timeout(); + /** + * Get the timestamp indicating when the job should timeout. + * + * @return int|null + */ + public function timeoutAt(); + /** * Get the name of the queued job class. * diff --git a/src/Type/AbstractQueue.php b/src/Type/AbstractQueue.php index 0d45a87..ac0ca36 100644 --- a/src/Type/AbstractQueue.php +++ b/src/Type/AbstractQueue.php @@ -193,6 +193,8 @@ protected function createStringPayload($job, $data) $payload['timeout'] = null; } + $payload['timeoutAt'] = $this->getJobExpiration($job); + return $payload; } @@ -233,4 +235,49 @@ public function setContainer(ContainerInterface $container) return $this; } + + /** + * Get the expiration timestamp for an object-based queue handler. + * + * @param mixed $job + * + * @return mixed + */ + public function getJobExpiration($job) + { + if (isset($job->timeoutAt)) { + + $timeoutAt = $job->timeoutAt; + + } elseif (method_exists($job, 'getTimeoutAt')) { + + $timeoutAt = $job->getTimeoutAt(); + + } else { + + $timeoutAt = null; + } + + if (isset($job->retryUntil)) { + + $retryUntil = $job->retryUntil; + + } elseif (method_exists($job, 'getRetryUntil')) { + + $retryUntil = $job->getRetryUntil(); + + } else { + + $retryUntil = null; + } + + if (empty($retryUntil) && empty($timeoutAt)) { + + return; + } + + $expiration = (!empty($timeoutAt)) ? $timeoutAt : $retryUntil; + + return $expiration instanceof \DateTimeInterface ? $expiration->getTimestamp() : $expiration; + } } diff --git a/src/Util/CacheAdapter.php b/src/Util/CacheAdapter.php index 3982dee..5fcf024 100644 --- a/src/Util/CacheAdapter.php +++ b/src/Util/CacheAdapter.php @@ -51,7 +51,7 @@ public function __construct($cache) */ public function get($key, $default = null) { - $this->cache->get($key, $default); + return $this->cache->get($key, $default); } /** diff --git a/src/Worker.php b/src/Worker.php index 0266dc0..7cf41e9 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -146,8 +146,9 @@ protected function registerTimeoutHandler($job, WorkerOptions $options) $this->kill(1); }); - $timeout = $this->timeoutForJob($job, $options); - pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0); + pcntl_alarm( + max($this->timeoutForJob($job, $options), 0) + ); } } @@ -171,11 +172,11 @@ protected function timeoutForJob($job, WorkerOptions $options) * * @return bool */ - protected function daemonShouldRun(WorkerOptions $options) + public function daemonShouldRun(WorkerOptions $options) { return ! (($this->manager->isDownForMaintenance() && ! $options->force) || - $this->paused || - $this->until() === false); + $this->isPaused() || + $this->until() !== false); } /** @@ -260,11 +261,11 @@ protected function getNextJob($connection, $queue) } catch (\Exception $e) { $this->exceptions->report($e); + $this->stopWorkerIfLostConnection($e); } catch (\Throwable $e) { $this->exceptions->report($e = new FatalThrowableError($e)); - $this->stopWorkerIfLostConnection($e); } } @@ -417,8 +418,14 @@ protected function handleJobException($connectionName, $job, WorkerOptions $opti protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries) { $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; + $timeoutAt = $job->timeout(); + + if ($timeoutAt && (new \DateTimeImmutable)->getTimestamp() <= $timeoutAt) { + + return; + } - if ($maxTries === 0 || $job->attempts() <= $maxTries) { + if (! $timeoutAt && ($maxTries === 0 || $job->attempts() <= $maxTries)) { return; } @@ -440,7 +447,12 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $ */ protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e) { - $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; + $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; + + if ($job->timeoutAt() && $job->timeoutAt() <= (new \DateTimeImmutable)->getTimestamp()) { + + $this->failJob($connectionName, $job, $e); + } if ($maxTries > 0 && $job->attempts() >= $maxTries) { @@ -606,6 +618,8 @@ public function stop($status = 0) */ public function kill($status = 0) { + $this->events->dispatch(EventsList::WORKER_STOPPING, new Event\WorkerStopping); + if (extension_loaded('posix')) { posix_kill(getmypid(), SIGKILL); @@ -662,6 +676,14 @@ protected function until() return $this->events->dispatch(EventsList::LOOPING, new Event\Looping)->isPropagationStopped(); } + /** + * @return bool + */ + public function isPaused() + { + return $this->paused; + } + /** * Raise the failed queue job event. * diff --git a/tests/Type/BeanstalkdQueueTest.php b/tests/Type/BeanstalkdQueueTest.php index 2bacdff..866f1ee 100644 --- a/tests/Type/BeanstalkdQueueTest.php +++ b/tests/Type/BeanstalkdQueueTest.php @@ -28,7 +28,7 @@ public function testPushProperlyPushesJobOntoBeanstalkd() $pheanstalk->shouldReceive('useTube')->once()->with('stack')->andReturn($pheanstalk); $pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk); - $pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => null, 'timeout' => null]), 1024, 0, 60); + $pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => null, 'timeout' => null, 'timeoutAt' => null]), 1024, 0, 60); $queue->push('foo', ['data'], 'stack'); $queue->push('foo', ['data']); } @@ -43,7 +43,7 @@ public function testDelayedPushProperlyPushesJobOntoBeanstalkd() $pheanstalk = $queue->getPheanstalk(); $pheanstalk->shouldReceive('useTube')->once()->with('stack')->andReturn($pheanstalk); $pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk); - $pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => 1, 'timeout' => 1]), \Pheanstalk\Pheanstalk::DEFAULT_PRIORITY, 5, \Pheanstalk\Pheanstalk::DEFAULT_TTR); + $pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => 1, 'timeout' => 1, 'timeoutAt' => null]), \Pheanstalk\Pheanstalk::DEFAULT_PRIORITY, 5, \Pheanstalk\Pheanstalk::DEFAULT_TTR); $queue->later(5, 'foo', ['data'], 'stack'); $queue->later(5, 'foo', ['data']); } diff --git a/tests/Type/RedisQueueTest.php b/tests/Type/RedisQueueTest.php index b74ccd8..b7ece98 100644 --- a/tests/Type/RedisQueueTest.php +++ b/tests/Type/RedisQueueTest.php @@ -21,7 +21,7 @@ public function testPushProperlyPushesJobOntoRedis() $queue->setContainer($this->getContainer()); $queue->expects($this->once())->method('getRandomId')->will($this->returnValue('foo')); - $redis->shouldReceive('rpush')->once()->with('queues:default', '{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"id":"foo","attempts":0}'); + $redis->shouldReceive('rpush')->once()->with('queues:default', '{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"timeoutAt":null,"id":"foo","attempts":0}'); $id = $queue->push('foo', ['data']); $this->assertEquals('foo', $id); } @@ -35,7 +35,7 @@ public function testDelayedPushProperlyPushesJobOntoRedis() $redis->shouldReceive('zadd')->once()->with( 'queues:default:delayed', 2, - '{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"id":"foo","attempts":0}' + '{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"timeoutAt":null,"id":"foo","attempts":0}' ); $id = $queue->later(1, 'foo', ['data']); $this->assertEquals('foo', $id); @@ -51,7 +51,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis() $redis->shouldReceive('zadd')->once()->with( 'queues:default:delayed', 2, - '{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"id":"foo","attempts":0}' + '{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"timeoutAt":null,"id":"foo","attempts":0}' ); $queue->later($date, 'foo', ['data']); } diff --git a/tests/Util/CacheAdapterTest.php b/tests/Util/CacheAdapterTest.php new file mode 100644 index 0000000..0f8b84c --- /dev/null +++ b/tests/Util/CacheAdapterTest.php @@ -0,0 +1,31 @@ +shouldReceive('get') + ->once() + ->andReturn('test') + ->getMock(); + + $class = new CacheAdapter($cacheMock); + + + $this->assertEquals('test', $class->get('test')); + } + +} \ No newline at end of file diff --git a/tests/WorkerTest.php b/tests/WorkerTest.php index 495ecd1..531c906 100644 --- a/tests/WorkerTest.php +++ b/tests/WorkerTest.php @@ -2,6 +2,7 @@ namespace IdeasBucket\QueueBundle; +use IdeasBucket\QueueBundle\Event\EventsList; use IdeasBucket\QueueBundle\Event\JobExceptionOccurred; use IdeasBucket\QueueBundle\Event\JobFailed; use IdeasBucket\QueueBundle\Event\JobProcessed; @@ -9,11 +10,12 @@ use IdeasBucket\QueueBundle\Exception\ErrorHandler; use IdeasBucket\QueueBundle\Exception\MaxAttemptsExceededException; use IdeasBucket\QueueBundle\Job\JobsInterface; +use Mockery as m; use PHPUnit\Framework\TestCase; use RuntimeException; -use Mockery as m; +use Symfony\Component\EventDispatcher\Event; use Symfony\Component\EventDispatcher\EventDispatcher; -use IdeasBucket\QueueBundle\Event\EventsList; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; /** * Class WorkerTest @@ -158,6 +160,91 @@ public function testJobBasedMaxRetries() $this->assertNull($job->failedWith); } + public function testDaemonShouldRunMaintenanceAndForce() + { + $manager = m::mock(Manager::class) + ->shouldReceive('isDownForMaintenance') + ->andReturn(true) + ->getMock(); + + $eventMock = m::mock(Event::class) + ->shouldReceive('isPropagationStopped') + ->andReturn(false) + ->getMock(); + + $dispatcher = m::mock(EventDispatcherInterface::class) + ->shouldReceive('dispatch') + ->andReturn($eventMock) + ->getMock(); + + $exceptionHandler = m::mock(ErrorHandler::class)->makePartial(); + + /** @var Worker|m\Mock $worker */ + $worker = m::mock(Worker::class, [$manager, $dispatcher, $exceptionHandler]) + ->makePartial() + ->shouldReceive('isPaused') + ->andReturn(false) + ->getMock(); + + $workerOptions = new WorkerOptions(); + + $bool = $worker->daemonShouldRun($workerOptions); + + $this->assertFalse($bool); + + // Now test forced + $workerOptions = new WorkerOptions(); + $workerOptions->force = true; + + $bool = $worker->daemonShouldRun($workerOptions); + $this->assertTrue($bool); + } + + public function testDaemonShouldRunUntilAndPaused(){ + $manager = m::mock(Manager::class) + ->shouldReceive('isDownForMaintenance') + ->andReturn(false) + ->getMock(); + + $eventMock = m::mock(Event::class) + ->shouldReceive('isPropagationStopped') + ->andReturnValues([true, false]) + ->getMock(); + + $dispatcher = m::mock(EventDispatcherInterface::class) + ->shouldReceive('dispatch') + ->andReturn($eventMock) + ->getMock(); + + $exceptionHandler = m::mock(ErrorHandler::class)->makePartial(); + + /** @var Worker|m\Mock $worker */ + $worker = m::mock(Worker::class, [$manager, $dispatcher, $exceptionHandler]) + ->makePartial() + ->shouldReceive('isPaused') + ->andReturn(false) + ->getMock(); + + $workerOptions = new WorkerOptions(); + + // Run once with isPropagationStopped = false + $bool = $worker->daemonShouldRun($workerOptions); + $this->assertFalse($bool); + + // Run with isPropagationStopped = true + $bool = $worker->daemonShouldRun($workerOptions); + $this->assertTrue($bool); + + $worker = m::mock(Worker::class, [$manager, $dispatcher, $exceptionHandler]) + ->makePartial() + ->shouldReceive('isPaused') + ->andReturn(true) + ->getMock(); + + $bool = $worker->daemonShouldRun($workerOptions); + $this->assertFalse($bool); + } + /** * Helpers... */ @@ -265,6 +352,11 @@ public function __construct($callback = null) }; } + public function timeoutAt() + { + + } + public function fire() { $this->fired = true;