Skip to content

Commit

Permalink
slack-vitess-r15.0.5: backport required Transaction Throttler PRs, …
Browse files Browse the repository at this point in the history
…pt. 3 + ci fixes (#351)

* txthrottler: add metrics for topoWatcher and healthCheckStreamer (vitessio#13153)

Signed-off-by: Tim Vaillancourt <[email protected]>

* Replace deprecated `github.com/golang/mock` with `go.uber.org/mock` (vitessio#13512)

Signed-off-by: Eng Zer Jun <[email protected]>
Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>

* Per workload TxThrottler metrics (vitessio#13526)

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* tx throttler: healthcheck all cells if `--tx-throttler-healthcheck-cells` is undefined (vitessio#12477)

Signed-off-by: Tim Vaillancourt <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>

* Add dry-run/monitoring-only mode for TxThrottler (vitessio#13604)

Signed-off-by: Eduardo J. Ortega U <[email protected]>
Signed-off-by: Eduardo J. Ortega U. <[email protected]>

* `txthrottler`: remove `txThrottlerConfig` struct, rely on `tabletenv` (vitessio#13624)

Signed-off-by: Tim Vaillancourt <[email protected]>

* tx throttler: remove unused topology watchers (vitessio#14412)

Signed-off-by: deepthi <[email protected]>

* tx_throttler: delete topo watcher metric instead of deprecating (vitessio#14445)

Signed-off-by: deepthi <[email protected]>

* TxThrottler: dont throttle unless lag (vitessio#14789)

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* go test -v

Signed-off-by: Tim Vaillancourt <[email protected]>

* add mutex to flaky parseFlags()

Signed-off-by: Tim Vaillancourt <[email protected]>

* revert tweaks for flaky tests

Signed-off-by: Tim Vaillancourt <[email protected]>

* fix protojson err

Signed-off-by: Tim Vaillancourt <[email protected]>

* make vtadmin_web_proto_types

Signed-off-by: Tim Vaillancourt <[email protected]>

* remove debug t.Logf(...)

Signed-off-by: Tim Vaillancourt <[email protected]>

---------

Signed-off-by: Tim Vaillancourt <[email protected]>
Signed-off-by: Eng Zer Jun <[email protected]>
Signed-off-by: Shlomi Noach <[email protected]>
Signed-off-by: Eduardo J. Ortega U <[email protected]>
Signed-off-by: Eduardo J. Ortega U. <[email protected]>
Signed-off-by: deepthi <[email protected]>
Co-authored-by: Eng Zer Jun <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>
Co-authored-by: Eduardo J. Ortega U <[email protected]>
Co-authored-by: Deepthi Sigireddi <[email protected]>
  • Loading branch information
5 people authored May 21, 2024
1 parent a2a622a commit 4a92ae7
Show file tree
Hide file tree
Showing 21 changed files with 725 additions and 465 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ require (
github.com/hashicorp/go-version v1.6.0
github.com/planetscale/log v0.0.0-20221118170849-fb599bc35c50
github.com/slok/noglog v0.2.0
go.uber.org/mock v0.4.0
go.uber.org/zap v1.23.0
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/sync v0.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,8 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,10 @@ Usage of vttablet:
--twopc_enable if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.
--tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9")
--tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100)
--tx-throttler-dry-run If present, the transaction throttler only records metrics about requests received and throttled, but does not actually throttle any requests.
--tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells
--tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)
--tx-throttler-topo-refresh-interval duration The rate that the transaction throttler will refresh the topology to find cells. (default 5m0s)
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9")
--tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.
--unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s)
Expand Down
30 changes: 16 additions & 14 deletions go/vt/discovery/replicationlag.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ func SetMinNumTablets(numTablets int) {
minNumTablets = numTablets
}

// IsReplicationLagHigh verifies that the given LegacytabletHealth refers to a tablet with high
// IsReplicationLagHigh verifies that the given TabletHealth refers to a tablet with high
// replication lag, i.e. higher than the configured discovery_low_replication_lag flag.
func IsReplicationLagHigh(tabletHealth *TabletHealth) bool {
return float64(tabletHealth.Stats.ReplicationLagSeconds) > lowReplicationLag.Seconds()
}

// IsReplicationLagVeryHigh verifies that the given LegacytabletHealth refers to a tablet with very high
// IsReplicationLagVeryHigh verifies that the given TabletHealth refers to a tablet with very high
// replication lag, i.e. higher than the configured discovery_high_replication_lag_minimum_serving flag.
func IsReplicationLagVeryHigh(tabletHealth *TabletHealth) bool {
return float64(tabletHealth.Stats.ReplicationLagSeconds) > highReplicationLagMinServing.Seconds()
Expand Down Expand Up @@ -117,7 +117,7 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal
return filterStatsByLag(tabletHealthList)
}
res := filterStatsByLagWithLegacyAlgorithm(tabletHealthList)
// run the filter again if exactly one tablet is removed,
// Run the filter again if exactly one tablet is removed,
// and we have spare tablets.
if len(res) > minNumTablets && len(res) == len(tabletHealthList)-1 {
res = filterStatsByLagWithLegacyAlgorithm(res)
Expand All @@ -128,12 +128,12 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal

func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {
list := make([]tabletLagSnapshot, 0, len(tabletHealthList))
// filter non-serving tablets and those with very high replication lag
// Filter out non-serving tablets and those with very high replication lag.
for _, ts := range tabletHealthList {
if !ts.Serving || ts.LastError != nil || ts.Stats == nil || IsReplicationLagVeryHigh(ts) {
continue
}
// Pull the current replication lag for a stable sort later.
// Save the current replication lag for a stable sort later.
list = append(list, tabletLagSnapshot{
ts: ts,
replag: ts.Stats.ReplicationLagSeconds})
Expand All @@ -142,7 +142,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {
// Sort by replication lag.
sort.Sort(tabletLagSnapshotList(list))

// Pick those with low replication lag, but at least minNumTablets tablets regardless.
// Pick tablets with low replication lag, but at least minNumTablets tablets regardless.
res := make([]*TabletHealth, 0, len(list))
for i := 0; i < len(list); i++ {
if !IsReplicationLagHigh(list[i].ts) || i < minNumTablets {
Expand All @@ -154,7 +154,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth {

func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*TabletHealth {
list := make([]*TabletHealth, 0, len(tabletHealthList))
// filter non-serving tablets
// Filter out non-serving tablets.
for _, ts := range tabletHealthList {
if !ts.Serving || ts.LastError != nil || ts.Stats == nil {
continue
Expand All @@ -164,7 +164,7 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if len(list) <= 1 {
return list
}
// if all have low replication lag (<=30s), return all tablets.
// If all tablets have low replication lag (<=30s), return all of them.
allLowLag := true
for _, ts := range list {
if IsReplicationLagHigh(ts) {
Expand All @@ -175,12 +175,12 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if allLowLag {
return list
}
// filter those affecting "mean" lag significantly
// calculate mean for all tablets
// We want to filter out tablets that are affecting "mean" lag significantly.
// We first calculate the mean across all tablets.
res := make([]*TabletHealth, 0, len(list))
m, _ := mean(list, -1)
for i, ts := range list {
// calculate mean by excluding ith tablet
// Now we calculate the mean by excluding ith tablet
mi, _ := mean(list, i)
if float64(mi) > float64(m)*0.7 {
res = append(res, ts)
Expand All @@ -189,9 +189,11 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta
if len(res) >= minNumTablets {
return res
}
// return at least minNumTablets tablets to avoid over loading,
// if there is enough tablets with replication lag < highReplicationLagMinServing.
// Pull the current replication lag for a stable sort.

// We want to return at least minNumTablets tablets to avoid overloading,
// as long as there are enough tablets with replication lag < highReplicationLagMinServing.

// Save the current replication lag for a stable sort.
snapshots := make([]tabletLagSnapshot, 0, len(list))
for _, ts := range list {
if !IsReplicationLagVeryHigh(ts) {
Expand Down
70 changes: 34 additions & 36 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ var (
"Operation", topologyWatcherOpListTablets, topologyWatcherOpGetTablet)
)

// tabletInfo is used internally by the TopologyWatcher class
// tabletInfo is used internally by the TopologyWatcher struct.
type tabletInfo struct {
alias string
tablet *topodata.Tablet
}

// TopologyWatcher polls tablet from a configurable set of tablets
// periodically. When tablets are added / removed, it calls
// the LegacyTabletRecorder AddTablet / RemoveTablet interface appropriately.
// TopologyWatcher polls the topology periodically for changes to
// the set of tablets. When tablets are added / removed / modified,
// it calls the AddTablet / RemoveTablet interface appropriately.
type TopologyWatcher struct {
// set at construction time
topoServer *topo.Server
Expand All @@ -80,20 +80,21 @@ type TopologyWatcher struct {

// mu protects all variables below
mu sync.Mutex
// tablets contains a map of alias -> tabletInfo for all known tablets
// tablets contains a map of alias -> tabletInfo for all known tablets.
tablets map[string]*tabletInfo
// topoChecksum stores a crc32 of the tablets map and is exported as a metric
// topoChecksum stores a crc32 of the tablets map and is exported as a metric.
topoChecksum uint32
// lastRefresh records the timestamp of the last topo refresh
// lastRefresh records the timestamp of the last refresh of the topology.
lastRefresh time.Time
// firstLoadDone is true when first load of the topology data is done.
// firstLoadDone is true when the initial load of the topology data is complete.
firstLoadDone bool
// firstLoadChan is closed when the initial loading of topology data is done.
// firstLoadChan is closed when the initial load of topology data is complete.
firstLoadChan chan struct{}
}

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
// the tablets that it is configured to watch, and reloads them periodically if needed.
// As of now there is only one implementation: watch all tablets in a cell.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
Expand All @@ -115,14 +116,14 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
}

// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
// the tablets in a cell, and reloads them as needed.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
})
}

// Start starts the topology watcher
// Start starts the topology watcher.
func (tw *TopologyWatcher) Start() {
tw.wg.Add(1)
go func(t *TopologyWatcher) {
Expand All @@ -140,7 +141,7 @@ func (tw *TopologyWatcher) Start() {
}(tw)
}

// Stop stops the watcher. It does not clean up the tablets added to LegacyTabletRecorder.
// Stop stops the watcher. It does not clean up the tablets added to HealthCheck.
func (tw *TopologyWatcher) Stop() {
tw.cancelFunc()
// wait for watch goroutine to finish.
Expand All @@ -151,7 +152,7 @@ func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)

// first get the list of relevant tabletAliases
// First get the list of relevant tabletAliases.
tabletAliases, err := tw.getTablets(tw)
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
Expand All @@ -166,7 +167,7 @@ func (tw *TopologyWatcher) loadTablets() {
}

// Accumulate a list of all known alias strings to use later
// when sorting
// when sorting.
tabletAliasStrs := make([]string, 0, len(tabletAliases))

tw.mu.Lock()
Expand All @@ -175,7 +176,7 @@ func (tw *TopologyWatcher) loadTablets() {
tabletAliasStrs = append(tabletAliasStrs, aliasStr)

if !tw.refreshKnownTablets {
// we already have a tabletInfo for this and the flag tells us to not refresh
// We already have a tabletInfo for this and the flag tells us to not refresh.
if val, ok := tw.tablets[aliasStr]; ok {
newTablets[aliasStr] = val
continue
Expand All @@ -188,7 +189,7 @@ func (tw *TopologyWatcher) loadTablets() {
tw.sem <- 1 // Wait for active queue to drain.
tablet, err := tw.topoServer.GetTablet(tw.ctx, alias)
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
<-tw.sem // Done; enable next request to run
<-tw.sem // Done; enable next request to run.
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
Expand Down Expand Up @@ -218,7 +219,7 @@ func (tw *TopologyWatcher) loadTablets() {
continue
}

// trust the alias from topo and add it if it doesn't exist
// Trust the alias from topo and add it if it doesn't exist.
if val, ok := tw.tablets[alias]; ok {
// check if the host and port have changed. If yes, replace tablet.
oldKey := TabletToMapKey(val.tablet)
Expand All @@ -230,7 +231,7 @@ func (tw *TopologyWatcher) loadTablets() {
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
}
} else {
// This is a new tablet record, let's add it to the healthcheck
// This is a new tablet record, let's add it to the HealthCheck.
tw.healthcheck.AddTablet(newVal.tablet)
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)
}
Expand All @@ -252,8 +253,8 @@ func (tw *TopologyWatcher) loadTablets() {
close(tw.firstLoadChan)
}

// iterate through the tablets in a stable order and compute a
// checksum of the tablet map
// Iterate through the tablets in a stable order and compute a
// checksum of the tablet map.
sort.Strings(tabletAliasStrs)
var buf bytes.Buffer
for _, alias := range tabletAliasStrs {
Expand All @@ -269,15 +270,15 @@ func (tw *TopologyWatcher) loadTablets() {

}

// RefreshLag returns the time since the last refresh
// RefreshLag returns the time since the last refresh.
func (tw *TopologyWatcher) RefreshLag() time.Duration {
tw.mu.Lock()
defer tw.mu.Unlock()

return time.Since(tw.lastRefresh)
}

// TopoChecksum returns the checksum of the current state of the topo
// TopoChecksum returns the checksum of the current state of the topo.
func (tw *TopologyWatcher) TopoChecksum() uint32 {
tw.mu.Lock()
defer tw.mu.Unlock()
Expand All @@ -286,7 +287,7 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 {
}

// TabletFilter is an interface that can be given to a TopologyWatcher
// to be applied as an additional filter on the list of tablets returned by its getTablets function
// to be applied as an additional filter on the list of tablets returned by its getTablets function.
type TabletFilter interface {
// IsIncluded returns whether tablet is included in this filter
IsIncluded(tablet *topodata.Tablet) bool
Expand All @@ -300,18 +301,18 @@ type FilterByShard struct {
}

// filterShard describes a filter for a given shard or keyrange inside
// a keyspace
// a keyspace.
type filterShard struct {
keyspace string
shard string
keyRange *topodata.KeyRange // only set if shard is also a KeyRange
}

// NewFilterByShard creates a new FilterByShard on top of an existing
// LegacyTabletRecorder. Each filter is a keyspace|shard entry, where shard
// NewFilterByShard creates a new FilterByShard for use by a
// TopologyWatcher. Each filter is a keyspace|shard entry, where shard
// can either be a shard name, or a keyrange. All tablets that match
// at least one keyspace|shard tuple will be forwarded to the
// underlying LegacyTabletRecorder.
// at least one keyspace|shard tuple will be forwarded by the
// TopologyWatcher to its consumer.
func NewFilterByShard(filters []string) (*FilterByShard, error) {
m := make(map[string][]*filterShard)
for _, filter := range filters {
Expand Down Expand Up @@ -348,8 +349,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) {
}, nil
}

// IsIncluded returns true iff the tablet's keyspace and shard should be
// forwarded to the underlying LegacyTabletRecorder.
// IsIncluded returns true iff the tablet's keyspace and shard match what we have.
func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
canonical, kr, err := topo.ValidateShardName(tablet.Shard)
if err != nil {
Expand All @@ -370,15 +370,14 @@ func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
return false
}

// FilterByKeyspace is a filter that filters tablets by
// keyspace
// FilterByKeyspace is a filter that filters tablets by keyspace.
type FilterByKeyspace struct {
keyspaces map[string]bool
}

// NewFilterByKeyspace creates a new FilterByKeyspace.
// Each filter is a keyspace entry. All tablets that match
// a keyspace will be forwarded to the underlying LegacyTabletRecorder.
// a keyspace will be forwarded to the TopologyWatcher's consumer.
func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
m := make(map[string]bool)
for _, keyspace := range selectedKeyspaces {
Expand All @@ -390,8 +389,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
}
}

// IsIncluded returns true if the tablet's keyspace should be
// forwarded to the underlying LegacyTabletRecorder.
// IsIncluded returns true if the tablet's keyspace matches what we have.
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
Expand Down
Loading

0 comments on commit 4a92ae7

Please sign in to comment.