Skip to content

Commit

Permalink
修复定时任务指定 force=true 后,启动服务会执行两次 (#474)
Browse files Browse the repository at this point in the history
* 修复定时任务指定 force=true 后,启动服务会执行两次

* 优化代码

* 修复
# Conflicts:
#	src/Components/swoole/src/Cron/Scheduler.php
#	src/Cron/Scheduler.php
  • Loading branch information
Yurunsoft committed Mar 3, 2023
1 parent db2c3d5 commit 32e54c6
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 113 deletions.
108 changes: 3 additions & 105 deletions src/Components/swoole/src/Cron/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@

namespace Imi\Swoole\Cron;

use Imi\Aop\Annotation\Inject;
use Imi\App;
use Imi\Bean\Annotation\Bean;
use Imi\Cron\Contract\ICronManager;
use Imi\Cron\Contract\IScheduler;
use Imi\Cron\CronCalculator;
use Imi\Cron\CronLock;
use Imi\Cron\CronTask;
use Imi\Cron\Message\Result;
use Imi\Log\Log;
use Imi\Server\ServerManager;
use Imi\Swoole\Cron\Consts\CronTaskType;
Expand All @@ -26,23 +20,8 @@
*
* 定时任务调度器
*/
class Scheduler implements IScheduler
class Scheduler extends \Imi\Cron\Scheduler
{
/**
* @Inject("CronManager")
*/
protected ICronManager $cronManager;

/**
* @Inject("CronCalculator")
*/
protected CronCalculator $cronCalculator;

/**
* @Inject("CronLock")
*/
protected CronLock $cronLock;

/**
* 协程工作池的协程数量.
*/
Expand All @@ -58,25 +37,9 @@ class Scheduler implements IScheduler
*/
private CoPool $coPool;

/**
* 下次执行时间集合.
*/
private array $nextTickTimeMap = [];

/**
* 正在执行的任务列表.
*
* @var \Imi\Cron\CronTask[]
*/
private array $runningTasks = [];

/**
* 首次执行记录集合.
*/
private array $firstRunMap = [];

public function __construct()
{
parent::__construct();
$this->coPool = $coPool = new CoPool($this->poolCoCount, $this->poolQueueLength,
// 定义任务匿名类,当然你也可以定义成普通类,传入完整类名
// @phpstan-ignore-next-line
Expand Down Expand Up @@ -146,50 +109,10 @@ public function run(ITaskParam $param)
*/
public function close(): void
{
parent::close();
$this->coPool->stop();
}

/**
* {@inheritDoc}
*/
public function schedule(): \Generator
{
$now = time();
$runningTasks = &$this->runningTasks;
$nextTickTimeMap = &$this->nextTickTimeMap;
$cronCalculator = $this->cronCalculator;
$firstRunMap = &$this->firstRunMap;
foreach ($this->cronManager->getRealTasks() as $task)
{
$id = $task->getId();
if (isset($runningTasks[$id]))
{
if (time() < $task->getLastRunTime() + $task->getMaxExecutionTime())
{
continue;
}
else
{
unset($runningTasks[$id]);
}
}
$nextTickTimeMap[$id] ??= $cronCalculator->getNextTickTime($task->getLastRunTime(), $task->getCronRules());
$firstRun = !isset($firstRunMap[$id]) && $task->getForce();
if ($firstRun || $now >= $nextTickTimeMap[$id])
{
if ($firstRun)
{
$firstRunMap[$id] = true;
}
else
{
unset($nextTickTimeMap[$id]);
}
yield $task;
}
}
}

/**
* {@inheritDoc}
*/
Expand All @@ -205,29 +128,4 @@ public function runTask(CronTask $task): void
$this->runningTasks[$task->getId()] = $task;
$this->coPool->addTaskAsync($task);
}

/**
* {@inheritDoc}
*/
public function completeTask(Result $result): void
{
$runningTasks = &$this->runningTasks;
$resultId = $result->id;
if (isset($runningTasks[$resultId]))
{
if (!$this->cronLock->unlock($runningTasks[$resultId]))
{
Log::error(sprintf('Task %s unlock failed', $resultId));
}
unset($runningTasks[$resultId]);
}
if ($result->success)
{
Log::info(sprintf('Task: %s, Process: %s#%s, Success', $resultId, $result->processType, $result->processId));
}
else
{
Log::error(sprintf('Task: %s, Process: %s#%s, %s', $resultId, $result->processType, $result->processId, $result->message));
}
}
}
17 changes: 9 additions & 8 deletions src/Cron/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,23 @@ class Scheduler implements IScheduler
/**
* 下次执行时间集合.
*/
private array $nextTickTimeMap = [];
protected array $nextTickTimeMap = [];

/**
* 正在执行的任务列表.
*
* @var \Imi\Cron\CronTask[]
*/
private array $runningTasks = [];
protected array $runningTasks = [];

/**
* 首次执行记录集合.
*/
private array $firstRunMap = [];
protected array $firstRunMap = [];

public function __construct()
{
}

/**
* {@inheritDoc}
Expand Down Expand Up @@ -89,16 +93,13 @@ public function schedule(): \Generator
}
$nextTickTimeMap[$id] ??= $cronCalculator->getNextTickTime($task->getLastRunTime(), $task->getCronRules());
$firstRun = !isset($firstRunMap[$id]) && $task->getForce();
if ($firstRun || $now >= $nextTickTimeMap[$id])
if ($firstRun || (null !== $nextTickTimeMap[$id] && $now >= $nextTickTimeMap[$id]))
{
if ($firstRun)
{
$firstRunMap[$id] = true;
}
else
{
unset($nextTickTimeMap[$id]);
}
unset($nextTickTimeMap[$id]);
yield $task;
}
}
Expand Down

0 comments on commit 32e54c6

Please sign in to comment.