Skip to content

Commit

Permalink
API refactroing
Browse files Browse the repository at this point in the history
Deduplicate "pointsQuery" fields
Deduplicate "preparedTagValuesQuery" fields
  • Loading branch information
alpinskiy committed Nov 27, 2024
1 parent cef328b commit d76cdc4
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 81 deletions.
8 changes: 3 additions & 5 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1825,9 +1825,7 @@ func (h *requestHandler) handleGetMetricTagValues(ctx context.Context, req getMe
}

pq := &preparedTagValuesQuery{
metricID: metricMeta.MetricID,
preKeyTagX: format.TagIndex(metricMeta.PreKeyTagID),
preKeyTagID: metricMeta.PreKeyTagID,
metric: metricMeta,
tagID: tagID,
numResults: numResults,
filterIn: mappedFilterIn,
Expand Down Expand Up @@ -2962,7 +2960,7 @@ func loadPoints(ctx context.Context, h *requestHandler, pq *pointsQuery, lod dat
isFast := lod.IsFast()
isLight := cols.isLight()
IsHardware := cols.IsHardware()
metric := pq.metricID
metric := pq.metricID()
table := lod.Table
kind := pq.kind
start := time.Now()
Expand Down Expand Up @@ -3028,7 +3026,7 @@ func loadPoint(ctx context.Context, h *requestHandler, pq *pointsQuery, lod data
isFast := lod.IsFast()
isLight := cols.isLight()
isHardware := cols.IsHardware()
metric := pq.metricID
metric := pq.metricID()
table := lod.Table
kind := pq.kind
err = h.doSelect(ctx, util.QueryMetaInto{
Expand Down
20 changes: 7 additions & 13 deletions internal/api/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,9 @@ func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuer
var tx int // time index
for _, lod := range lods {
pq := pointsQuery{
metric: qry.Metric,
version: h.version,
user: h.accessInfo.user,
metricID: qry.Metric.MetricID,
preKeyTagX: format.TagIndex(qry.Metric.PreKeyTagID),
preKeyTagID: qry.Metric.PreKeyTagID,
kind: kind,
by: qry.GroupBy,
filterIn: qry.FilterIn,
Expand Down Expand Up @@ -651,11 +649,9 @@ func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuer
func (h *requestHandler) QueryTagValueIDs(ctx context.Context, qry promql.TagValuesQuery) ([]int32, error) {
var (
pq = &preparedTagValuesQuery{
metricID: qry.Metric.MetricID,
preKeyTagX: format.TagIndex(qry.Metric.PreKeyTagID),
preKeyTagID: qry.Metric.PreKeyTagID,
tagID: format.TagID(qry.TagIndex),
numResults: math.MaxInt - 1,
metric: qry.Metric,
tagID: format.TagID(qry.TagIndex),
numResults: math.MaxInt - 1,
}
tags = make(map[int32]bool)
)
Expand Down Expand Up @@ -692,11 +688,9 @@ func (h *requestHandler) QueryTagValueIDs(ctx context.Context, qry promql.TagVal
func (h *requestHandler) QueryStringTop(ctx context.Context, qry promql.TagValuesQuery) ([]string, error) {
var (
pq = &preparedTagValuesQuery{
metricID: qry.Metric.MetricID,
preKeyTagX: format.TagIndex(qry.Metric.PreKeyTagID),
preKeyTagID: qry.Metric.PreKeyTagID,
tagID: format.StringTopTagID,
numResults: math.MaxInt - 1,
metric: qry.Metric,
tagID: format.StringTopTagID,
numResults: math.MaxInt - 1,
}
tags = make(map[string]bool)
)
Expand Down
109 changes: 76 additions & 33 deletions internal/api/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
)

type preparedTagValuesQuery struct {
metricID int32
preKeyTagX int
preKeyTagID string
metric *format.MetricMetaValue
tagID string
numResults int
filterIn data_model.TagFilters
Expand All @@ -32,12 +30,9 @@ func (q *preparedTagValuesQuery) stringTag() bool {
}

type pointsQuery struct {
metric *format.MetricMetaValue
version string
user string
metricID int32
preKeyTagX int
preKeyTagID string
isStringTop bool
kind data_model.DigestKind
by []string
filterIn data_model.TagFilters
Expand All @@ -63,11 +58,11 @@ func (pq *pointsQuery) cacheKey() string {
sb.WriteString(Version3)
}
sb.WriteString(";m=")
sb.WriteString(fmt.Sprint(pq.metricID))
sb.WriteString(fmt.Sprint(pq.metricID()))
sb.WriteString(";pk=")
sb.WriteString(fmt.Sprint(pq.preKeyTagX))
sb.WriteString(pq.preKeyTagID())
sb.WriteString(";st=")
sb.WriteString(fmt.Sprint(pq.isStringTop))
sb.WriteString(fmt.Sprint(pq.isStringTop()))
sb.WriteString(";kind=")
sb.WriteString(fmt.Sprint(int(pq.kind)))
sb.WriteString(";by=")
Expand Down Expand Up @@ -177,20 +172,20 @@ func (pq *preparedTagValuesQuery) tagValuesQueryV2(lod data_model.LOD) (string,
// no need to escape anything as long as table and tag names are fixed
var sb strings.Builder
sb.WriteString("SELECT ")
sb.WriteString(mappedColumnName(lod.HasPreKey, pq.tagID, pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, pq.tagID, pq.preKeyTagID()))
sb.WriteString(" AS ")
sb.WriteString(valueName)
sb.WriteString(",toFloat64(")
sb.WriteString(sqlAggFn(lod.Version, "sum"))
sb.WriteString("(count)) AS _count FROM ")
sb.WriteString(pq.preKeyTableName(lod))
writeWhereTimeFilter(&sb, &lod)
writeMetricFilter(&sb, pq.metricID, pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeMetricFilter(&sb, pq.metricID(), pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeV1DateFilter(&sb, &lod)
for i, ids := range pq.filterIn.Tags {
if len(ids.Values) > 0 {
sb.WriteString(" AND ")
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID()))
sb.WriteString(" IN (")
expandTagsMapped(&sb, ids.Values)
sb.WriteString(")")
Expand All @@ -204,7 +199,7 @@ func (pq *preparedTagValuesQuery) tagValuesQueryV2(lod data_model.LOD) (string,
for i, ids := range pq.filterNotIn.Tags {
if len(ids.Values) > 0 {
sb.WriteString(" AND ")
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID()))
sb.WriteString(" NOT IN (")
expandTagsMapped(&sb, ids.Values)
sb.WriteString(")")
Expand All @@ -216,7 +211,7 @@ func (pq *preparedTagValuesQuery) tagValuesQueryV2(lod data_model.LOD) (string,
sb.WriteString(")")
}
sb.WriteString(" GROUP BY ")
sb.WriteString(mappedColumnName(lod.HasPreKey, pq.tagID, pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, pq.tagID, pq.preKeyTagID()))
sb.WriteString(" HAVING _count>0 ORDER BY _count DESC,")
sb.WriteString(valueName)
sb.WriteString(" LIMIT ")
Expand All @@ -234,7 +229,7 @@ func (pq *preparedTagValuesQuery) tagValuesQueryV3(lod data_model.LOD) (string,
sb.WriteString(" AS _unmapped,toFloat64(sum(count)) AS _count FROM ")
sb.WriteString(pq.preKeyTableName(lod))
writeWhereTimeFilter(&sb, &lod)
writeMetricFilter(&sb, pq.metricID, pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeMetricFilter(&sb, pq.metricID(), pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeTagCond(&sb, pq.filterIn, true)
writeTagCond(&sb, pq.filterNotIn, false)
sb.WriteString(" GROUP BY _mapped,_unmapped HAVING _count>0 ORDER BY _count,_mapped,_unmapped DESC LIMIT ")
Expand All @@ -259,7 +254,7 @@ func (pq *preparedTagValuesQuery) tagValueIDsQueryV3(lod data_model.LOD) (string
sb.WriteString(" AS _mapped,toFloat64(sum(count)) AS _count FROM ")
sb.WriteString(pq.preKeyTableName(lod))
writeWhereTimeFilter(&sb, &lod)
writeMetricFilter(&sb, pq.metricID, pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeMetricFilter(&sb, pq.metricID(), pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeTagCond(&sb, pq.filterIn, true)
writeTagCond(&sb, pq.filterNotIn, false)
sb.WriteString(" GROUP BY _mapped HAVING _count>0 ORDER BY _count,_mapped DESC LIMIT ")
Expand Down Expand Up @@ -414,7 +409,7 @@ type pointsQueryMeta struct {

func loadPointsSelectWhat(sb *strings.Builder, pq *pointsQuery, version string) (int, error) {
var (
isStringTop = pq.isStringTop
isStringTop = pq.isStringTop()
kind = pq.kind
)
if version == Version1 && isStringTop {
Expand Down Expand Up @@ -492,7 +487,7 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
sb.WriteString(fmt.Sprintf("toInt64(%d) AS _stepSec", lod.StepSec))
}
for _, b := range pq.by {
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID), b))
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID()), b))
}
cnt, err := loadPointsSelectWhat(&sb, pq, lod.Version)
if err != nil {
Expand All @@ -501,12 +496,12 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
sb.WriteString(" FROM ")
sb.WriteString(pq.preKeyTableName(lod))
writeWhereTimeFilter(&sb, &lod)
writeMetricFilter(&sb, pq.metricID, pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeMetricFilter(&sb, pq.metricID(), pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeV1DateFilter(&sb, &lod)
for i, ids := range pq.filterIn.Tags {
if len(ids.Values) > 0 {
sb.WriteString(" AND ")
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID()))
sb.WriteString(" IN (")
expandTagsMapped(&sb, ids.Values)
sb.WriteString(")")
Expand All @@ -520,7 +515,7 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
for i, ids := range pq.filterNotIn.Tags {
if len(ids.Values) > 0 {
sb.WriteString(" AND ")
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID))
sb.WriteString(mappedColumnName(lod.HasPreKey, format.TagID(i), pq.preKeyTagID()))
sb.WriteString(" NOT IN (")
expandTagsMapped(&sb, ids.Values)
sb.WriteString(")")
Expand All @@ -533,7 +528,7 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
}
sb.WriteString(" GROUP BY _time")
for _, b := range pq.by {
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID), b))
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID()), b))
}
var having bool
switch pq.kind {
Expand All @@ -554,7 +549,7 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
}
sb.WriteString(" ORDER BY _time")
for _, b := range pq.by {
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID), b))
sb.WriteString(fmt.Sprintf(",%s AS key%s", mappedColumnName(lod.HasPreKey, b, pq.preKeyTagID()), b))
}
if pq.sort == sortDescending {
sb.WriteString(" DESC")
Expand All @@ -563,7 +558,7 @@ func (pq *pointsQuery) loadPointsQueryV2(lod data_model.LOD, utcOffset int64, us
sb.WriteString(fmt.Sprintf(" LIMIT %v SETTINGS optimize_aggregation_in_order=1", limit))
cols := newPointsSelectColsV2(pointsQueryMeta{
queryMeta: queryMeta{
metricID: pq.metricID,
metricID: pq.metricID(),
kind: pq.kind,
user: pq.user,
},
Expand Down Expand Up @@ -599,7 +594,7 @@ func (pq *pointsQuery) loadPointsQueryV3(lod data_model.LOD, utcOffset int64, us
sb.WriteString(pq.preKeyTableName(lod))
writeWhereTimeFilter(&sb, &lod)
sb.WriteString(" AND index_type=0")
writeMetricFilter(&sb, pq.metricID, pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeMetricFilter(&sb, pq.metricID(), pq.filterIn.Metrics, pq.filterNotIn.Metrics, lod.Version)
writeTagCond(&sb, pq.filterIn, true)
writeTagCond(&sb, pq.filterNotIn, false)
sb.WriteString(" GROUP BY _time")
Expand Down Expand Up @@ -642,7 +637,7 @@ func (pq *pointsQuery) loadPointsQueryV3(lod data_model.LOD, utcOffset int64, us
sb.WriteString(" SETTINGS optimize_aggregation_in_order=1")
cols := newPointsSelectColsV3(pointsQueryMeta{
queryMeta: queryMeta{
metricID: pq.metricID,
metricID: pq.metricID(),
kind: pq.kind,
user: pq.user,
},
Expand All @@ -654,6 +649,52 @@ func (pq *pointsQuery) loadPointsQueryV3(lod data_model.LOD, utcOffset int64, us
return sb.String(), cols, err
}

func (pq *pointsQuery) isStringTop() bool {
return pq.metric != nil && pq.metric.StringTopDescription != ""
}

func (pq *pointsQuery) metricID() int32 {
if pq.metric == nil {
return 0
}
return pq.metric.MetricID
}

func (pq *pointsQuery) preKeyTagID() string {
if pq.metric == nil {
return ""
}
return pq.metric.PreKeyTagID
}

func (pq *pointsQuery) preKeyTagX() int {
if pq.metric == nil {
return -1
}
return format.TagIndex(pq.metric.PreKeyTagID)
}

func (pq *preparedTagValuesQuery) metricID() int32 {
if pq.metric == nil {
return 0
}
return pq.metric.MetricID
}

func (pq *preparedTagValuesQuery) preKeyTagID() string {
if pq.metric == nil {
return ""
}
return pq.metric.PreKeyTagID
}

func (pq *preparedTagValuesQuery) preKeyTagX() int {
if pq.metric == nil {
return -1
}
return format.TagIndex(pq.metric.PreKeyTagID)
}

func sqlAggFn(version string, fn string) string {
if version == Version1 {
return fn + "Merge"
Expand Down Expand Up @@ -755,12 +796,13 @@ func (s *stringFixed) String() string {
func (pq *pointsQuery) preKeyTableName(lod data_model.LOD) string {
var usePreKey bool
if lod.HasPreKey {
preKeyTagX := pq.preKeyTagX()
usePreKey = lod.PreKeyOnly ||
pq.filterIn.Contains(pq.preKeyTagX) ||
pq.filterNotIn.Contains(pq.preKeyTagX)
pq.filterIn.Contains(preKeyTagX) ||
pq.filterNotIn.Contains(preKeyTagX)
if !usePreKey {
for _, v := range pq.by {
if v == format.TagID(pq.preKeyTagX) {
if v == pq.preKeyTagID() {
usePreKey = true
break
}
Expand All @@ -774,11 +816,12 @@ func (pq *pointsQuery) preKeyTableName(lod data_model.LOD) string {
}

func (pq *preparedTagValuesQuery) preKeyTableName(lod data_model.LOD) string {
preKeyTagX := pq.preKeyTagX()
usePreKey := (lod.HasPreKey &&
(lod.PreKeyOnly ||
(pq.tagID != "" && pq.tagID == pq.preKeyTagID) ||
pq.filterIn.Contains(pq.preKeyTagX) ||
pq.filterNotIn.Contains(pq.preKeyTagX)))
(pq.tagID != "" && pq.tagID == pq.preKeyTagID()) ||
pq.filterIn.Contains(preKeyTagX) ||
pq.filterNotIn.Contains(preKeyTagX)))
if usePreKey {
return preKeyTableNames[lod.Table]
}
Expand Down
Loading

0 comments on commit d76cdc4

Please sign in to comment.