diff --git a/src/Type/BeanstalkdQueue.php b/src/Type/BeanstalkdQueue.php index 3f3a992..29d8125 100644 --- a/src/Type/BeanstalkdQueue.php +++ b/src/Type/BeanstalkdQueue.php @@ -55,7 +55,7 @@ public function size($queue = null) { $queue = $this->getQueue($queue); - return (int)$this->pheanstalk->statsTube($queue)->total_jobs; + return (int)$this->pheanstalk->statsTube($queue)->current_jobs_ready; } /** diff --git a/src/Type/RedisQueue.php b/src/Type/RedisQueue.php index 952cd17..87d2b6d 100644 --- a/src/Type/RedisQueue.php +++ b/src/Type/RedisQueue.php @@ -121,9 +121,7 @@ public function later($delay, $job, $data = '', $queue = null) */ protected function laterRaw($delay, $payload, $queue = null) { - $this->redis->zadd( - $this->getQueue($queue) . ':delayed', $this->availableAt($delay), $payload - ); + $this->redis->zadd($this->getQueue($queue) . ':delayed', $this->availableAt($delay), $payload); return ArrayHelper::get(json_decode($payload, true), 'id'); } @@ -134,7 +132,7 @@ protected function laterRaw($delay, $payload, $queue = null) * @param string $job * @param mixed $data * - * @return string + * @return array */ protected function createPayloadArray($job, $data = '') { diff --git a/src/Worker.php b/src/Worker.php index dc0556d..1a11564 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -2,6 +2,7 @@ namespace IdeasBucket\QueueBundle; +use IdeasBucket\Common\Utils\StringHelper; use Psr\SimpleCache\CacheInterface as Cache; use IdeasBucket\QueueBundle\Event\EventsList; use IdeasBucket\QueueBundle\Event\JobFailed; @@ -55,6 +56,13 @@ class Worker */ protected $paused = false; + /** + * Indicates if the worker should exit. + * + * @var bool + */ + public $shouldQuit = false; + /** * Create a new queue worker. * @@ -84,6 +92,16 @@ public function daemon($connectionName, $queue, WorkerOptions $options) while (true) { + // Before reserving any jobs, we will make sure this queue is not paused and + // if it is we will just pause this worker for a given amount of time and + // make sure we do not need to kill this worker process off completely. + if (!$this->daemonShouldRun($options)) { + + $this->pauseWorker($options, $lastRestart); + + continue; + } + // First, we will attempt to get the next job off of the queue. We will also // register the timeout handler and reset the alarm for this job so it is // not stuck in a frozen state forever. Then, we can fire off this job. @@ -94,7 +112,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options) // If the daemon should run (not in maintenance mode, etc.), then we can run // fire off this job for processing. Otherwise, we will need to sleep the // worker so no more jobs are processed until they should be processed. - if ($job && $this->daemonShouldRun($options)) { + if ($job) { $this->runJob($job, $connectionName, $options); @@ -106,79 +124,117 @@ public function daemon($connectionName, $queue, WorkerOptions $options) // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. - if ($this->memoryExceeded($options->memory)) { + $this->stopIfNecessary($options, $lastRestart); + } + } - $this->stop(12); + /** + * Register the worker timeout handler (PHP 7.1+). + * + * @param JobsInterface|null $job + * @param WorkerOptions $options + */ + protected function registerTimeoutHandler($job, WorkerOptions $options) + { + if ($this->supportsAsyncSignals()) { - } elseif ($this->queueShouldRestart($lastRestart)) { + // We will register a signal handler for the alarm signal so that we can kill this + // process if it is running too long because it has frozen. This uses the async + // signals supported in recent versions of PHP to accomplish it conveniently. + pcntl_signal(SIGALRM, function () { - $this->stop(); - } + $this->kill(1); + }); + + $timeout = $this->timeoutForJob($job, $options); + pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0); } } /** - * @return Manager + * Get the appropriate timeout for the given job. + * + * @param JobsInterface|null $job + * @param WorkerOptions $options + * + * @return int */ - public function getManager() + protected function timeoutForJob($job, WorkerOptions $options) { - return $this->manager; + return $job && !is_null($job->timeout()) ? $job->timeout() : $options->timeout; } /** - * @param Manager $manager + * Determine if the daemon should process on this iteration. * - * @return Worker + * @param WorkerOptions $options + * + * @return bool */ - public function setManager($manager) + protected function daemonShouldRun(WorkerOptions $options) { - $this->manager = $manager; + return ! (($this->manager->isDownForMaintenance() && ! $options->force) || + $this->paused || + $this->until() === false); + } - return $this; + /** + * Pause the worker for the current loop. + * + * @param WorkerOptions $options + * @param int $lastRestart + * @return void + */ + protected function pauseWorker(WorkerOptions $options, $lastRestart) + { + $this->sleep($options->sleep > 0 ? $options->sleep : 1); + + $this->stopIfNecessary($options, $lastRestart); } /** - * Enable async signals for the process. + * Stop the process if necessary. + * + * @param WorkerOptions $options + * @param int $lastRestart */ - protected function listenForSignals() + protected function stopIfNecessary(WorkerOptions $options, $lastRestart) { - if ($this->supportsAsyncSignals()) { + if ($this->shouldQuit) { - pcntl_async_signals(true); + $this->kill(); + } - pcntl_signal(SIGUSR2, function () { + if ($this->memoryExceeded($options->memory)) { - $this->paused = true; - }); + $this->stop(12); - pcntl_signal(SIGCONT, function () { + } elseif ($this->queueShouldRestart($lastRestart)) { - $this->paused = false; - }); + $this->stop(); } } /** - * Determine if "async" signals are supported. + * Process the next job on the queue. * - * @return bool + * @param string $connectionName + * @param string $queue + * @param WorkerOptions $options */ - protected function supportsAsyncSignals() + public function runNextJob($connectionName, $queue, WorkerOptions $options) { - return version_compare(PHP_VERSION, '7.1.0') >= 0 && extension_loaded('pcntl'); - } + $job = $this->getNextJob($this->manager->connection($connectionName), $queue); - /** - * Get the last queue restart timestamp, or null. - * - * @return int|null - */ - protected function getTimestampOfLastQueueRestart() - { - if ($this->cache) { + // If we're able to pull a job off of the stack, we will process it and then return + // from this method. If there is no job on the queue, we will "sleep" the worker + // for the specified number of seconds, then keep processing jobs after sleep. + if ($job) { - return $this->cache->get('ideasbucket_queue:restart'); + $this->runJob($job, $connectionName, $options); } + + $this->sleep($options->sleep); } /** @@ -207,124 +263,74 @@ protected function getNextJob($connection, $queue) } catch (\Throwable $e) { - $this->exceptions->report(new FatalThrowableError($e)); + $this->exceptions->report($e = new FatalThrowableError($e)); + + $this->stopWorkerIfLostConnection($e); } } /** - * Register the worker timeout handler (PHP 7.1+). + * Process the given job. * - * @param JobsInterface|null $job - * @param WorkerOptions $options + * @param JobsInterface $job + * @param string $connectionName + * @param WorkerOptions $options */ - protected function registerTimeoutHandler($job, WorkerOptions $options) + protected function runJob($job, $connectionName, WorkerOptions $options) { - if ($options->timeout > 0 && $this->supportsAsyncSignals()) { - - // We will register a signal handler for the alarm signal so that we can kill this - // process if it is running too long because it has frozen. This uses the async - // signals supported in recent versions of PHP to accomplish it conveniently. - pcntl_signal(SIGALRM, function () { + try { + $this->process($connectionName, $job, $options); - $this->kill(1); - }); + } catch (\Exception $e) { - pcntl_alarm($this->timeoutForJob($job, $options) + $options->sleep); - } - } + $this->exceptions->report($e); + $this->stopWorkerIfLostConnection($e); - /** - * Kill the process. - * - * @param int $status - */ - public function kill($status = 0) - { - if (extension_loaded('posix')) { + } catch (\Throwable $e) { - posix_kill(getmypid(), SIGKILL); + $this->exceptions->report($e = new FatalThrowableError($e)); + $this->stopWorkerIfLostConnection($e); } - - exit($status); } /** - * Get the appropriate timeout for the given job. - * - * @param JobsInterface|null $job - * @param WorkerOptions $options + * Stop the worker if we have lost connection to a database. * - * @return int + * @param \Exception $e + * @return void */ - protected function timeoutForJob($job, WorkerOptions $options) + protected function stopWorkerIfLostConnection($e) { - return $job && !is_null($job->timeout()) ? $job->timeout() : $options->timeout; - } + if ($this->causedByLostConnection($e)) { - /** - * Determine if the daemon should process on this iteration. - * - * @param WorkerOptions $options - * - * @return bool - */ - protected function daemonShouldRun(WorkerOptions $options) - { - if (($this->manager->isDownForMaintenance() && !$options->force) || - $this->paused || - $this->until() === false - ) { - - // If the application is down for maintenance or doesn't want the queues to run - // we will sleep for one second just in case the developer has it set to not - // sleep at all. This just prevents CPU from maxing out in this situation. - $this->sleep(1); - - return false; + $this->shouldQuit = true; } - - return true; - } - - /** - * @return bool - */ - protected function until() - { - return $this->events->dispatch(EventsList::LOOPING, new Event\Looping)->isPropagationStopped(); } /** - * Sleep the script for a given number of seconds. + * Determine if the given exception was caused by a lost connection. * - * @param int $seconds - */ - public function sleep($seconds) - { - sleep($seconds); - } - - /** - * Process the given job. + * @param \Exception $e * - * @param JobsInterface $job - * @param string $connectionName - * @param WorkerOptions $options + * @return bool */ - protected function runJob($job, $connectionName, WorkerOptions $options) + protected function causedByLostConnection(\Exception $e) { - try { - - $this->process($connectionName, $job, $options); - - } catch (\Exception $e) { - - $this->exceptions->report($e); - - } catch (\Throwable $e) { - - $this->exceptions->report(new FatalThrowableError($e)); - } + $message = $e->getMessage(); + + return StringHelper::contains($message, [ + 'server has gone away', + 'no connection to the server', + 'Lost connection', + 'is dead or not enabled', + 'Error while sending', + 'decryption failed or bad record mac', + 'server closed the connection unexpectedly', + 'SSL connection has been closed unexpectedly', + 'Error writing data to the connection', + 'Resource deadlock avoided', + 'Transaction() on null', + ]); } /** @@ -365,14 +371,38 @@ public function process($connectionName, $job, WorkerOptions $options) } /** - * Raise the before queue job event. + * Handle an exception that occurred while the job was running. * * @param string $connectionName * @param JobsInterface $job + * @param WorkerOptions $options + * @param \Exception $e + * + * @throws \Exception */ - protected function raiseBeforeJobEvent($connectionName, $job) + protected function handleJobException($connectionName, $job, WorkerOptions $options, $e) { - $this->events->dispatch(EventsList::JOB_PROCESSING, new Event\JobProcessing($connectionName, $job)); + try { + + // First, we will go ahead and mark the job as failed if it will exceed the maximum + // attempts it is allowed to run the next time we process it. If so we will just + // go ahead and mark it as failed now so we do not have to release this again. + $this->markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, (int)$options->maxTries, $e); + + $this->raiseExceptionOccurredJobEvent($connectionName, $job, $e); + + } finally { + + // If we catch an exception, we will attempt to release the job back onto the queue + // so it is not lost entirely. This'll let the job be retried at a later time by + // another listener (or this same one). We will re-throw this exception after. + if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) { + + $job->release($options->delay); + } + } + + throw $e; } /** @@ -393,13 +423,31 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $ return; } - $exception = new MaxAttemptsExceededException; - - $this->failJob($connectionName, $job, $exception); + $this->failJob($connectionName, $job, $exception = new MaxAttemptsExceededException( + 'A queued job has been attempted too many times. The job may have previously timed out.' + )); throw $exception; } + /** + * Mark the given job as failed if it has exceeded the maximum allowed attempts. + * + * @param string $connectionName + * @param JobsInterface $job + * @param int $maxTries + * @param \Exception $e + */ + protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e) + { + $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; + + if ($maxTries > 0 && $job->attempts() >= $maxTries) { + + $this->failJob($connectionName, $job, $e); + } + } + /** * Mark the given job as failed and raise the relevant event. * @@ -429,6 +477,18 @@ protected function failJob($connectionName, $job, $e) } } + + /** + * Raise the before queue job event. + * + * @param string $connectionName + * @param JobsInterface $job + */ + protected function raiseBeforeJobEvent($connectionName, $job) + { + $this->events->dispatch(EventsList::JOB_PROCESSING, new Event\JobProcessing($connectionName, $job)); + } + /** * Raise the after queue job event. * @@ -441,68 +501,78 @@ protected function raiseAfterJobEvent($connectionName, $job) } /** - * Handle an exception that occurred while the job was running. + * Raise the exception occurred queue job event. * * @param string $connectionName * @param JobsInterface $job - * @param WorkerOptions $options * @param \Exception $e - * - * @throws \Exception */ - protected function handleJobException($connectionName, $job, WorkerOptions $options, $e) + protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e) { - try { + $this->events->dispatch(EventsList::JOB_EXCEPTION_OCCURRED, new Event\JobExceptionOccurred($connectionName, $job, $e)); + } - // First, we will go ahead and mark the job as failed if it will exceed the maximum - // attempts it is allowed to run the next time we process it. If so we will just - // go ahead and mark it as failed now so we do not have to release this again. - $this->markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, (int)$options->maxTries, $e); - $this->raiseExceptionOccurredJobEvent($connectionName, $job, $e); + /** + * Determine if the queue worker should restart. + * + * @param int|null $lastRestart + * + * @return bool + */ + protected function queueShouldRestart($lastRestart) + { + return $this->getTimestampOfLastQueueRestart() != $lastRestart; + } - } finally { - // If we catch an exception, we will attempt to release the job back onto the queue - // so it is not lost entirely. This'll let the job be retried at a later time by - // another listener (or this same one). We will re-throw this exception after. - if (!$job->isDeleted()) { + /** + * Get the last queue restart timestamp, or null. + * + * @return int|null + */ + protected function getTimestampOfLastQueueRestart() + { + if ($this->cache) { - $job->release($options->delay); - } + return $this->cache->get('ideasbucket_queue:restart'); } - - throw $e; } /** - * Mark the given job as failed if it has exceeded the maximum allowed attempts. - * - * @param string $connectionName - * @param JobsInterface $job - * @param int $maxTries - * @param \Exception $e + * Enable async signals for the process. */ - protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e) + protected function listenForSignals() { - $maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries; + if ($this->supportsAsyncSignals()) { - if ($maxTries > 0 && $job->attempts() >= $maxTries) { + pcntl_async_signals(true); - $this->failJob($connectionName, $job, $e); + pcntl_signal(SIGTERM, function () { + + $this->shouldQuit = true; + }); + + pcntl_signal(SIGUSR2, function () { + + $this->paused = true; + }); + + pcntl_signal(SIGCONT, function () { + + $this->paused = false; + }); } } /** - * Raise the exception occurred queue job event. + * Determine if "async" signals are supported. * - * @param string $connectionName - * @param JobsInterface $job - * @param \Exception $e + * @return bool */ - protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e) + protected function supportsAsyncSignals() { - $this->events->dispatch(EventsList::JOB_EXCEPTION_OCCURRED, new Event\JobExceptionOccurred($connectionName, $job, $e)); + return version_compare(PHP_VERSION, '7.1.0') >= 0 && extension_loaded('pcntl'); } /** @@ -530,37 +600,28 @@ public function stop($status = 0) } /** - * Determine if the queue worker should restart. - * - * @param int|null $lastRestart + * Kill the process. * - * @return bool + * @param int $status */ - protected function queueShouldRestart($lastRestart) + public function kill($status = 0) { - return $this->getTimestampOfLastQueueRestart() != $lastRestart; + if (extension_loaded('posix')) { + + posix_kill(getmypid(), SIGKILL); + } + + exit($status); } /** - * Process the next job on the queue. + * Sleep the script for a given number of seconds. * - * @param string $connectionName - * @param string $queue - * @param WorkerOptions $options + * @param int $seconds */ - public function runNextJob($connectionName, $queue, WorkerOptions $options) + public function sleep($seconds) { - $job = $this->getNextJob($this->manager->connection($connectionName), $queue); - - // If we're able to pull a job off of the stack, we will process it and then return - // from this method. If there is no job on the queue, we will "sleep" the worker - // for the specified number of seconds, then keep processing jobs after sleep. - if ($job) { - - $this->runJob($job, $connectionName, $options); - } - - $this->sleep($options->sleep); + sleep($seconds); } /** @@ -573,6 +634,34 @@ public function setCache(Cache $cache) $this->cache = $cache; } + /** + * @return Manager + */ + public function getManager() + { + return $this->manager; + } + + /** + * @param Manager $manager + * + * @return Worker + */ + public function setManager($manager) + { + $this->manager = $manager; + + return $this; + } + + /** + * @return bool + */ + protected function until() + { + return $this->events->dispatch(EventsList::LOOPING, new Event\Looping)->isPropagationStopped(); + } + /** * Raise the failed queue job event. *