Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into vstream_col_colls
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 25, 2024
2 parents a77d394 + 47e1375 commit 2a0ad87
Show file tree
Hide file tree
Showing 118 changed files with 1,869 additions and 1,191 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/cluster_endtoend_vtorc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ jobs:
draft=$(echo "$PR_DATA" | jq .draft -r)
echo "is_draft=${draft}" >> $GITHUB_OUTPUT
- name: Check Memory
run: |
totalMem=$(free -g | awk 'NR==2 {print $2}')
echo "total memory $totalMem GB"
if [[ "$totalMem" -lt 15 ]]; then
echo "Less memory than required"
exit 1
fi
- name: Check out code
if: steps.skip-workflow.outputs.skip-workflow == 'false'
uses: actions/checkout@v3
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/cluster_endtoend_vtorc_mysql57.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ jobs:
draft=$(echo "$PR_DATA" | jq .draft -r)
echo "is_draft=${draft}" >> $GITHUB_OUTPUT
- name: Check Memory
run: |
totalMem=$(free -g | awk 'NR==2 {print $2}')
echo "total memory $totalMem GB"
if [[ "$totalMem" -lt 15 ]]; then
echo "Less memory than required"
exit 1
fi
- name: Check out code
if: steps.skip-workflow.outputs.skip-workflow == 'false'
uses: actions/checkout@v3
Expand Down
3 changes: 0 additions & 3 deletions config/tablet/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,18 @@ oltpReadPool:
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
maxLifetimeSeconds: 0 # queryserver-config-pool-conn-max-lifetime
prefillParallelism: 0 # queryserver-config-pool-prefill-parallelism
maxWaiters: 50000 # queryserver-config-query-pool-waiter-cap

olapReadPool:
size: 200 # queryserver-config-stream-pool-size
timeoutSeconds: 0 # queryserver-config-query-pool-timeout
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
prefillParallelism: 0 # queryserver-config-stream-pool-prefill-parallelism
maxWaiters: 0

txPool:
size: 20 # queryserver-config-transaction-cap
timeoutSeconds: 1 # queryserver-config-txpool-timeout
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
prefillParallelism: 0 # queryserver-config-transaction-prefill-parallelism
maxWaiters: 50000 # queryserver-config-txpool-waiter-cap

oltp:
queryTimeoutSeconds: 30 # queryserver-config-query-timeout
Expand Down
3 changes: 0 additions & 3 deletions doc/design-docs/TabletServerParamsAsYAML.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,18 @@ oltpReadPool:
timeoutSeconds: 0 # queryserver-config-query-pool-timeout
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
prefillParallelism: 0 # queryserver-config-pool-prefill-parallelism
maxWaiters: 50000 # queryserver-config-query-pool-waiter-cap
olapReadPool:
size: 200 # queryserver-config-stream-pool-size
timeoutSeconds: 0 # queryserver-config-query-pool-timeout
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
prefillParallelism: 0 # queryserver-config-stream-pool-prefill-parallelism
maxWaiters: 0
txPool:
size: 20 # queryserver-config-transaction-cap
timeoutSeconds: 1 # queryserver-config-txpool-timeout
idleTimeoutSeconds: 1800 # queryserver-config-idle-timeout
prefillParallelism: 0 # queryserver-config-transaction-prefill-parallelism
maxWaiters: 50000 # queryserver-config-txpool-waiter-cap
oltp:
queryTimeoutSeconds: 30 # queryserver-config-query-timeout
Expand Down
34 changes: 10 additions & 24 deletions go/cache/lru_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package cache

import (
"testing"

"github.com/stretchr/testify/assert"
)

type CacheValue struct {
Expand All @@ -27,24 +29,12 @@ type CacheValue struct {
func TestInitialState(t *testing.T) {
cache := NewLRUCache[*CacheValue](5)
l, sz, c, e, h, m := cache.Len(), cache.UsedCapacity(), cache.MaxCapacity(), cache.Evictions(), cache.Hits(), cache.Misses()
if l != 0 {
t.Errorf("length = %v, want 0", l)
}
if sz != 0 {
t.Errorf("size = %v, want 0", sz)
}
if c != 5 {
t.Errorf("capacity = %v, want 5", c)
}
if e != 0 {
t.Errorf("evictions = %v, want 0", c)
}
if h != 0 {
t.Errorf("hits = %v, want 0", c)
}
if m != 0 {
t.Errorf("misses = %v, want 0", c)
}
assert.Zero(t, l)
assert.EqualValues(t, 0, sz)
assert.EqualValues(t, 5, c)
assert.EqualValues(t, 0, e)
assert.EqualValues(t, 0, h)
assert.EqualValues(t, 0, m)
}

func TestSetInsertsValue(t *testing.T) {
Expand Down Expand Up @@ -137,12 +127,8 @@ func TestCapacityIsObeyed(t *testing.T) {
// Insert one more; something should be evicted to make room.
cache.Set("key4", value)
sz, evictions := cache.UsedCapacity(), cache.Evictions()
if sz != size {
t.Errorf("post-evict cache.UsedCapacity() = %v, expected %v", sz, size)
}
if evictions != 1 {
t.Errorf("post-evict cache.Evictions() = %v, expected 1", evictions)
}
assert.Equal(t, size, sz)
assert.EqualValues(t, 1, evictions)

// Check various other stats
if l := cache.Len(); int64(l) != size {
Expand Down
14 changes: 8 additions & 6 deletions go/cache/theine/singleflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func TestDo(t *testing.T) {
return "bar", nil
})

assert.Equal(t, "bar (string)", fmt.Sprintf("%v (%T)", v, v), "incorrect Do value")
assert.NoError(t, err, "got Do error")
assert.Equal(t, "bar (string)", fmt.Sprintf("%v (%T)", v, v))
assert.NoError(t, err)
}

func TestDoErr(t *testing.T) {
Expand Down Expand Up @@ -85,11 +85,11 @@ func TestDoDupSuppress(t *testing.T) {
defer wg2.Done()
wg1.Done()
v, err, _ := g.Do("key", fn)
if !assert.NoError(t, err, "unexpected Do error") {
if !assert.NoError(t, err) {
return
}

assert.Equal(t, "bar", v, "unexpected Do value")
assert.Equal(t, "bar", v)
}()
}
wg1.Wait()
Expand All @@ -98,7 +98,8 @@ func TestDoDupSuppress(t *testing.T) {
c <- "bar"
wg2.Wait()
got := atomic.LoadInt32(&calls)
assert.True(t, got > 0 && got < n, "number of calls not between 0 and %d", n)
assert.Greater(t, got, int32(0))
assert.Less(t, got, int32(n))
}

// Test singleflight behaves correctly after Do panic.
Expand Down Expand Up @@ -131,7 +132,7 @@ func TestPanicDo(t *testing.T) {

select {
case <-done:
assert.Equal(t, int32(n), panicCount, "unexpected number of panics")
assert.EqualValues(t, n, panicCount)
case <-time.After(time.Second):
require.Fail(t, "Do hangs")
}
Expand All @@ -152,6 +153,7 @@ func TestGoexitDo(t *testing.T) {
var err error
defer func() {
assert.NoError(t, err)

if atomic.AddInt32(&waited, -1) == 0 {
close(done)
}
Expand Down
10 changes: 7 additions & 3 deletions go/cmd/vtcombo/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -84,6 +85,8 @@ In particular, it contains:
tabletTypesToWait []topodatapb.TabletType

env *vtenv.Environment

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
Expand Down Expand Up @@ -131,6 +134,7 @@ func init() {
if err != nil {
log.Fatalf("unable to initialize env: %v", err)
}
srvTopoCounts = stats.NewCountersWithSingleLabel("ResilientSrvTopoServer", "Resilient srvtopo server operations", "type")
}

func startMysqld(uid uint32) (mysqld *mysqlctl.Mysqld, cnf *mysqlctl.Mycnf, err error) {
Expand Down Expand Up @@ -234,7 +238,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
// to be the "internal" protocol that InitTabletMap registers.
cmd.Flags().Set("tablet_manager_protocol", "internal")
cmd.Flags().Set("tablet_protocol", "internal")
uid, err := vtcombo.InitTabletMap(env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, startMysql)
uid, err := vtcombo.InitTabletMap(env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, startMysql, srvTopoCounts)
if err != nil {
// ensure we start mysql in the event we fail here
if startMysql {
Expand All @@ -260,7 +264,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
}

wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil)
newUID, err := vtcombo.CreateKs(ctx, env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, ks, true, uid, wr)
newUID, err := vtcombo.CreateKs(ctx, env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, ks, true, uid, wr, srvTopoCounts)
if err != nil {
return err
}
Expand Down Expand Up @@ -297,7 +301,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
}

// vtgate configuration and init
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, srvTopoCounts)

tabletTypes := make([]topodatapb.TabletType, 0, 1)
if len(tabletTypesToWait) != 0 {
Expand Down
4 changes: 3 additions & 1 deletion go/cmd/vtexplain/cli/vtexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo/memorytopo"
Expand Down Expand Up @@ -186,7 +187,8 @@ func parseAndRun() error {
}
ctx := context.Background()
ts := memorytopo.NewServer(ctx, vtexplain.Cell)
vte, err := vtexplain.Init(ctx, env, ts, vschema, schema, ksShardMap, opts)
srvTopoCounts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
vte, err := vtexplain.Init(ctx, env, ts, vschema, schema, ksShardMap, opts, srvTopoCounts)
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion go/cmd/vtgate/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -73,8 +74,14 @@ var (
PreRunE: servenv.CobraPreRunE,
RunE: run,
}

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
srvTopoCounts = stats.NewCountersWithSingleLabel("ResilientSrvTopoServer", "Resilient srvtopo server operations", "type")
}

// CheckCellFlags will check validation of cell and cells_to_watch flag
// it will help to avoid strange behaviors when vtgate runs but actually does not work
func CheckCellFlags(ctx context.Context, serv srvtopo.Server, cell string, cellsToWatch string) error {
Expand Down Expand Up @@ -139,7 +146,7 @@ func run(cmd *cobra.Command, args []string) error {
ts := topo.Open()
defer ts.Close()

resilientServer = srvtopo.NewResilientServer(context.Background(), ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, srvTopoCounts)

tabletTypes := make([]topodatapb.TabletType, 0, 1)
for _, tt := range tabletTypesToWait {
Expand Down
13 changes: 10 additions & 3 deletions go/cmd/vttablet/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -102,8 +103,14 @@ vttablet \
PreRunE: servenv.CobraPreRunE,
RunE: run,
}

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
srvTopoCounts = stats.NewCountersWithSingleLabel("TabletSrvTopo", "Resilient srvtopo server operations", "type")
}

func run(cmd *cobra.Command, args []string) error {
servenv.Init()

Expand All @@ -129,7 +136,7 @@ func run(cmd *cobra.Command, args []string) error {
}

ts := topo.Open()
qsc, err := createTabletServer(context.Background(), env, config, ts, tabletAlias)
qsc, err := createTabletServer(context.Background(), env, config, ts, tabletAlias, srvTopoCounts)
if err != nil {
ts.Close()
return err
Expand Down Expand Up @@ -249,7 +256,7 @@ func extractOnlineDDL() error {
return nil
}

func createTabletServer(ctx context.Context, env *vtenv.Environment, config *tabletenv.TabletConfig, ts *topo.Server, tabletAlias *topodatapb.TabletAlias) (*tabletserver.TabletServer, error) {
func createTabletServer(ctx context.Context, env *vtenv.Environment, config *tabletenv.TabletConfig, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, srvTopoCounts *stats.CountersWithSingleLabel) (*tabletserver.TabletServer, error) {
if tableACLConfig != "" {
// To override default simpleacl, other ACL plugins must set themselves to be default ACL factory
tableacl.Register("simpleacl", &simpleacl.Factory{})
Expand All @@ -258,7 +265,7 @@ func createTabletServer(ctx context.Context, env *vtenv.Environment, config *tab
}

// creates and registers the query service
qsc := tabletserver.NewTabletServer(ctx, env, "", config, ts, tabletAlias)
qsc := tabletserver.NewTabletServer(ctx, env, "", config, ts, tabletAlias, srvTopoCounts)
servenv.OnRun(func() {
qsc.Register()
addStatusParts(qsc)
Expand Down
Loading

0 comments on commit 2a0ad87

Please sign in to comment.