Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Fixes for Vitess 18 #14383

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,19 @@ var (
Name: "hash",
}},
},
"oltp_test": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id",
Name: "hash",
}},
Columns: []*vschemapb.Column{{
Name: "c",
Type: sqltypes.Char,
}, {
Name: "pad",
Type: sqltypes.Char,
}},
},
},
}

Expand Down
132 changes: 132 additions & 0 deletions go/vt/vtgate/endtoend/oltp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package endtoend
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Header is missing here


import (
"bytes"
"context"
"fmt"
"math/rand"
"sync"
"testing"

"vitess.io/vitess/go/mysql"
)

// 10 groups, 119 characters
const cValueTemplate = "###########-###########-###########-" +
"###########-###########-###########-" +
"###########-###########-###########-" +
"###########"

// 5 groups, 59 characters
const padValueTemplate = "###########-###########-###########-" +
"###########-###########"

func sysbenchRandom(rng *rand.Rand, template string) []byte {
out := make([]byte, 0, len(template))
for i := range template {
switch template[i] {
case '#':
out = append(out, '0'+byte(rng.Intn(10)))
default:
out = append(out, template[i])
}
}
return out
}

var oltpInitOnce sync.Once

func BenchmarkOLTP(b *testing.B) {
const MaxRows = 10000
const RangeSize = 100

rng := rand.New(rand.NewSource(1234))

ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
b.Fatal(err)
}
defer conn.Close()

var query bytes.Buffer

oltpInitOnce.Do(func() {
b.Logf("seeding database for benchmark...")

var rows int = 1
for i := 0; i < MaxRows/10; i++ {
query.Reset()
query.WriteString("insert into oltp_test(id, k, c, pad) values ")
for j := 0; j < 10; j++ {
if j > 0 {
query.WriteString(", ")
}
_, _ = fmt.Fprintf(&query, "(%d, %d, '%s', '%s')", rows, rng.Int31n(0xFFFF), sysbenchRandom(rng, cValueTemplate), sysbenchRandom(rng, padValueTemplate))
rows++
}

_, err = conn.ExecuteFetch(query.String(), -1, false)
if err != nil {
b.Fatal(err)
}
}
b.Logf("finshed (inserted %d rows)", rows)
})

b.Run("SimpleRanges", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
id := rng.Intn(MaxRows)

query.Reset()
_, _ = fmt.Fprintf(&query, "SELECT c FROM oltp_test WHERE id BETWEEN %d AND %d", id, id+rng.Intn(RangeSize)-1)
_, err := conn.ExecuteFetch(query.String(), 1000, false)
if err != nil {
b.Error(err)
}
}
})

b.Run("SumRanges", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
id := rng.Intn(MaxRows)

query.Reset()
_, _ = fmt.Fprintf(&query, "SELECT SUM(k) FROM oltp_test WHERE id BETWEEN %d AND %d", id, id+rng.Intn(RangeSize)-1)
_, err := conn.ExecuteFetch(query.String(), 1000, false)
if err != nil {
b.Error(err)
}
}
})

b.Run("OrderRanges", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
id := rng.Intn(MaxRows)

query.Reset()
_, _ = fmt.Fprintf(&query, "SELECT c FROM oltp_test WHERE id BETWEEN %d AND %d ORDER BY c", id, id+rng.Intn(RangeSize)-1)
_, err := conn.ExecuteFetch(query.String(), 1000, false)
if err != nil {
b.Error(err)
}
}
})

b.Run("DistinctRanges", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
id := rng.Intn(MaxRows)

query.Reset()
_, _ = fmt.Fprintf(&query, "SELECT DISTINCT c FROM oltp_test WHERE id BETWEEN %d AND %d ORDER BY c", id, id+rng.Intn(RangeSize)-1)
_, err := conn.ExecuteFetch(query.String(), 1000, false)
if err != nil {
b.Error(err)
}
}
})
}
8 changes: 8 additions & 0 deletions go/vt/vtgate/endtoend/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,11 @@ create table t1_sharded(
id2 bigint,
primary key(id1)
) Engine=InnoDB;

create table oltp_test(
id bigint not null auto_increment,
k bigint default 0 not null,
c char(120) default '' not null,
pad char(60) default '' not null,
primary key (id)
) Engine=InnoDB;
117 changes: 91 additions & 26 deletions go/vt/vtgate/engine/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,35 @@ func (oa *OrderedAggregate) TryExecute(ctx context.Context, vcursor VCursor, bin
return qr.Truncate(oa.TruncateColumnCount), nil
}

func (oa *OrderedAggregate) executeGroupBy(result *sqltypes.Result) (*sqltypes.Result, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 🌟

if len(result.Rows) < 1 {
return result, nil
}

out := &sqltypes.Result{
Fields: result.Fields,
Rows: result.Rows[:0],
}

var currentKey []sqltypes.Value
var lastRow sqltypes.Row
var err error
for _, row := range result.Rows {
var nextGroup bool

currentKey, nextGroup, err = oa.nextGroupBy(currentKey, row)
if err != nil {
return nil, err
}
if nextGroup {
out.Rows = append(out.Rows, lastRow)
}
lastRow = row
}
out.Rows = append(out.Rows, lastRow)
return out, nil
}

func (oa *OrderedAggregate) execute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
result, err := vcursor.ExecutePrimitive(
ctx,
Expand All @@ -124,6 +153,10 @@ func (oa *OrderedAggregate) execute(ctx context.Context, vcursor VCursor, bindVa
if err != nil {
return nil, err
}
if len(oa.Aggregates) == 0 {
return oa.executeGroupBy(result)
}

agg, fields, err := newAggregation(result.Fields, oa.Aggregates)
if err != nil {
return nil, err
Expand Down Expand Up @@ -169,43 +202,75 @@ func (oa *OrderedAggregate) TryStreamExecute(ctx context.Context, vcursor VCurso
var agg aggregationState
var fields []*querypb.Field
var currentKey []sqltypes.Value
var lastRow sqltypes.Row
var visitor func(qr *sqltypes.Result) error

visitor := func(qr *sqltypes.Result) error {
var err error
if len(oa.Aggregates) > 0 {
visitor = func(qr *sqltypes.Result) error {
var err error

if agg == nil && len(qr.Fields) != 0 {
agg, fields, err = newAggregation(qr.Fields, oa.Aggregates)
if err != nil {
return err
}
if err = cb(&sqltypes.Result{Fields: fields}); err != nil {
return err
if agg == nil && len(qr.Fields) != 0 {
agg, fields, err = newAggregation(qr.Fields, oa.Aggregates)
if err != nil {
return err
}
if err = cb(&sqltypes.Result{Fields: fields}); err != nil {
return err
}
}
}

// This code is similar to the one in Execute.
for _, row := range qr.Rows {
var nextGroup bool
// This code is similar to the one in Execute.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can these be moved to their own little functions?

for _, row := range qr.Rows {
var nextGroup bool

currentKey, nextGroup, err = oa.nextGroupBy(currentKey, row)
if err != nil {
return err
currentKey, nextGroup, err = oa.nextGroupBy(currentKey, row)
if err != nil {
return err
}

if nextGroup {
// this is a new grouping. let's yield the old one, and start a new
if err := cb(&sqltypes.Result{Rows: [][]sqltypes.Value{agg.finish()}}); err != nil {
return err
}

agg.reset()
}

if err := agg.add(row); err != nil {
return err
}
}
return nil
}
} else {
visitor = func(qr *sqltypes.Result) error {
var err error
if fields == nil && len(qr.Fields) > 0 {
fields = qr.Fields
if err = cb(&sqltypes.Result{Fields: fields}); err != nil {
return err
}
}
for _, row := range qr.Rows {
var nextGroup bool

if nextGroup {
// this is a new grouping. let's yield the old one, and start a new
if err := cb(&sqltypes.Result{Rows: [][]sqltypes.Value{agg.finish()}}); err != nil {
currentKey, nextGroup, err = oa.nextGroupBy(currentKey, row)
if err != nil {
return err
}

agg.reset()
}
if nextGroup {
// this is a new grouping. let's yield the old one, and start a new
if err := cb(&sqltypes.Result{Rows: []sqltypes.Row{lastRow}}); err != nil {
return err
}
}

if err := agg.add(row); err != nil {
return err
lastRow = row
}
return nil
}
return nil
}

/* we need the input fields types to correctly calculate the output types */
Expand All @@ -214,8 +279,8 @@ func (oa *OrderedAggregate) TryStreamExecute(ctx context.Context, vcursor VCurso
return err
}

if currentKey != nil {
if err := cb(&sqltypes.Result{Rows: [][]sqltypes.Value{agg.finish()}}); err != nil {
if lastRow != nil {
if err := cb(&sqltypes.Result{Rows: [][]sqltypes.Value{lastRow}}); err != nil {
return err
}
}
Expand Down
17 changes: 9 additions & 8 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"context"
"fmt"
"math/rand"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -428,27 +429,27 @@

comparers := extractSlices(route.OrderBy)

sort.Slice(out.Rows, func(i, j int) bool {
slices.SortFunc(out.Rows, func(a, b sqltypes.Row) int {
var cmp int
if err != nil {
return true
return -1
}
// If there are any errors below, the function sets
// the external err and returns true. Once err is set,
// all subsequent calls return true. This will make
// Slice think that all elements are in the correct
// order and return more quickly.
for _, c := range comparers {
cmp, err = c.compare(out.Rows[i], out.Rows[j])
cmp, err = c.compare(a, b)
if err != nil {
return true
return -1
}
if cmp == 0 {
continue
if cmp != 0 {
return cmp
}
return cmp < 0
return cmp

Check failure on line 450 in go/vt/vtgate/engine/route.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

SA4004: the surrounding loop is unconditionally terminated (staticcheck)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, if cmp==0 then the comparers loop should continue than return

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah I fucked this up when rebasing between 17 and 18 (I was trying the patch on both versions). Sorry! Will fix

}
return true
return 0
})

return out.Truncate(route.TruncateColumnCount), err
Expand Down
Loading
Loading