Skip to content

hectorqin/php-resque

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

PHP-Resque

chrisboulton/php-resque 的基础上进行了如下改造:

  • 采用psr-4自动加载规范
  • 合并 php-resque-scheduler
  • 新增自定义处理方法worker
  • 支持自定义worker
  • 支持redis扩展及Predis扩展
  • 支持Timer定时器功能
  • 支持Crontab定时任务功能
  • 支持ThinkPHP5/6命令行使用
  • 支持THinkPHP3.2控制器CLI模式使用
  • 提供Controller trait便于其它框架启动worker
  • 提供MagicCall trait便于提交队列任务

安装

composer require hectorqin/php-resque

配置

TP5/6 默认获取 resque 配置文件, TP3.2 默认获取 RESQUE_CONFIG 配置

<?php

use Psr\Log\LogLevel;
use Resque\Listener\StatsListener;
use Resque\WorkerManager;

return [
    // 是否守护进程
    'daemonize'       => false,
    // redis 数据库信息
    'redis_backend'   => '',
    // redis 数据库编号
    'redis_database'  => '',
    // worker消费间隔
    'interval'        => 5,
    // worker组
    'worker_group'    => [
        [
            "type"  => "Worker",
            "queue" => "deault",
            "nums"  => 1,
        ],
        [
            "type"  => "SchedulerWorker",
            "queue" => "default",
            "nums"  => 1,
        ],
        [
            "type"     => "CustomWorker",
            "queue"    => "dump_date",
            'interval' => 5,
            "nums"     => 2,
            // 可使用闭包
            "handler"  => function () {
                WorkerManager::log("current datetime" . date("Y-m-d H:i:s"));
            },
        ],
        [
            "type"        => "CrontabWorker",
            "queue"       => "crontab",
            "nums"        => 2,
            "workerQueue" => "default",
        ],
    ],

    // 动态加载文件,使用 glob 函数
    'app_include'     => '',

    // redis存储前缀,默认为 resque
    'prefix'          => '',

    // manager pid文件
    'pidfile'         => './resque.pid',

    // 日志文件
    'log_file'        => './resque.log',

    // status临时文件
    'statistics_file' => './resque.status',

    // 是否不调用pcntl_fork,不调用时只支持一个workerGroup,且只有一个进程
    'no_fork'         => false,

    // 是否记录info日志
    'verbose'         => false,
    // 是否记录debug日志
    'vverbose'        => false,
    // 日志记录级别
    'log_level'       => [
        LogLevel::EMERGENCY,
        LogLevel::ALERT,
        LogLevel::CRITICAL,
        LogLevel::ERROR,
        LogLevel::WARNING,
        LogLevel::NOTICE,
        // LogLevel::INFO,
    ],

    // worker事件监听器
    'listener' => [
        StatsListener::class,
    ],

    // manager进程定时器 ["interval"=>1, "handler"=>callable, "params"=>[], "persistent"=>true]
    // interval 时间间隔,handler 回调函数(可使用闭包),params 回调参数,persistent 是否周期性任务
    'manager_timer' => [

    ],

    // worker 上的 秒级 crontab ["name"=>'mycrontab',"rule"=>"0 * * * * *","handler"=>callable,"params"=>[]]
    // name 名称,rule 定时规则,handler 回调函数(不能使用闭包,闭包可以用opis/closure包装一下),params 回调参数
    // 必须启动 CrontabWorker 和 SchedulerWorker
    'worker_crontab' => [

    ],
];

使用

启动worker

# 测试
./vendor/bin/resque

# ThinkPHP使用
php think resque start
# 守护进程
php think resque start -d

投递任务

投递任务之前需要设置 ResqueRedis 的配置, Resque::setBackend($redis_backend);,TP5 / TP6 默认自动使用resque配置文件。TP3.2需要手动配置 Resque::setBackend(C('RESQUE_CONFIG.redis_backend'));

简单任务

handler为任务执行函数

use Resque\Job\SimpleJob;

// 投递到 default 队列
SimpleJob::default([
    'handler' => ['\\app\\index\\controller\\Index', 'job'],
    'params'  => [
        'hello' => 0
    ], // ThinkPHP 支持参数绑定,其他框架请按照参数顺序传值
], true);

// 投递到 task 队列,延迟10秒执行
SimpleJob::task([
    'handler' => ['\\app\\index\\controller\\Index', 'job'],
    'params'  => [
        'hello' => 0
    ], // ThinkPHP 支持参数绑定,其他框架请按照参数顺序传值
], true, 10);

// 投递到 task 队列,定时执行
SimpleJob::task([
    'handler' => ['\\app\\index\\controller\\Index', 'job'],
    'params'  => [
        'hello' => 0
    ], // ThinkPHP 支持参数绑定,其他框架请按照参数顺序传值
], true, strtotime("2020-12-01"));
<?php
namespace app\index\controller;

class Index
{
    function job($hello) {
        echo $hello;
    }
}

自定义任务

编写自定义任务类,继承 SimpleJob,在 execute 方法内编写任务逻辑

use Resque\Job\SimpleJob;
use Resque\Job;

/**
 * 自定义任务
 * @property array $args 任务参数
 * @property Job $job 任务对象
 * @property string $queue 队列名称
 */
class MyJob extends SimpleJob
{
    /**
     * 任务执行逻辑
     *
     * TP支持参数绑定,其余框架仅支持数组参数
     *
     * @return mixed
     * @throws ThinkException
     * @throws InvalidArgumentException
     * @throws ClassNotFoundException
     */
    public function execute($hello)
    {
        // 自定义任务逻辑
    }

    /**
     * 成功回调
     *
     * @return void
     */
    public function onSuccess()
    {
    }

    /**
     * 失败回调
     * @return void
     */
    public function onError()
    {
    }
}

// 投递任务
MyJob::enqueue([
    'hello' => 1
]);

// 延迟3秒执行任务
MyJob::enqueueAt([
    'hello' => 1
], 3);

// 一个小时后执行任务
MyJob::enqueueAt([
    'hello' => 1
], time() + 3600);

(new MyJob())->delay(5)->executeAsync($param1, $param2);

监听事件

// Worker、SchedulerWorker、CustomWorker、CrontabWorker 启动事件
Event::listen('onWorkerStart', [$this, 'onWorkerStart']);
// Worker 创建job进程前事件
Event::listen('beforeForkExecutor', [$this, 'beforeForkExecutor']);
// Worker 创建job进程后事件
Event::listen('afterForkExecutor', [$this, 'afterForkExecutor']);
// 任务执行前事件
Event::listen('beforePerformJob', [$this, 'beforePerformJob']);
// 任务执行后事件
Event::listen('afterPerformJob', [$this, 'afterPerformJob']);
// 任务执行失败事件
Event::listen('onJobFailed', [$this, 'onJobFailed']);
// Worker、SchedulerWorker、CustomWorker、CrontabWorker 关闭事件
Event::listen('onWorkerStop', [$this, 'onWorkerStop']);
// 任务入队前事件
Event::listen('beforeEnqueue', [$this, 'beforeEnqueue']);
// 任务入队后事件
Event::listen('afterEnqueue', [$this, 'afterEnqueue']);
// 延迟/定时任务进入定时队列后事件
Event::listen('afterSchedule', [$this, 'afterSchedule']);
// 延迟/定时任务进入执行队列前事件
Event::listen('beforeDelayedEnqueue', [$this, 'beforeDelayedEnqueue']);

快捷投递任务

Class Test
{
    use \Resque\Concern\MagicCall;

    /**
     * 任务执行 handler
     */
    function testResque($name)
    {
        trace('Hello ' . $name);
    }

    function test()
    {
        // 设置默认同步执行,不使用队列,可用于调试
        // $this->setSync(true);
        $this->setDefaultQueue('default');
        // 任务 handler方法名 + Async 即可投递任务,参数顺序保持一致,方便调试
        $this->testResqueAsync('hector');
        $this->delay(5)->testResqueAsync('hector');
        $this->at(time() + 10)->testResqueAsync('hector');
        $this->at(time() + 15)->handler([\get_class(), 'testResque'], [
            'hector'
        ])->send();

        // 同步执行,不使用队列,调试用
        $this->sync(true)->testResqueAsync('hector');
    }
}

$test = new Test();
$test->test();