From 28cf2c7385ee32f62f00ead7c5244691784808cd Mon Sep 17 00:00:00 2001 From: Yurun Date: Wed, 18 Oct 2023 17:08:08 +0800 Subject: [PATCH] =?UTF-8?q?[3.0]=20imi-amqp=20=E6=94=B9=E9=80=A0=20(#595)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 废弃 AMQPSwooleConnection * 重构 AMQP 连接实例化 * 废弃 AMQP Connection 注解 * 更新文档 * 废弃 Connection 注解改为废弃连接配置项 * 更新文档 --- doc/SUMMARY.md | 2 +- doc/base/version/2.0-2.1.md | 365 -------------- doc/base/version/2.1-3.0.md | 17 + doc/components/mq/amqp.md | 82 ++-- src/Components/amqp/README.md | 447 ------------------ .../amqp/example/AMQP/Test/TestConsumer.php | 7 +- .../amqp/example/AMQP/Test/TestMessage.php | 34 +- .../amqp/example/AMQP/Test/TestPublisher.php | 7 +- .../amqp/example/AMQP/Test2/TestConsumer2.php | 37 -- .../amqp/example/AMQP/Test2/TestMessage2.php | 103 ---- .../example/AMQP/Test2/TestPublisher2.php | 24 - .../ApiServer/Controller/IndexController.php | 17 +- .../example/Process/SwooleTestProcess.php | 14 +- ...tProcess1.php => WorkermanTestProcess.php} | 7 +- .../example/Process/WorkermanTestProcess2.php | 44 -- src/Components/amqp/example/config/beans.php | 1 - src/Components/amqp/example/config/config.php | 1 - .../amqp/src/Annotation/Connection.php | 41 +- src/Components/amqp/src/Base/Traits/TAMQP.php | 61 +-- .../amqp/src/Pool/AMQPCoroutinePool.php | 16 +- src/Components/amqp/src/Pool/AMQPPool.php | 152 +++++- src/Components/amqp/src/Pool/AMQPResource.php | 4 - .../amqp/src/Swoole/AMQPSwooleConnection.php | 139 ------ src/Components/amqp/src/Swoole/SwooleIO.php | 218 --------- .../amqp/tests/RabbitMQ/RabbitMQTest.php | 6 +- 25 files changed, 263 insertions(+), 1583 deletions(-) delete mode 100644 doc/base/version/2.0-2.1.md create mode 100644 doc/base/version/2.1-3.0.md delete mode 100644 src/Components/amqp/example/AMQP/Test2/TestConsumer2.php delete mode 100644 src/Components/amqp/example/AMQP/Test2/TestMessage2.php delete mode 100644 src/Components/amqp/example/AMQP/Test2/TestPublisher2.php rename src/Components/amqp/example/Process/{WorkermanTestProcess1.php => WorkermanTestProcess.php} (85%) delete mode 100644 src/Components/amqp/example/Process/WorkermanTestProcess2.php delete mode 100644 src/Components/amqp/src/Swoole/AMQPSwooleConnection.php delete mode 100644 src/Components/amqp/src/Swoole/SwooleIO.php diff --git a/doc/SUMMARY.md b/doc/SUMMARY.md index 8420eb40c1..1a1a7f0f0d 100644 --- a/doc/SUMMARY.md +++ b/doc/SUMMARY.md @@ -16,7 +16,7 @@ * [开始一个新项目](base/new.md) * [应用配置](base/config.md) * [常见问题](base/qa.md) -* [v2.0-v2.1 升级指南](base/version/2.0-2.1.md) +* [v2.1-v3.0升级指南](base/version/2.1-3.0.md) ## 框架核心 diff --git a/doc/base/version/2.0-2.1.md b/doc/base/version/2.0-2.1.md deleted file mode 100644 index 5403f42732..0000000000 --- a/doc/base/version/2.0-2.1.md +++ /dev/null @@ -1,365 +0,0 @@ -# v2.0-v2.1 升级指南 - -[toc] - -v2.0 是一个非常成功的 LTS 版本,进行了底层重构,增加了强类型规范化以及功能增强,让我们的项目拥有了更强大的能力。 - -相比于 v2.0 版本,v2.1 版本不会有太大的不兼容性更改,可以在参考本页说明的情况下平滑升级。 - -## 不兼容的更改 - -* Swoole 最低版本从 4.7 升级为 4.8。 - -* `RedisModel`、`MemoryTableModel` 初始化数据时,`@Column` 注解配置的 `type` 将不生效。`json`、`list` 类型字段不会自动处理,此特性仅支持数据库表模型。 - -* 在使用 `\Imi\Swoole\Process\Pool` 调用 `start()` 方法启动进程池后,请使用 `wait()` 方法等待进程池结束。 - -* Swoole CronProcess 的 Unix Socket 文件名已更改。 - -## 新功能 - -### v2.1.53 - -**发布日期:** `2023-09-01` - -* 增强 `whereBrackets`,支持查询条件收集器 ([#580](https://github.com/imiphp/imi/pull/580)) ([文档](https://doc.imiphp.com/v2.1/components/db/index.html#whereBrackets)) - -### v2.1.52 - -**发布日期:** `2023-08-18` - -* 支持在分页查询时指定查询记录数量时的字段 ([#575](https://github.com/imiphp/imi/pull/575)) - -### v2.1.51 - -**发布日期:** `2023-08-05` - -* 模型关联支持跨数据库和跨连接池场景,模型关联注解新增 `poolName` 参数 - -* 模型 `@JsonDecode` 注解新增 `arrayWrap` 参数 ([#569](https://github.com/imiphp/imi/pull/569)) - -### v2.1.49 - -**发布日期:** `2023-07-10` - -* Redis 模型支持[安全删除记录](/v2.1/components/orm/RedisModel.html#%E5%AE%89%E5%85%A8%E5%88%A0%E9%99%A4%E8%AE%B0%E5%BD%95) ([#555](https://github.com/imiphp/imi/pull/555)) - -* Redis 模型 `hash_object` 模式支持设置字段类型(`json`/`list`/`set`) ([文档](/v2.1/components/orm/RedisModel.html#@Column)) ([#560](https://github.com/imiphp/imi/pull/560)) - -* Redis 模型 `hash_object` 模式在不启用 redis 序列化情况下支持强类型字段 ([#560](https://github.com/imiphp/imi/pull/560)) - -* RedisModel::find()、generateKey()、generateMember() 改为参数非必传 ([#560](https://github.com/imiphp/imi/pull/560)) - -### v2.1.48 - -**发布日期:** `2023-07-03` - -* 支持 MySQL 高性能分页查询(大表分页类) ([#542](https://github.com/imiphp/imi/pull/542)) - -* 生成模型相关改进 ([#537](https://github.com/imiphp/imi/pull/537)) - -* 数据库迁移组件([imi-migration](https://github.com/imiphp/imi-migration)),支持一种新的迁移方式 - -### v2.1.47 - -**发布日期:** `2023-06-09` - -* 支持 PostgreSQL 生成模型配置 `bean` 和 `incrUpdate` 参数 ([#524](https://github.com/imiphp/imi/pull/524)) - -* 增强:`Imi\Lock\Lock` 类中的相关方法增加 `$lockId` 参数 ([#520](https://github.com/imiphp/imi/pull/520)) - -* pgsql 模型字段的创建更新时间 `time`、`timetz`、`timestamp`、`timest` 支持设置时间精度 ([#527](https://github.com/imiphp/imi/pull/527)) - -* 模型自动创建时间和更新时间传入值时,不再自动更新时间 ([#525](https://github.com/imiphp/imi/pull/525)) - -* 支持绑定上传文件对象到控制器方法参数 ([#531](https://github.com/imiphp/imi/pull/531)) - -* 数据库查询构建器支持全文搜索(MySQL+PostgreSQL) ([#533](https://github.com/imiphp/imi/pull/533)) - -### v2.1.46 - -**发布日期:** `2023-05-12` - -* 模型查询构建器`Model::query()`、`Model::dbQuery()`支持定义表别名 ([文档](/v2.1/components/orm/RDModel.html)) - -* 新增[使用 Protobuf 的 gRPC HTTP 网关客户端](/v2.1/components/rpc/grpc-proxy.html#%E4%BD%BF%E7%94%A8%20Protobuf%20%E7%9A%84%20gRPC%20HTTP%20%E7%BD%91%E5%85%B3%E5%AE%A2%E6%88%B7%E7%AB%AF) - -* `Imi\Util\Random` 新增 `float()` 和 `bytes()` 方法 - -### v2.1.45 - -**发布日期:** `2023-04-29` - -* 异步执行新增 `@Defer`、`@DeferAsync` 注解 ([文档](/v2.1/components/async/index.html)) - -* 支持优雅的 SSE 服务端推送功能 ([文档](/v2.1/components/httpserver/sse.html)) - -### v2.1.43 - -**发布日期:** `2023-04-07` - -* 增加环境变量 `IMI_SCAN_STATISTICS`,支持关闭输出扫描阶段的日志 ([#488](https://github.com/imiphp/imi/pull/488)) ([文档](/v2.1/core/env.html)) - -* 新增内存缓存驱动 ([#490](https://github.com/imiphp/imi/pull/490)) ([文档](/v2.1/components/cache/memory.html)) - -### v2.1.41 - -**发布日期:** `2023-03-31` - -* 查询构建器 `fieldRaw()`、`joinRaw()`、`whereRaw()`、`orWhereRaw()`、`groupRaw()`、`havingRaw()`、`orderRaw()`、`setFieldExp()`、`setFieldInc()`、`setFieldDec()` 支持传参数绑定 - -### v2.1.40 - -**发布日期:** `2023-03-24` - -* 支持 Phar 构建前后回调配置 ([#478](https://github.com/imiphp/imi/pull/478)) ([文档](/v2.1/components/phar/index.html)) - -* 支持 phar 构建后写出资源文件 ([#478](https://github.com/imiphp/imi/pull/478)) ([文档](/v2.1/components/phar/index.html)) - -* 支持[关闭定时任务执行成功的日志](/v2.1/components/task/cron.html#%E6%B3%A8%E8%A7%A3%E8%AE%BE%E5%AE%9A) ([#477](https://github.com/imiphp/imi/pull/477)) - -### v2.1.39 - -**发布日期:** `2023-03-10` - -* 支持 MySQL 查询构建器 [insert ignore into](/v2.1/components/db/index.html#%E6%8F%92%E5%85%A5%E8%AE%B0%E5%BD%95) ([#476](https://github.com/imiphp/imi/pull/476)) - -### v2.1.36 - -**发布日期:** `2023-02-03` - -* 实现数据库查询构建器的[指定分区操作](/v2.1/components/db/index.html#%E6%8C%87%E5%AE%9A%E5%88%86%E5%8C%BA) ([#466](https://github.com/imiphp/imi/pull/466)) - -* `Imi\Util\System::getCpuCoresNum()` 支持 MacOS 系统 ([#468](https://github.com/imiphp/imi/pull/468)) - -### v2.1.34 - -**发布日期:** `2022-12-16` - -* 支持[模型自定义关联](/v2.1/components/orm/relation/relation.html) - -* 支持[设置 Statement 最大缓存数量](/v2.1/components/db/config.html) - -### v2.1.33 - -**发布日期:** `2022-12-03` - -* `ProtobufUtil::getMessageValue()` 支持可遍历对象,无法处理的原样返回 - -### v2.1.32 - -**发布日期:** `2022-11-25` - -* Listener 注解支持设置事件仅触发一次 ([#458](https://github.com/imiphp/imi/pull/458)) - -### v2.1.31 - -**发布日期:** `2022-11-18` - -* 兼容 PHP 8.2 ([#441](https://github.com/imiphp/imi/pull/441)) - -* 支持 PHP 8.2 交叉类型 - -* 支持 PHP 8.2 只读类 - -### v2.1.30 - -**发布日期:** `2022-10-28` - -* 支持 AOP 切入带有引用返回值的方法 ([#450](https://github.com/imiphp/imi/pull/450)) - -* 新增 `imi-influxdb` 组件 - -* 支持服务指标监控 InfluxDB 驱动 - -* 支持服务指标监控 TDengine InfluxDB 兼容驱动 - -### v2.1.29 - -**发布日期:** `2022-10-14` - -* [服务发现(负载均衡)](/v2.1/components/serviceDiscovery/index.html) - -* 调用链路追踪 [OpenTracing](/v2.1/components/tracing/opentracing.html)(Zipkin、Jaeger) - -* 重构骨架项目:增加模块划分、自动格式化、代码静态分析、自动化测试,更加符合实际项目的开发 - -* `Imi\Grpc\Util\ProtobufUtil::setMessageData()` 增加参数 `$ignoreUnknown`,可以忽略未知字段 - -* 新增 `Imi\Grpc\Util\ProtobufUtil::newMessage()` 和 `Imi\Grpc\Util\ProtobufUtil::newMessageArray()` - -* 增强 `BeanFactory::getObjectClass()` 支持字符串 - -* 增强 `@Column` 注解的 `createTime`、`updateTime` 支持传入 `int` 值,表示时间精度,仅 `bigint`、`int8` 有效 ([#445](https://github.com/imiphp/imi/pull/445)) - -### v2.1.28 - -**发布日期:** `2022-09-23` - -* 新增[数据库迁移](/v2.1/components/orm/migration.html) - -* 支持在非 cli 环境执行 `Imi\Util\Imi::getImiCmd()`、`Imi\Util\Imi::getImiCmdArray()` - -### v2.1.27 - -**发布日期:** `2022-09-16` - -* 实现模型发号器,支持 UUID、雪花算法 ([#430](https://github.com/imiphp/imi/pull/430)) ([文档](/v2.1/components/orm/RDModel/definition.html#@Id)) - -* 新增 RequestParam 注解 ([#432](https://github.com/imiphp/imi/pull/432)) ([文档](/v2.1/components/httpserver/request.html#RequestParam%20%E6%B3%A8%E8%A7%A3)) - -### v2.1.24 - -**发布日期:** `2022-08-27` - -* 支持 WebSocket 二进制协议 - -* [新增支持键值过期的存储对象 `Imi\Util\ExpiredStorage`](/v2.1/utils/ExpiredStorage.html) - -* [请求上下文缓存驱动](/v2.1/components/cache/requestContext.html) - -### v2.1.23 - -**发布日期:** `2022-08-20` - -* 支持模型序列化 ([#412](https://github.com/imiphp/imi/pull/412)) - -### v2.1.22 - -**发布日期:** `2022-08-13` - -* [gRPC 的 HTTP 代理网关](/v2.1/components/rpc/grpc-proxy.html) - -* 新增命令行工具的 `--bootstrap` 参数 - -* 支持在 Swoole BASE 模式下获取所有 Worker 连接数 - -* 支持模型增量更新 - -### v2.1.20 - -**发布日期:** `2022-07-22` - -* PDO 驱动绑定值支持 `resource` 类型使用 `\PDO::PARAM_LOB` - -### v2.1.19 - -**发布日期:** `2022-07-15` - -* `Db`类添加一个工具方法[`Db::debugSql`](/v2.1/components/db/index.html#%E6%B8%B2%E6%9F%93%E9%A2%84%E7%BC%96%E8%AF%91SQL%E8%AF%AD%E5%8F%A5) - -### v2.1.18 - -**发布日期:** `2022-07-02` - -* 模型生成支持 MySQL、PgSQL 虚拟列 - -* [容器增加3个方法:set、newInstance、bindCallable](/v2.1/core/container.html) - -* 请求上下文增加2个方法:remember、unset ([请求上下文](/v2.1/core/requestContext.html)、[连接上下文](/v2.1/components/websocketServer/session.html)) - -### v2.1.17 - -**发布日期:** `2022-06-24` - -* [支持配置启动服务时检测连接是否可用](/v2.1/base/config.html#%E5%85%B1%E6%9C%89%E7%BB%93%E6%9E%84) (`@app.server.checkPoolResource`) - -### v2.1.16 - -**发布日期:** `2022-06-17` - -* [实现 Swoole 用户进程双向通信](/v2.1/core/processCommunication.html#%E7%94%A8%E6%88%B7%E8%BF%9B%E7%A8%8B) - -### v2.1.15 - -**发布日期:** `2022-06-10` - -* 新增 `@EnvValue` 注解 ([文档](/v2.1/annotations/injectValue.html#@EnvValue)) - -### v2.1.11 - -**发布日期:** `2022-05-07` - -* 在 Model 中支持 MySQL 数据类型 `set` - -### v2.1.9 - -**发布日期:** `2022-04-16` - -* [定时任务支持:移除所有任务、检测是否存在任务、获取单个任务、获取所有任务](/v2.1/components/task/cron.html) - -### v2.1.8 - -**发布日期:** `2022-04-08` - -* [模型关联支持在查询结果中,包含被软删除的数据](/v2.1/components/orm/relation/) - -### v2.1.7 - -**发布日期:** `2022-04-02` - -* [新增 `Imi\dump()` 调试输出函数](/v2.1/utils/functions.html#Imi\dump) - -* [单文件运行 imi](/v2.1/core/quickStart.html) - -* [支持验证器动态数组对象验证](/v2.1/components/validation/index.html) - -* [Db 构建器添加 find、value、column 查询方法](/v2.1/components/db/index.html#%E6%9F%A5%E8%AF%A2%E4%B8%80%E8%A1%8C) - -### v2.1.6 - -**发布日期:** `2022-03-20` - -* [支持在 composer.json 中配置项目命名空间](/v2.1/base/config.html#%E5%85%B1%E6%9C%89%E7%BB%93%E6%9E%84) - -* [`Db`的分块查询与游标查询支持](/v2.1/components/db/index.html#%E5%88%86%E5%9D%97%E6%9F%A5%E8%AF%A2) - -### v2.1.5 - -**发布日期:** `2022-03-11` - -* 实现 WebSocket、Tcp、Udp 异常处理器 - -### v2.1.4 - -**发布日期:** `2022-03-04` - -* [支持监听 Swoole WebSocket Server disconnect 事件](/v2.1/container/swoole/events.html) - -* [支持定义命令行名称分割符](/v2.1/dev/tool.html) - -* [Phar 打包支持](/v2.1/components/phar/index.html) - -### v2.1.2 - -**发布日期:** `2022-02-18` - -* [枚举类增加 validate() 和 assert()](/v2.1/components/struct/enum.html) - -* [Query->where() 条件的值支持传原始 SQL](/v2.1/components/db/index.html) - -### v2.1.1 - -**发布日期:** `2022-02-12` - -* [定时任务的最小、最大延迟执行秒数](/v2.1/components/task/cron.html) - -* [Swoole WebSocket Server 支持 `syncConnect`](/v2.1/base/config.html) - -* [支持为数据库连接设置表前缀](/v2.1/components/db/index.html) - -* [模型注解 `JsonEncode` 增强,增加 `JsonDecode` 注解](/v2.1/components/orm/RDModel.html) - -* [支持从 `Query` 构建 SQL 语句](/v2.1/components/db/index.html) - -* [新增 `Model::exists()` 用于判断记录是否存在](/v2.1/components/orm/RDModel.html) - -* [支持设置字段为 `createTime`,插入记录时自动设置时间](/v2.1/components/orm/RDModel.html) - -### v2.1.0 - -**发布日期:** `2022-01-21` - -* [宏定义](/v2.1/components/macro.html) - -* [异步日志](/v2.1/components/log/index.html) diff --git a/doc/base/version/2.1-3.0.md b/doc/base/version/2.1-3.0.md new file mode 100644 index 0000000000..09d42b9751 --- /dev/null +++ b/doc/base/version/2.1-3.0.md @@ -0,0 +1,17 @@ +# v2.1-v3.0 升级指南 + +[toc] + +## 不兼容的更改 + +### 框架核心 + +* `psr/http-message` 版本升级,请求和响应相关类的类型声明有改动 + +### imi-amqp + +* 连接配置项有所增改,参考 [连接配置项](https://doc.imiphp.com/v3.0/components/mq/amqp.html#%E8%BF%9E%E6%8E%A5%E9%85%8D%E7%BD%AE%E9%A1%B9) + +* `Imi\AMQP\Annotation\Connection` 注解类的连接配置全部废弃,只保留 `poolName` + +* 废弃 `Imi\AMQP\Swoole\AMQPSwooleConnection` 客户端类 diff --git a/doc/components/mq/amqp.md b/doc/components/mq/amqp.md index 314a7110ba..f8b60073dc 100644 --- a/doc/components/mq/amqp.md +++ b/doc/components/mq/amqp.md @@ -93,6 +93,45 @@ Github: ] ``` +### 连接配置项 + +| 属性名称 | 说明 | +|-|- +| host | 主机 | +| port | 端口 | +| user | 用户名 | +| vhost | vhost,默认 `/` | +| connectionTimeout | 连接超时 | +| readTimeout | 读超时 | +| writeTimeout | 写超时 | +| channelRpcTimeout | 频道 RPC 超时时间,默认 `0.0` | +| heartbeat | 心跳时间。如果不设置的情况,设置了连接池的心跳,就会设置为该值的 2 倍,否则设为`0` | +| keepalive | keepalive,默认 `false` | +| isSecure | 是否启用加密通信,默认 `false` | +| ioType | io 类型,默认 `stream`,可选:`stream`、`socket` | +| insist | insist | +| loginMethod | 默认 `AMQPLAIN` | +| loginResponse | loginResponse | +| locale | 默认 `en_US` | +| amqpProtocol | AMQP 协议,默认 `0.9.1` | +| protocolStrictFields | 是否使用严格的 AMQP 0.9.1 字段类型。RabbitMQ 不支持这个。默认 `false` | +| sendBufferSize | 发送缓冲区大小,默认 `0` | +| sslCaCert | CA 证书内容 | +| sslCaPath | CA 证书地址 | +| sslCert | SSL 证书 | +| sslKey | SSL 证书密钥 | +| sslVerify | 是否验证 SSL 证书 | +| sslVerifyName | SSL 证书验证名称 | +| sslPassPhrase | SSL 证书密码短语 | +| sslCiphers | SSL 密码 | +| sslSecurityLevel | SSL 安全等级 | +| isLazy | 是否懒加载,默认 `false`,不推荐修改 | +| networkProtocol | 网络协议,默认 `tcp`,不推荐修改 | +| streamContext | 流上下文,默认 `null`,不推荐修改 | +| dispatchSignals | 无用项,默认 `true`,不推荐修改 | +| connectionName | 连接名称,不推荐修改 | +| debugPackets | 输出所有网络数据包以进行调试。,默认 `false`,不推荐修改 | + ### 队列组件支持 本组件额外实现了 [imiphp/imi-queue](https://github.com/imiphp/imi-queue) 的接口,可以用 Queue 组件的 API 进行调用。 @@ -168,25 +207,6 @@ Github: 优点是可以完美利用 AMQP 特性,适合需要个性化定制的用户。 -#### 连接配置项 - -| 属性名称 | 说明 | -|-|- -| host | 主机 | -| port | 端口 | -| user | 用户名 | -| vhost | vhost,默认`/` | -| insist | insist | -| loginMethod | 默认`AMQPLAIN` | -| loginResponse | loginResponse | -| locale | 默认`en_US` | -| connectionTimeout | 连接超时 | -| readWriteTimeout | 读写超时 | -| keepalive | keepalive,默认`false` | -| heartbeat | 心跳时间。如果不设置的情况,设置了连接池的心跳,就会设置为该值的 2 倍,否则设为`0` | -| channelRpcTimeout | 频道 RPC 超时时间,默认`0.0` | -| sslProtocol | ssl 协议,默认`null` | - #### 消息定义 继承 `Imi\AMQP\Message` 类,可在构造方法中对属性修改。 @@ -318,7 +338,7 @@ ticket | ticket | `null` | 可选注解:`@Queue`、`@Exchange`、`@Connection` -不配置 `@Connection` 注解,可以从连接池中获取连接 +不配置 `@Connection` 注解,从默认连接池中获取连接 ```php publish($message); 可选注解:`@Queue`、`@Exchange`、`@Connection` -不配置 `@Connection` 注解,可以从连接池中获取连接 +不配置 `@Connection` 注解,从默认连接池中获取连接 ```php [ - // 引入组件 - 'AMQP' => 'Imi\AMQP', - ], -] -``` - -连接池配置: - -```php -[ - 'pools' => [ - 'rabbit' => [ - 'sync' => [ - 'pool' => [ - 'class' => \Imi\AMQP\Pool\AMQPSyncPool::class, - 'config' => [ - 'maxResources' => 10, - 'minResources' => 0, - ], - ], - 'resource' => [ - 'host' => '127.0.0.1', - 'port' => 5672, - 'user' => 'guest', - 'password' => 'guest', - ] - ], - 'async' => [ - 'pool' => [ - 'class' => \Imi\AMQP\Pool\AMQPCoroutinePool::class, - 'config' => [ - 'maxResources' => 10, - 'minResources' => 1, - ], - ], - 'resource' => [ - 'host' => '127.0.0.1', - 'port' => 5672, - 'user' => 'guest', - 'password' => 'guest', - ] - ], - ], - ] -] -``` - -默认连接池: - -```php -[ - 'beans' => [ - 'AMQP' => [ - 'defaultPoolName' => 'rabbit', - ], - ], -] -``` - -### 连接配置项 - -| 属性名称 | 说明 | -|-|- -| host | 主机 | -| port | 端口 | -| user | 用户名 | -| vhost | vhost,默认`/` | -| insist | insist | -| loginMethod | 默认`AMQPLAIN` | -| loginResponse | loginResponse | -| locale | 默认`en_US` | -| connectionTimeout | 连接超时 | -| readWriteTimeout | 读写超时 | -| keepalive | keepalive,默认`false` | -| heartbeat | 心跳时间,默认`0` | -| channelRpcTimeout | 频道 RPC 超时时间,默认`0.0` | -| sslProtocol | ssl 协议,默认`null` | - -### 消息定义 - -继承 `Imi\AMQP\Message` 类,可在构造方法中对属性修改。 - -根据需要可以覆盖实现`setBodyData`、`getBodyData`方法,实现自定义的消息结构。 - -```php -routingKey = 'imi-2'; - $this->format = \Imi\Util\Format\Json::class; - } - - /** - * 设置主体数据 - * - * @param mixed $data - * @return self - */ - public function setBodyData($data) - { - foreach($data as $k => $v) - { - $this->$k = $v; - } - } - - /** - * 获取主体数据 - * - * @return mixed - */ - public function getBodyData() - { - return [ - 'memberId' => $this->memberId, - 'content' => $this->content, - ]; - } - - /** - * Get 用户ID - * - * @return int - */ - public function getMemberId() - { - return $this->memberId; - } - - /** - * Set 用户ID - * - * @param int $memberId 用户ID - * - * @return self - */ - public function setMemberId(int $memberId) - { - $this->memberId = $memberId; - - return $this; - } - - /** - * Get 内容 - * - * @return string - */ - public function getContent() - { - return $this->content; - } - - /** - * Set 内容 - * - * @param string $content 内容 - * - * @return self - */ - public function setContent(string $content) - { - $this->content = $content; - - return $this; - } - -} -``` - -**属性列表:** - -名称 | 说明 | 默认值 --|-|- -bodyData | 消息主体内容,非字符串 | `null` | -properties | 属性 | `['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,]` | -routingKey | 路由键 | 空字符串 | -format | 如果设置了,发布的消息是编码后的`bodyData`,同理读取时也会解码。实现了`Imi\Util\Format\IFormat`的格式化类。支持`Json`、`PhpSerialize` | `null` | -mandatory | mandatory标志位 | `false` | -immediate | immediate标志位 | `false` | -ticket | ticket | `null` | - -### 发布者 - -必选注解:`@Publisher` - -可选注解:`@Queue`、`@Exchange`、`@Connection` - -不配置 `@Connection` 注解,可以从连接池中获取连接 - -```php -getBody(), get_class($message)); - Redis::set('imi-amqp:consume:1:' . $message->getMemberId(), $message->getBody()); - return ConsumerResult::ACK; - } - -} - -``` - -### 注解说明 - -### @Publisher - -发布者注解 - -| 属性名称 | 说明 | -|-|- -| queue | 队列名称 | -| exchange | 交换机名称 | -| routingKey | 路由键 | - -### @Consumer - -消费者注解 - -| 属性名称 | 说明 | -|-|- -| tag | 消费者标签 | -| queue | 队列名称 | -| exchange | 交换机名称 | -| routingKey | 路由键 | -| message | 消息类名,默认:`Imi\AMQP\Message` | -| mandatory | mandatory标志位 | -| immediate | immediate标志位 | -| ticket | ticket | - -### @Queue - -队列注解 - -| 属性名称 | 说明 | -|-|- -| name | 队列名称 | -| routingKey | 路由键 | -| passive | 被动模式,默认`false` | -| durable | 消息队列持久化,默认`true` | -| exclusive | 独占,默认`false` | -| autoDelete | 自动删除,默认`false` | -| nowait | 是否非阻塞,默认`false` | -| arguments | 参数 | -| ticket | ticket | - -### @Exchange - -交换机注解 - -| 属性名称 | 说明 | -|-|- -| name | 交换机名称 | -| type | 类型可选:`direct`、`fanout`、`topic`、`headers` | -| passive | 被动模式,默认`false` | -| durable | 消息队列持久化,默认`true` | -| autoDelete | 自动删除,默认`false` | -| internal | 设置是否为rabbitmq内部使用, `true`表示是内部使用, `false`表示不是内部使用 | -| nowait | 是否非阻塞,默认`false` | -| arguments | 参数 | -| ticket | ticket | - -### @Connection - -连接注解 - -| 属性名称 | 说明 | -|-|- -| poolName | 不为 `null` 时,无视其他属性,直接用该连接池配置。默认为`null`,如果`host`、`port`、`user`、`password`都未设置,则获取默认的连接池。 | -| host | 主机 | -| port | 端口 | -| user | 用户名 | -| vhost | vhost,默认`/` | -| insist | insist | -| loginMethod | 默认`AMQPLAIN` | -| loginResponse | loginResponse | -| locale | 默认`en_US` | -| connectionTimeout | 连接超时 | -| readWriteTimeout | 读写超时 | -| keepalive | keepalive,默认`false` | -| heartbeat | 心跳时间,默认`0` | -| channelRpcTimeout | 频道 RPC 超时时间,默认`0.0` | -| sslProtocol | ssl 协议,默认`null` | - -### 队列组件支持 - -本组件额外实现了 [imiphp/imi-queue](https://github.com/imiphp/imi-queue) 的接口,可以用 Queue 组件的 API 进行调用。 - -只需要将队列驱动配置为:`AMQPQueueDriver` - -配置示例: - -```php -[ - 'components' => [ - 'AMQP' => 'Imi\AMQP', - ], - 'beans' => [ - 'AutoRunProcessManager' => [ - 'processes' => [ - // 加入队列消费进程,非必须,你也可以自己写进程消费 - 'QueueConsumer', - ], - ], - 'imiQueue' => [ - // 默认队列 - 'default' => 'test1', - // 队列列表 - 'list' => [ - // 队列名称 - 'test1' => [ - // 使用的队列驱动 - 'driver' => 'AMQPQueueDriver', - // 消费协程数量 - 'co' => 1, - // 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准 - 'process' => 1, - // 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效) - 'timespan' => 0.1, - // 进程分组名称 - 'processGroup' => 'a', - // 自动消费 - 'autoConsumer' => true, - // 消费者类 - 'consumer' => 'AConsumer', - // 驱动类所需要的参数数组 - 'config' => [ - // AMQP 连接池名称 - 'poolName' => 'amqp', - // Redis 连接池名称 - 'redisPoolName'=> 'redis', - // Redis 键名前缀 - 'redisPrefix' => 'test1:', - // 可选配置: - // 支持消息删除功能,依赖 Redis - 'supportDelete' => true, - // 支持消费超时队列功能,依赖 Redis,并且自动增加一个队列 - 'supportTimeout' => true, - // 支持消费失败队列功能,自动增加一个队列 - 'supportFail' => true, - // 循环尝试 pop 的时间间隔,单位:秒 - 'timespan' => 0.03, - // 本地缓存的队列长度。由于 AMQP 不支持主动pop,而是主动推送,所以本地会有缓存队列,这个队列不宜过大。 - 'queueLength' => 16, - // 消息类名 - 'message' => \Imi\AMQP\Queue\JsonAMQPMessage::class, - ] - ], - ], - ], - ] -] -``` - -消费者类写法,与`imi-queue`组件用法一致。 - -具体可以参考: - ## 免费技术支持 QQ群:17916227 [![点击加群](https://pub.idqqimg.com/wpa/images/group.png "点击加群")](https://jq.qq.com/?_wv=1027&k=5wXf4Zq),如有问题会有人解答和修复。 diff --git a/src/Components/amqp/example/AMQP/Test/TestConsumer.php b/src/Components/amqp/example/AMQP/Test/TestConsumer.php index bda1c2dcb2..6331d84919 100644 --- a/src/Components/amqp/example/AMQP/Test/TestConsumer.php +++ b/src/Components/amqp/example/AMQP/Test/TestConsumer.php @@ -4,7 +4,6 @@ namespace AMQPApp\AMQP\Test; -use Imi\AMQP\Annotation\Connection; use Imi\AMQP\Annotation\Consumer; use Imi\AMQP\Base\BaseConsumer; use Imi\AMQP\Contract\IMessage; @@ -13,13 +12,11 @@ use Imi\Redis\Redis; /** - * 启动一个新连接消费. + * 使用连接池中的连接消费. * * @Bean("TestConsumer") * - * @Connection(host=AMQP_SERVER_HOST, port=5672, user="guest", password="guest") - * - * @Consumer(tag="tag-imi", queue="queue-imi-1", message=\AMQPApp\AMQP\Test\TestMessage::class) + * @Consumer(tag="tag-imi", queue="queue-imi-2", message=\AMQPApp\AMQP\Test\TestMessage::class) */ class TestConsumer extends BaseConsumer { diff --git a/src/Components/amqp/example/AMQP/Test/TestMessage.php b/src/Components/amqp/example/AMQP/Test/TestMessage.php index c461363334..964a5d7b86 100644 --- a/src/Components/amqp/example/AMQP/Test/TestMessage.php +++ b/src/Components/amqp/example/AMQP/Test/TestMessage.php @@ -15,10 +15,17 @@ class TestMessage extends Message */ private $memberId; + /** + * 内容. + * + * @var string + */ + private $content; + public function __construct() { parent::__construct(); - $this->routingKey = 'imi-1'; + $this->routingKey = 'imi-2'; $this->format = \Imi\Util\Format\Json::class; } @@ -42,6 +49,7 @@ public function getBodyData() { return [ 'memberId' => $this->memberId, + 'content' => $this->content, ]; } @@ -68,4 +76,28 @@ public function setMemberId(int $memberId) return $this; } + + /** + * Get 内容. + * + * @return string + */ + public function getContent() + { + return $this->content; + } + + /** + * Set 内容. + * + * @param string $content 内容 + * + * @return self + */ + public function setContent(string $content) + { + $this->content = $content; + + return $this; + } } diff --git a/src/Components/amqp/example/AMQP/Test/TestPublisher.php b/src/Components/amqp/example/AMQP/Test/TestPublisher.php index 2bc0fbc164..221d55754e 100644 --- a/src/Components/amqp/example/AMQP/Test/TestPublisher.php +++ b/src/Components/amqp/example/AMQP/Test/TestPublisher.php @@ -4,7 +4,6 @@ namespace AMQPApp\AMQP\Test; -use Imi\AMQP\Annotation\Connection; use Imi\AMQP\Annotation\Exchange; use Imi\AMQP\Annotation\Publisher; use Imi\AMQP\Annotation\Queue; @@ -14,11 +13,9 @@ /** * @Bean("TestPublisher") * - * @Connection(host=AMQP_SERVER_HOST, port=5672, user="guest", password="guest") + * @Publisher(tag="tag-imi", queue="queue-imi-2", exchange="exchange-imi", routingKey="imi-2") * - * @Publisher(tag="tag-imi", queue="queue-imi-1", exchange="exchange-imi", routingKey="imi-1") - * - * @Queue(name="queue-imi-1", routingKey="imi-1") + * @Queue(name="queue-imi-2", routingKey="imi-2") * * @Exchange(name="exchange-imi") */ diff --git a/src/Components/amqp/example/AMQP/Test2/TestConsumer2.php b/src/Components/amqp/example/AMQP/Test2/TestConsumer2.php deleted file mode 100644 index 85b84fd7ac..0000000000 --- a/src/Components/amqp/example/AMQP/Test2/TestConsumer2.php +++ /dev/null @@ -1,37 +0,0 @@ -getBody(), \get_class($message)); - Redis::set('imi-amqp:consume:2:' . $message->getMemberId(), $message->getBody()); - - return ConsumerResult::ACK; - } -} diff --git a/src/Components/amqp/example/AMQP/Test2/TestMessage2.php b/src/Components/amqp/example/AMQP/Test2/TestMessage2.php deleted file mode 100644 index a89f9ed110..0000000000 --- a/src/Components/amqp/example/AMQP/Test2/TestMessage2.php +++ /dev/null @@ -1,103 +0,0 @@ -routingKey = 'imi-2'; - $this->format = \Imi\Util\Format\Json::class; - } - - /** - * {@inheritDoc} - */ - public function setBodyData($data): self - { - foreach ($data as $k => $v) - { - $this->{$k} = $v; - } - - return $this; - } - - /** - * {@inheritDoc} - */ - public function getBodyData() - { - return [ - 'memberId' => $this->memberId, - 'content' => $this->content, - ]; - } - - /** - * Get 用户ID. - * - * @return int - */ - public function getMemberId() - { - return $this->memberId; - } - - /** - * Set 用户ID. - * - * @param int $memberId 用户ID - * - * @return self - */ - public function setMemberId(int $memberId) - { - $this->memberId = $memberId; - - return $this; - } - - /** - * Get 内容. - * - * @return string - */ - public function getContent() - { - return $this->content; - } - - /** - * Set 内容. - * - * @param string $content 内容 - * - * @return self - */ - public function setContent(string $content) - { - $this->content = $content; - - return $this; - } -} diff --git a/src/Components/amqp/example/AMQP/Test2/TestPublisher2.php b/src/Components/amqp/example/AMQP/Test2/TestPublisher2.php deleted file mode 100644 index a186d49bfb..0000000000 --- a/src/Components/amqp/example/AMQP/Test2/TestPublisher2.php +++ /dev/null @@ -1,24 +0,0 @@ -setMemberId($memberId); + $message->setContent('memberId:' . $memberId); // @phpstan-ignore-next-line $r1 = RequestContext::getBean('TestPublisher')->publish($message); - $message2 = new TestMessage2(); - $message2->setMemberId($memberId); - $message2->setContent('memberId:' . $memberId); - // @phpstan-ignore-next-line - $r2 = RequestContext::getBean('TestPublisher2')->publish($message2); - $queueTestMessage = new QueueTestMessage(); $queueTestMessage->setMemberId($memberId); $message = new Message(); @@ -61,7 +55,6 @@ public function publish(int $memberId = 19260817) return [ 'r1' => $r1, - 'r2' => $r2, ]; } @@ -75,17 +68,15 @@ public function publish(int $memberId = 19260817) public function consume($memberId) { $r1 = Redis::get($key1 = 'imi-amqp:consume:1:' . $memberId); - $r2 = Redis::get($key2 = 'imi-amqp:consume:2:' . $memberId); - $r3 = Redis::get($key3 = 'imi-amqp:consume:QueueTest:' . $memberId); - if (false !== $r1 && false !== $r2 && false !== $r3) + $r2 = Redis::get($key2 = 'imi-amqp:consume:QueueTest:' . $memberId); + if (false !== $r1 && false !== $r2) { - Redis::del($key1, $key2, $key3); + Redis::del($key1, $key2); } return [ 'r1' => $r1, 'r2' => $r2, - 'r3' => $r3, ]; } } diff --git a/src/Components/amqp/example/Process/SwooleTestProcess.php b/src/Components/amqp/example/Process/SwooleTestProcess.php index 7b4c9e1b22..95b995f71c 100644 --- a/src/Components/amqp/example/Process/SwooleTestProcess.php +++ b/src/Components/amqp/example/Process/SwooleTestProcess.php @@ -4,6 +4,7 @@ namespace AMQPApp\Process; +use AMQPApp\AMQP\Test\TestConsumer; use Imi\AMQP\Contract\IConsumer; use Imi\Aop\Annotation\Inject; use Imi\Event\Event; @@ -20,17 +21,8 @@ class SwooleTestProcess extends BaseProcess { /** * @Inject("TestConsumer") - * - * @var \AMQPApp\AMQP\Test\TestConsumer */ - protected $testConsumer; - - /** - * @Inject("TestConsumer2") - * - * @var \AMQPApp\AMQP\Test2\TestConsumer2 - */ - protected $testConsumer2; + protected TestConsumer $testConsumer; private bool $running = false; @@ -38,12 +30,10 @@ public function run(\Swoole\Process $process): void { $this->running = true; $this->runConsumer($this->testConsumer); - $this->runConsumer($this->testConsumer2); $channel = new \Swoole\Coroutine\Channel(); Event::on('IMI.PROCESS.END', function () use ($channel) { $this->running = false; $this->testConsumer->close(); - $this->testConsumer2->close(); $channel->push(1); }, ImiPriority::IMI_MAX); $channel->pop(); diff --git a/src/Components/amqp/example/Process/WorkermanTestProcess1.php b/src/Components/amqp/example/Process/WorkermanTestProcess.php similarity index 85% rename from src/Components/amqp/example/Process/WorkermanTestProcess1.php rename to src/Components/amqp/example/Process/WorkermanTestProcess.php index 486d23e2f5..8d090d1986 100644 --- a/src/Components/amqp/example/Process/WorkermanTestProcess1.php +++ b/src/Components/amqp/example/Process/WorkermanTestProcess.php @@ -4,6 +4,7 @@ namespace AMQPApp\Process; +use AMQPApp\AMQP\Test\TestConsumer; use Imi\AMQP\Contract\IConsumer; use Imi\Aop\Annotation\Inject; use Imi\Log\Log; @@ -14,14 +15,12 @@ /** * @Process(name="TestProcess1") */ -class WorkermanTestProcess1 extends BaseProcess +class WorkermanTestProcess extends BaseProcess { /** * @Inject("TestConsumer") - * - * @var \AMQPApp\AMQP\Test\TestConsumer */ - protected $testConsumer; + protected TestConsumer $testConsumer; public function run(Worker $process): void { diff --git a/src/Components/amqp/example/Process/WorkermanTestProcess2.php b/src/Components/amqp/example/Process/WorkermanTestProcess2.php deleted file mode 100644 index 4a29bbdaaa..0000000000 --- a/src/Components/amqp/example/Process/WorkermanTestProcess2.php +++ /dev/null @@ -1,44 +0,0 @@ -runConsumer($this->testConsumer2); - } - - private function runConsumer(IConsumer $consumer): void - { - try - { - $consumer->run(); - } - catch (\Throwable $th) - { - Log::error($th); - sleep(3); - $this->runConsumer($consumer); - } - } -} diff --git a/src/Components/amqp/example/config/beans.php b/src/Components/amqp/example/config/beans.php index 71f1fb52dd..3e6319a400 100644 --- a/src/Components/amqp/example/config/beans.php +++ b/src/Components/amqp/example/config/beans.php @@ -31,7 +31,6 @@ 'QueueConsumer', ] : [ 'TestProcess1', - 'TestProcess2', 'QueueConsumer', ], ], diff --git a/src/Components/amqp/example/config/config.php b/src/Components/amqp/example/config/config.php index e6473ba94f..f7cca38d96 100644 --- a/src/Components/amqp/example/config/config.php +++ b/src/Components/amqp/example/config/config.php @@ -99,7 +99,6 @@ 'user' => 'guest', 'password' => 'guest', 'keepalive' => false, // 截止 Swoole 4.8 还有兼容问题,所以必须设为 false,不影响使用 - 'connectionClass' => \PhpAmqpLib\Connection\AMQPStreamConnection::class, ], ], ] : [], diff --git a/src/Components/amqp/src/Annotation/Connection.php b/src/Components/amqp/src/Annotation/Connection.php index 486fea8e24..565bf88b9b 100644 --- a/src/Components/amqp/src/Annotation/Connection.php +++ b/src/Components/amqp/src/Annotation/Connection.php @@ -13,23 +13,7 @@ * * @Target({"CLASS"}) * - * @property string|null $poolName 连接池名称 - * @property string $host 主机 - * @property int $port 端口 - * @property string $user 用户名 - * @property string $password 密码 - * @property string $vhost - * @property bool $insist - * @property string $loginMethod - * @property null $loginResponse - * @property string $locale - * @property float $connectionTimeout 连接超时 - * @property float $readWriteTimeout 读写超时 - * @property null $context 上下文 - * @property bool $keepalive - * @property int $heartbeat 心跳时间 - * @property float $channelRpcTimeout 频道 RPC 超时时间 - * @property string|null $sslProtocol ssl 协议 + * @property string|null $poolName 连接池名称 */ #[\Attribute(\Attribute::TARGET_CLASS)] class Connection extends Base @@ -39,30 +23,9 @@ class Connection extends Base */ protected ?string $defaultFieldName = 'poolName'; - /** - * @param null $loginResponse - * @param null $context - */ public function __construct( ?array $__data = null, - ?string $poolName = null, - string $host = '', - int $port = 0, - string $user = '', - #[\SensitiveParameter] - string $password = '', - string $vhost = '/', - bool $insist = false, - string $loginMethod = 'AMQPLAIN', - $loginResponse = null, - string $locale = 'en_US', - float $connectionTimeout = 3.0, - float $readWriteTimeout = 3.0, - $context = null, - bool $keepalive = false, - int $heartbeat = 0, - float $channelRpcTimeout = 0.0, - ?string $sslProtocol = null + ?string $poolName = null ) { parent::__construct(...\func_get_args()); } diff --git a/src/Components/amqp/src/Base/Traits/TAMQP.php b/src/Components/amqp/src/Base/Traits/TAMQP.php index 37eda52274..1000c674e4 100644 --- a/src/Components/amqp/src/Base/Traits/TAMQP.php +++ b/src/Components/amqp/src/Base/Traits/TAMQP.php @@ -17,7 +17,6 @@ use Imi\Log\Log; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; -use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Wire\AMQPTable; trait TAMQP @@ -97,65 +96,7 @@ protected function initConfig(): void */ protected function getConnection(): AbstractConnection { - $poolName = null; - if (null === $this->poolName) - { - $connectionByPool = false; - $connectionAnnotation = $this->connectionAnnotation; - if ($connectionAnnotation) - { - if (null === $connectionAnnotation->poolName) - { - if (!(null !== $connectionAnnotation->host && null !== $connectionAnnotation->port && null !== $connectionAnnotation->user && null !== $connectionAnnotation->password)) - { - $connectionByPool = true; - } - } - else - { - $connectionByPool = true; - } - } - else - { - $connectionByPool = true; - } - if ($connectionByPool) - { - $poolName = $connectionAnnotation->poolName ?? $this->amqp->getDefaultPoolName(); - } - } - else - { - $connectionByPool = true; - $poolName = $this->poolName; - } - if ($connectionByPool || $poolName) - { - return AMQPPool::getInstance($poolName); - } - elseif (isset($connectionAnnotation)) - { - return new AMQPStreamConnection( - $connectionAnnotation->host, - $connectionAnnotation->port, - $connectionAnnotation->user, - $connectionAnnotation->password, - $connectionAnnotation->vhost, - $connectionAnnotation->insist, - $connectionAnnotation->loginMethod, $connectionAnnotation->loginResponse, - $connectionAnnotation->locale, $connectionAnnotation->connectionTimeout, - $connectionAnnotation->readWriteTimeout, - $connectionAnnotation->context, - $connectionAnnotation->keepalive, - $connectionAnnotation->heartbeat, - $connectionAnnotation->channelRpcTimeout - ); - } - else - { - throw new \RuntimeException('Annotation @Connection does not found'); - } + return AMQPPool::getInstance($this->poolName ?? $this->connectionAnnotation->poolName ?? null); } /** diff --git a/src/Components/amqp/src/Pool/AMQPCoroutinePool.php b/src/Components/amqp/src/Pool/AMQPCoroutinePool.php index bc58fb1c2f..984fe307ce 100644 --- a/src/Components/amqp/src/Pool/AMQPCoroutinePool.php +++ b/src/Components/amqp/src/Pool/AMQPCoroutinePool.php @@ -4,7 +4,6 @@ namespace Imi\AMQP\Pool; -use Imi\AMQP\Swoole\AMQPSwooleConnection; use Imi\Bean\BeanFactory; use Imi\Pool\Interfaces\IPoolResource; use Imi\Pool\TUriResourceConfig; @@ -37,20 +36,11 @@ protected function createResource(): IPoolResource public function createNewResource(): IPoolResource { $config = $this->getNextResourceConfig(); - if (isset($config['heartbeat'])) + if (!isset($config['heartbeat']) && ($poolHeartbeatInterval = $this->getConfig()->getHeartbeatInterval()) > 0) { - $heartbeat = (int) $config['heartbeat']; + $config['heartbeat'] = (int) ($poolHeartbeatInterval * 2); } - elseif (($poolHeartbeatInterval = $this->getConfig()->getHeartbeatInterval()) > 0) - { - $heartbeat = (int) ($poolHeartbeatInterval * 2); - } - else - { - $heartbeat = 0; - } - $class = $config['connectionClass'] ?? AMQPSwooleConnection::class; - return BeanFactory::newInstance(AMQPResource::class, $this, new $class($config['host'], (int) $config['port'], $config['user'], $config['password'], $config['vhost'] ?? '/', (bool) ($config['insist'] ?? false), $config['loginMethod'] ?? 'AMQPLAIN', $config['loginResponse'] ?? null, $config['locale'] ?? 'en_US', (float) ($config['connectionTimeout'] ?? 3.0), (float) ($config['readWriteTimeout'] ?? 3.0), $config['context'] ?? null, (bool) ($config['keepalive'] ?? false), $heartbeat, (float) ($config['channelRpcTimeout'] ?? 0.0))); + return BeanFactory::newInstance(AMQPResource::class, $this, AMQPPool::createInstanceFromConfig($config)); } } diff --git a/src/Components/amqp/src/Pool/AMQPPool.php b/src/Components/amqp/src/Pool/AMQPPool.php index 5ee634b649..61d5cb5003 100644 --- a/src/Components/amqp/src/Pool/AMQPPool.php +++ b/src/Components/amqp/src/Pool/AMQPPool.php @@ -9,7 +9,8 @@ use Imi\Pool\PoolManager; use Imi\RequestContext; use PhpAmqpLib\Connection\AbstractConnection; -use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Connection\AMQPConnectionConfig; +use PhpAmqpLib\Connection\AMQPConnectionFactory; /** * AMQP 客户端连接池. @@ -41,8 +42,7 @@ public static function getNewInstance(?string $poolName = null): AbstractConnect throw new \RuntimeException(sprintf('Not found db config %s', $poolName)); } - /** @var AbstractConnection $connection */ - $connection = App::newInstance($config['connectionClass'] ?? AMQPStreamConnection::class, $config['host'], (int) $config['port'], $config['user'], $config['password'], $config['vhost'] ?? '/', (bool) ($config['insist'] ?? false), $config['loginMethod'] ?? 'AMQPLAIN', $config['loginResponse'] ?? null, $config['locale'] ?? 'en_US', (float) ($config['connectionTimeout'] ?? 3.0), (float) ($config['readWriteTimeout'] ?? 3.0), $config['context'] ?? null, (bool) ($config['keepalive'] ?? false), (int) ($config['heartbeat'] ?? 0), (float) ($config['channelRpcTimeout'] ?? 0.0), $config['sslProtocol'] ?? null); + $connection = self::createInstanceFromConfig($config); if (!$connection->isConnected()) { throw new \RuntimeException(sprintf('AMQP %s connection failed', $poolName)); @@ -83,8 +83,7 @@ public static function getInstance(?string $poolName = null): ?AbstractConnectio $connection = App::get($requestContextKey); if (null === $connection || !$connection->isConnected()) { - /** @var AbstractConnection $connection */ - $connection = App::newInstance($config['connectionClass'] ?? AMQPStreamConnection::class, $config['host'], (int) $config['port'], $config['user'], $config['password'], $config['vhost'] ?? '/', (bool) ($config['insist'] ?? false), $config['loginMethod'] ?? 'AMQPLAIN', $config['loginResponse'] ?? null, $config['locale'] ?? 'en_US', (float) ($config['connectionTimeout'] ?? 3.0), (float) ($config['readWriteTimeout'] ?? 3.0), $config['context'] ?? null, (bool) ($config['keepalive'] ?? false), (int) ($config['heartbeat'] ?? 0), (float) ($config['channelRpcTimeout'] ?? 0.0), $config['sslProtocol'] ?? null); + $connection = self::createInstanceFromConfig($config); if (!$connection->isConnected()) { throw new \RuntimeException(sprintf('AMQP %s connection failed', $poolName)); @@ -96,6 +95,149 @@ public static function getInstance(?string $poolName = null): ?AbstractConnectio } } + public static function createInstanceFromConfig(array $configArray): AbstractConnection + { + $config = new AMQPConnectionConfig(); + if (isset($configArray['ioType'])) + { + $config->setIoType($configArray['ioType']); + } + if (isset($configArray['host'])) + { + $config->setHost($configArray['host']); + } + if (isset($configArray['port'])) + { + $config->setPort($configArray['port']); + } + if (isset($configArray['user'])) + { + $config->setUser($configArray['user']); + } + if (isset($configArray['password'])) + { + $config->setPassword($configArray['password']); + } + if (isset($configArray['vhost'])) + { + $config->setVhost($configArray['vhost']); + } + if (isset($configArray['insist'])) + { + $config->setInsist($configArray['insist']); + } + if (isset($configArray['loginMethod'])) + { + $config->setLoginMethod($configArray['loginMethod']); + } + if (isset($configArray['loginResponse'])) + { + $config->setLoginResponse($configArray['loginResponse']); + } + if (isset($configArray['locale'])) + { + $config->setLocale($configArray['locale']); + } + if (isset($configArray['connectionTimeout'])) + { + $config->setConnectionTimeout($configArray['connectionTimeout']); + } + if (isset($configArray['readTimeout'])) + { + $config->setReadTimeout($configArray['readTimeout']); + } + if (isset($configArray['writeTimeout'])) + { + $config->setWriteTimeout($configArray['writeTimeout']); + } + if (isset($configArray['channelRPCTimeout'])) + { + $config->setChannelRpcTimeout($configArray['channelRPCTimeout']); + } + if (isset($configArray['heartbeat'])) + { + $config->setHeartbeat($configArray['heartbeat']); + } + if (isset($configArray['keepalive'])) + { + $config->setKeepalive($configArray['keepalive']); + } + if (isset($configArray['isSecure'])) + { + $config->setIsSecure($configArray['isSecure']); + } + if (isset($configArray['networkProtocol'])) + { + $config->setNetworkProtocol($configArray['networkProtocol']); + } + if (isset($configArray['streamContext'])) + { + $config->setStreamContext($configArray['streamContext']); + } + if (isset($configArray['sendBufferSize'])) + { + $config->setSendBufferSize($configArray['sendBufferSize']); + } + if (isset($configArray['dispatchSignals'])) + { + $config->enableSignalDispatch($configArray['dispatchSignals']); + } + if (isset($configArray['amqpProtocol'])) + { + $config->setAMQPProtocol($configArray['amqpProtocol']); + } + if (isset($configArray['protocolStrictFields'])) + { + $config->setProtocolStrictFields($configArray['protocolStrictFields']); + } + if (isset($configArray['sslCaCert'])) + { + $config->setSslCaCert($configArray['sslCaCert']); + } + if (isset($configArray['sslCaPath'])) + { + $config->setSslCaPath($configArray['sslCaPath']); + } + if (isset($configArray['sslCert'])) + { + $config->setSslCert($configArray['sslCert']); + } + if (isset($configArray['sslKey'])) + { + $config->setSslKey($configArray['sslKey']); + } + if (isset($configArray['sslVerify'])) + { + $config->setSslVerify($configArray['sslVerify']); + } + if (isset($configArray['sslVerifyName'])) + { + $config->setSslVerifyName($configArray['sslVerifyName']); + } + if (isset($configArray['sslPassPhrase'])) + { + $config->setSslPassphrase($configArray['sslPassPhrase']); + } + if (isset($configArray['sslCiphers'])) + { + $config->setSslCiphers($configArray['sslCiphers']); + } + if (isset($configArray['sslSecurityLevel'])) + { + $config->setSslSecurityLevel($configArray['sslSecurityLevel']); + } + if (isset($configArray['connectionName'])) + { + $config->setConnectionName($configArray['connectionName']); + } + if (isset($configArray['debugPackets'])) + { + $config->setDebugPackets($configArray['debugPackets']); + } + + return AMQPConnectionFactory::create($config); + } + /** * 释放连接实例. */ diff --git a/src/Components/amqp/src/Pool/AMQPResource.php b/src/Components/amqp/src/Pool/AMQPResource.php index cbdb6a3ada..83353ad36b 100644 --- a/src/Components/amqp/src/Pool/AMQPResource.php +++ b/src/Components/amqp/src/Pool/AMQPResource.php @@ -79,10 +79,6 @@ public function close(): void { return; } - if ($this->connection instanceof \Imi\AMQP\Swoole\AMQPSwooleConnection) - { - $this->connection->getIO()->close(); - } } /** diff --git a/src/Components/amqp/src/Swoole/AMQPSwooleConnection.php b/src/Components/amqp/src/Swoole/AMQPSwooleConnection.php deleted file mode 100644 index 29e71e3e7c..0000000000 --- a/src/Components/amqp/src/Swoole/AMQPSwooleConnection.php +++ /dev/null @@ -1,139 +0,0 @@ -io) - { - $this->io->close(); - } - } - - /** - * {@inheritDoc} - * - * @return void - */ - protected function connect() - { - parent::connect(); - $this->startHeartbeat(); - } - - /** - * {@inheritDoc} - */ - public function close($reply_code = 0, $reply_text = '', $method_sig = [0, 0]) - { - $this->stopHeartbeat(); - - return parent::close($reply_code, $reply_text, $method_sig); - } - - protected function startHeartbeat(): void - { - if ($this->heartbeat > 0) - { - $this->heartbeatTimerId = Timer::tick($this->heartbeat * 500, function () { - if ($this->isConnected()) - { - $this->write_heartbeat(); - } - }); - } - } - - protected function stopHeartbeat(): void - { - if ($this->heartbeatTimerId) - { - Timer::del($this->heartbeatTimerId); - $this->heartbeatTimerId = null; - } - } - - /** - * Sends a heartbeat message. - */ - protected function write_heartbeat(): void - { - $pkt = new AMQPWriter(); - $pkt->write_octet(8); - $pkt->write_short(0); - $pkt->write_long(0); - $pkt->write_octet(0xCE); - $this->write($pkt->getvalue()); - } -} diff --git a/src/Components/amqp/src/Swoole/SwooleIO.php b/src/Components/amqp/src/Swoole/SwooleIO.php deleted file mode 100644 index 47e64e8272..0000000000 --- a/src/Components/amqp/src/Swoole/SwooleIO.php +++ /dev/null @@ -1,218 +0,0 @@ -host = $host; - $this->port = $port; - $this->connection_timeout = $connection_timeout; - $this->read_write_timeout = $read_write_timeout; - $this->context = $context; - $this->keepalive = $keepalive; - $this->heartbeat = $heartbeat; - $this->initial_heartbeat = $heartbeat; - } - - /** - * Set ups the connection. - * - * @return void - * - * @throws \PhpAmqpLib\Exception\AMQPIOException - * @throws \PhpAmqpLib\Exception\AMQPRuntimeException - */ - public function connect() - { - $sock = new \Swoole\Coroutine\Client(\Imi\Swoole\Util\Swoole::getTcpSockTypeByHost($this->host)); - if (!$sock->connect($this->host, $this->port, $this->connection_timeout)) - { - throw new AMQPRuntimeException(sprintf('Error Connecting to server(%s): %s ', $sock->errCode, swoole_strerror($sock->errCode)), $sock->errCode); - } - $this->sock = $sock; - } - - /** - * Reconnects the socket. - * - * @return void - */ - public function reconnect() - { - $this->close(); - $this->connect(); - } - - /** - * @param int $len - * - * @return string|false - * - * @throws \PhpAmqpLib\Exception\AMQPIOException - * @throws \PhpAmqpLib\Exception\AMQPRuntimeException - * @throws \PhpAmqpLib\Exception\AMQPSocketException - * @throws \PhpAmqpLib\Exception\AMQPTimeoutException - * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException - */ - public function read($len) - { - $this->check_heartbeat(); - - while (true) - { - if ($len <= \strlen($this->buffer)) - { - $data = substr($this->buffer, 0, $len); - $this->buffer = substr($this->buffer, $len); - $this->last_read = microtime(true); - - return $data; - } - - if (!$this->sock || !$this->sock->connected) - { - throw new AMQPConnectionClosedException('Broken pipe or closed connection'); - } - - $read_buffer = $this->sock->recv($this->read_write_timeout ?: -1); - if (false === $read_buffer) - { - if (110 === $this->sock->errCode) - { - throw new AMQPTimeoutException('Error receiving data, errno=' . $this->sock->errCode); - } - else - { - throw new AMQPRuntimeException('Error receiving data, errno=' . $this->sock->errCode); - } - } - - if ('' === $read_buffer) - { - throw new AMQPConnectionClosedException('Broken pipe or closed connection'); - } - - $this->buffer .= $read_buffer; - } - - // @phpstan-ignore-next-line - return false; - } - - /** - * @param string $data - * - * @return void - * - * @throws \PhpAmqpLib\Exception\AMQPIOException - * @throws \PhpAmqpLib\Exception\AMQPSocketException - * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException - * @throws \PhpAmqpLib\Exception\AMQPTimeoutException - */ - public function write($data) - { - $buffer = $this->sock->send($data); - - if (false === $buffer) - { - throw new AMQPConnectionClosedException('Error sending data, errno=' . $this->sock->errCode); - } - - if (0 === $buffer && !$this->sock->connected) - { - throw new AMQPConnectionClosedException('Broken pipe or closed connection'); - } - - $this->last_write = microtime(true); - } - - /** - * @return void - */ - public function close() - { - if ($this->sock) - { - $this->sock->close(); - $this->sock = null; - } - // @phpstan-ignore-next-line - $this->last_read = null; - // @phpstan-ignore-next-line - $this->last_write = null; - } - - /** - * @return resource - */ - public function getSocket() - { - // @phpstan-ignore-next-line - return $this->sock; - } - - /** - * @param int $sec - * @param int $usec - * - * @return int|mixed - */ - protected function do_select($sec, $usec) - { - return 1; - } - - /** - * Heartbeat logic: check connection health here. - * - * @return void - * - * @throws \PhpAmqpLib\Exception\AMQPRuntimeException - */ - public function check_heartbeat() - { - } -} diff --git a/src/Components/amqp/tests/RabbitMQ/RabbitMQTest.php b/src/Components/amqp/tests/RabbitMQ/RabbitMQTest.php index 43c3fdd236..9174f46224 100644 --- a/src/Components/amqp/tests/RabbitMQ/RabbitMQTest.php +++ b/src/Components/amqp/tests/RabbitMQ/RabbitMQTest.php @@ -19,7 +19,6 @@ public function testPublish(): void $response = $http->get($this->host . 'publish?memberId=20180621'); $this->assertEquals([ 'r1' => true, - 'r2' => true, ], $response->json(true)); } @@ -27,9 +26,8 @@ public function testConsume(): void { $http = new HttpRequest(); $excepted = [ - 'r1' => '{"memberId":20180621}', - 'r2' => '{"memberId":20180621,"content":"memberId:20180621"}', - 'r3' => '{"memberId":20180621}', + 'r1' => '{"memberId":20180621,"content":"memberId:20180621"}', + 'r2' => '{"memberId":20180621}', ]; for ($i = 0; $i < 10; ++$i) {