diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md
index ebc0c485fc1..fdc68ca7689 100644
--- a/changelog/22.0/22.0.0/summary.md
+++ b/changelog/22.0/22.0.0/summary.md
@@ -11,6 +11,9 @@
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
+- **[Minor Changes](#minor-changes)**
+ - **[VTTablet](#vttablet)**
+ - [VTTablet: Query Consolidation Waiter Cap](#vttablet-consolidator-query-waiter-cap)
## Major Changes
@@ -67,3 +70,11 @@ To upgrade to the newer version of the configuration file, first switch to using
- `twopc_abandon_age` flag now supports values in the time.Duration format (e.g., 1s, 2m, 1h).
While the flag will continue to accept float values (interpreted as seconds) for backward compatibility,
**float inputs are deprecated** and will be removed in a future release.
+
+## Minor Changes
+
+### VTTablet
+
+#### --consolidator-query-waiter-cap flag
+
+A new CLI flag `--consolidator-query-waiter-cap` to set the maximum number of clients allowed to wait on the consolidator. The default value is set to 0 for unlimited wait. Users can adjust this value based on the performance of VTTablet to avoid excessive memory usage and the risk of being OOMKilled, particularly in Kubernetes deployments.
diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt
index 7348fcf1753..dcebe87ef72 100644
--- a/go/flags/endtoend/vtcombo.txt
+++ b/go/flags/endtoend/vtcombo.txt
@@ -52,6 +52,7 @@ Flags:
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
+ --consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
--consul_auth_static_file string JSON File to read the topos/tokens from.
diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt
index e4c6fde66af..0b510c90adc 100644
--- a/go/flags/endtoend/vttablet.txt
+++ b/go/flags/endtoend/vttablet.txt
@@ -86,6 +86,7 @@ Flags:
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
+ --consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
--consul_auth_static_file string JSON File to read the topos/tokens from.
diff --git a/go/sync2/consolidator.go b/go/sync2/consolidator.go
index 401daaef1f1..b777a066e2e 100644
--- a/go/sync2/consolidator.go
+++ b/go/sync2/consolidator.go
@@ -40,6 +40,7 @@ type PendingResult interface {
SetResult(*sqltypes.Result)
Result() *sqltypes.Result
Wait()
+ AddWaiterCounter(int64) *int64
}
type consolidator struct {
@@ -77,6 +78,7 @@ func (co *consolidator) Create(query string) (PendingResult, bool) {
defer co.mu.Unlock()
var r *pendingResult
if r, ok := co.queries[query]; ok {
+ r.AddWaiterCounter(int64(1))
return r, false
}
r = &pendingResult{consolidator: co, query: query}
@@ -122,17 +124,23 @@ func (rs *pendingResult) Wait() {
rs.executing.RLock()
}
+func (rs *pendingResult) AddWaiterCounter(c int64) *int64 {
+ atomic.AddInt64(rs.consolidator.totalWaiterCount, c)
+ return rs.consolidator.totalWaiterCount
+}
+
// ConsolidatorCache is a thread-safe object used for counting how often recent
// queries have been consolidated.
// It is also used by the txserializer package to count how often transactions
// have been queued and had to wait because they targeted the same row (range).
type ConsolidatorCache struct {
*cache.LRUCache[*ccount]
+ totalWaiterCount *int64
}
// NewConsolidatorCache creates a new cache with the given capacity.
func NewConsolidatorCache(capacity int64) *ConsolidatorCache {
- return &ConsolidatorCache{cache.NewLRUCache[*ccount](capacity)}
+ return &ConsolidatorCache{cache.NewLRUCache[*ccount](capacity), new(int64)}
}
// Record increments the count for "query" by 1.
diff --git a/go/sync2/consolidator_test.go b/go/sync2/consolidator_test.go
index 132a253ba29..5437bb335a6 100644
--- a/go/sync2/consolidator_test.go
+++ b/go/sync2/consolidator_test.go
@@ -18,11 +18,42 @@ package sync2
import (
"reflect"
+ "sync"
"testing"
"vitess.io/vitess/go/sqltypes"
)
+func TestAddWaiterCount(t *testing.T) {
+ con := NewConsolidator()
+ sql := "select * from SomeTable"
+ pr, _ := con.Create(sql)
+ var wgAdd sync.WaitGroup
+ var wgSub sync.WaitGroup
+
+ var concurrent = 1000
+
+ for i := 0; i < concurrent; i++ {
+ wgAdd.Add(1)
+ wgSub.Add(1)
+ go func() {
+ defer wgAdd.Done()
+ pr.AddWaiterCounter(1)
+ }()
+ go func() {
+ defer wgSub.Done()
+ pr.AddWaiterCounter(-1)
+ }()
+ }
+
+ wgAdd.Wait()
+ wgSub.Wait()
+
+ if *pr.AddWaiterCounter(0) != 0 {
+ t.Fatalf("Expect 0 totalWaiterCount but got: %v", *pr.AddWaiterCounter(0))
+ }
+}
+
func TestConsolidator(t *testing.T) {
con := NewConsolidator()
sql := "select * from SomeTable"
diff --git a/go/sync2/fake_consolidator.go b/go/sync2/fake_consolidator.go
index 64c59e78a5a..aadee1d37ce 100644
--- a/go/sync2/fake_consolidator.go
+++ b/go/sync2/fake_consolidator.go
@@ -112,3 +112,8 @@ func (fr *FakePendingResult) SetResult(result *sqltypes.Result) {
func (fr *FakePendingResult) Wait() {
fr.WaitCalls++
}
+
+// AddWaiterCounter is currently a no-op.
+func (fr *FakePendingResult) AddWaiterCounter(int64) *int64 {
+ return new(int64)
+}
diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go
index 519b60b79d6..95535fa5556 100644
--- a/go/vt/vttablet/tabletserver/query_executor.go
+++ b/go/vt/vttablet/tabletserver/query_executor.go
@@ -716,10 +716,14 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
q.SetErr(err)
}
} else {
- qre.logStats.QuerySources |= tabletenv.QuerySourceConsolidator
- startTime := time.Now()
- q.Wait()
- qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
+ waiterCap := qre.tsv.config.ConsolidatorQueryWaiterCap
+ if waiterCap == 0 || *q.AddWaiterCounter(0) <= waiterCap {
+ qre.logStats.QuerySources |= tabletenv.QuerySourceConsolidator
+ startTime := time.Now()
+ q.Wait()
+ qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
+ }
+ q.AddWaiterCounter(-1)
}
if q.Err() != nil {
return nil, q.Err()
diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go
index 994999f2368..bdebdfa5fa2 100644
--- a/go/vt/vttablet/tabletserver/tabletenv/config.go
+++ b/go/vt/vttablet/tabletserver/tabletenv/config.go
@@ -195,6 +195,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
fs.Int64Var(¤tConfig.ConsolidatorStreamQuerySize, "consolidator-stream-query-size", defaultConfig.ConsolidatorStreamQuerySize, "Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator.")
fs.Int64Var(¤tConfig.ConsolidatorStreamTotalSize, "consolidator-stream-total-size", defaultConfig.ConsolidatorStreamTotalSize, "Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator.")
+ fs.Int64Var(¤tConfig.ConsolidatorQueryWaiterCap, "consolidator-query-waiter-cap", 0, "Configure the maximum number of clients allowed to wait on the consolidator.")
fs.DurationVar(&healthCheckInterval, "health_check_interval", defaultConfig.Healthcheck.Interval, "Interval between health checks")
fs.DurationVar(°radedThreshold, "degraded_threshold", defaultConfig.Healthcheck.DegradedThreshold, "replication lag after which a replica is considered degraded")
fs.DurationVar(&unhealthyThreshold, "unhealthy_threshold", defaultConfig.Healthcheck.UnhealthyThreshold, "replication lag after which a replica is considered unhealthy")
@@ -320,6 +321,7 @@ type TabletConfig struct {
StreamBufferSize int `json:"streamBufferSize,omitempty"`
ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"`
ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"`
+ ConsolidatorQueryWaiterCap int64 `json:"consolidatorMaxQueryWait,omitempty"`
QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"`
QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"`
SchemaReloadInterval time.Duration `json:"schemaReloadIntervalSeconds,omitempty"`