Skip to content

Commit

Permalink
优化代码 (#667)
Browse files Browse the repository at this point in the history
* 修复 Cron 在某些场景抛出异常

* 优化 Db::select() 默认使用从库

* 格式化代码
  • Loading branch information
Yurunsoft authored Dec 21, 2023
1 parent f8dff84 commit f28f988
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 45 deletions.
91 changes: 49 additions & 42 deletions src/Components/swoole/src/Cron/Scheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,49 +49,56 @@ public function __construct()
*/
public function run(ITaskParam $param)
{
/** @var \Imi\Cron\CronTask $task */
$task = $param->getData();
/** @var \Imi\Cron\CronManager $cronManager */
$cronManager = App::getBean('CronManager');
switch ($type = $task->getType())
try
{
case CronTaskType::RANDOM_WORKER:
$swooleServer = ServerManager::getServer('main', ISwooleServer::class)->getSwooleServer();
$taskClass = $task->getTask();
$swooleServer->sendMessage(json_encode([
'action' => 'cronTask',
'id' => $task->getId(),
'data' => $task->getData(),
'task' => \is_callable($taskClass) ? null : $taskClass,
'type' => $type,
], \JSON_THROW_ON_ERROR | \JSON_UNESCAPED_SLASHES | \JSON_UNESCAPED_UNICODE), random_int(0, $swooleServer->setting['worker_num'] - 1));
break;
case CronTaskType::ALL_WORKER:
$swooleServer = ServerManager::getServer('main', ISwooleServer::class)->getSwooleServer();
$taskClass = $task->getTask();
$message = json_encode([
'action' => 'cronTask',
'id' => $task->getId(),
'data' => $task->getData(),
'task' => \is_callable($taskClass) ? null : $taskClass,
'type' => $type,
], \JSON_THROW_ON_ERROR | \JSON_UNESCAPED_SLASHES | \JSON_UNESCAPED_UNICODE);
for ($i = 0; $i < $swooleServer->setting['worker_num']; ++$i)
{
$swooleServer->sendMessage($message, $i);
}
break;
case CronTaskType::TASK:
$cronManager->getTaskCallable($task->getId(), $task->getTask(), $type)($task->getId(), $task->getData());
break;
case CronTaskType::PROCESS:
$cronManager->getTaskCallable($task->getTask(), $task->getTask(), $type)($task->getId(), $task->getData());
break;
case CronTaskType::CRON_PROCESS:
/** @var \Imi\Cron\CronWorker $cronWorker */
$cronWorker = App::getBean('CronWorker');
$cronWorker->exec($task->getId(), $task->getData(), $task->getTask(), $type);
break;
/** @var \Imi\Cron\CronTask $task */
$task = $param->getData();
/** @var \Imi\Cron\CronManager $cronManager */
$cronManager = App::getBean('CronManager');
switch ($type = $task->getType())
{
case CronTaskType::RANDOM_WORKER:
$swooleServer = ServerManager::getServer('main', ISwooleServer::class)->getSwooleServer();
$taskClass = $task->getTask();
$swooleServer->sendMessage(json_encode([
'action' => 'cronTask',
'id' => $task->getId(),
'data' => $task->getData(),
'task' => \is_callable($taskClass) ? null : $taskClass,
'type' => $type,
], \JSON_THROW_ON_ERROR | \JSON_UNESCAPED_SLASHES | \JSON_UNESCAPED_UNICODE), random_int(0, $swooleServer->setting['worker_num'] - 1));
break;
case CronTaskType::ALL_WORKER:
$swooleServer = ServerManager::getServer('main', ISwooleServer::class)->getSwooleServer();
$taskClass = $task->getTask();
$message = json_encode([
'action' => 'cronTask',
'id' => $task->getId(),
'data' => $task->getData(),
'task' => \is_callable($taskClass) ? null : $taskClass,
'type' => $type,
], \JSON_THROW_ON_ERROR | \JSON_UNESCAPED_SLASHES | \JSON_UNESCAPED_UNICODE);
for ($i = 0; $i < $swooleServer->setting['worker_num']; ++$i)
{
$swooleServer->sendMessage($message, $i);
}
break;
case CronTaskType::TASK:
$cronManager->getTaskCallable($task->getId(), $task->getTask(), $type)($task->getId(), $task->getData());
break;
case CronTaskType::PROCESS:
$cronManager->getTaskCallable($task->getTask(), $task->getTask(), $type)($task->getId(), $task->getData());
break;
case CronTaskType::CRON_PROCESS:
/** @var \Imi\Cron\CronWorker $cronWorker */
$cronWorker = App::getBean('CronWorker');
$cronWorker->exec($task->getId(), $task->getData(), $task->getTask(), $type);
break;
}
}
catch (\Throwable $th)
{
Log::error($th);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Db/Db.php
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public static function exec(string $sql, array $bindValues = [], ?string $poolNa
/**
* 执行 SQL 返回结果.
*/
public static function select(string $sql, array $bindValues = [], ?string $poolName = null, int $queryType = QueryType::WRITE): ?IResult
public static function select(string $sql, array $bindValues = [], ?string $poolName = null, int $queryType = QueryType::READ): ?IResult
{
$db = self::getInstance($poolName, $queryType);
if ($bindValues)
Expand Down
3 changes: 1 addition & 2 deletions src/Model/Model.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Imi\Db\Db;
use Imi\Db\Query\Interfaces\IQuery;
use Imi\Db\Query\Interfaces\IResult;
use Imi\Db\Query\QueryType;
use Imi\Db\Query\Raw;
use Imi\Db\Query\Result;
use Imi\Event\Event;
Expand Down Expand Up @@ -227,7 +226,7 @@ public static function exists(...$ids): bool
}
}

return (bool) Db::select('select exists(' . $query->buildSelectSql() . ')', $query->getBinds(), static::__getMeta(static::__getRealClassName())->getDbPoolName(), QueryType::READ)->getScalar();
return (bool) Db::select('select exists(' . $query->buildSelectSql() . ')', $query->getBinds(), static::__getMeta(static::__getRealClassName())->getDbPoolName())->getScalar();
}

/**
Expand Down

0 comments on commit f28f988

Please sign in to comment.