Skip to content

Commit

Permalink
优化插件通讯协议处理
Browse files Browse the repository at this point in the history
修正TCP网络服务通道BUG
  • Loading branch information
microrain authored and microrain committed Aug 17, 2023
1 parent f415435 commit 26ab38f
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 41 deletions.
5 changes: 2 additions & 3 deletions extend/extend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
"github.com/sagoo-cloud/sagooiot/extend/model"
"github.com/sagoo-cloud/sagooiot/extend/module"
"sync"
Expand Down Expand Up @@ -96,7 +95,7 @@ func (pm *SysPlugin) GetProtocolPlugin(protocolName string) (obj module.Protocol

// GetProtocolUnpackData 通过协议解析插件处理后,获取解析数据。protocolType 为协议名称
// todo 需要标记数据协议子类型
func (pm *SysPlugin) GetProtocolUnpackData(protocolType string, data []byte) (res string, err error) {
func (pm *SysPlugin) GetProtocolUnpackData(protocolType string, data []byte) (res model.JsonRes, err error) {
//获取插件
p, err := pm.pluginManager.GetInterface(protocolType)
if err != nil {
Expand All @@ -106,7 +105,7 @@ func (pm *SysPlugin) GetProtocolUnpackData(protocolType string, data []byte) (re
var rd = model.DataReq{}
rd.Data = data
resData := p.(module.Protocol).Decode(rd)
return gconv.String(resData), err
return resData, err
}

// NoticeSend 通过插件发送通知信息。noticeName 为通知插件名称;msg为通知内容
Expand Down
30 changes: 30 additions & 0 deletions extend/model/sagoomqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package model

import "encoding/gob"

func init() {
gob.Register(SagooMqttModel{})
}

// SagooMqttModel 主结构
type (
SagooMqttModel struct {
Id string `json:"id"`
Version string `json:"version"`
Sys SysInfo `json:"sys"`
Params map[string]Param `json:"params"`
Method string `json:"method"`
ModelFuncName string `json:"model_func_name"`
ModelFuncIdentify string `json:"model_func_identify"`
}

SysInfo struct {
Ack int `json:"ack"`
}

// Param 属性
Param struct {
Value interface{} `json:"value"`
Time int64 `json:"time"`
}
)
10 changes: 8 additions & 2 deletions network/core/mqtt-device.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,17 @@ func deviceDataHandler(ctx context.Context, client MQTT.Client, message MQTT.Mes
if extend.GetProtocolPlugin() == nil {
return
}
res, err = extend.GetProtocolPlugin().GetProtocolUnpackData(messageProtocol, message.Payload())
pluginData, err := extend.GetProtocolPlugin().GetProtocolUnpackData(messageProtocol, message.Payload())
if err != nil {
g.Log().Errorf(ctx, "get plugin error: %w, topic:%s, message:%s, message ignored", err, message.Topic(), string(message.Payload()))
g.Log().Errorf(ctx, "get plugin error: %w, deviceKey:%s, data:%s, message ignored", err, deviceDetail.Key, string(message.Payload()))
return
}
if pluginData.Code != 0 {
g.Log().Errorf(ctx, "plugin parse error: code:%d message:%s, deviceKey:%s, data:%s, message ignored", pluginData.Code, pluginData.Message, deviceDetail.Key, string(message.Payload()))
return
}
pluginDataByte, _ := json.Marshal(pluginData.Data)
res = string(pluginDataByte)
}
var reportData networkModel.DefaultMessageType
if reportDataErr := json.Unmarshal([]byte(res), &reportData); reportDataErr != nil {
Expand Down
10 changes: 9 additions & 1 deletion network/core/tunnel-base.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,19 @@ func (l *tunnelBase) ReadData(ctx context.Context, deviceKey string, data []byte
if extend.GetProtocolPlugin() == nil {
return
}
res, err = extend.GetProtocolPlugin().GetProtocolUnpackData(productDetail.MessageProtocol, data)
// 通过消息协议插件解析数据
pluginData, err := extend.GetProtocolPlugin().GetProtocolUnpackData(productDetail.MessageProtocol, data)
g.Log().Debug(context.TODO(), "GetProtocolUnpackData", pluginData)
if err != nil {
g.Log().Errorf(ctx, "get plugin error: %w, message:%s, message ignored", err, res)
return
}
if pluginData.Code != 0 {
g.Log().Errorf(ctx, "plugin parse error: code:%d message:%s, deviceKey:%s, data:%s, message ignored", pluginData.Code, pluginData.Message, deviceDetail.Key, string(data))
return
}
pluginDataByte, _ := json.Marshal(pluginData.Data)
res = string(pluginDataByte)
}
var reportData networkModel.DefaultMessageType
if reportDataErr := json.Unmarshal([]byte(res), &reportData); reportDataErr != nil {
Expand Down
4 changes: 3 additions & 1 deletion network/model/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package model
import (
"bytes"
"encoding/hex"
"github.com/gogf/gf/v2/text/gregex"
"regexp"
"time"
)
Expand Down Expand Up @@ -71,7 +72,8 @@ func (p *RegisterPacket) Check(buf []byte) (deviceKey string, checkOk bool) {
if p.regex == nil {
p.regex = regexp.MustCompile(p.Regex)
}
return string(buf), p.regex.MatchString(string(buf))
match, _ := gregex.MatchString(p.Regex, string(buf))
return match[1], p.regex.Match(buf)
}
if p.Length > 0 {
if len(buf) != p.Length {
Expand Down
18 changes: 0 additions & 18 deletions plugins/protocol/tgn52/model.go

This file was deleted.

41 changes: 25 additions & 16 deletions plugins/protocol/tgn52/tgn52.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package main

import (
"fmt"
"github.com/gogf/gf/v2/util/guid"
"github.com/sagoo-cloud/sagooiot/extend/model"
"net/rpc"
"strings"
"time"

gplugin "github.com/hashicorp/go-plugin"
plugin "github.com/sagoo-cloud/sagooiot/extend/module"
Expand Down Expand Up @@ -32,30 +34,37 @@ func (p *ProtocolTgn52) Encode(args interface{}) model.JsonRes {
func (p *ProtocolTgn52) Decode(data model.DataReq) model.JsonRes {
var resp model.JsonRes
resp.Code = 0

tmpData := strings.Split(string(data.Data), ";")
var rd = DeviceData{}
var rd = make(map[string]model.Param)

l := len(tmpData)
nowTime := time.Now().Unix()
if l > 7 {
rd.HeadStr = tmpData[0]
rd.DeviceID = tmpData[1]
rd.Signal = tmpData[2]
rd.Battery = tmpData[3]
rd.Temperature = tmpData[4]
rd.Humidity = tmpData[5]
rd.Cycle = tmpData[6]
rd["HeadStr"] = model.Param{Value: tmpData[0], Time: nowTime}
rd["DeviceID"] = model.Param{Value: tmpData[1], Time: nowTime}
rd["Signal"] = model.Param{Value: tmpData[2], Time: nowTime}
rd["Battery"] = model.Param{Value: tmpData[3], Time: nowTime}
rd["Temperature"] = model.Param{Value: tmpData[4], Time: nowTime}
rd["Humidity"] = model.Param{Value: tmpData[5], Time: nowTime}
rd["Cycle"] = model.Param{Value: tmpData[6], Time: nowTime}
//处理续传数据
updateStr := make([]string, 0)
for i := 7; i < l; i++ {
rd.Update = append(rd.Update, tmpData[i])
updateStr = append(updateStr, tmpData[i])
}
rd["Update"] = model.Param{Value: updateStr, Time: nowTime}
}
res := plugin.OutJsonRes(0, "", rd)
if rd.IsEmpty() {
resp.Code = 1
resp.Message = "数据为空,或数据结构不对"
return resp
}

resp.Code = 0
resp.Data = res
resp.Data = model.SagooMqttModel{
Id: guid.S(),
Version: "1.0",
Sys: model.SysInfo{Ack: 0},
Params: rd,
Method: "thing.event.property.post",
ModelFuncName: "upProperty",
}
return resp
}

Expand Down

0 comments on commit 26ab38f

Please sign in to comment.