Skip to content

Commit

Permalink
fix:修复网关子设备状态的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
microrain authored and microrain committed Mar 4, 2024
1 parent 172573e commit 75356d4
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 68 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ require (
github.com/Knetic/govaluate v3.0.0+incompatible
github.com/arl/statsviz v0.6.0
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.1
github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.1
github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.1
github.com/gogf/gf/v2 v2.6.1
github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.2
github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.2
github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.2
github.com/gogf/gf/v2 v2.6.2
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/golang-module/carbon/v2 v2.2.8
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.1 h1:5VW1vlaFNSHHhMliRkGTcDshMeA52Il8T+gffJJaVMc=
github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.1/go.mod h1:jxCa1WV/W+q0F4ILebakUsqRrl7iL3qvP+Uci0eXAew=
github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.1 h1:5NWx7rZa8CbPNw1vbLzIXQFEMbKvoJVQM0GyReBRvJ8=
github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.1/go.mod h1:iy1Dwp5xWfGfuWixCgGQ06ZX6lp+d9onbmSWWzi111A=
github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.1 h1:d3/8lWFWmaQ/8mzJ5GxyRpO4racPpZ3yZ8kCuejhhiY=
github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.1/go.mod h1:O0nzQLfNJtRApGHJluraTy41jc3LIvTsSkR8WAHb4f0=
github.com/gogf/gf/v2 v2.6.1 h1:n/cfXM506WjhPa6Z1CEDuHNM1XZ7C8JzSDPn2AfuxgQ=
github.com/gogf/gf/v2 v2.6.1/go.mod h1:x2XONYcI4hRQ/4gMNbWHmZrNzSEIg20s2NULbzom5k0=
github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.2 h1:iCUoR8je08TehU633pj+vmNdQ/qmWLTpHYQx7yERTv8=
github.com/gogf/gf/contrib/drivers/mysql/v2 v2.6.2/go.mod h1:bPYIZ56MyKvLp1P+EWFpkyJ+wofFF9yxChgr/iScP8A=
github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.2 h1:aAELEWXp8HPHIqHMqrLvy+mjk8cGpV5JovFOkGEvavI=
github.com/gogf/gf/contrib/nosql/redis/v2 v2.6.2/go.mod h1:LNURndyCdDl0kwrCNtjmzAJmABVQd9Mp0w0AAP+u/BU=
github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.2 h1:MChoSivlMZS1Cw6is6u+kBmjdV3G/N5TkHqP/pjC17M=
github.com/gogf/gf/contrib/trace/jaeger/v2 v2.6.2/go.mod h1:yI+aJpR6kU1ATLaRaCzI1o7fY/HNDDCfR5RS3mPNr0s=
github.com/gogf/gf/v2 v2.6.2 h1:TvI1UEH2RDbgFVlJJjkc/6ct6+5zjbOS5MiJ2ESG8qg=
github.com/gogf/gf/v2 v2.6.2/go.mod h1:x2XONYcI4hRQ/4gMNbWHmZrNzSEIg20s2NULbzom5k0=
github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE=
github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-module/carbon/v2 v2.2.8 h1:a1VxHHKAR7fc1ho7sYXhS1s5S4x7+oqAf2EY5p8C46A=
Expand Down
32 changes: 22 additions & 10 deletions manifest/config/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ system:
isDemo: false # 是否为演示系统
isCluster: false # 是否为集群部署默认为false开启集群必须配置redis
deviceCacheData:
poolSize: 300 #设备数据缓存池连接数
poolSize: 50 #设备数据缓存池连接数
recordDuration: "20m" #设备数据缓存时长超过时长的数据将被清除默认为10分钟
recordLimit: 1000 #设备数据缓存条数限制超过条数的数据将被清除默认为1000条
pluginsPath: "./plugins/built"
Expand Down Expand Up @@ -78,14 +78,13 @@ database:
stdout: false
ctxKeys: [ "RequestId" ]
default:
# link: "mysql:root:DbyTYGu3s4WuAF4TTq7@tcp(127.0.0.1:3307)/zhgy_sagoo_cn?loc=Local&parseTime=true"
link: "mysql:root:DbyTYGu3s4WuAF4TTq7@tcp(127.0.0.1:3307)/sagooiot2024?loc=Local&parseTime=true"
link: "mysql:root:DbyTYGu3s4WuAF4TTq7@tcp(127.0.0.1:3307)/sagooiot-community?loc=Local&parseTime=true"
debug: true #开启调试模式
charset: "utf8mb4" #数据库编码(: utf8/gbk/gb2312),一般设置为utf8
dryRun: false #ORM空跑(只读不写)
maxIdle: 50 #连接池最大闲置的连接数
maxIdle: 30 #连接池最大闲置的连接数
maxOpen: 100 #连接池最大打开的连接数
maxLifetime: 60 #(单位秒)连接对象可重复使用的时间长度
maxLifetime: 120 #(单位秒)连接对象可重复使用的时间长度

# 这个mqtt客户端主要是服务端内部处理消息使用的通道
mqtt:
Expand All @@ -100,15 +99,27 @@ mqtt:

# 时序数据库配置
tsd:
database: "TdEngine"
database: "Influxdb" #可选择 TdEngineInfluxdb
tdengine:
# type: "taosRestful" #http连接方式端口是6041
# dsn: "root:taosdata@http(127.0.0.1:6041)/"
type: "taosWS" #http连接方式端口是6041
dsn: "root:taosdata@ws(127.0.0.1:6041)/"
dbName: "sagoo_iot"
maxOpenConns: 1000
maxIdleConns: 50
maxOpenConns: 2000
maxIdleConns: 80
influxdb:
addr: "http://localhost:8086"
org: "sagoo"
dbName: "sagoo_iot"
token: "ez4BQ5QQCUpcAp1FDhhdY9jfcvxq2Z9OLkQSuQG_IPOzE9GvGRHfRm_YYwfuHtCaS7TVefxhEnzCOHi_nGtsCw=="
userName:
passWord:
aggregator: #聚合器配置用来配置批量写入时的聚合器
batchSize: 1000 #批处理数
workers: 100 #工作线程数
channelBufferSize: 50000 #通道缓冲数
lingerTime: 100 # 防止数据积压时间单位毫秒

# Redis 配置示例
redis:
Expand All @@ -118,9 +129,10 @@ redis:
db: 0
pass:
minIdle: 50 #最小空闲连接数
maxIdle: 100 #允许闲置的最大连接数
maxIdle: 20 #允许闲置的最大连接数
maxActive: 16 #最大活跃连接数
idleTimeout: "30m" #连接最大空闲时间使用时间字符串例如30s/2m/2d
idleTimeout: "30s" #连接最大空闲时间使用时间字符串例如30s/2m/2d
ClientName: "SystemCache"

#消息队列
queue:
Expand Down
94 changes: 48 additions & 46 deletions network/core/logic/model/up/property/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ import (
"encoding/json"
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/guid"
"sagooiot/internal/consts"
"sagooiot/internal/model"
"sagooiot/internal/mqtt"
"sagooiot/internal/service"
"sagooiot/network/core"
"sagooiot/network/core/logic/baseLogic"
"sagooiot/network/core/tunnel/base"
"sagooiot/pkg/dcache"
"sagooiot/pkg/iotModel"
"sagooiot/pkg/iotModel/sagooProtocol"
"sagooiot/pkg/iotModel/sagooProtocol/north"
Expand Down Expand Up @@ -38,29 +42,22 @@ func GatewayBatchReportProperty(ctx context.Context, data topicModel.TopicHandle
return logError(ctx, "parse data error", err, data)
}

//网关属性处理
if len(gatewayBatchReport.Params.Properties) > 0 {
if err := handleProperties(ctx, data, gatewayBatchReport.Params.Properties); err != nil {
return err
}
}
//网关子设备处理
for _, sub := range gatewayBatchReport.Params.SubDevices {

//网关事件处理
if len(gatewayBatchReport.Params.Events) > 0 {
if err := handleEvents(ctx, data, gatewayBatchReport.Params.Events, ""); err != nil {
return err
subDevice, err := dcache.GetDeviceDetailInfo(sub.Identity.DeviceKey)
if err != nil {
continue
}
}
dcache.UpdateStatus(ctx, subDevice) //更新子设备状态

//网关子设备处理
for _, sub := range gatewayBatchReport.Params.SubDevices {
if len(sub.Properties) > 0 {
if err := handleProperties(ctx, data, sub.Properties); err != nil {
if err := handleProperties(ctx, data, subDevice, sub.Properties); err != nil {
return err
}
}
if len(sub.Events) > 0 {
if err := handleEvents(ctx, data, sub.Events, sub.Identity.DeviceKey); err != nil {
if err := handleEvents(ctx, data, subDevice, sub.Events); err != nil {
return err
}
}
Expand All @@ -77,55 +74,60 @@ func logError(ctx context.Context, message string, err error, data topicModel.To
}

// handleProperties 处理属性上报
func handleProperties(ctx context.Context, data topicModel.TopicHandlerData, properties map[string]interface{}) error {
reportDataInfo, err := service.DevTSLParse().HandleProperties(ctx, data.DeviceDetail, properties)
func handleProperties(ctx context.Context, data topicModel.TopicHandlerData, subDevice *model.DeviceOutput, properties map[string]interface{}) error {
reportDataInfo, err := service.DevTSLParse().HandleProperties(ctx, subDevice, properties)
if err != nil {
return logError(ctx, "parse property error", err, data)
}

// 上报处理结果
if len(reportDataInfo) > 0 {
//if err := service.DevDataReport().Property(ctx, data.DeviceKey, reportDataInfo, subDeviceKey); err != nil {
// return logError(ctx, "report property error", err, data)
//}
north.WriteMessage(ctx, north.PropertyReportMessageTopic, nil, data.ProductKey, data.DeviceKey, iotModel.PropertyReportMessage{

var reportData = new(sagooProtocol.ReportPropertyReq)
reportData.Id = guid.S()
reportData.Version = "1.0"
reportData.Sys = sagooProtocol.SysInfo{
Ack: 0,
}
reportData.Params = properties
reportData.Method = "thing.event.property.post"
// 上报数据存入日志库
go baseLogic.InertTdLog(ctx, consts.MsgTypePropertyReport, subDevice.Key, reportData)

north.WriteMessage(ctx, north.PropertyReportMessageTopic, nil, subDevice.ProductKey, subDevice.Key, iotModel.PropertyReportMessage{
Properties: reportDataInfo,
})

// 检查报警规则
if err := service.AlarmRule().Check(ctx, data.DeviceKey, data.DeviceKey, consts.AlarmTriggerTypeProperty, reportDataInfo); err != nil {
return logError(ctx, "alarm check error", err, data)
if err := service.AlarmRule().Check(ctx, subDevice.ProductKey, subDevice.Key, consts.AlarmTriggerTypeProperty, reportDataInfo); err != nil {
return logError(ctx, "handleProperties alarm check error", err, data)
}
}

return nil
}

// handleEvents 处理事件上报
func handleEvents(ctx context.Context, data topicModel.TopicHandlerData, events map[string]sagooProtocol.EventNode, subDeviceKey string) error {
resList, err := service.DevTSLParse().HandleEvents(ctx, data.DeviceDetail, events)
if err != nil {
return logError(ctx, "parse event error", err, data)
}

for _, e := range resList {
// 上报事件
if len(e.Param.Value) > 0 {
//if err := service.DevDataReport().Event(ctx, data.DeviceKey, reportEventData, subDeviceKey); err != nil {
// return logError(ctx, "report event error", err, data)
//}

north.WriteMessage(ctx, north.EventReportMessageTopic, nil, data.ProductKey, data.DeviceDetail.Key, iotModel.EventReportMessage{
EventId: e.Key,
Events: e.Param.Value,
Timestamp: time.Now().UnixMilli(),
})

// 检查报警规则
if err := service.AlarmRule().Check(ctx, data.DeviceKey, data.ProductKey, consts.AlarmTriggerTypeProperty, e); err != nil {
return logError(ctx, "alarm check error", err, data)
}
func handleEvents(ctx context.Context, data topicModel.TopicHandlerData, subDevice *model.DeviceOutput, events map[string]sagooProtocol.EventNode) error {
for eventName, eventData := range events {
var reportData = new(sagooProtocol.ReportPropertyReq)
reportData.Id = guid.S()
reportData.Version = "1.0"
reportData.Sys = sagooProtocol.SysInfo{
Ack: 0,
}
reportData.Params = eventData.Value
reportData.Method = "thing.event." + eventName + ".post"

// 上报数据存入日志库
go baseLogic.InertTdLog(ctx, consts.MsgTypeEvent, subDevice.Key, reportData)

north.WriteMessage(ctx, north.EventReportMessageTopic, nil, subDevice.ProductKey, subDevice.Key, iotModel.EventReportMessage{
EventId: eventName,
Events: eventData.Value,
Timestamp: time.Now().UnixMilli(),
})

}
return nil
}
Expand Down

0 comments on commit 75356d4

Please sign in to comment.