Skip to content

Commit

Permalink
fix: parse 1d aggr table failed
Browse files Browse the repository at this point in the history
  • Loading branch information
lzf575 committed Dec 9, 2024
1 parent 74b1f81 commit d68b181
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
35 changes: 24 additions & 11 deletions server/ingester/ckissu/ckissu.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ckissu
import (
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -232,7 +233,7 @@ func (i *Issu) getDatasourceInfo(connect *sql.DB, db, mvTableName string) (*Data
return nil, err
}
}
log.Infof("getDatasourceInfo sql: %s createSql: %s ", sql, createSql)
log.Debugf("getDatasourceInfo sql: %s createSql: %s ", sql, createSql)
var summable, unsummable, interval, baseTable string
var matchs [4]string
// 匹配 `packet_tx__agg` AggregateFunction(sum, UInt64), 中的 'sum' 为可累加聚合的方法
Expand All @@ -245,8 +246,8 @@ func (i *Issu) getDatasourceInfo(connect *sql.DB, db, mvTableName string) (*Data
}
// 匹配 toStartOfHour(time) AS time, 中的 'Hour' 为聚合时长
intervalReg := regexp.MustCompile("toStartOf([a-zA-Z]+)")
// 匹配 FROM vtap_flow.`1m_local` 中的'1m' 为原始数据源
baseTableReg := regexp.MustCompile("FROM .*.`.*\\.(.*)_local`")
// 匹配 FROM vtap_flow.`1m_local` 中的'1m' 为原始数据源,
baseTableReg := regexp.MustCompile("FROM .*.`.*\\.(.*)_(local|agg)`")

for i, reg := range []*regexp.Regexp{summableReg, unsummableReg, intervalReg, baseTableReg} {
submatchs := reg.FindStringSubmatch(createSql)
Expand Down Expand Up @@ -379,12 +380,14 @@ func (i *Issu) addColumnDatasource(index int, d *DatasourceInfo, isMapTable, isA
if lastDotIndex < 0 {
return nil, fmt.Errorf("invalid table name %s", d.name)
}
dstTableName := d.name[lastDotIndex+1:]

rawTable := flow_metrics.GetMetricsTables(ckdb.MergeTree, common.CK_VERSION, ckdb.DF_CLUSTER, ckdb.DF_STORAGE_POLICY, i.ckdbType, 7, 1, 7, 1, i.cfg.GetCKDBColdStorages())[flow_metrics.MetricsTableNameToID(d.name[:lastDotIndex+1]+d.baseTable)]
// create table mv
createMvSql := datasource.MakeMVTableCreateSQL(
rawTable, d.db, dstTableName,
d.summable, d.unsummable, d.interval)
aggrInterval := ckdb.AggregationHour
if d.interval == ckdb.TimeFuncDay {
aggrInterval = ckdb.AggregationDay
}
createMvSql := rawTable.MakeAggrMVTableCreateSQL(parseOrgId(d.db), aggrInterval)
log.Info(createMvSql)
_, err = Exec(connect, createMvSql)
if err != nil {
Expand All @@ -400,17 +403,15 @@ func (i *Issu) addColumnDatasource(index int, d *DatasourceInfo, isMapTable, isA
}

// create table local
createLocalSql := datasource.MakeCreateTableLocal(
rawTable, d.db, dstTableName,
d.summable, d.unsummable)
createLocalSql := rawTable.MakeAggrLocalTableCreateSQL(parseOrgId(d.db), aggrInterval)
log.Info(createLocalSql)
_, err = Exec(connect, createLocalSql)
if err != nil {
return nil, err
}

// create table global
createGlobalSql := datasource.MakeGlobalTableCreateSQL(rawTable, d.db, dstTableName)
createGlobalSql := rawTable.MakeAggrGlobalTableCreateSQL(parseOrgId(d.db), aggrInterval)
log.Info(createGlobalSql)
_, err = Exec(connect, createGlobalSql)
if err != nil {
Expand Down Expand Up @@ -1198,6 +1199,18 @@ func parseOrgDatabase(db string) (string, string) {
return "", db
}

func parseOrgId(db string) uint16 {
orgIdStr, _ := parseOrgDatabase(db)
if orgIdStr == "" {
return ckdb.DEFAULT_ORG_ID
}
orgId, err := strconv.Atoi(orgIdStr)
if err != nil {
return ckdb.DEFAULT_ORG_ID
}
return uint16(orgId)
}

func getOrgDatabase(db string, orgIDPrefix string) string {
_, rawDb := parseOrgDatabase(db)
return orgIDPrefix + rawDb
Expand Down
2 changes: 1 addition & 1 deletion server/ingester/exporters/universal_tag/universal_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (u *UniversalTagsManager) Reload(orgId uint16) error {

u.universalTagMaps[orgId] = u.GetUniversalTagMaps(response)

log.Infof("eporter update rpc universalTagNames version %d -> %d", u.versionUniversalTagMaps, newVersion, logger.NewORGPrefix(int(orgId)))
log.Infof("eporter update rpc universalTagNames version %d -> %d", u.versionUniversalTagMaps[orgId], newVersion, logger.NewORGPrefix(int(orgId)))
u.versionUniversalTagMaps[orgId] = newVersion

return nil
Expand Down

0 comments on commit d68b181

Please sign in to comment.