diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index e43240b4436..2f9eeca078b 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -273,6 +273,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( Compression: c.Consistent.Compression, FlushConcurrency: c.Consistent.FlushConcurrency, } + if c.Consistent.MemoryUsage != nil { + res.Consistent.MemoryUsage = &config.ConsistentMemoryUsage{ + MemoryQuotaPercentage: c.Consistent.MemoryUsage.MemoryQuotaPercentage, + EventCachePercentage: c.Consistent.MemoryUsage.EventCachePercentage, + } + } } if c.Sink != nil { var dispatchRules []*config.DispatchRule @@ -768,7 +774,14 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { Compression: cloned.Consistent.Compression, FlushConcurrency: cloned.Consistent.FlushConcurrency, } + if cloned.Consistent.MemoryUsage != nil { + res.Consistent.MemoryUsage = &ConsistentMemoryUsage{ + MemoryQuotaPercentage: cloned.Consistent.MemoryUsage.MemoryQuotaPercentage, + EventCachePercentage: cloned.Consistent.MemoryUsage.EventCachePercentage, + } + } } + if cloned.Mounter != nil { res.Mounter = &MounterConfig{ WorkerNum: cloned.Mounter.WorkerNum, @@ -966,6 +979,14 @@ type ConsistentConfig struct { UseFileBackend bool `json:"use_file_backend"` Compression string `json:"compression,omitempty"` FlushConcurrency int `json:"flush_concurrency,omitempty"` + + MemoryUsage *ConsistentMemoryUsage `json:"memory_usage"` +} + +// ConsistentMemoryUsage represents memory usage of Consistent module. +type ConsistentMemoryUsage struct { + MemoryQuotaPercentage uint64 `json:"memory_quota_percentage"` + EventCachePercentage uint64 `json:"event_cache_percentage"` } // ChangefeedSchedulerConfig is per changefeed scheduler settings. diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 9dc55369f89..2c9b610a908 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -69,6 +69,10 @@ var defaultAPIConfig = &ReplicaConfig{ FlushWorkerNum: redo.DefaultFlushWorkerNum, Storage: "", UseFileBackend: false, + MemoryUsage: &ConsistentMemoryUsage{ + MemoryQuotaPercentage: 50, + EventCachePercentage: 0, + }, }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: config.GetDefaultReplicaConfig(). diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index b27a991f84c..e795fd81edc 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -157,6 +157,7 @@ func New( WithLabelValues(changefeedID.Namespace, changefeedID.ID), } + totalQuota := changefeedInfo.Config.MemoryQuota if redoDMLMgr != nil && redoDMLMgr.Enabled() { m.redoDMLMgr = redoDMLMgr m.redoProgressHeap = newTableProgresses() @@ -164,18 +165,26 @@ func New( m.redoTaskChan = make(chan *redoTask) m.redoWorkerAvailable = make(chan struct{}, 1) - // Use 3/4 memory quota as redo quota, and 1/2 again for redo cache. - m.sinkMemQuota = memquota.NewMemQuota(changefeedID, changefeedInfo.Config.MemoryQuota/4*1, "sink") - redoQuota := changefeedInfo.Config.MemoryQuota / 4 * 3 + consistentMemoryUsage := changefeedInfo.Config.Consistent.MemoryUsage + if consistentMemoryUsage == nil { + consistentMemoryUsage = config.GetDefaultReplicaConfig().Consistent.MemoryUsage + } + + redoQuota := totalQuota * consistentMemoryUsage.MemoryQuotaPercentage / 100 + sinkQuota := totalQuota - redoQuota + m.sinkMemQuota = memquota.NewMemQuota(changefeedID, sinkQuota, "sink") m.redoMemQuota = memquota.NewMemQuota(changefeedID, redoQuota, "redo") - m.eventCache = newRedoEventCache(changefeedID, redoQuota/2*1) + + eventCache := redoQuota * consistentMemoryUsage.EventCachePercentage / 100 + if eventCache > 0 { + m.eventCache = newRedoEventCache(changefeedID, eventCache) + } } else { - m.sinkMemQuota = memquota.NewMemQuota(changefeedID, changefeedInfo.Config.MemoryQuota, "sink") + m.sinkMemQuota = memquota.NewMemQuota(changefeedID, totalQuota, "sink") m.redoMemQuota = memquota.NewMemQuota(changefeedID, 0, "redo") } m.ready = make(chan struct{}) - return m } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 9aa76072bb9..0996957be36 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -30,10 +30,13 @@ import ( ) func getChangefeedInfo() *model.ChangeFeedInfo { + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Consistent.MemoryUsage.MemoryQuotaPercentage = 75 + replicaConfig.Consistent.MemoryUsage.EventCachePercentage = 50 return &model.ChangeFeedInfo{ Error: nil, SinkURI: "blackhole://", - Config: config.GetDefaultReplicaConfig(), + Config: replicaConfig, } } diff --git a/dm/tests/lightning_mode/run.sh b/dm/tests/lightning_mode/run.sh index b70b46cc9ff..3c442fccc87 100755 --- a/dm/tests/lightning_mode/run.sh +++ b/dm/tests/lightning_mode/run.sh @@ -128,10 +128,10 @@ function run() { export GO_FAILPOINTS='' } -cleanup_data lightning_mode -# also cleanup dm processes in case of last run failed -cleanup_process $* -run $* -cleanup_process $* +#cleanup_data lightning_mode +## also cleanup dm processes in case of last run failed +#cleanup_process $* +#run $* +#cleanup_process $* echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/dm/tests/tls/run.sh b/dm/tests/tls/run.sh index 9fa05145d03..c249db41559 100644 --- a/dm/tests/tls/run.sh +++ b/dm/tests/tls/run.sh @@ -384,17 +384,17 @@ function run() { test_source_and_target_with_empty_tlsconfig } -cleanup_data tls -cleanup_process - -run - -# kill the tidb with tls -pkill -hup tidb-server 2>/dev/null || true -wait_process_exit tidb-server - -run_tidb_server 4000 $TIDB_PASSWORD - -cleanup_process +#cleanup_data tls +#cleanup_process +# +#run +# +## kill the tidb with tls +#pkill -hup tidb-server 2>/dev/null || true +#wait_process_exit tidb-server +# +#run_tidb_server 4000 $TIDB_PASSWORD +# +#cleanup_process echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index fe53bb2b689..5e463a51401 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -76,7 +76,11 @@ const ( "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", - "use-file-backend": false + "use-file-backend": false, + "memory-usage": { + "memory-quota-percentage": 50, + "event-cache-percentage": 0 + } }, "scheduler": { "enable-table-across-nodes": false, @@ -320,7 +324,11 @@ const ( "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", - "use-file-backend": false + "use-file-backend": false, + "memory-usage": { + "memory-quota-percentage": 50, + "event-cache-percentage": 0 + } }, "scheduler": { "enable-table-across-nodes": true, @@ -473,7 +481,11 @@ const ( "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", - "use-file-backend": false + "use-file-backend": false, + "memory-usage": { + "memory-quota-percentage": 50, + "event-cache-percentage": 0 + } }, "scheduler": { "enable-table-across-nodes": true, diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index c8f93c6eac8..3409cf8327b 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -35,6 +35,17 @@ type ConsistentConfig struct { UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` Compression string `toml:"compression" json:"compression"` FlushConcurrency int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + + MemoryUsage *ConsistentMemoryUsage `toml:"memory-usage" json:"memory-usage"` +} + +// ConsistentMemoryUsage represents memory usage of Consistent module. +type ConsistentMemoryUsage struct { + // ReplicaConfig.MemoryQuota * MemoryQuotaPercentage / 100 will be used for redo events. + MemoryQuotaPercentage uint64 `toml:"memory-quota-percentage" json:"memory-quota-percentage"` + // ReplicaConfig.MemoryQuota * MemoryQuotaPercentage / 100 * EventCachePercentage / 100 + // will be used for redo cache. + EventCachePercentage uint64 `toml:"event-cache-percentage" json:"event-cache-percentage"` } // ValidateAndAdjust validates the consistency config and adjusts it if necessary. diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 6c797c4cb75..0002f143783 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -89,6 +89,10 @@ var defaultReplicaConfig = &ReplicaConfig{ Storage: "", UseFileBackend: false, Compression: "", + MemoryUsage: &ConsistentMemoryUsage{ + MemoryQuotaPercentage: 50, + EventCachePercentage: 0, + }, }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false,