Skip to content

Commit

Permalink
feat(otelbench): implement dump limits
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Nov 29, 2024
1 parent 29dbc7c commit 936bc51
Showing 1 changed file with 41 additions and 4 deletions.
45 changes: 41 additions & 4 deletions cmd/otelbench/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/ClickHouse/ch-go"
"github.com/dustin/go-humanize"
"github.com/go-faster/errors"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
Expand All @@ -27,6 +28,9 @@ func newDumpCommand() *cobra.Command {

func newDumpCreateCommand() *cobra.Command {
var arg struct {
LimitTime time.Duration
LimitCount int

Output string
Database string

Expand All @@ -43,6 +47,7 @@ func newDumpCreateCommand() *cobra.Command {
g, ctx := errgroup.WithContext(cobraCommand.Context())
done := make(chan struct{})

fmt.Println("Dumping tables to", arg.Output)
if err := os.MkdirAll(arg.Output, 0755); err != nil {
return errors.Wrap(err, "create output directory")
}
Expand All @@ -68,7 +73,7 @@ func newDumpCreateCommand() *cobra.Command {
g.Go(func() error {
if err := checkConnection(ctx); err == nil {
// Already connected.
fmt.Println("Already connected")
fmt.Println("Clickhouse is already listening, not performing port-forwarding.")
return nil
}

Expand Down Expand Up @@ -107,7 +112,7 @@ func newDumpCreateCommand() *cobra.Command {
return ctx.Err()
}
}
fmt.Println("Port-forward ready")
fmt.Println("Clickhouse connection is ready")
tables := []string{
"logs",
"logs_attrs",
Expand All @@ -119,11 +124,23 @@ func newDumpCreateCommand() *cobra.Command {
"traces_tags",
"migration",
}
var files []os.FileInfo
for _, table := range tables {
query := fmt.Sprintf("SELECT * FROM %s.%s", arg.Database, table)
query += " WHERE true"
switch table {
case "logs", "metrics_points", "traces_spans":
if arg.LimitTime > 0 {
query += fmt.Sprintf(" AND timestamp > (now() - toIntervalSecond(%d))", int(arg.LimitTime.Seconds()))
}
}
if arg.LimitCount > 0 {
query += fmt.Sprintf(" LIMIT %d", arg.LimitCount)
}

// SELECT * FROM faster.logs INTO OUTFILE '/tmp/dump.bin' FORMAT Native;
outFile := filepath.Join(arg.Output, fmt.Sprintf("%s.bin", table))
query += fmt.Sprintf(" INTO OUTFILE '%s' TRUNCATE FORMAT Native", outFile)
outFile := filepath.Join(arg.Output, fmt.Sprintf("%s.bin.lz4", table))
query += fmt.Sprintf(" INTO OUTFILE '%s' TRUNCATE COMPRESSION 'lz4' FORMAT Native", outFile)

args := []string{
"-h", "localhost",
Expand All @@ -143,8 +160,26 @@ func newDumpCreateCommand() *cobra.Command {
if err := cmd.Run(); err != nil {
return errors.Wrapf(err, "dump table %s", table)
}

stat, err := os.Stat(outFile)
if err != nil {
return errors.Wrap(err, "stat")
}

files = append(files, stat)
}

fmt.Println("Dumps:")
var totalBytes int64
for _, file := range files {
totalBytes += file.Size()
// Pad to 35 characters, align left.
fmt.Printf(" %35s %s\n", file.Name(), humanize.Bytes(uint64(file.Size())))
}

fmt.Println("--------")
fmt.Printf(" %35s %s\n", "Total", humanize.Bytes(uint64(totalBytes)))

return nil
})

Expand All @@ -158,6 +193,8 @@ func newDumpCreateCommand() *cobra.Command {
f.StringVar(&arg.KubernetesService, "kubernetes-service", "chi-db-cluster-0-0", "Kubernetes service")
f.IntVar(&arg.RemotePort, "target-port", 9000, "Remote port")
f.IntVar(&arg.LocalPort, "port", 9000, "Local port")
f.DurationVar(&arg.LimitTime, "duration", 0, "Limit oldest data with delta from now")
f.IntVar(&arg.LimitCount, "limit", 0, "Limit oldest data with count")

return rootCmd
}

0 comments on commit 936bc51

Please sign in to comment.