From 865ccc47d979521fcc4f9b81ab634eb1a8331d8e Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Tue, 31 Oct 2023 14:20:07 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #9959 Signed-off-by: ti-chi-bot --- cdc/api/v2/model.go | 31 +++++++++++++++++-------- cdc/api/v2/model_test.go | 11 +++++---- cdc/owner/changefeed_test.go | 7 +++--- cdc/processor/processor_test.go | 10 ++++++++ cdc/redo/meta_manager.go | 8 ++++++- cdc/redo/meta_manager_test.go | 27 +++++++++++---------- pkg/config/config_test_data.go | 3 +++ pkg/config/consistent.go | 20 ++++++++++++---- pkg/config/replica_config.go | 11 +++++---- pkg/redo/config.go | 2 ++ tests/integration_tests/api_v2/model.go | 11 +++++---- 11 files changed, 95 insertions(+), 46 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 3206f2f1899..6b585d94cf2 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -260,11 +260,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( } if c.Consistent != nil { res.Consistent = &config.ConsistentConfig{ - Level: c.Consistent.Level, - MaxLogSize: c.Consistent.MaxLogSize, - FlushIntervalInMs: c.Consistent.FlushIntervalInMs, - Storage: c.Consistent.Storage, - UseFileBackend: c.Consistent.UseFileBackend, + Level: c.Consistent.Level, + MaxLogSize: c.Consistent.MaxLogSize, + FlushIntervalInMs: c.Consistent.FlushIntervalInMs, + MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs, + Storage: c.Consistent.Storage, + UseFileBackend: c.Consistent.UseFileBackend, } } if c.Sink != nil { @@ -627,11 +628,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { } if cloned.Consistent != nil { res.Consistent = &ConsistentConfig{ - Level: cloned.Consistent.Level, - MaxLogSize: cloned.Consistent.MaxLogSize, - FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs, - Storage: cloned.Consistent.Storage, - UseFileBackend: cloned.Consistent.UseFileBackend, + Level: cloned.Consistent.Level, + MaxLogSize: cloned.Consistent.MaxLogSize, + FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs, + MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs, + Storage: cloned.Consistent.Storage, + UseFileBackend: cloned.Consistent.UseFileBackend, } } if cloned.Mounter != nil { @@ -807,11 +809,20 @@ type ColumnSelector struct { // ConsistentConfig represents replication consistency config for a changefeed // This is a duplicate of config.ConsistentConfig type ConsistentConfig struct { +<<<<<<< HEAD Level string `json:"level"` MaxLogSize int64 `json:"max_log_size"` FlushIntervalInMs int64 `json:"flush_interval"` Storage string `json:"storage"` UseFileBackend bool `json:"use_file_backend"` +======= + Level string `json:"level,omitempty"` + MaxLogSize int64 `json:"max_log_size"` + FlushIntervalInMs int64 `json:"flush_interval"` + MetaFlushIntervalInMs int64 `json:"meta_flush_interval"` + Storage string `json:"storage,omitempty"` + UseFileBackend bool `json:"use_file_backend"` +>>>>>>> afe43311da (redo(ticdc): add meta flush interval configuration (#9959)) } // ChangefeedSchedulerConfig is per changefeed scheduler settings. diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 2ed6dc97f84..1a7b0c6150f 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -58,11 +58,12 @@ var defaultAPIConfig = &ReplicaConfig{ AdvanceTimeoutInSec: util.AddressOf(uint(150)), }, Consistent: &ConsistentConfig{ - Level: "none", - MaxLogSize: 64, - FlushIntervalInMs: redo.DefaultFlushIntervalInMs, - Storage: "", - UseFileBackend: false, + Level: "none", + MaxLogSize: 64, + FlushIntervalInMs: redo.DefaultFlushIntervalInMs, + MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs, + Storage: "", + UseFileBackend: false, }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: config.GetDefaultReplicaConfig(). diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 81fb4716b2b..075a681fbb5 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -514,9 +514,10 @@ func TestRemoveChangefeed(t *testing.T) { info := ctx.ChangefeedVars().Info dir := t.TempDir() info.Config.Consistent = &config.ConsistentConfig{ - Level: "eventual", - Storage: filepath.Join("nfs://", dir), - FlushIntervalInMs: redo.DefaultFlushIntervalInMs, + Level: "eventual", + Storage: filepath.Join("nfs://", dir), + FlushIntervalInMs: redo.DefaultFlushIntervalInMs, + MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs, } ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: ctx.ChangefeedVars().ID, diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 9bc810a6c4b..6534792136a 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -70,12 +70,22 @@ func newProcessor4Test( } else { tmpDir := t.TempDir() redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID) +<<<<<<< HEAD dmlMgr, err := redo.NewDMLManager(ctx, &config.ConsistentConfig{ Level: string(redoPkg.ConsistentLevelEventual), MaxLogSize: redoPkg.DefaultMaxLogSize, FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs, Storage: "file://" + redoDir, UseFileBackend: false, +======= + dmlMgr, err := redo.NewDMLManager(ctx, changefeedID, &config.ConsistentConfig{ + Level: string(redoPkg.ConsistentLevelEventual), + MaxLogSize: redoPkg.DefaultMaxLogSize, + FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs, + MetaFlushIntervalInMs: redoPkg.DefaultMetaFlushIntervalInMs, + Storage: "file://" + redoDir, + UseFileBackend: false, +>>>>>>> afe43311da (redo(ticdc): add meta flush interval configuration (#9959)) }) require.NoError(t, err) p.redo.r = dmlMgr diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 54f888a13f0..6e1baf456ac 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -120,7 +120,13 @@ func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaMan changeFeedID: contextutil.ChangefeedIDFromCtx(ctx), uuidGenerator: uuid.NewGenerator(), enabled: true, - flushIntervalInMs: cfg.FlushIntervalInMs, + flushIntervalInMs: cfg.MetaFlushIntervalInMs, + } + + if m.flushIntervalInMs < redo.MinFlushIntervalInMs { + log.Warn("redo flush interval is too small, use default value", + zap.Int64("interval", m.flushIntervalInMs)) + m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs } uri, err := storage.ParseRawURL(cfg.Storage) diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index 8879e7c4864..ad2384ba637 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -72,10 +72,11 @@ func TestInitAndWriteMeta(t *testing.T) { startTs := uint64(10) cfg := &config.ConsistentConfig{ - Level: string(redo.ConsistentLevelEventual), - MaxLogSize: redo.DefaultMaxLogSize, - Storage: uri.String(), - FlushIntervalInMs: redo.MinFlushIntervalInMs, + Level: string(redo.ConsistentLevelEventual), + MaxLogSize: redo.DefaultMaxLogSize, + Storage: uri.String(), + FlushIntervalInMs: redo.MinFlushIntervalInMs, + MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, } m, err := NewMetaManagerWithInit(ctx, cfg, startTs) require.NoError(t, err) @@ -136,10 +137,11 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { startTs := uint64(10) cfg := &config.ConsistentConfig{ - Level: string(redo.ConsistentLevelEventual), - MaxLogSize: redo.DefaultMaxLogSize, - Storage: uri.String(), - FlushIntervalInMs: redo.MinFlushIntervalInMs, + Level: string(redo.ConsistentLevelEventual), + MaxLogSize: redo.DefaultMaxLogSize, + Storage: uri.String(), + FlushIntervalInMs: redo.MinFlushIntervalInMs, + MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, } m, err := NewMetaManagerWithInit(ctx, cfg, startTs) require.NoError(t, err) @@ -261,10 +263,11 @@ func TestGCAndCleanup(t *testing.T) { startTs := uint64(3) cfg := &config.ConsistentConfig{ - Level: string(redo.ConsistentLevelEventual), - MaxLogSize: redo.DefaultMaxLogSize, - Storage: uri.String(), - FlushIntervalInMs: redo.MinFlushIntervalInMs, + Level: string(redo.ConsistentLevelEventual), + MaxLogSize: redo.DefaultMaxLogSize, + Storage: uri.String(), + FlushIntervalInMs: redo.MinFlushIntervalInMs, + MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, } m, err := NewMetaManagerWithInit(ctx, cfg, startTs) require.NoError(t, err) diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 5a44aac5aac..b652a3760ce 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -60,6 +60,7 @@ const ( "level": "none", "max-log-size": 64, "flush-interval": 2000, + "meta-flush-interval": 200, "storage": "", "use-file-backend": false }, @@ -273,6 +274,7 @@ const ( "level": "none", "max-log-size": 64, "flush-interval": 2000, + "meta-flush-interval": 200, "storage": "", "use-file-backend": false }, @@ -406,6 +408,7 @@ const ( "level": "none", "max-log-size": 64, "flush-interval": 2000, + "meta-flush-interval": 200, "storage": "", "use-file-backend": false }, diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 636edcf865f..4a521f98da5 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -24,11 +24,12 @@ import ( // ConsistentConfig represents replication consistency config for a changefeed. type ConsistentConfig struct { - Level string `toml:"level" json:"level"` - MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"` - FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"` - Storage string `toml:"storage" json:"storage"` - UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` + Level string `toml:"level" json:"level"` + MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"` + FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"` + MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"` + Storage string `toml:"storage" json:"storage"` + UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` } // ValidateAndAdjust validates the consistency config and adjusts it if necessary. @@ -50,6 +51,15 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { c.FlushIntervalInMs, redo.MinFlushIntervalInMs)) } + if c.MetaFlushIntervalInMs == 0 { + c.MetaFlushIntervalInMs = redo.DefaultMetaFlushIntervalInMs + } + if c.MetaFlushIntervalInMs < redo.MinFlushIntervalInMs { + return cerror.ErrInvalidReplicaConfig.FastGenByArgs( + fmt.Sprintf("The consistent.meta-flush-interval:%d must be equal or greater than %d", + c.MetaFlushIntervalInMs, redo.MinFlushIntervalInMs)) + } + uri, err := storage.ParseRawURL(c.Storage) if err != nil { return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs( diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index c08b1bc6879..20eaf2ace25 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -69,11 +69,12 @@ var defaultReplicaConfig = &ReplicaConfig{ AdvanceTimeoutInSec: util.AddressOf(DefaultAdvanceTimeoutInSec), }, Consistent: &ConsistentConfig{ - Level: "none", - MaxLogSize: redo.DefaultMaxLogSize, - FlushIntervalInMs: redo.DefaultFlushIntervalInMs, - Storage: "", - UseFileBackend: false, + Level: "none", + MaxLogSize: redo.DefaultMaxLogSize, + FlushIntervalInMs: redo.DefaultFlushIntervalInMs, + MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs, + Storage: "", + UseFileBackend: false, }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false, diff --git a/pkg/redo/config.go b/pkg/redo/config.go index fbcdb3f1eb3..d0562b83be5 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -44,6 +44,8 @@ const ( FlushWarnDuration = time.Second * 20 // DefaultFlushIntervalInMs is the default flush interval for redo log. DefaultFlushIntervalInMs = 2000 + // DefaultMetaFlushIntervalInMs is the default flush interval for redo meta. + DefaultMetaFlushIntervalInMs = 200 // MinFlushIntervalInMs is the minimum flush interval for redo log. MinFlushIntervalInMs = 50 diff --git a/tests/integration_tests/api_v2/model.go b/tests/integration_tests/api_v2/model.go index 3d7ac7c28f3..2e450c806c6 100644 --- a/tests/integration_tests/api_v2/model.go +++ b/tests/integration_tests/api_v2/model.go @@ -273,11 +273,12 @@ type ColumnSelector struct { // ConsistentConfig represents replication consistency config for a changefeed // This is a duplicate of config.ConsistentConfig type ConsistentConfig struct { - Level string `json:"level"` - MaxLogSize int64 `json:"max_log_size"` - FlushIntervalInMs int64 `json:"flush_interval"` - Storage string `json:"storage"` - UseFileBackend bool `json:"use_file_backend"` + Level string `json:"level"` + MaxLogSize int64 `json:"max_log_size"` + FlushIntervalInMs int64 `json:"flush_interval"` + MetaFlushIntervalInMs int64 `json:"meta_flush_interval"` + Storage string `json:"storage"` + UseFileBackend bool `json:"use_file_backend"` } // ChangefeedSchedulerConfig is per changefeed scheduler settings. From cd7ede00b442ce25e014f52166f5608abad26d83 Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Mon, 4 Dec 2023 17:41:59 +0800 Subject: [PATCH 2/3] add meta flush interval configuration --- cdc/api/v2/model.go | 8 -------- cdc/processor/processor_test.go | 9 --------- 2 files changed, 17 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 6b585d94cf2..989270e07fd 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -809,20 +809,12 @@ type ColumnSelector struct { // ConsistentConfig represents replication consistency config for a changefeed // This is a duplicate of config.ConsistentConfig type ConsistentConfig struct { -<<<<<<< HEAD - Level string `json:"level"` - MaxLogSize int64 `json:"max_log_size"` - FlushIntervalInMs int64 `json:"flush_interval"` - Storage string `json:"storage"` - UseFileBackend bool `json:"use_file_backend"` -======= Level string `json:"level,omitempty"` MaxLogSize int64 `json:"max_log_size"` FlushIntervalInMs int64 `json:"flush_interval"` MetaFlushIntervalInMs int64 `json:"meta_flush_interval"` Storage string `json:"storage,omitempty"` UseFileBackend bool `json:"use_file_backend"` ->>>>>>> afe43311da (redo(ticdc): add meta flush interval configuration (#9959)) } // ChangefeedSchedulerConfig is per changefeed scheduler settings. diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 6534792136a..d872ab2d9d4 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -70,22 +70,13 @@ func newProcessor4Test( } else { tmpDir := t.TempDir() redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID) -<<<<<<< HEAD dmlMgr, err := redo.NewDMLManager(ctx, &config.ConsistentConfig{ - Level: string(redoPkg.ConsistentLevelEventual), - MaxLogSize: redoPkg.DefaultMaxLogSize, - FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs, - Storage: "file://" + redoDir, - UseFileBackend: false, -======= - dmlMgr, err := redo.NewDMLManager(ctx, changefeedID, &config.ConsistentConfig{ Level: string(redoPkg.ConsistentLevelEventual), MaxLogSize: redoPkg.DefaultMaxLogSize, FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs, MetaFlushIntervalInMs: redoPkg.DefaultMetaFlushIntervalInMs, Storage: "file://" + redoDir, UseFileBackend: false, ->>>>>>> afe43311da (redo(ticdc): add meta flush interval configuration (#9959)) }) require.NoError(t, err) p.redo.r = dmlMgr From 92d1b4148448717204a778724a363c9b7375f5cf Mon Sep 17 00:00:00 2001 From: jiangjianyuan Date: Mon, 4 Dec 2023 20:02:53 +0800 Subject: [PATCH 3/3] fix integration test --- tests/integration_tests/api_v2/cases.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/integration_tests/api_v2/cases.go b/tests/integration_tests/api_v2/cases.go index 579fbd64d41..0325867ff44 100644 --- a/tests/integration_tests/api_v2/cases.go +++ b/tests/integration_tests/api_v2/cases.go @@ -90,11 +90,12 @@ var customReplicaConfig = &ReplicaConfig{ EnablePartitionSeparator: true, }, Consistent: &ConsistentConfig{ - Level: "", - MaxLogSize: 65, - FlushIntervalInMs: 500, - Storage: "local://test", - UseFileBackend: true, + Level: "", + MaxLogSize: 65, + MetaFlushIntervalInMs: 201, + FlushIntervalInMs: 500, + Storage: "local://test", + UseFileBackend: true, }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false, @@ -133,11 +134,12 @@ var defaultReplicaConfig = &ReplicaConfig{ EnablePartitionSeparator: true, }, Consistent: &ConsistentConfig{ - Level: "none", - MaxLogSize: redo.DefaultMaxLogSize, - FlushIntervalInMs: redo.DefaultFlushIntervalInMs, - Storage: "", - UseFileBackend: false, + Level: "none", + MaxLogSize: redo.DefaultMaxLogSize, + MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs, + FlushIntervalInMs: redo.DefaultFlushIntervalInMs, + Storage: "", + UseFileBackend: false, }, Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false,