diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 7691e0557eb..c8b718e6201 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -274,6 +274,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( Compression: c.Consistent.Compression, FlushConcurrency: c.Consistent.FlushConcurrency, } + if c.Consistent.MemoryUsage != nil { + res.Consistent.MemoryUsage = &config.ConsistentMemoryUsage{ + MemoryQuotaPercentage: c.Consistent.MemoryUsage.MemoryQuotaPercentage, + EventCachePercentage: c.Consistent.MemoryUsage.EventCachePercentage, + } + } } if c.Sink != nil { var dispatchRules []*config.DispatchRule @@ -505,7 +511,14 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { Compression: cloned.Consistent.Compression, FlushConcurrency: cloned.Consistent.FlushConcurrency, } + if cloned.Consistent.MemoryUsage != nil { + res.Consistent.MemoryUsage = &ConsistentMemoryUsage{ + MemoryQuotaPercentage: cloned.Consistent.MemoryUsage.MemoryQuotaPercentage, + EventCachePercentage: cloned.Consistent.MemoryUsage.EventCachePercentage, + } + } } + if cloned.Mounter != nil { res.Mounter = &MounterConfig{ WorkerNum: cloned.Mounter.WorkerNum, @@ -693,6 +706,14 @@ type ConsistentConfig struct { UseFileBackend bool `json:"use_file_backend"` Compression string `json:"compression,omitempty"` FlushConcurrency int `json:"flush_concurrency,omitempty"` + + MemoryUsage *ConsistentMemoryUsage `json:"memory_usage"` +} + +// ConsistentMemoryUsage represents memory usage of Consistent module. +type ConsistentMemoryUsage struct { + MemoryQuotaPercentage uint64 `json:"memory_quota_percentage"` + EventCachePercentage uint64 `json:"event_cache_percentage"` } // ChangefeedSchedulerConfig is per changefeed scheduler settings. diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 0ef9b028e54..e15c483f895 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -64,6 +64,10 @@ var defaultAPIConfig = &ReplicaConfig{ FlushWorkerNum: redo.DefaultFlushWorkerNum, Storage: "", UseFileBackend: false, + MemoryUsage: &ConsistentMemoryUsage{ + MemoryQuotaPercentage: 50, + EventCachePercentage: 0, + }, }, ChangefeedErrorStuckDuration: &JSONDuration{config. GetDefaultReplicaConfig().ChangefeedErrorStuckDuration}, diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 4e308b2775b..d7101b4b969 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -160,6 +160,7 @@ func New( metricsTableSinkTotalRows: metricsTableSinkTotalRows, } + totalQuota := changefeedInfo.Config.MemoryQuota if redoDMLMgr != nil && redoDMLMgr.Enabled() { m.redoDMLMgr = redoDMLMgr m.redoProgressHeap = newTableProgresses() @@ -167,13 +168,22 @@ func New( m.redoTaskChan = make(chan *redoTask) m.redoWorkerAvailable = make(chan struct{}, 1) - // Use 3/4 memory quota as redo quota, and 1/2 again for redo cache. - m.sinkMemQuota = memquota.NewMemQuota(changefeedID, changefeedInfo.Config.MemoryQuota/4*1, "sink") - redoQuota := changefeedInfo.Config.MemoryQuota / 4 * 3 + consistentMemoryUsage := changefeedInfo.Config.Consistent.MemoryUsage + if consistentMemoryUsage == nil { + consistentMemoryUsage = config.GetDefaultReplicaConfig().Consistent.MemoryUsage + } + + redoQuota := totalQuota * consistentMemoryUsage.MemoryQuotaPercentage / 100 + sinkQuota := totalQuota - redoQuota + m.sinkMemQuota = memquota.NewMemQuota(changefeedID, sinkQuota, "sink") m.redoMemQuota = memquota.NewMemQuota(changefeedID, redoQuota, "redo") - m.eventCache = newRedoEventCache(changefeedID, redoQuota/2*1) + + eventCache := redoQuota * consistentMemoryUsage.EventCachePercentage / 100 + if eventCache > 0 { + m.eventCache = newRedoEventCache(changefeedID, eventCache) + } } else { - m.sinkMemQuota = memquota.NewMemQuota(changefeedID, changefeedInfo.Config.MemoryQuota, "sink") + m.sinkMemQuota = memquota.NewMemQuota(changefeedID, totalQuota, "sink") m.redoMemQuota = memquota.NewMemQuota(changefeedID, 0, "redo") } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 7c66cb17f99..112b0adcc6a 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -68,10 +68,13 @@ func createManagerWithMemEngine( } func getChangefeedInfo() *model.ChangeFeedInfo { + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Consistent.MemoryUsage.MemoryQuotaPercentage = 75 + replicaConfig.Consistent.MemoryUsage.EventCachePercentage = 50 return &model.ChangeFeedInfo{ Error: nil, SinkURI: "blackhole://", - Config: config.GetDefaultReplicaConfig(), + Config: replicaConfig, } } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 2d388c5d5e2..3e83122fa3a 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -65,7 +65,11 @@ const ( "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", - "use-file-backend": false + "use-file-backend": false, + "memory-usage": { + "memory-quota-percentage": 50, + "event-cache-percentage": 0 + } }, "changefeed-error-stuck-duration": 1800000000000, "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" @@ -227,7 +231,11 @@ const ( "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", - "use-file-backend": false + "use-file-backend": false, + "memory-usage": { + "memory-quota-percentage": 50, + "event-cache-percentage": 0 + } }, "changefeed-error-stuck-duration": 1800000000000, "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" @@ -293,7 +301,11 @@ const ( "encoding-worker-num": 16, "flush-worker-num": 8, "storage": "", - "use-file-backend": false + "use-file-backend": false, + "memory-usage": { + "memory-quota-percentage": 50, + "event-cache-percentage": 0 + } }, "changefeed-error-stuck-duration": 1800000000000, "sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION" diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index c8f93c6eac8..3409cf8327b 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -35,6 +35,17 @@ type ConsistentConfig struct { UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` Compression string `toml:"compression" json:"compression"` FlushConcurrency int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + + MemoryUsage *ConsistentMemoryUsage `toml:"memory-usage" json:"memory-usage"` +} + +// ConsistentMemoryUsage represents memory usage of Consistent module. +type ConsistentMemoryUsage struct { + // ReplicaConfig.MemoryQuota * MemoryQuotaPercentage / 100 will be used for redo events. + MemoryQuotaPercentage uint64 `toml:"memory-quota-percentage" json:"memory-quota-percentage"` + // ReplicaConfig.MemoryQuota * MemoryQuotaPercentage / 100 * EventCachePercentage / 100 + // will be used for redo cache. + EventCachePercentage uint64 `toml:"event-cache-percentage" json:"event-cache-percentage"` } // ValidateAndAdjust validates the consistency config and adjusts it if necessary. diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 520a34f48db..634cb230b35 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -82,6 +82,10 @@ var defaultReplicaConfig = &ReplicaConfig{ Storage: "", UseFileBackend: false, Compression: "", + MemoryUsage: &ConsistentMemoryUsage{ + MemoryQuotaPercentage: 50, + EventCachePercentage: 0, + }, }, ChangefeedErrorStuckDuration: time.Minute * 30, SQLMode: defaultSQLMode,