Skip to content

Commit

Permalink
Merge pull request #563 from go-faster/feat/improve-logql-bench-report
Browse files Browse the repository at this point in the history
chore(ch-log-bench-read): add more benchmark queries
  • Loading branch information
tdakkota authored Dec 9, 2024
2 parents 69d0d53 + d7ea501 commit 853277a
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 163 deletions.
33 changes: 26 additions & 7 deletions cmd/otelbench/chtracker/chtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/tempoapi"
)
Expand Down Expand Up @@ -91,21 +92,39 @@ func (t *Tracker[Q]) Track(ctx context.Context, meta Q, cb func(context.Context,
}

// Report iterates over tracked queries.
func (t *Tracker[Q]) Report(ctx context.Context, cb func(context.Context, TrackedQuery[Q], []QueryReport) error) error {
func (t *Tracker[Q]) Report(ctx context.Context, cb func(context.Context, TrackedQuery[Q], []QueryReport, error) error) error {
if err := t.Flush(ctx); err != nil {
return err
}

t.queriesMux.Lock()
defer t.queriesMux.Unlock()

for _, tq := range t.queries {
reports, err := t.retrieveReports(ctx, tq)
if err != nil {
return errors.Wrapf(err, "retrieve reports for %q", tq.TraceID)
}
grp, grpCtx := errgroup.WithContext(ctx)
type retrivalResult struct {
Reports []QueryReport
Err error
}
queries := make([]retrivalResult, len(t.queries))
for i, tq := range t.queries {
i, tq := i, tq
grp.Go(func() error {
r, err := t.retrieveReports(grpCtx, tq)
if err != nil {
err = errors.Wrapf(err, "retrieve reports for %q", tq.TraceID)
}
queries[i] = retrivalResult{Reports: r, Err: err}
return nil
})
}
if err := grp.Wait(); err != nil {
return errors.Wrap(err, "retrieve reports")
}

for i, result := range queries {
tq := t.queries[i]

if err := cb(ctx, tq, reports); err != nil {
if err := cb(ctx, tq, result.Reports, result.Err); err != nil {
return errors.Wrap(err, "report callback")
}
}
Expand Down
11 changes: 10 additions & 1 deletion cmd/otelbench/logql_analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (a LogQLAnalyze) renderPretty(report logqlbench.LogQLReport, w io.Writer) e
d := time.Duration(nanos) * time.Nanosecond
return d.Round(time.Millisecond / 20).String()
}
fmt.Fprintln(&buf, " duration:", formatNanos(q.DurationNanos))
fmt.Fprint(&buf, " duration:", formatNanos(q.DurationNanos))
if q.Timeout {
fmt.Fprint(&buf, " (timeout)")
}
fmt.Fprintln(&buf)

if len(q.Queries) > 0 {
fmt.Fprintln(&buf, " sql queries:", len(q.Queries))
Expand All @@ -81,6 +85,10 @@ func (a LogQLAnalyze) renderPretty(report logqlbench.LogQLReport, w io.Writer) e
func (a LogQLAnalyze) renderBenchstat(report logqlbench.LogQLReport, w io.Writer) error {
var recs []benchfmt.Result
for _, q := range report.Queries {
if q.ReportError != "" {
continue
}

name := normalizeBenchName(q.Title)
if len(name) == 0 {
name = fmt.Appendf(name, "Query%d", q.ID)
Expand All @@ -89,6 +97,7 @@ func (a LogQLAnalyze) renderBenchstat(report logqlbench.LogQLReport, w io.Writer
Name: bytes.Join(
[][]byte{
[]byte(`LogQL`),
[]byte(q.Type),
name,
},
[]byte{'/'},
Expand Down
26 changes: 20 additions & 6 deletions cmd/otelbench/logqlbench/logqlbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type LogQLBenchmark struct {
client *lokiapi.Client
start time.Time
end time.Time
limit int
}

// Setup setups benchmark using given flags.
Expand All @@ -53,6 +54,7 @@ func (p *LogQLBenchmark) Setup(cmd *cobra.Command) error {
if p.end, err = lokihandler.ParseTimestamp(p.EndTime, time.Time{}); err != nil {
return errors.Wrap(err, "parse end time")
}
p.limit = 1000

p.tracker, err = chtracker.Setup[Query](ctx, "logql", p.TrackerOptions)
if err != nil {
Expand Down Expand Up @@ -130,16 +132,28 @@ func (p *LogQLBenchmark) Run(ctx context.Context) error {

var reports []LogQLReportQuery
if err := p.tracker.Report(ctx,
func(ctx context.Context, tq chtracker.TrackedQuery[Query], queries []chtracker.QueryReport) error {
func(ctx context.Context, tq chtracker.TrackedQuery[Query], queries []chtracker.QueryReport, retriveErr error) error {
var errMsg string
if retriveErr != nil {
if errors.Is(retriveErr, context.DeadlineExceeded) {
errMsg = "no queries"
} else {
errMsg = retriveErr.Error()
}
}

header := tq.Meta.Header()
reports = append(reports, LogQLReportQuery{
ID: tq.Meta.ID,
Title: tq.Meta.Title,
Description: tq.Meta.Description,
Query: tq.Meta.Query,
Matchers: tq.Meta.Match,
ID: header.ID,
Type: string(tq.Meta.Type()),
Title: header.Title,
Description: header.Description,
Query: tq.Meta.Query(),
Matchers: tq.Meta.Matchers(),
DurationNanos: tq.Duration.Nanoseconds(),
Queries: queries,
Timeout: tq.Timeout,
ReportError: errMsg,
})
return nil
},
Expand Down
Loading

0 comments on commit 853277a

Please sign in to comment.