Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TOPK function spans timeshifs #710

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 45 additions & 24 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
@@ -290,6 +290,7 @@ type (
}

DashboardVar struct {
Name string `json:"name"`
Args DashboardVarArgs `json:"args"`
Vals []string `json:"values"`
Link [][]int `json:"link"`
@@ -377,6 +378,7 @@ type (
rand *rand.Rand
stat *endpointStat
timeNow time.Time
vars map[string]promql.Variable
}

//easyjson:json
@@ -419,6 +421,7 @@ type (
renderRequest struct {
ai accessInfo
seriesRequest []seriesRequest
vars map[string]promql.Variable
renderWidth string
renderFormat string
}
@@ -1750,7 +1753,7 @@ func (h *Handler) HandleSeriesQuery(w http.ResponseWriter, r *http.Request) {
return
}
// Parse request
qry, err := h.parseHTTPRequest(r)
qry, _, err := h.parseHTTPRequest(r)
if err != nil {
respondJSON(w, nil, 0, 0, err, h.verbose, ai.user, sl)
return
@@ -1892,7 +1895,7 @@ func (h *Handler) HandleSeriesQuery(w http.ResponseWriter, r *http.Request) {

func (h *Handler) handleSeriesQueryPromQL(w http.ResponseWriter, r *http.Request, sl *endpointStat, ai accessInfo) {
// Parse request
qry, err := h.parseHTTPRequest(r)
qry, vars, err := h.parseHTTPRequest(r)
if err != nil {
respondJSON(w, nil, 0, 0, err, h.verbose, ai.user, sl)
return
@@ -1927,6 +1930,7 @@ func (h *Handler) handleSeriesQueryPromQL(w http.ResponseWriter, r *http.Request
res, freeRes, err = h.handlePromqlQuery(withHTTPEndpointStat(ctx, sl), ai, qry, seriesRequestOptions{
debugQueries: true,
stat: sl,
vars: vars,
metricNameCallback: func(name string) {
qry.metricWithNamespace = name
g.Go(func() error {
@@ -1943,6 +1947,7 @@ func (h *Handler) handleSeriesQueryPromQL(w http.ResponseWriter, r *http.Request
res, freeRes, err = h.handlePromqlQuery(withHTTPEndpointStat(ctx, sl), ai, qry, seriesRequestOptions{
debugQueries: true,
stat: sl,
vars: vars,
})
}
var traces []string
@@ -2096,6 +2101,7 @@ func (h *Handler) handlePromqlQuery(ctx context.Context, ai accessInfo, req seri
}
},
SeriesQueryCallback: seriesQueryCallback,
Vars: opt.vars,
}
)
if req.widthKind == widthAutoRes {
@@ -2523,7 +2529,7 @@ func (h *Handler) HandleGetPoint(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), h.querySelectTimeout)
defer cancel()

req, err := h.parseHTTPRequest(r)
req, _, err := h.parseHTTPRequest(r)
if err != nil {
respondJSON(w, nil, 0, 0, err, h.verbose, ai.user, sl)
return
@@ -2746,7 +2752,7 @@ func (h *Handler) HandleGetRender(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), h.querySelectTimeout)
defer cancel()

s, err := h.parseHTTPRequestS(r, 12)
s, vars, err := h.parseHTTPRequestS(r, 12)
if err != nil {
respondJSON(w, nil, 0, 0, err, h.verbose, ai.user, sl)
return
@@ -2756,6 +2762,7 @@ func (h *Handler) HandleGetRender(w http.ResponseWriter, r *http.Request) {
ctx, ai,
renderRequest{
seriesRequest: s,
vars: vars,
renderWidth: r.FormValue(paramRenderWidth),
renderFormat: r.FormValue(paramDataFormat),
})
@@ -2853,9 +2860,12 @@ func (h *Handler) handleGetRender(ctx context.Context, ai accessInfo, req render
err error
start = time.Now()
)
data, cancel, err = h.handlePromqlQuery(ctx, ai, r, seriesRequestOptions{metricNameCallback: func(s string) {
req.seriesRequest[i].metricWithNamespace = s
}})
data, cancel, err = h.handlePromqlQuery(ctx, ai, r, seriesRequestOptions{
vars: req.vars,
metricNameCallback: func(s string) {
req.seriesRequest[i].metricWithNamespace = s
},
})
if err != nil {
return nil, false, err
}
@@ -3648,18 +3658,18 @@ func getQueryRespEqual(a, b *SeriesResponse) bool {
return true
}

func (h *Handler) parseHTTPRequest(r *http.Request) (seriesRequest, error) {
res, err := h.parseHTTPRequestS(r, 1)
func (h *Handler) parseHTTPRequest(r *http.Request) (seriesRequest, map[string]promql.Variable, error) {
res, vars, err := h.parseHTTPRequestS(r, 1)
if err != nil {
return seriesRequest{}, err
return seriesRequest{}, nil, err
}
if len(res) == 0 {
return seriesRequest{}, httpErr(http.StatusBadRequest, fmt.Errorf("request is empty"))
return seriesRequest{}, nil, httpErr(http.StatusBadRequest, fmt.Errorf("request is empty"))
}
return res[0], nil
return res[0], vars, nil
}

func (h *Handler) parseHTTPRequestS(r *http.Request, maxTabs int) (res []seriesRequest, err error) {
func (h *Handler) parseHTTPRequestS(r *http.Request, maxTabs int) (res []seriesRequest, env map[string]promql.Variable, err error) {
defer func() {
var dummy httpError
if err != nil && !errors.As(err, &dummy) {
@@ -3763,7 +3773,13 @@ func (h *Handler) parseHTTPRequestS(r *http.Request, maxTabs int) (res []seriesR
tab.maxHost = v.MaxHost
n++
}
env = make(map[string]promql.Variable)
for _, v := range dash.Vars {
env[v.Name] = promql.Variable{
Value: v.Vals,
Group: v.Args.Group,
Negate: v.Args.Negate,
}
for _, link := range v.Link {
if len(link) != 2 {
continue
@@ -3894,7 +3910,7 @@ func (h *Handler) parseHTTPRequestS(r *http.Request, maxTabs int) (res []seriesR
switch s[1] {
case "g":
varByName(s[0]).group = first(v)
case "nk":
case "nv":
varByName(s[0]).negate = first(v)
}
}
@@ -3949,7 +3965,7 @@ func (h *Handler) parseHTTPRequestS(r *http.Request, maxTabs int) (res []seriesR
var tid string
tid, err = parseTagID(s)
if err != nil {
return nil, err
return nil, nil, err
}
t.by = append(t.by, tid)
}
@@ -3969,7 +3985,7 @@ func (h *Handler) parseHTTPRequestS(r *http.Request, maxTabs int) (res []seriesR
case Version1, Version2:
t.version = s
default:
return nil, fmt.Errorf("invalid version: %q", s)
return nil, nil, fmt.Errorf("invalid version: %q", s)
}
case ParamWidth:
t.strWidth = first(v)
@@ -3987,17 +4003,22 @@ func (h *Handler) parseHTTPRequestS(r *http.Request, maxTabs int) (res []seriesR
t.expandToLODBoundary = true
}
if err != nil {
return nil, err
return nil, nil, err
}
}
if len(tabs) == 0 {
return nil, nil
return nil, nil, nil
}
for _, v := range vars {
vv := varM[v.name]
if vv == nil {
continue
}
env[v.name] = promql.Variable{
Value: vv.val,
Group: vv.group == "1",
Negate: vv.negate == "1",
}
for _, link := range v.link {
if len(link) != 2 {
continue
@@ -4068,28 +4089,28 @@ func (h *Handler) parseHTTPRequestS(r *http.Request, maxTabs int) (res []seriesR
if len(tab0.strFrom) != 0 || len(tab0.strTo) != 0 {
tab0.from, tab0.to, err = parseFromTo(tab0.strFrom, tab0.strTo)
if err != nil {
return nil, err
return nil, nil, err
}
}
err = finalize(tab0)
if err != nil {
return nil, err
return nil, nil, err
}
for i := range tabs[1:] {
t := &tabs[i+1]
t.from = tab0.from
t.to = tab0.to
err = finalize(t)
if err != nil {
return nil, err
return nil, nil, err
}
}
// build resulting slice
if tabX != -1 {
if tabs[tabX].strType == "1" {
return nil, nil
return nil, nil, nil
}
return []seriesRequest{tabs[tabX].seriesRequest}, nil
return []seriesRequest{tabs[tabX].seriesRequest}, env, nil
}
res = make([]seriesRequest, 0, len(tabs))
for _, t := range tabs {
@@ -4100,7 +4121,7 @@ func (h *Handler) parseHTTPRequestS(r *http.Request, maxTabs int) (res []seriesR
res = append(res, t.seriesRequest)
}
}
return res, nil
return res, env, nil
}

func (r *DashboardTimeRange) UnmarshalJSON(bs []byte) error {
4 changes: 2 additions & 2 deletions internal/api/promql.go
Original file line number Diff line number Diff line change
@@ -536,7 +536,7 @@ func (h *Handler) QueryTagValueIDs(ctx context.Context, qry promql.TagValuesQuer
panic("metric access violation") // should not happen
}
var (
version = promqlVersionOrDefault(qry.Version)
version = promqlVersionOrDefault(qry.Options.Version)
fl = qry.Timescale.LODs[0] // first LOD
ll = qry.Timescale.LODs[len(qry.Timescale.LODs)-1] // last LOD
lods = selectTagValueLODs(
@@ -594,7 +594,7 @@ func (h *Handler) QuerySTagValues(ctx context.Context, qry promql.TagValuesQuery
panic("metric access violation") // should not happen
}
var (
version = promqlVersionOrDefault(qry.Version)
version = promqlVersionOrDefault(qry.Options.Version)
fl = qry.Timescale.LODs[0] // first LOD
ll = qry.Timescale.LODs[len(qry.Timescale.LODs)-1] // last LOD
lods = selectTagValueLODs(
Loading
Loading