Skip to content

Commit

Permalink
Badges HTTP request handler
Browse files Browse the repository at this point in the history
  • Loading branch information
alpinskiy committed Nov 20, 2024
1 parent fb25f2a commit e6b137c
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 19 deletions.
1 change: 1 addition & 0 deletions cmd/statshouse-api/statshouse-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ func run(argv args, cfg *api.Config, vkuthPublicKeys map[string][]byte) error {
a.Path("/" + api.EndpointMetric).Methods("POST").HandlerFunc(api.HandlePostMetric)
a.Path("/" + api.EndpointResetFlood).Methods("POST").HandlerFunc(api.HandlePostResetFlood)
a.Path("/" + api.EndpointQuery).Methods("GET").HandlerFunc(api.HandleSeriesQuery)
a.Path("/badges").Methods("GET", "POST").HandlerFunc(api.HandleBadgesQuery)
a.Path("/" + api.EndpointPoint).Methods("GET").HandlerFunc(api.HandlePointQuery)
a.Path("/" + api.EndpointPoint).Methods("POST").HandlerFunc(api.HandlePointQuery)
a.Path("/" + api.EndpointTable).Methods("GET").HandlerFunc(api.HandleGetTable)
Expand Down
81 changes: 81 additions & 0 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,6 +1968,87 @@ func HandleSeriesQuery(r *httpRequestHandler) {
}
}

func HandleBadgesQuery(r *httpRequestHandler) {
var err error
var req seriesRequest
if req, err = r.parseSeriesRequest(); err == nil {
err = req.validate(&r.requestHandler)
}
if err != nil {
respondJSON(r, nil, 0, 0, err)
return
}
var limit int
if len(req.promQL) == 0 {
if req.promQL, err = r.getPromQuery(req); err != nil {
respondJSON(r, nil, 0, 0, err)
}
} else {
limit = req.numResults
}
ctx, cancel := context.WithTimeout(r.Context(), r.querySelectTimeout)
defer cancel()
var offsets = make([]int64, 0, len(req.shifts))
for _, v := range req.shifts {
offsets = append(offsets, -toSec(v))
}
ev, err := r.promEngine.NewEvaluator(
ctx, r,
promql.Query{
Start: req.from.Unix(),
End: req.to.Unix(),
Step: req.step,
Expr: req.promQL,
Options: promql.Options{
Version: req.version,
Version3Start: r.Version3Start.Load(),
AvoidCache: req.avoidCache,
Extend: req.excessPoints,
ExplicitGrouping: true,
QuerySequential: r.querySequential,
ScreenWidth: req.screenWidth,
MaxHost: req.maxHost,
Offsets: offsets,
Limit: limit,
Vars: req.vars,
Compat: req.compat,
},
})
if err != nil {
respondJSON(r, nil, 0, 0, err)
return
}
var res SeriesResponse
if metric := ev.QueryMetric(); metric != nil {
badges, cancel := r.queryBadges(ctx, req, metric)
defer cancel()
for _, d := range badges.Series.Data {
if t, ok := d.Tags.ID2Tag["2"]; !ok || t.SValue != metric.Name {
continue
}
if t, ok := d.Tags.ID2Tag["1"]; ok {
badgeType := t.Value
if t, ok = d.Tags.ID2Tag[promql.LabelWhat]; ok {
what := data_model.DigestWhat(t.Value)
switch {
case what == data_model.DigestAvg && badgeType == format.TagValueIDBadgeAgentSamplingFactor:
res.SamplingFactorSrc = sumSeries(d.Values, 1) / float64(len(badges.Time))
case what == data_model.DigestAvg && badgeType == format.TagValueIDBadgeAggSamplingFactor:
res.SamplingFactorAgg = sumSeries(d.Values, 1) / float64(len(badges.Time))
case what == data_model.DigestCountRaw && badgeType == format.TagValueIDBadgeIngestionErrors:
res.ReceiveErrors = sumSeries(d.Values, 0)
case what == data_model.DigestCountRaw && badgeType == format.TagValueIDBadgeIngestionWarnings:
res.ReceiveWarnings = sumSeries(d.Values, 0)
case what == data_model.DigestCountRaw && badgeType == format.TagValueIDBadgeAggMappingErrors:
res.MappingErrors = sumSeries(d.Values, 0)
}
}
}
}
}
respondJSON(r, res, queryClientCache, queryClientCacheStale, nil)
}

func HandleFrontendStat(r *httpRequestHandler) {
if r.accessInfo.service {
// statistics from bots isn't welcome
Expand Down
11 changes: 5 additions & 6 deletions internal/data_model/timescale.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Timescale struct {
UTCOffset int64
Time []int64
LODs []TimescaleLOD
Start int64
End int64
Step int64 // aggregation interval requested (former "desiredStepMul")
StartX int // requested time interval starts at "Time[StartX]"
ViewStartX int
Expand Down Expand Up @@ -325,6 +327,8 @@ func GetTimescale(args GetTimescaleArgs) (Timescale, error) {
res := Timescale{
Location: args.Location,
UTCOffset: args.UTCOffset,
Start: args.Start,
End: args.End,
Step: args.Step,
}
var resLen int
Expand Down Expand Up @@ -507,12 +511,7 @@ func (t *Timescale) Empty() bool {
}

func (t *Timescale) Duration() time.Duration {
start := t.Time[t.ViewStartX]
end := start
if 0 < t.ViewEndX {
end = t.Time[t.ViewEndX-1] + 1
}
return time.Duration(end-start) * time.Second
return time.Duration(t.End-t.Start) * time.Second
}

func (t *Timescale) appendLOD(lod TimescaleLOD) {
Expand Down
39 changes: 26 additions & 13 deletions internal/promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,25 @@ func GetMetricNameMatchers(expr string, res []*labels.Matcher) ([]*labels.Matche
}

func (ng Engine) Exec(ctx context.Context, h Handler, qry Query) (parser.Value, func(), error) {
// parse query
ev, err := ng.newEvaluator(ctx, h, qry)
ev, err := ng.NewEvaluator(ctx, h, qry)
if err != nil {
return nil, nil, Error{what: err}
return nil, nil, err
}
return ev.Run()
}

func (ev *evaluator) Run() (parser.Value, func(), error) {
// parse query
if ev.t.Empty() {
return &TimeSeries{Time: []int64{}}, func() {}, nil
}
if e, ok := ev.ast.(*parser.StringLiteral); ok {
return String{T: qry.Start, V: e.Val}, func() {}, nil
return String{T: ev.t.Start, V: e.Val}, func() {}, nil
}
// evaluate query
ev.Tracef(ev.ast.String())
if ev.opt.Debug {
ev.Tracef("requested from %d to %d, timescale from %d to %d", qry.Start, qry.End, ev.t.Time[ev.t.StartX], ev.t.Time[len(ev.t.Time)-1])
ev.Tracef("requested from %d to %d, timescale from %d to %d", ev.t.Start, ev.t.End, ev.t.Time[ev.t.StartX], ev.t.Time[len(ev.t.Time)-1])
}
var ok bool
defer func() {
Expand All @@ -179,16 +183,25 @@ func (ng Engine) Exec(ctx context.Context, h Handler, qry Query) (parser.Value,
// resolve int32 tag values into strings
for _, dat := range res.Series.Data {
for _, tg := range dat.Tags.ID2Tag {
tg.stringify(&ev)
tg.stringify(ev)
}
}
ev.Tracef("buffers alloc #%d, reuse #%d, %s", len(ev.allocMap)+len(ev.freeList), len(ev.reuseList), res.String())
ev.reportStat(qry, time.Now())
ev.reportStat(time.Now())
ok = true // prevents deffered "cancel"
return &res, ev.cancel, nil
}

func (ng Engine) newEvaluator(ctx context.Context, h Handler, qry Query) (evaluator, error) {
func (ev *evaluator) QueryMetric() *format.MetricMetaValue {
if len(ev.QueryStat.MetricOffset) == 1 {
for v := range ev.QueryStat.MetricOffset {
return v
}
}
return nil
}

func (ng Engine) NewEvaluator(ctx context.Context, h Handler, qry Query) (evaluator, error) {
timeStart := time.Now()
if qry.Options.TimeNow == 0 {
// fix the time "now"
Expand All @@ -209,7 +222,7 @@ func (ng Engine) newEvaluator(ctx context.Context, h Handler, qry Query) (evalua
var err error
ev.ast, err = parser.ParseExpr(qry.Expr)
if err != nil {
return evaluator{}, err
return evaluator{}, Error{what: err}
}
if v, ok := evalLiteral(ev.ast); ok {
ev.ast = v
Expand Down Expand Up @@ -246,7 +259,7 @@ func (ng Engine) newEvaluator(ctx context.Context, h Handler, qry Query) (evalua
return err
})
if err != nil {
return evaluator{}, err
return evaluator{}, Error{what: err}
}
// widen time range to accommodate range selectors and ensure instant query won't return empty result
qry.Start -= maxRange
Expand All @@ -270,7 +283,7 @@ func (ng Engine) newEvaluator(ctx context.Context, h Handler, qry Query) (evalua
})

if err != nil || ev.t.Empty() {
return evaluator{}, err
return evaluator{}, Error{what: err}
}
// evaluate reduction rules
ev.ars = make(map[parser.Expr]parser.Expr)
Expand Down Expand Up @@ -1503,11 +1516,11 @@ func (ev *evaluator) newWindow(v []float64, s bool) window {
return newWindow(ev.time(), v, ev.r, ev.t.LODs[len(ev.t.LODs)-1].Step, s)
}

func (ev *evaluator) reportStat(qry Query, timeEnd time.Time) {
func (ev *evaluator) reportStat(timeEnd time.Time) {
tags := statshouse.Tags{
1: srvfunc.HostnameForStatshouse(),
}
r := qry.End - qry.Start
r := ev.t.End - ev.t.Start
x := 2
switch {
// add one because UI always requests one second more
Expand Down

0 comments on commit e6b137c

Please sign in to comment.