Skip to content

Commit

Permalink
Query duration TOP
Browse files Browse the repository at this point in the history
  • Loading branch information
alpinskiy committed Nov 27, 2024
1 parent 5e32083 commit 9e4fbc9
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 27 deletions.
3 changes: 2 additions & 1 deletion cmd/statshouse-api/statshouse-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ func run(argv args, cfg *api.Config, vkuthPublicKeys map[string][]byte) error {
m.Path("/debug/pprof/trace").Methods("GET").HandlerFunc(api.HandleProfTrace)
m.Path("/debug/pprof/symbol").Methods("GET").HandlerFunc(api.HandleProfSymbol)
m.Path("/debug/500").Methods("GET").HandlerFunc(api.DumpInternalServerErrors)
m.Path("/debug/top").Methods("GET").HandlerFunc(api.DumpQueryTop)
m.Path("/debug/top/mem").Methods("GET").HandlerFunc(api.DumpQueryTopMemUsage)
m.Path("/debug/top/time").Methods("GET").HandlerFunc(api.DumpQueryTopDuration)
m.Path("/debug/tag/draft").Methods("GET").HandlerFunc(api.HandleTagDraftList)
m.Router.PathPrefix("/").Methods("GET", "HEAD").HandlerFunc(f.HandleStatic)

Expand Down
102 changes: 84 additions & 18 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,22 @@ type (
errorX int

// TOP queries by memory usage
queryTop []queryInfo
queryTopMu sync.Mutex
queryTopMemUsage []queryTopMemUsage
queryTopMemUsageMu sync.Mutex
queryTopDuration []queryTopDuration
queryTopDurationMu sync.Mutex
}

queryInfo struct {
queryTopMemUsage struct {
queryArgs
queryStatistics
queryMemUsage
protocol int
user string
}

queryTopDuration struct {
queryArgs
duration time.Duration
protocol int
user string
}
Expand All @@ -228,7 +237,7 @@ type (
end int64
}

queryStatistics struct {
queryMemUsage struct {
rowCount int
colCount int
memUsage int
Expand Down Expand Up @@ -2993,6 +3002,7 @@ func loadPoints(ctx context.Context, h *requestHandler, pq *pointsQuery, lod dat
return nil
}})
duration := time.Since(start)
h.reportQueryDuration(duration)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -3380,14 +3390,14 @@ func (h *requestHandler) init(accessToken, version string) (err error) {
return nil
}

func (h *requestHandler) reportQueryDataSize(rowCount, colCount int) {
func (h *requestHandler) reportQueryMemUsage(rowCount, colCount int) {
memUsage := 8 * rowCount * colCount
if memUsage <= 0 {
return
}
h.queryTopMu.Lock()
defer h.queryTopMu.Unlock()
s := h.queryTop
h.queryTopMemUsageMu.Lock()
defer h.queryTopMemUsageMu.Unlock()
s := h.queryTopMemUsage
i := len(s)
for ; i > 0 && s[i-1].memUsage < memUsage; i-- {
// pass
Expand All @@ -3397,41 +3407,84 @@ func (h *requestHandler) reportQueryDataSize(rowCount, colCount int) {
switch i {
case 0:
if len(s) == 0 {
s = make([]queryInfo, 0, maxLen+1)
s = append(s, h.getQueryInfo(rowCount, colCount, memUsage))
s = make([]queryTopMemUsage, 0, maxLen+1)
s = append(s, h.queryMemUsage(rowCount, colCount, memUsage))
} else {
s = append(s[:1], s...)
if len(s) > maxLen {
s = s[:maxLen]
}
s[0] = h.queryMemUsage(rowCount, colCount, memUsage)
}
top = true
case len(s):
if len(s) < maxLen && s[len(s)-1].expr != h.query.Expr {
s = append(s, h.queryMemUsage(rowCount, colCount, memUsage))
top = true
}
default:
if s[i-1].expr != h.query.Expr {
s = append(s[:i+1], s[i+1:]...)
s[i] = h.queryMemUsage(rowCount, colCount, memUsage)
top = true
}
}
if top {
h.queryTopMemUsage = s
}
}

func (h *requestHandler) reportQueryDuration(d time.Duration) {
if d <= 0 {
return
}
h.queryTopDurationMu.Lock()
defer h.queryTopDurationMu.Unlock()
s := h.queryTopDuration
i := len(s)
for ; i > 0 && s[i-1].duration < d; i-- {
// pass
}
var top bool
const maxLen = 100
switch i {
case 0:
if len(s) == 0 {
s = make([]queryTopDuration, 0, maxLen+1)
s = append(s, h.queryDuration(d))
} else {
s = append(s[:1], s...)
if len(s) > maxLen {
s = s[:maxLen]
}
s[0] = h.getQueryInfo(rowCount, colCount, memUsage)
s[0] = h.queryDuration(d)
}
top = true
case len(s):
if len(s) < maxLen && s[len(s)-1].expr != h.query.Expr {
s = append(s, h.getQueryInfo(rowCount, colCount, memUsage))
s = append(s, h.queryDuration(d))
top = true
}
default:
if s[i-1].expr != h.query.Expr {
s = append(s[:i+1], s[i+1:]...)
s[i] = h.getQueryInfo(rowCount, colCount, memUsage)
s[i] = h.queryDuration(d)
top = true
}
}
if top {
h.queryTop = s
h.queryTopDuration = s
}
}

func (h *requestHandler) getQueryInfo(rowCount, colCount, memUsage int) queryInfo {
return queryInfo{
func (h *requestHandler) queryMemUsage(rowCount, colCount, memUsage int) queryTopMemUsage {
return queryTopMemUsage{
queryArgs: queryArgs{
expr: h.query.Expr,
start: h.query.Start,
end: h.query.End,
},
queryStatistics: queryStatistics{
queryMemUsage: queryMemUsage{
rowCount: rowCount,
colCount: colCount,
memUsage: memUsage,
Expand All @@ -3441,6 +3494,19 @@ func (h *requestHandler) getQueryInfo(rowCount, colCount, memUsage int) queryInf
}
}

func (h *requestHandler) queryDuration(d time.Duration) queryTopDuration {
return queryTopDuration{
queryArgs: queryArgs{
expr: h.query.Expr,
start: h.query.Start,
end: h.query.End,
},
duration: d,
protocol: h.endpointStat.protocol,
user: h.endpointStat.user,
}
}

func HandleTagDraftList(r *httpRequestHandler) {
m := make(map[string][]string)
for _, metric := range r.metricsStorage.GetMetaMetricList(false) {
Expand Down
47 changes: 40 additions & 7 deletions internal/api/http_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,22 @@ func DumpInternalServerErrors(r *httpRequestHandler) {
}
}

func DumpQueryTop(r *httpRequestHandler) {
func DumpQueryTopMemUsage(r *httpRequestHandler) {
w := r.Response()
if ok := r.accessInfo.insecureMode || r.accessInfo.bitAdmin; !ok {
w.WriteHeader(http.StatusForbidden)
return
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
var s []queryInfo
r.queryTopMu.Lock()
var s []queryTopMemUsage
r.queryTopMemUsageMu.Lock()
if r.FormValue("reset") != "" {
r.queryTop, s = s, r.queryTop
r.queryTopMemUsage, s = s, r.queryTopMemUsage
} else {
s = make([]queryInfo, 0, len(r.queryTop))
s = append(s, r.queryTop...)
s = make([]queryTopMemUsage, 0, len(r.queryTopMemUsage))
s = append(s, r.queryTopMemUsage...)
}
r.queryTopMu.Unlock()
r.queryTopMemUsageMu.Unlock()
for _, v := range s {
var protocol string
switch v.protocol {
Expand All @@ -198,6 +198,39 @@ func DumpQueryTop(r *httpRequestHandler) {
}
}

func DumpQueryTopDuration(r *httpRequestHandler) {
w := r.Response()
if ok := r.accessInfo.insecureMode || r.accessInfo.bitAdmin; !ok {
w.WriteHeader(http.StatusForbidden)
return
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
var s []queryTopDuration
r.queryTopDurationMu.Lock()
if r.FormValue("reset") != "" {
r.queryTopDuration, s = s, r.queryTopDuration
} else {
s = make([]queryTopDuration, 0, len(r.queryTopDuration))
s = append(s, r.queryTopDuration...)
}
r.queryTopDurationMu.Unlock()
for _, v := range s {
var protocol string
switch v.protocol {
case format.TagValueIDRPC:
protocol = "RPC"
case format.TagValueIDHTTP:
protocol = "HTTP"
default:
protocol = strconv.Itoa(v.protocol)
}
w.Write([]byte(v.expr))
w.Write([]byte(fmt.Sprintf(
"\n# duration=%v token=%s proto=%s\n\n",
v.duration, v.user, protocol)))
}
}

func (r *httpRequestHandler) Response() http.ResponseWriter {
return &r.w
}
Expand Down
2 changes: 1 addition & 1 deletion internal/api/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuer
tagX = make(map[tsTags]int, len(tagX))
}
res.Meta.Total = len(res.Data)
h.reportQueryDataSize(len(res.Data), len(qry.Timescale.Time))
h.reportQueryMemUsage(len(res.Data), len(qry.Timescale.Time))
succeeded = true // prevents deffered "cleanup"
return res, cleanup, nil
}
Expand Down

0 comments on commit 9e4fbc9

Please sign in to comment.