Skip to content

Commit

Permalink
feat:修复TD存入数据顺序不正确的BUG
Browse files Browse the repository at this point in the history
  • Loading branch information
microrain authored and microrain committed May 28, 2024
1 parent 9874cd3 commit ee0f2b9
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 76 deletions.
66 changes: 66 additions & 0 deletions pkg/tsd/comm/tools.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package comm

import (
"context"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"sagooiot/pkg/iotModel"
"strings"
"time"
)

// ProductTableName 获取TSD产品表名
Expand Down Expand Up @@ -39,3 +45,63 @@ func TsdTagName(key string) string {
}
return TdTagPrefix + key
}

// GetDeviceField 获取设备数据key原始顺序列表与重构后的字段
func GetDeviceField(data iotModel.ReportPropertyData) (keys, field []string) {
field = []string{"ts"}
for k := range data {
keys = append(keys, k)
k = TsdColumnName(k)
field = append(field, k)
// 属性上报时间
field = append(field, k+"_time")
}
return
}

// GetDeviceFieldAndValue 同时获取设备数据字段与值
func GetDeviceFieldAndValue(data iotModel.ReportPropertyData) (field, value []string) {
field = []string{"ts"}
for k, v := range data {
value = append(value, "'"+gvar.New(v.Value).String()+"'")
value = append(value, "'"+gtime.New(v.CreateTime).Format("Y-m-d H:i:s")+"'")
k = TsdColumnName(k)
field = append(field, k)
// 属性上报时间
field = append(field, k+"_time")
}
return
}

// GetDeviceValue 获取设备数据值
func GetDeviceValue(field []string, data iotModel.ReportPropertyData) []string {
var value []string
//跟据统一的key列表顺序,对数据值排序输出
for _, key := range field {
for k, v := range data {
_, ok := data[key]
if ok {
if k == key {

value = append(value, "'"+gvar.New(v.Value).String()+"'")
value = append(value, "'"+gtime.New(v.CreateTime).Format("Y-m-d H:i:s")+"'")
}
}
}
}
return value
}

// ChangeTime 连接时区处理
func ChangeTime(v *g.Var) (rs *g.Var) {
driver := g.Cfg().MustGet(context.TODO(), "tdengine.type")
if driver.String() == "taosRestful" {
if t, err := time.Parse("2006-01-02 15:04:05 +0000 UTC", v.String()); err == nil {
rs = gvar.New(t.Local().Format("2006-01-02 15:04:05"))
return
}
}

rs = v
return
}
123 changes: 47 additions & 76 deletions pkg/tsd/internal/tdengine/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,56 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"sagooiot/internal/service"
"sagooiot/pkg/iotModel"
"sagooiot/pkg/tsd/comm"
"sort"
"time"

"strings"
)

// Initialization 初始化tsd相关的设备数据
func (m *TdEngine) Initialization(ctx context.Context) (err error) {
// 资源锁
lockKey := "tdLock:initDb"
lockVal, err := g.Redis().Do(ctx, "SET", lockKey, gtime.Now().Unix(), "NX", "EX", "3600")
if err != nil {
return
}
if lockVal.String() != "OK" {
return
}
defer func() {
_, err = g.Redis().Do(ctx, "DEL", lockKey)
}()

taos, err := service.TdEngine().GetConn(ctx, "")
if err != nil {
err = gerror.New("获取链接失败")
return
}

dbName := g.Cfg().MustGet(context.Background(), "tsd.tdengine.dbName", "sagoo_iot").String()
_, err = taos.Exec("CREATE DATABASE IF NOT EXISTS " + dbName)

return
}

// InsertDeviceData 插入设备数据
func (m *TdEngine) InsertDeviceData(deviceKey string, data iotModel.ReportPropertyData, subKey ...string) (err error) {
if m.db == nil {
_, err = m.connect()
if err != nil {
return err
}
m.connect()
}

if len(data) == 0 {
err = errors.New("数据为空")
return
}

field := getDeviceField(data)
value := getDeviceValue(data)

field, value := comm.GetDeviceFieldAndValue(data)
table := comm.DeviceTableName(deviceKey)
if len(subKey) > 0 {
// 子设备
Expand All @@ -44,76 +66,36 @@ func (m *TdEngine) InsertDeviceData(deviceKey string, data iotModel.ReportProper
return
}

// getDeviceField 获取设备数据字段
func getDeviceField(data iotModel.ReportPropertyData) []string {
var field []string

for k := range data {
k = comm.TsdColumnName(k)
field = append(field, k)
// 属性上报时间
field = append(field, k+"_time")
}
sort.Strings(field)
return field
}

// getDeviceValue 获取设备数据值
func getDeviceValue(data iotModel.ReportPropertyData) []string {
//ts := time.Now().Format("Y-m-d H:i:s")
//var value = []string{"'" + ts + "'"}
var value []string

var keys []string
// 提取map中的键
for k := range data {
keys = append(keys, k)
}
sort.Strings(keys)

// 给key排序后,从map取值
for _, k := range keys {
v := data[k]
value = append(value, "'"+gvar.New(v.Value).String()+"'")
value = append(value, "'"+gtime.New(v.CreateTime).Format("Y-m-d H:i:s")+"'")
}
return value
}

// BatchInsertDeviceData 批量插入单设备的数据
func (m *TdEngine) BatchInsertDeviceData(deviceKey string, deviceDataList []iotModel.ReportPropertyData) (resultNum int, err error) {
if m.db == nil {
_, err = m.connect()
if err != nil {
return
}
m.connect()
}
if len(deviceDataList) == 0 {
err = errors.New("数据为空")
return
}
table := comm.DeviceTableName(deviceKey)
field := getDeviceField(deviceDataList[0])
keys, fieldList := comm.GetDeviceField(deviceDataList[0])
var (
ts = time.Now().UnixMilli() // Unix 毫秒时间戳
baseSQL = "INSERT INTO " + table + " (" + strings.Join(field, ",") + ") VALUES"
baseSQL = "INSERT INTO " + table + " (" + strings.Join(fieldList, ",") + ") VALUES"
sqlBuilder strings.Builder
allCount int
allTime int64
)
sqlBuilder.WriteString(baseSQL)
//g.Log().Debug(context.Background(), "====04====BatchInsertDeviceData 接收到:", len(deviceDataList), deviceDataList)

for i, row := range deviceDataList {
ts++
value := getDeviceValue(row) //获取设备数据值
value := comm.GetDeviceValue(keys, row) //获取设备数据值
sqlBuilder.WriteString(fmt.Sprintf(" (%s)", "'"+time.UnixMilli(ts).Format(time.RFC3339Nano)+"',"+strings.Join(value, ",")))
// 当 SQL 字符串长度超过 15K 或在最后一条数据时执行
if sqlBuilder.Len() > 15*1024 || i == len(deviceDataList)-1 {
trimmedSQL := strings.TrimRight(sqlBuilder.String(), " ")
start := time.Now() // 开始时间
//g.Log().Debug(context.Background(), "====06====BatchInsertDeviceData SQL:", trimmedSQL)
//_, err := m.db.Exec(trimmedSQL)
g.Log().Debug(context.Background(), "====06====BatchInsertDeviceData SQL:", trimmedSQL)
_, err := m.db.Exec(trimmedSQL)
if err != nil {
g.Log().Error(context.Background(), err.Error(), trimmedSQL)
}
Expand All @@ -127,17 +109,13 @@ func (m *TdEngine) BatchInsertDeviceData(deviceKey string, deviceDataList []iotM
}
}
resultNum = allCount
//g.Log().Debugf(context.Background(), "Total: %d, Time: %dms\n", allCount, allTime)
return
}

// BatchInsertMultiDeviceData 插入多设备的数据
func (m *TdEngine) BatchInsertMultiDeviceData(multiDeviceDataList map[string][]iotModel.ReportPropertyData) (resultNum int, err error) {
if m.db == nil {
_, err = m.connect()
if err != nil {
return
}
m.connect()
}
if len(multiDeviceDataList) == 0 {
err = errors.New("数据为空")
Expand All @@ -155,23 +133,21 @@ func (m *TdEngine) BatchInsertMultiDeviceData(multiDeviceDataList map[string][]i

i := 0
for deviceKey, deviceData := range multiDeviceDataList {

table := comm.DeviceTableName(deviceKey)
var field = []string{"ts"}
field = append(field, getDeviceField(deviceData[0])...)

fieldKeys, fieldList := comm.GetDeviceField(deviceData[0])
ts++
sqlBuilder.WriteString(" " + table + " (" + strings.Join(fieldList, ",") + ") VALUES")

for _, data := range deviceData {
value := getDeviceValue(data)
sqlBuilder.WriteString(" " + table + " (" + strings.Join(field, ",") + ") VALUES" + fmt.Sprintf(" (%s)", "'"+time.UnixMilli(ts).Format(time.RFC3339Nano)+"',"+strings.Join(value, ",")))

value := comm.GetDeviceValue(fieldKeys, data)
sqlBuilder.WriteString(fmt.Sprintf(" (%s)", "'"+time.UnixMilli(ts).Format(time.RFC3339Nano)+"',"+strings.Join(value, ",")))
}

// 当 SQL 字符串长度超过 15K 或在最后一条数据时执行
if sqlBuilder.Len() > 15*1024 || i == len(multiDeviceDataList)-1 {
trimmedSQL := strings.TrimRight(sqlBuilder.String(), " ")
start := time.Now() // 开始时间
g.Log().Debug(context.Background(), "====06====BatchInsertDeviceData SQL:", trimmedSQL)
//g.Log().Debug(context.Background(), "====06====BatchInsertDeviceData SQL:", trimmedSQL)
_, err := m.db.Exec(trimmedSQL)
if err != nil {
g.Log().Error(context.Background(), err.Error(), trimmedSQL)
Expand Down Expand Up @@ -200,11 +176,9 @@ func (m *TdEngine) WatchDeviceData(deviceKey string, callback func(data iotModel
// InsertLogData 插入日志数据
func (m *TdEngine) InsertLogData(log iotModel.DeviceLog) (result sql.Result, err error) {
if m.db == nil {
_, err = m.connect()
if err != nil {
return
}
m.connect()
}

table := comm.DeviceLogTable(log.Device)
baseSQL := "INSERT INTO %s USING device_log TAGS ('%s') VALUES ('%s', '%s', '%s')"
sqlStr := fmt.Sprintf(baseSQL, table, log.Device, time.Now().Format(time.RFC3339Nano), log.Type, log.Content)
Expand All @@ -216,10 +190,7 @@ func (m *TdEngine) InsertLogData(log iotModel.DeviceLog) (result sql.Result, err
// BatchInsertLogData 批量插入日志数据
func (m *TdEngine) BatchInsertLogData(deviceLogList map[string][]iotModel.DeviceLog) (resultNum int, err error) {
if m.db == nil {
_, err = m.connect()
if err != nil {
return
}
m.connect()
}
if len(deviceLogList) == 0 {
return
Expand Down

0 comments on commit ee0f2b9

Please sign in to comment.