From d68b1810dfc0e6bba548b9ce65dbbd6bfc8d2c1c Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Mon, 9 Dec 2024 11:27:09 +0800 Subject: [PATCH] fix: parse 1d aggr table failed --- server/ingester/ckissu/ckissu.go | 35 +++++++++++++------ .../exporters/universal_tag/universal_tag.go | 2 +- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/server/ingester/ckissu/ckissu.go b/server/ingester/ckissu/ckissu.go index 9f679fc4924e..bebbcd5cec52 100644 --- a/server/ingester/ckissu/ckissu.go +++ b/server/ingester/ckissu/ckissu.go @@ -19,6 +19,7 @@ package ckissu import ( "fmt" "regexp" + "strconv" "strings" "sync" "time" @@ -232,7 +233,7 @@ func (i *Issu) getDatasourceInfo(connect *sql.DB, db, mvTableName string) (*Data return nil, err } } - log.Infof("getDatasourceInfo sql: %s createSql: %s ", sql, createSql) + log.Debugf("getDatasourceInfo sql: %s createSql: %s ", sql, createSql) var summable, unsummable, interval, baseTable string var matchs [4]string // 匹配 `packet_tx__agg` AggregateFunction(sum, UInt64), 中的 'sum' 为可累加聚合的方法 @@ -245,8 +246,8 @@ func (i *Issu) getDatasourceInfo(connect *sql.DB, db, mvTableName string) (*Data } // 匹配 toStartOfHour(time) AS time, 中的 'Hour' 为聚合时长 intervalReg := regexp.MustCompile("toStartOf([a-zA-Z]+)") - // 匹配 FROM vtap_flow.`1m_local` 中的'1m' 为原始数据源 - baseTableReg := regexp.MustCompile("FROM .*.`.*\\.(.*)_local`") + // 匹配 FROM vtap_flow.`1m_local` 中的'1m' 为原始数据源, + baseTableReg := regexp.MustCompile("FROM .*.`.*\\.(.*)_(local|agg)`") for i, reg := range []*regexp.Regexp{summableReg, unsummableReg, intervalReg, baseTableReg} { submatchs := reg.FindStringSubmatch(createSql) @@ -379,12 +380,14 @@ func (i *Issu) addColumnDatasource(index int, d *DatasourceInfo, isMapTable, isA if lastDotIndex < 0 { return nil, fmt.Errorf("invalid table name %s", d.name) } - dstTableName := d.name[lastDotIndex+1:] + rawTable := flow_metrics.GetMetricsTables(ckdb.MergeTree, common.CK_VERSION, ckdb.DF_CLUSTER, ckdb.DF_STORAGE_POLICY, i.ckdbType, 7, 1, 7, 1, i.cfg.GetCKDBColdStorages())[flow_metrics.MetricsTableNameToID(d.name[:lastDotIndex+1]+d.baseTable)] // create table mv - createMvSql := datasource.MakeMVTableCreateSQL( - rawTable, d.db, dstTableName, - d.summable, d.unsummable, d.interval) + aggrInterval := ckdb.AggregationHour + if d.interval == ckdb.TimeFuncDay { + aggrInterval = ckdb.AggregationDay + } + createMvSql := rawTable.MakeAggrMVTableCreateSQL(parseOrgId(d.db), aggrInterval) log.Info(createMvSql) _, err = Exec(connect, createMvSql) if err != nil { @@ -400,9 +403,7 @@ func (i *Issu) addColumnDatasource(index int, d *DatasourceInfo, isMapTable, isA } // create table local - createLocalSql := datasource.MakeCreateTableLocal( - rawTable, d.db, dstTableName, - d.summable, d.unsummable) + createLocalSql := rawTable.MakeAggrLocalTableCreateSQL(parseOrgId(d.db), aggrInterval) log.Info(createLocalSql) _, err = Exec(connect, createLocalSql) if err != nil { @@ -410,7 +411,7 @@ func (i *Issu) addColumnDatasource(index int, d *DatasourceInfo, isMapTable, isA } // create table global - createGlobalSql := datasource.MakeGlobalTableCreateSQL(rawTable, d.db, dstTableName) + createGlobalSql := rawTable.MakeAggrGlobalTableCreateSQL(parseOrgId(d.db), aggrInterval) log.Info(createGlobalSql) _, err = Exec(connect, createGlobalSql) if err != nil { @@ -1198,6 +1199,18 @@ func parseOrgDatabase(db string) (string, string) { return "", db } +func parseOrgId(db string) uint16 { + orgIdStr, _ := parseOrgDatabase(db) + if orgIdStr == "" { + return ckdb.DEFAULT_ORG_ID + } + orgId, err := strconv.Atoi(orgIdStr) + if err != nil { + return ckdb.DEFAULT_ORG_ID + } + return uint16(orgId) +} + func getOrgDatabase(db string, orgIDPrefix string) string { _, rawDb := parseOrgDatabase(db) return orgIDPrefix + rawDb diff --git a/server/ingester/exporters/universal_tag/universal_tag.go b/server/ingester/exporters/universal_tag/universal_tag.go index 14bb63cb95df..e9f6e9a8250c 100644 --- a/server/ingester/exporters/universal_tag/universal_tag.go +++ b/server/ingester/exporters/universal_tag/universal_tag.go @@ -356,7 +356,7 @@ func (u *UniversalTagsManager) Reload(orgId uint16) error { u.universalTagMaps[orgId] = u.GetUniversalTagMaps(response) - log.Infof("eporter update rpc universalTagNames version %d -> %d", u.versionUniversalTagMaps, newVersion, logger.NewORGPrefix(int(orgId))) + log.Infof("eporter update rpc universalTagNames version %d -> %d", u.versionUniversalTagMaps[orgId], newVersion, logger.NewORGPrefix(int(orgId))) u.versionUniversalTagMaps[orgId] = newVersion return nil