From 4039d0aeee2a38e24568ff7abd338746a6ec9fa5 Mon Sep 17 00:00:00 2001 From: chanvi Date: Wed, 19 Jun 2024 11:05:36 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E8=AE=BE=E5=A4=87=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E7=9A=84=E6=89=A7=E8=A1=8C=E7=BB=93=E6=9E=9C=E6=97=A0=E8=BF=94?= =?UTF-8?q?=E5=9B=9Ebug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/core/logic/baseLogic/asyncMap.go | 24 ++++++++++++------------ network/core/router.go | 12 ++++++++++++ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/network/core/logic/baseLogic/asyncMap.go b/network/core/logic/baseLogic/asyncMap.go index 1ec3c1a..4ac9bec 100644 --- a/network/core/logic/baseLogic/asyncMap.go +++ b/network/core/logic/baseLogic/asyncMap.go @@ -10,7 +10,7 @@ import ( type AsyncMap struct { sync.RWMutex - info map[string]*FInfo + Info map[string]*FInfo } type FInfo struct { @@ -19,26 +19,26 @@ type FInfo struct { Response chan interface{} } -var asyncMapInfo = &AsyncMap{info: make(map[string]*FInfo)} +var AsyncMapInfo = &AsyncMap{Info: make(map[string]*FInfo)} func SyncRequest(ctx context.Context, id, funcKey string, params interface{}, timeout int) (interface{}, error) { if timeout == 0 { timeout = 45 } - responseChan := make(chan interface{}) - asyncMapInfo.Lock() - asyncMapInfo.info[id] = &FInfo{ + responseChan := make(chan interface{}, 1) + AsyncMapInfo.Lock() + AsyncMapInfo.Info[id] = &FInfo{ FuncKey: funcKey, Request: params, Response: responseChan, } - asyncMapInfo.Unlock() + AsyncMapInfo.Unlock() defer func() { - asyncMapInfo.Lock() - delete(asyncMapInfo.info, id) - asyncMapInfo.Unlock() + AsyncMapInfo.Lock() + delete(AsyncMapInfo.Info, id) + AsyncMapInfo.Unlock() close(responseChan) }() @@ -53,9 +53,9 @@ func SyncRequest(ctx context.Context, id, funcKey string, params interface{}, ti } func GetCallInfoById(ctx context.Context, id string) (funcKey string, params interface{}, response chan interface{}, err error) { - asyncMapInfo.RLock() - defer asyncMapInfo.RUnlock() - if info, ok := asyncMapInfo.info[id]; !ok { + AsyncMapInfo.RLock() + defer AsyncMapInfo.RUnlock() + if info, ok := AsyncMapInfo.Info[id]; !ok { return "", nil, nil, errors.New("cannot get call info by id " + id) } else { return info.FuncKey, info.Request, info.Response, nil diff --git a/network/core/router.go b/network/core/router.go index 2ee92f9..5df0b3d 100644 --- a/network/core/router.go +++ b/network/core/router.go @@ -7,12 +7,14 @@ import ( "fmt" MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" "sagooiot/internal/consts" "sagooiot/internal/model" "sagooiot/internal/mqtt" "sagooiot/network/core/logic/baseLogic" "sagooiot/pkg/dcache" "sagooiot/pkg/gpool" + "sagooiot/pkg/iotModel/sagooProtocol" "sagooiot/pkg/iotModel/topicModel" "sagooiot/pkg/jsinterpreter" "sagooiot/pkg/plugins" @@ -80,6 +82,16 @@ func (s *SubMap) HandleMessage(ctx context.Context, handleF handleFunc) func(con return nil } + // 处理设备应答 + if strings.HasSuffix(topicInfo[6], "reply") { + var msg sagooProtocol.ServiceCallOutputRes + json.Unmarshal([]byte(res), &msg) + + if info, ok := baseLogic.AsyncMapInfo.Info[msg.Id]; ok { + info.Response <- gconv.Map(msg) + } + } + // 获取设备详情,拿出来消息协议,然后按照产品定义的消息协议解析消息 deviceInfo, err := dcache.GetDeviceDetailInfo(deviceKey) if err != nil {