diff --git a/pkg/tsd/comm/tools.go b/pkg/tsd/comm/tools.go index 3b911b2..c6a463d 100644 --- a/pkg/tsd/comm/tools.go +++ b/pkg/tsd/comm/tools.go @@ -1,7 +1,13 @@ package comm import ( + "context" + "github.com/gogf/gf/v2/container/gvar" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gtime" + "sagooiot/pkg/iotModel" "strings" + "time" ) // ProductTableName 获取TSD产品表名 @@ -39,3 +45,63 @@ func TsdTagName(key string) string { } return TdTagPrefix + key } + +// GetDeviceField 获取设备数据key原始顺序列表与重构后的字段 +func GetDeviceField(data iotModel.ReportPropertyData) (keys, field []string) { + field = []string{"ts"} + for k := range data { + keys = append(keys, k) + k = TsdColumnName(k) + field = append(field, k) + // 属性上报时间 + field = append(field, k+"_time") + } + return +} + +// GetDeviceFieldAndValue 同时获取设备数据字段与值 +func GetDeviceFieldAndValue(data iotModel.ReportPropertyData) (field, value []string) { + field = []string{"ts"} + for k, v := range data { + value = append(value, "'"+gvar.New(v.Value).String()+"'") + value = append(value, "'"+gtime.New(v.CreateTime).Format("Y-m-d H:i:s")+"'") + k = TsdColumnName(k) + field = append(field, k) + // 属性上报时间 + field = append(field, k+"_time") + } + return +} + +// GetDeviceValue 获取设备数据值 +func GetDeviceValue(field []string, data iotModel.ReportPropertyData) []string { + var value []string + //跟据统一的key列表顺序,对数据值排序输出 + for _, key := range field { + for k, v := range data { + _, ok := data[key] + if ok { + if k == key { + + value = append(value, "'"+gvar.New(v.Value).String()+"'") + value = append(value, "'"+gtime.New(v.CreateTime).Format("Y-m-d H:i:s")+"'") + } + } + } + } + return value +} + +// ChangeTime 连接时区处理 +func ChangeTime(v *g.Var) (rs *g.Var) { + driver := g.Cfg().MustGet(context.TODO(), "tdengine.type") + if driver.String() == "taosRestful" { + if t, err := time.Parse("2006-01-02 15:04:05 +0000 UTC", v.String()); err == nil { + rs = gvar.New(t.Local().Format("2006-01-02 15:04:05")) + return + } + } + + rs = v + return +} diff --git a/pkg/tsd/internal/tdengine/device.go b/pkg/tsd/internal/tdengine/device.go index 9aea862..a63eb89 100644 --- a/pkg/tsd/internal/tdengine/device.go +++ b/pkg/tsd/internal/tdengine/device.go @@ -5,24 +5,48 @@ import ( "database/sql" "errors" "fmt" - "github.com/gogf/gf/v2/container/gvar" + "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" + "sagooiot/internal/service" "sagooiot/pkg/iotModel" "sagooiot/pkg/tsd/comm" - "sort" "time" "strings" ) +// Initialization 初始化tsd相关的设备数据 +func (m *TdEngine) Initialization(ctx context.Context) (err error) { + // 资源锁 + lockKey := "tdLock:initDb" + lockVal, err := g.Redis().Do(ctx, "SET", lockKey, gtime.Now().Unix(), "NX", "EX", "3600") + if err != nil { + return + } + if lockVal.String() != "OK" { + return + } + defer func() { + _, err = g.Redis().Do(ctx, "DEL", lockKey) + }() + + taos, err := service.TdEngine().GetConn(ctx, "") + if err != nil { + err = gerror.New("获取链接失败") + return + } + + dbName := g.Cfg().MustGet(context.Background(), "tsd.tdengine.dbName", "sagoo_iot").String() + _, err = taos.Exec("CREATE DATABASE IF NOT EXISTS " + dbName) + + return +} + // InsertDeviceData 插入设备数据 func (m *TdEngine) InsertDeviceData(deviceKey string, data iotModel.ReportPropertyData, subKey ...string) (err error) { if m.db == nil { - _, err = m.connect() - if err != nil { - return err - } + m.connect() } if len(data) == 0 { @@ -30,9 +54,7 @@ func (m *TdEngine) InsertDeviceData(deviceKey string, data iotModel.ReportProper return } - field := getDeviceField(data) - value := getDeviceValue(data) - + field, value := comm.GetDeviceFieldAndValue(data) table := comm.DeviceTableName(deviceKey) if len(subKey) > 0 { // 子设备 @@ -44,76 +66,36 @@ func (m *TdEngine) InsertDeviceData(deviceKey string, data iotModel.ReportProper return } -// getDeviceField 获取设备数据字段 -func getDeviceField(data iotModel.ReportPropertyData) []string { - var field []string - - for k := range data { - k = comm.TsdColumnName(k) - field = append(field, k) - // 属性上报时间 - field = append(field, k+"_time") - } - sort.Strings(field) - return field -} - -// getDeviceValue 获取设备数据值 -func getDeviceValue(data iotModel.ReportPropertyData) []string { - //ts := time.Now().Format("Y-m-d H:i:s") - //var value = []string{"'" + ts + "'"} - var value []string - - var keys []string - // 提取map中的键 - for k := range data { - keys = append(keys, k) - } - sort.Strings(keys) - - // 给key排序后,从map取值 - for _, k := range keys { - v := data[k] - value = append(value, "'"+gvar.New(v.Value).String()+"'") - value = append(value, "'"+gtime.New(v.CreateTime).Format("Y-m-d H:i:s")+"'") - } - return value -} - // BatchInsertDeviceData 批量插入单设备的数据 func (m *TdEngine) BatchInsertDeviceData(deviceKey string, deviceDataList []iotModel.ReportPropertyData) (resultNum int, err error) { if m.db == nil { - _, err = m.connect() - if err != nil { - return - } + m.connect() } if len(deviceDataList) == 0 { err = errors.New("数据为空") return } table := comm.DeviceTableName(deviceKey) - field := getDeviceField(deviceDataList[0]) + keys, fieldList := comm.GetDeviceField(deviceDataList[0]) var ( ts = time.Now().UnixMilli() // Unix 毫秒时间戳 - baseSQL = "INSERT INTO " + table + " (" + strings.Join(field, ",") + ") VALUES" + baseSQL = "INSERT INTO " + table + " (" + strings.Join(fieldList, ",") + ") VALUES" sqlBuilder strings.Builder allCount int allTime int64 ) sqlBuilder.WriteString(baseSQL) - //g.Log().Debug(context.Background(), "====04====BatchInsertDeviceData 接收到:", len(deviceDataList), deviceDataList) for i, row := range deviceDataList { ts++ - value := getDeviceValue(row) //获取设备数据值 + value := comm.GetDeviceValue(keys, row) //获取设备数据值 sqlBuilder.WriteString(fmt.Sprintf(" (%s)", "'"+time.UnixMilli(ts).Format(time.RFC3339Nano)+"',"+strings.Join(value, ","))) // 当 SQL 字符串长度超过 15K 或在最后一条数据时执行 if sqlBuilder.Len() > 15*1024 || i == len(deviceDataList)-1 { trimmedSQL := strings.TrimRight(sqlBuilder.String(), " ") start := time.Now() // 开始时间 - //g.Log().Debug(context.Background(), "====06====BatchInsertDeviceData SQL:", trimmedSQL) - //_, err := m.db.Exec(trimmedSQL) + g.Log().Debug(context.Background(), "====06====BatchInsertDeviceData SQL:", trimmedSQL) + _, err := m.db.Exec(trimmedSQL) if err != nil { g.Log().Error(context.Background(), err.Error(), trimmedSQL) } @@ -127,17 +109,13 @@ func (m *TdEngine) BatchInsertDeviceData(deviceKey string, deviceDataList []iotM } } resultNum = allCount - //g.Log().Debugf(context.Background(), "Total: %d, Time: %dms\n", allCount, allTime) return } // BatchInsertMultiDeviceData 插入多设备的数据 func (m *TdEngine) BatchInsertMultiDeviceData(multiDeviceDataList map[string][]iotModel.ReportPropertyData) (resultNum int, err error) { if m.db == nil { - _, err = m.connect() - if err != nil { - return - } + m.connect() } if len(multiDeviceDataList) == 0 { err = errors.New("数据为空") @@ -155,23 +133,21 @@ func (m *TdEngine) BatchInsertMultiDeviceData(multiDeviceDataList map[string][]i i := 0 for deviceKey, deviceData := range multiDeviceDataList { - table := comm.DeviceTableName(deviceKey) - var field = []string{"ts"} - field = append(field, getDeviceField(deviceData[0])...) - + fieldKeys, fieldList := comm.GetDeviceField(deviceData[0]) ts++ + sqlBuilder.WriteString(" " + table + " (" + strings.Join(fieldList, ",") + ") VALUES") for _, data := range deviceData { - value := getDeviceValue(data) - sqlBuilder.WriteString(" " + table + " (" + strings.Join(field, ",") + ") VALUES" + fmt.Sprintf(" (%s)", "'"+time.UnixMilli(ts).Format(time.RFC3339Nano)+"',"+strings.Join(value, ","))) - + value := comm.GetDeviceValue(fieldKeys, data) + sqlBuilder.WriteString(fmt.Sprintf(" (%s)", "'"+time.UnixMilli(ts).Format(time.RFC3339Nano)+"',"+strings.Join(value, ","))) } + // 当 SQL 字符串长度超过 15K 或在最后一条数据时执行 if sqlBuilder.Len() > 15*1024 || i == len(multiDeviceDataList)-1 { trimmedSQL := strings.TrimRight(sqlBuilder.String(), " ") start := time.Now() // 开始时间 - g.Log().Debug(context.Background(), "====06====BatchInsertDeviceData SQL:", trimmedSQL) + //g.Log().Debug(context.Background(), "====06====BatchInsertDeviceData SQL:", trimmedSQL) _, err := m.db.Exec(trimmedSQL) if err != nil { g.Log().Error(context.Background(), err.Error(), trimmedSQL) @@ -200,11 +176,9 @@ func (m *TdEngine) WatchDeviceData(deviceKey string, callback func(data iotModel // InsertLogData 插入日志数据 func (m *TdEngine) InsertLogData(log iotModel.DeviceLog) (result sql.Result, err error) { if m.db == nil { - _, err = m.connect() - if err != nil { - return - } + m.connect() } + table := comm.DeviceLogTable(log.Device) baseSQL := "INSERT INTO %s USING device_log TAGS ('%s') VALUES ('%s', '%s', '%s')" sqlStr := fmt.Sprintf(baseSQL, table, log.Device, time.Now().Format(time.RFC3339Nano), log.Type, log.Content) @@ -216,10 +190,7 @@ func (m *TdEngine) InsertLogData(log iotModel.DeviceLog) (result sql.Result, err // BatchInsertLogData 批量插入日志数据 func (m *TdEngine) BatchInsertLogData(deviceLogList map[string][]iotModel.DeviceLog) (resultNum int, err error) { if m.db == nil { - _, err = m.connect() - if err != nil { - return - } + m.connect() } if len(deviceLogList) == 0 { return