From 75356d4daafd4dbaa85ffc57c37d74c01100189f Mon Sep 17 00:00:00 2001 From: microrain Date: Mon, 4 Mar 2024 08:54:17 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E7=BD=91=E5=85=B3?= =?UTF-8?q?=E5=AD=90=E8=AE=BE=E5=A4=87=E7=8A=B6=E6=80=81=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 8 +- go.sum | 16 ++-- manifest/config/config.example.yaml | 32 +++++-- .../logic/model/up/property/batch/batch.go | 94 ++++++++++--------- 4 files changed, 82 insertions(+), 68 deletions(-) diff --git a/go.mod b/go.mod index 415c35e..f9b9cda 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,10 @@ require ( github.com/Knetic/govaluate v3.0.0+incompatible github.com/arl/statsviz v0.6.0 github.com/eclipse/paho.mqtt.golang v1.4.3 - github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.1 - github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.1 - github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.1 - github.com/gogf/gf/v2 v2.6.1 + github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.2 + github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.2 + github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.2 + github.com/gogf/gf/v2 v2.6.2 github.com/golang-jwt/jwt/v5 v5.0.0 github.com/golang-module/carbon/v2 v2.2.8 github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 diff --git a/go.sum b/go.sum index 12be006..0be840a 100644 --- a/go.sum +++ b/go.sum @@ -45,14 +45,14 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= -github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.1 h1:5VW1vlaFNSHHhMliRkGTcDshMeA52Il8T+gffJJaVMc= -github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.1/go.mod h1:jxCa1WV/W+q0F4ILebakUsqRrl7iL3qvP+Uci0eXAew= -github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.1 h1:5NWx7rZa8CbPNw1vbLzIXQFEMbKvoJVQM0GyReBRvJ8= -github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.1/go.mod h1:iy1Dwp5xWfGfuWixCgGQ06ZX6lp+d9onbmSWWzi111A= -github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.1 h1:d3/8lWFWmaQ/8mzJ5GxyRpO4racPpZ3yZ8kCuejhhiY= -github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.1/go.mod h1:O0nzQLfNJtRApGHJluraTy41jc3LIvTsSkR8WAHb4f0= -github.com/gogf/gf/v2 v2.6.1 h1:n/cfXM506WjhPa6Z1CEDuHNM1XZ7C8JzSDPn2AfuxgQ= -github.com/gogf/gf/v2 v2.6.1/go.mod h1:x2XONYcI4hRQ/4gMNbWHmZrNzSEIg20s2NULbzom5k0= +github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.2 h1:iCUoR8je08TehU633pj+vmNdQ/qmWLTpHYQx7yERTv8= +github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.2/go.mod h1:bPYIZ56MyKvLp1P+EWFpkyJ+wofFF9yxChgr/iScP8A= +github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.2 h1:aAELEWXp8HPHIqHMqrLvy+mjk8cGpV5JovFOkGEvavI= +github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.2/go.mod h1:LNURndyCdDl0kwrCNtjmzAJmABVQd9Mp0w0AAP+u/BU= +github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.2 h1:MChoSivlMZS1Cw6is6u+kBmjdV3G/N5TkHqP/pjC17M= +github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.2/go.mod h1:yI+aJpR6kU1ATLaRaCzI1o7fY/HNDDCfR5RS3mPNr0s= +github.com/gogf/gf/v2 v2.6.2 h1:TvI1UEH2RDbgFVlJJjkc/6ct6+5zjbOS5MiJ2ESG8qg= +github.com/gogf/gf/v2 v2.6.2/go.mod h1:x2XONYcI4hRQ/4gMNbWHmZrNzSEIg20s2NULbzom5k0= github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-module/carbon/v2 v2.2.8 h1:a1VxHHKAR7fc1ho7sYXhS1s5S4x7+oqAf2EY5p8C46A= diff --git a/manifest/config/config.example.yaml b/manifest/config/config.example.yaml index efb9197..197b013 100644 --- a/manifest/config/config.example.yaml +++ b/manifest/config/config.example.yaml @@ -9,7 +9,7 @@ system: isDemo: false # 是否为演示系统 isCluster: false # 是否为集群部署,默认为false。开启集群必须配置redis deviceCacheData: - poolSize: 300 #设备数据缓存池连接数 + poolSize: 50 #设备数据缓存池连接数 recordDuration: "20m" #设备数据缓存时长,超过时长的数据将被清除。默认为10分钟 recordLimit: 1000 #设备数据缓存条数限制,超过条数的数据将被清除。默认为1000条 pluginsPath: "./plugins/built" @@ -78,14 +78,13 @@ database: stdout: false ctxKeys: [ "RequestId" ] default: - # link: "mysql:root:DbyTYGu3s4WuAF4TTq7@tcp(127.0.0.1:3307)/zhgy_sagoo_cn?loc=Local&parseTime=true" - link: "mysql:root:DbyTYGu3s4WuAF4TTq7@tcp(127.0.0.1:3307)/sagooiot2024?loc=Local&parseTime=true" + link: "mysql:root:DbyTYGu3s4WuAF4TTq7@tcp(127.0.0.1:3307)/sagooiot-community?loc=Local&parseTime=true" debug: true #开启调试模式 charset: "utf8mb4" #数据库编码(如: utf8/gbk/gb2312),一般设置为utf8 dryRun: false #ORM空跑(只读不写) - maxIdle: 50 #连接池最大闲置的连接数 + maxIdle: 30 #连接池最大闲置的连接数 maxOpen: 100 #连接池最大打开的连接数 - maxLifetime: 60 #(单位秒)连接对象可重复使用的时间长度 + maxLifetime: 120 #(单位秒)连接对象可重复使用的时间长度 # 这个mqtt客户端主要是服务端内部处理消息使用的通道 mqtt: @@ -100,15 +99,27 @@ mqtt: # 时序数据库配置 tsd: - database: "TdEngine" + database: "Influxdb" #可选择 TdEngine、Influxdb tdengine: # type: "taosRestful" #http连接方式,端口是6041 # dsn: "root:taosdata@http(127.0.0.1:6041)/" type: "taosWS" #http连接方式,端口是6041 dsn: "root:taosdata@ws(127.0.0.1:6041)/" dbName: "sagoo_iot" - maxOpenConns: 1000 - maxIdleConns: 50 + maxOpenConns: 2000 + maxIdleConns: 80 + influxdb: + addr: "http://localhost:8086" + org: "sagoo" + dbName: "sagoo_iot" + token: "ez4BQ5QQCUpcAp1FDhhdY9jfcvxq2Z9OLkQSuQG_IPOzE9GvGRHfRm_YYwfuHtCaS7TVefxhEnzCOHi_nGtsCw==" + userName: + passWord: + aggregator: #聚合器配置,用来配置批量写入时的聚合器 + batchSize: 1000 #批处理数 + workers: 100 #工作线程数 + channelBufferSize: 50000 #通道缓冲数 + lingerTime: 100 # 防止数据积压时间,单位毫秒 # Redis 配置示例 redis: @@ -118,9 +129,10 @@ redis: db: 0 pass: minIdle: 50 #最小空闲连接数 - maxIdle: 100 #允许闲置的最大连接数 + maxIdle: 20 #允许闲置的最大连接数 maxActive: 16 #最大活跃连接数 - idleTimeout: "30m" #连接最大空闲时间,使用时间字符串例如30s/2m/2d + idleTimeout: "30s" #连接最大空闲时间,使用时间字符串例如30s/2m/2d + ClientName: "SystemCache" #消息队列 queue: diff --git a/network/core/logic/model/up/property/batch/batch.go b/network/core/logic/model/up/property/batch/batch.go index f2bb623..6bf1af2 100644 --- a/network/core/logic/model/up/property/batch/batch.go +++ b/network/core/logic/model/up/property/batch/batch.go @@ -5,11 +5,15 @@ import ( "encoding/json" "fmt" "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/guid" "sagooiot/internal/consts" + "sagooiot/internal/model" "sagooiot/internal/mqtt" "sagooiot/internal/service" "sagooiot/network/core" + "sagooiot/network/core/logic/baseLogic" "sagooiot/network/core/tunnel/base" + "sagooiot/pkg/dcache" "sagooiot/pkg/iotModel" "sagooiot/pkg/iotModel/sagooProtocol" "sagooiot/pkg/iotModel/sagooProtocol/north" @@ -38,29 +42,22 @@ func GatewayBatchReportProperty(ctx context.Context, data topicModel.TopicHandle return logError(ctx, "parse data error", err, data) } - //网关属性处理 - if len(gatewayBatchReport.Params.Properties) > 0 { - if err := handleProperties(ctx, data, gatewayBatchReport.Params.Properties); err != nil { - return err - } - } + //网关子设备处理 + for _, sub := range gatewayBatchReport.Params.SubDevices { - //网关事件处理 - if len(gatewayBatchReport.Params.Events) > 0 { - if err := handleEvents(ctx, data, gatewayBatchReport.Params.Events, ""); err != nil { - return err + subDevice, err := dcache.GetDeviceDetailInfo(sub.Identity.DeviceKey) + if err != nil { + continue } - } + dcache.UpdateStatus(ctx, subDevice) //更新子设备状态 - //网关子设备处理 - for _, sub := range gatewayBatchReport.Params.SubDevices { if len(sub.Properties) > 0 { - if err := handleProperties(ctx, data, sub.Properties); err != nil { + if err := handleProperties(ctx, data, subDevice, sub.Properties); err != nil { return err } } if len(sub.Events) > 0 { - if err := handleEvents(ctx, data, sub.Events, sub.Identity.DeviceKey); err != nil { + if err := handleEvents(ctx, data, subDevice, sub.Events); err != nil { return err } } @@ -77,24 +74,33 @@ func logError(ctx context.Context, message string, err error, data topicModel.To } // handleProperties 处理属性上报 -func handleProperties(ctx context.Context, data topicModel.TopicHandlerData, properties map[string]interface{}) error { - reportDataInfo, err := service.DevTSLParse().HandleProperties(ctx, data.DeviceDetail, properties) +func handleProperties(ctx context.Context, data topicModel.TopicHandlerData, subDevice *model.DeviceOutput, properties map[string]interface{}) error { + reportDataInfo, err := service.DevTSLParse().HandleProperties(ctx, subDevice, properties) if err != nil { return logError(ctx, "parse property error", err, data) } // 上报处理结果 if len(reportDataInfo) > 0 { - //if err := service.DevDataReport().Property(ctx, data.DeviceKey, reportDataInfo, subDeviceKey); err != nil { - // return logError(ctx, "report property error", err, data) - //} - north.WriteMessage(ctx, north.PropertyReportMessageTopic, nil, data.ProductKey, data.DeviceKey, iotModel.PropertyReportMessage{ + + var reportData = new(sagooProtocol.ReportPropertyReq) + reportData.Id = guid.S() + reportData.Version = "1.0" + reportData.Sys = sagooProtocol.SysInfo{ + Ack: 0, + } + reportData.Params = properties + reportData.Method = "thing.event.property.post" + // 上报数据存入日志库 + go baseLogic.InertTdLog(ctx, consts.MsgTypePropertyReport, subDevice.Key, reportData) + + north.WriteMessage(ctx, north.PropertyReportMessageTopic, nil, subDevice.ProductKey, subDevice.Key, iotModel.PropertyReportMessage{ Properties: reportDataInfo, }) // 检查报警规则 - if err := service.AlarmRule().Check(ctx, data.DeviceKey, data.DeviceKey, consts.AlarmTriggerTypeProperty, reportDataInfo); err != nil { - return logError(ctx, "alarm check error", err, data) + if err := service.AlarmRule().Check(ctx, subDevice.ProductKey, subDevice.Key, consts.AlarmTriggerTypeProperty, reportDataInfo); err != nil { + return logError(ctx, "handleProperties alarm check error", err, data) } } @@ -102,30 +108,26 @@ func handleProperties(ctx context.Context, data topicModel.TopicHandlerData, pro } // handleEvents 处理事件上报 -func handleEvents(ctx context.Context, data topicModel.TopicHandlerData, events map[string]sagooProtocol.EventNode, subDeviceKey string) error { - resList, err := service.DevTSLParse().HandleEvents(ctx, data.DeviceDetail, events) - if err != nil { - return logError(ctx, "parse event error", err, data) - } - - for _, e := range resList { - // 上报事件 - if len(e.Param.Value) > 0 { - //if err := service.DevDataReport().Event(ctx, data.DeviceKey, reportEventData, subDeviceKey); err != nil { - // return logError(ctx, "report event error", err, data) - //} - - north.WriteMessage(ctx, north.EventReportMessageTopic, nil, data.ProductKey, data.DeviceDetail.Key, iotModel.EventReportMessage{ - EventId: e.Key, - Events: e.Param.Value, - Timestamp: time.Now().UnixMilli(), - }) - - // 检查报警规则 - if err := service.AlarmRule().Check(ctx, data.DeviceKey, data.ProductKey, consts.AlarmTriggerTypeProperty, e); err != nil { - return logError(ctx, "alarm check error", err, data) - } +func handleEvents(ctx context.Context, data topicModel.TopicHandlerData, subDevice *model.DeviceOutput, events map[string]sagooProtocol.EventNode) error { + for eventName, eventData := range events { + var reportData = new(sagooProtocol.ReportPropertyReq) + reportData.Id = guid.S() + reportData.Version = "1.0" + reportData.Sys = sagooProtocol.SysInfo{ + Ack: 0, } + reportData.Params = eventData.Value + reportData.Method = "thing.event." + eventName + ".post" + + // 上报数据存入日志库 + go baseLogic.InertTdLog(ctx, consts.MsgTypeEvent, subDevice.Key, reportData) + + north.WriteMessage(ctx, north.EventReportMessageTopic, nil, subDevice.ProductKey, subDevice.Key, iotModel.EventReportMessage{ + EventId: eventName, + Events: eventData.Value, + Timestamp: time.Now().UnixMilli(), + }) + } return nil }