diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 33a2b885c2d..1887709830d 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -255,11 +255,12 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( } if c.Consistent != nil { res.Consistent = &config.ConsistentConfig{ - Level: c.Consistent.Level, - MaxLogSize: c.Consistent.MaxLogSize, - FlushIntervalInMs: c.Consistent.FlushIntervalInMs, - Storage: c.Consistent.Storage, - UseFileBackend: c.Consistent.UseFileBackend, + Level: c.Consistent.Level, + MaxLogSize: c.Consistent.MaxLogSize, + FlushIntervalInMs: c.Consistent.FlushIntervalInMs, + MetaFlushIntervalInMs: c.Consistent.MetaFlushIntervalInMs, + Storage: c.Consistent.Storage, + UseFileBackend: c.Consistent.UseFileBackend, } } if c.Sink != nil { @@ -455,11 +456,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { } if cloned.Consistent != nil { res.Consistent = &ConsistentConfig{ - Level: cloned.Consistent.Level, - MaxLogSize: cloned.Consistent.MaxLogSize, - FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs, - Storage: cloned.Consistent.Storage, - UseFileBackend: cloned.Consistent.UseFileBackend, + Level: cloned.Consistent.Level, + MaxLogSize: cloned.Consistent.MaxLogSize, + FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs, + MetaFlushIntervalInMs: cloned.Consistent.MetaFlushIntervalInMs, + Storage: cloned.Consistent.Storage, + UseFileBackend: cloned.Consistent.UseFileBackend, } } if cloned.Mounter != nil { @@ -627,11 +629,12 @@ type ColumnSelector struct { // ConsistentConfig represents replication consistency config for a changefeed // This is a duplicate of config.ConsistentConfig type ConsistentConfig struct { - Level string `json:"level"` - MaxLogSize int64 `json:"max_log_size"` - FlushIntervalInMs int64 `json:"flush_interval"` - Storage string `json:"storage"` - UseFileBackend bool `json:"use_file_backend"` + Level string `json:"level,omitempty"` + MaxLogSize int64 `json:"max_log_size"` + FlushIntervalInMs int64 `json:"flush_interval"` + MetaFlushIntervalInMs int64 `json:"meta_flush_interval"` + Storage string `json:"storage,omitempty"` + UseFileBackend bool `json:"use_file_backend"` } // ChangefeedSchedulerConfig is per changefeed scheduler settings. diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 9382356a670..f04fa1386fd 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -56,11 +56,12 @@ var defaultAPIConfig = &ReplicaConfig{ AdvanceTimeoutInSec: config.DefaultAdvanceTimeoutInSec, }, Consistent: &ConsistentConfig{ - Level: "none", - MaxLogSize: 64, - FlushIntervalInMs: redo.DefaultFlushIntervalInMs, - Storage: "", - UseFileBackend: false, + Level: "none", + MaxLogSize: 64, + FlushIntervalInMs: redo.DefaultFlushIntervalInMs, + MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs, + Storage: "", + UseFileBackend: false, }, } diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 6d6d9536117..b257b1afba7 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -504,9 +504,10 @@ func TestRemoveChangefeed(t *testing.T) { info := ctx.ChangefeedVars().Info dir := t.TempDir() info.Config.Consistent = &config.ConsistentConfig{ - Level: "eventual", - Storage: filepath.Join("nfs://", dir), - FlushIntervalInMs: redo.DefaultFlushIntervalInMs, + Level: "eventual", + Storage: filepath.Join("nfs://", dir), + FlushIntervalInMs: redo.DefaultFlushIntervalInMs, + MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs, } ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: ctx.ChangefeedVars().ID, diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 30c62e86aff..69676a10e11 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -63,11 +63,11 @@ type feedStateManager struct { // resolvedTs and initCheckpointTs is for checking whether resolved timestamp // has been advanced or not. - resolvedTs model.Ts - initCheckpointTs model.Ts - + resolvedTs model.Ts + checkpointTs model.Ts checkpointTsAdvanced time.Time - lastCheckpointTs model.Ts + + changefeedErrorStuckDuration time.Duration } // newFeedStateManager creates feedStateManager and initialize the exponential backoff @@ -83,34 +83,61 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager { // backoff will stop once the defaultBackoffMaxElapsedTime has elapsed. f.errBackoff.MaxElapsedTime = defaultBackoffMaxElapsedTime - f.resetErrRetry() + f.changefeedErrorStuckDuration = defaultBackoffMaxElapsedTime + f.resetErrRetry() + f.isRetrying = false return f } +func (m *feedStateManager) shouldRetry() bool { + // changefeed should not retry within [m.lastErrorRetryTime, m.lastErrorRetryTime + m.backoffInterval). + return time.Since(m.lastErrorRetryTime) >= m.backoffInterval +} + +func (m *feedStateManager) shouldFailWhenRetry() bool { + // retry the changefeed + m.backoffInterval = m.errBackoff.NextBackOff() + // NextBackOff() will return -1 once the MaxElapsedTime has elapsed, + // set the changefeed to failed state. + if m.backoffInterval == m.errBackoff.Stop { + return true + } + + m.lastErrorRetryTime = time.Now() + return false +} + +// resetErrRetry reset the error retry related fields +func (m *feedStateManager) resetErrRetry() { + m.errBackoff.Reset() + m.backoffInterval = m.errBackoff.NextBackOff() + m.lastErrorRetryTime = time.Unix(0, 0) +} + func (m *feedStateManager) Tick( state *orchestrator.ChangefeedReactorState, resolvedTs model.Ts, ) (adminJobPending bool) { + m.checkAndInitLastRetryCheckpointTs(state.Status) + if state.Status != nil { - if m.lastCheckpointTs < state.Status.CheckpointTs { - m.lastCheckpointTs = state.Status.CheckpointTs + if m.checkpointTs < state.Status.CheckpointTs { + m.checkpointTs = state.Status.CheckpointTs m.checkpointTsAdvanced = time.Now() } - if m.state == nil || m.state.Status == nil { - // It's the first time `m.state.Status` gets filled. - m.initCheckpointTs = state.Status.CheckpointTs + if m.resolvedTs < resolvedTs { + m.resolvedTs = resolvedTs + } + if m.checkpointTs >= m.resolvedTs { + m.checkpointTsAdvanced = time.Now() } } - - m.checkAndInitLastRetryCheckpointTs(state.Status) - m.state = state - m.resolvedTs = resolvedTs m.shouldBeRunning = true defer func() { if !m.shouldBeRunning { - m.cleanUpTaskPositions() + m.cleanUp() } }() if m.handleAdminJob() { @@ -131,16 +158,12 @@ func (m *feedStateManager) Tick( m.shouldBeRunning = false return case model.StatePending: - if time.Since(m.lastErrorRetryTime) < m.backoffInterval { + if !m.shouldRetry() { m.shouldBeRunning = false return } - // retry the changefeed - oldBackoffInterval := m.backoffInterval - m.backoffInterval = m.errBackoff.NextBackOff() - // NextBackOff() will return -1 once the MaxElapsedTime has elapsed, - // set the changefeed to failed state. - if m.backoffInterval == m.errBackoff.Stop { + + if m.shouldFailWhenRetry() { log.Error("The changefeed won't be restarted as it has been experiencing failures for "+ "an extended duration", zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime), @@ -154,18 +177,17 @@ func (m *feedStateManager) Tick( return } - m.lastErrorRetryTime = time.Now() + // retry the changefeed + m.shouldBeRunning = true if m.state.Status != nil { m.lastErrorRetryCheckpointTs = m.state.Status.CheckpointTs } - m.shouldBeRunning = true m.patchState(model.StateWarning) log.Info("changefeed retry backoff interval is elapsed,"+ "chengefeed will be restarted", zap.String("namespace", m.state.ID.Namespace), zap.String("changefeed", m.state.ID.ID), zap.Time("lastErrorRetryTime", m.lastErrorRetryTime), - zap.Duration("lastRetryInterval", oldBackoffInterval), zap.Duration("nextRetryInterval", m.backoffInterval)) case model.StateNormal, model.StateWarning: m.checkAndChangeState() @@ -274,7 +296,7 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { m.shouldBeRunning = true // when the changefeed is manually resumed, we must reset the backoff m.resetErrRetry() - // The lastErrorTime also needs to be cleared before a fresh run. + m.isRetrying = false jobsPending = true m.patchState(model.StateNormal) @@ -407,12 +429,15 @@ func (m *feedStateManager) patchState(feedState model.FeedState) { }) } -func (m *feedStateManager) cleanUpTaskPositions() { +func (m *feedStateManager) cleanUp() { for captureID := range m.state.TaskPositions { m.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return nil, position != nil, nil }) } + m.checkpointTs = 0 + m.checkpointTsAdvanced = time.Time{} + m.resolvedTs = 0 } func (m *feedStateManager) errorsReportedByProcessors() []*model.RunningError { @@ -535,12 +560,12 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { info.Error = lastError return info, true, nil }) - } - // The errBackoff needs to be reset before the first retry. - if !m.isRetrying { - m.resetErrRetry() - m.isRetrying = true + // The errBackoff needs to be reset before the first retry. + if !m.isRetrying { + m.resetErrRetry() + m.isRetrying = true + } } } @@ -554,13 +579,14 @@ func (m *feedStateManager) handleWarning(errs ...*model.RunningError) { currTime := m.upstream.PDClock.CurrentTime() ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs) m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs - // Conditions: - // 1. checkpoint lag is large enough; - // 2. checkpoint hasn't been advanced for a long while; - // 3. the changefeed has been initialized. - if currTime.Sub(ckptTime) > defaultBackoffMaxElapsedTime && - time.Since(m.checkpointTsAdvanced) > defaultBackoffMaxElapsedTime && - m.resolvedTs > m.initCheckpointTs { + + checkpointTsStuck := time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration + if checkpointTsStuck { + log.Info("changefeed retry on warning for a very long time and does not resume, "+ + "it will be failed", zap.String("changefeed", m.state.ID.ID), + zap.Uint64("checkpointTs", m.state.Status.CheckpointTs), + zap.Duration("checkpointTime", currTime.Sub(ckptTime)), + ) code, _ := cerrors.RFCCode(cerrors.ErrChangefeedUnretryable) m.handleError(&model.RunningError{ Time: lastError.Time, @@ -592,13 +618,6 @@ func GenerateChangefeedEpoch(ctx context.Context, pdClient pd.Client) uint64 { return oracle.ComposeTS(phyTs, logical) } -// resetErrRetry reset the error retry related fields -func (m *feedStateManager) resetErrRetry() { - m.errBackoff.Reset() - m.backoffInterval = m.errBackoff.NextBackOff() - m.lastErrorRetryTime = time.Unix(0, 0) -} - // checkAndChangeState checks the state of the changefeed and change it if needed. // if the state of the changefeed is warning and the changefeed's checkpointTs is // greater than the lastRetryCheckpointTs, it will change the state to normal. diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index ed738e04b55..6710063f9fe 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -64,6 +64,8 @@ func newFeedStateManager4Test( f.resetErrRetry() + f.changefeedErrorStuckDuration = time.Second * 3 + return f } @@ -208,6 +210,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { require.Equal(t, state.Status.AdminJobType, model.AdminStop) // resume the changefeed in failed state + manager.isRetrying = true manager.PushAdminJob(&model.AdminJob{ CfID: ctx.ChangefeedVars().ID, Type: model.AdminResume, @@ -220,6 +223,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { require.Equal(t, state.Info.State, model.StateNormal) require.Equal(t, state.Info.AdminJobType, model.AdminNone) require.Equal(t, state.Status.AdminJobType, model.AdminNone) + require.False(t, manager.isRetrying) } func TestMarkFinished(t *testing.T) { @@ -743,6 +747,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) { func TestHandleWarning(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) manager := newFeedStateManager4Test(200, 1600, 0, 2.0) + manager.changefeedErrorStuckDuration = 100 * time.Millisecond state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(t, state, nil) @@ -935,3 +940,114 @@ func TestErrorAfterWarning(t *testing.T) { require.Equal(t, model.StateWarning, state.Info.State) require.True(t, manager.ShouldRunning()) } + +func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { + t.Parallel() + + maxElapsedTimeInMs := 2000 + ctx := cdcContext.NewBackendContext4Test(true) + manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) + tester := orchestrator.NewReactorStateTester(t, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.Nil(t, info) + return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.Nil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + + tester.MustApplyPatches() + manager.Tick(state, 200) + tester.MustApplyPatches() + require.Equal(t, model.StateNormal, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 1. test when an warning occurs, the changefeed state will be changed to warning + // and it will still keep running + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 200) + // some patches will be generated when the manager.Tick is called + // so we need to apply the patches before we check the state + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing, + // the changefeed state will remain warning whena new warning is encountered. + time.Sleep(manager.changefeedErrorStuckDuration + 10) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 200) + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 3. Test changefeed remain warning when resolvedTs is progressing after stuck beyond the detection time. + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 400) + tester.MustApplyPatches() + require.Equal(t, model.StateWarning, state.Info.State) + require.True(t, manager.ShouldRunning()) + + // 4. Test changefeed failed when checkpointTs is not progressing for changefeedErrorStuckDuration time. + time.Sleep(manager.changefeedErrorStuckDuration + 10) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 200, + }, true, nil + }) + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + manager.Tick(state, 400) + tester.MustApplyPatches() + require.Equal(t, model.StateFailed, state.Info.State) + require.False(t, manager.ShouldRunning()) +} diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 7abea626afb..53b3f81c608 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -265,7 +265,26 @@ func newLogManager( func (m *logManager) Run(ctx context.Context) error { defer m.close() - return m.bgUpdateLog(ctx) + start := time.Now() + w, err := factory.NewRedoLogWriter(ctx, m.cfg) + if err != nil { + log.Error("redo: failed to create redo log writer", + zap.String("namespace", m.cfg.ChangeFeedID.Namespace), + zap.String("changefeed", m.cfg.ChangeFeedID.ID), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + return err + } + m.writer = w + return m.bgUpdateLog(ctx, m.getFlushDuration()) +} + +func (m *logManager) getFlushDuration() time.Duration { + flushIntervalInMs := m.cfg.FlushIntervalInMs + if m.cfg.LogType == redo.RedoDDLLogFileType { + flushIntervalInMs = m.cfg.MetaFlushIntervalInMs + } + return time.Duration(flushIntervalInMs) * time.Millisecond } // Enabled returns whether this log manager is enabled @@ -468,15 +487,14 @@ func (m *logManager) onResolvedTsMsg(tableID model.TableID, resolvedTs model.Ts) } } -func (m *logManager) bgUpdateLog(ctx context.Context) error { +func (m *logManager) bgUpdateLog(ctx context.Context, flushDuration time.Duration) error { m.releaseMemoryCbs = make([]func(), 0, 1024) - flushIntervalInMs := m.cfg.FlushIntervalInMs - ticker := time.NewTicker(time.Duration(flushIntervalInMs) * time.Millisecond) + ticker := time.NewTicker(flushDuration) defer ticker.Stop() log.Info("redo manager bgUpdateLog is running", zap.String("namespace", m.cfg.ChangeFeedID.Namespace), zap.String("changefeed", m.cfg.ChangeFeedID.ID), - zap.Int64("flushIntervalInMs", flushIntervalInMs)) + zap.Duration("flushIntervalInMs", flushDuration)) var err error // logErrCh is used to retrieve errors from log flushing goroutines. diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index a84d5a5a4dc..9d29fe3d74d 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -16,7 +16,6 @@ package redo import ( "context" "fmt" - "math" "math/rand" "sync" "testing" @@ -24,8 +23,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/redo/writer" - "github.com/pingcap/tiflow/cdc/redo/writer/blackhole" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/redo" "github.com/stretchr/testify/require" @@ -33,279 +30,6 @@ import ( "golang.org/x/sync/errgroup" ) -func checkResolvedTs(t *testing.T, mgr *logManager, expectedRts uint64) { - time.Sleep(time.Duration(redo.MinFlushIntervalInMs+200) * time.Millisecond) - resolvedTs := uint64(math.MaxUint64) - mgr.rtsMap.Range(func(tableID any, value any) bool { - v, ok := value.(*statefulRts) - require.True(t, ok) - ts := v.getFlushed() - if ts < resolvedTs { - resolvedTs = ts - } - return true - }) - require.Equal(t, expectedRts, resolvedTs) -} - -func TestConsistentConfig(t *testing.T) { - t.Parallel() - levelCases := []struct { - level string - valid bool - }{ - {"none", true}, - {"eventual", true}, - {"NONE", false}, - {"", false}, - } - for _, lc := range levelCases { - require.Equal(t, lc.valid, redo.IsValidConsistentLevel(lc.level)) - } - - levelEnableCases := []struct { - level string - consistent bool - }{ - {"invalid-level", false}, - {"none", false}, - {"eventual", true}, - } - for _, lc := range levelEnableCases { - require.Equal(t, lc.consistent, redo.IsConsistentEnabled(lc.level)) - } - - storageCases := []struct { - storage string - valid bool - }{ - {"local", true}, - {"nfs", true}, - {"s3", true}, - {"blackhole", true}, - {"Local", false}, - {"", false}, - } - for _, sc := range storageCases { - require.Equal(t, sc.valid, redo.IsValidConsistentStorage(sc.storage)) - } - - s3StorageCases := []struct { - storage string - s3Enabled bool - }{ - {"local", false}, - {"nfs", false}, - {"s3", true}, - {"blackhole", false}, - } - for _, sc := range s3StorageCases { - require.Equal(t, sc.s3Enabled, redo.IsExternalStorage(sc.storage)) - } -} - -// TestLogManagerInProcessor tests how redo log manager is used in processor. -func TestLogManagerInProcessor(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - testWriteDMLs := func(storage string, useFileBackend bool) { - ctx, cancel := context.WithCancel(ctx) - cfg := &config.ConsistentConfig{ - Level: string(redo.ConsistentLevelEventual), - MaxLogSize: redo.DefaultMaxLogSize, - Storage: storage, - FlushIntervalInMs: redo.MinFlushIntervalInMs, - UseFileBackend: useFileBackend, - } - dmlMgr, err := NewDMLManager(ctx, cfg) - require.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - dmlMgr.Run(ctx) - }() - - // check emit row changed events can move forward resolved ts - spans := []model.TableID{53, 55, 57, 59} - - startTs := uint64(100) - for _, span := range spans { - dmlMgr.AddTable(span, startTs) - } - testCases := []struct { - span model.TableID - rows []*model.RowChangedEvent - }{ - { - span: 53, - rows: []*model.RowChangedEvent{ - {CommitTs: 120, Table: &model.TableName{TableID: 53}}, - {CommitTs: 125, Table: &model.TableName{TableID: 53}}, - {CommitTs: 130, Table: &model.TableName{TableID: 53}}, - }, - }, - { - span: 55, - rows: []*model.RowChangedEvent{ - {CommitTs: 130, Table: &model.TableName{TableID: 55}}, - {CommitTs: 135, Table: &model.TableName{TableID: 55}}, - }, - }, - { - span: 57, - rows: []*model.RowChangedEvent{ - {CommitTs: 130, Table: &model.TableName{TableID: 57}}, - }, - }, - { - span: 59, - rows: []*model.RowChangedEvent{ - {CommitTs: 128, Table: &model.TableName{TableID: 59}}, - {CommitTs: 130, Table: &model.TableName{TableID: 59}}, - {CommitTs: 133, Table: &model.TableName{TableID: 59}}, - }, - }, - } - for _, tc := range testCases { - err := dmlMgr.EmitRowChangedEvents(ctx, tc.span, nil, tc.rows...) - require.NoError(t, err) - } - - // check UpdateResolvedTs can move forward the resolved ts when there is not row event. - flushResolvedTs := uint64(150) - for _, span := range spans { - checkResolvedTs(t, dmlMgr.logManager, startTs) - err := dmlMgr.UpdateResolvedTs(ctx, span, flushResolvedTs) - require.NoError(t, err) - } - checkResolvedTs(t, dmlMgr.logManager, flushResolvedTs) - - // check remove table can work normally - removeTable := spans[len(spans)-1] - spans = spans[:len(spans)-1] - dmlMgr.RemoveTable(removeTable) - flushResolvedTs = uint64(200) - for _, span := range spans { - err := dmlMgr.UpdateResolvedTs(ctx, span, flushResolvedTs) - require.NoError(t, err) - } - checkResolvedTs(t, dmlMgr.logManager, flushResolvedTs) - - cancel() - wg.Wait() - } - - testWriteDMLs("blackhole://", true) - storages := []string{ - fmt.Sprintf("file://%s", t.TempDir()), - fmt.Sprintf("nfs://%s", t.TempDir()), - } - for _, storage := range storages { - testWriteDMLs(storage, true) - testWriteDMLs(storage, false) - } -} - -// TestLogManagerInOwner tests how redo log manager is used in owner, -// where the redo log manager needs to handle DDL event only. -func TestLogManagerInOwner(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - testWriteDDLs := func(storage string, useFileBackend bool) { - ctx, cancel := context.WithCancel(ctx) - cfg := &config.ConsistentConfig{ - Level: string(redo.ConsistentLevelEventual), - MaxLogSize: redo.DefaultMaxLogSize, - Storage: storage, - FlushIntervalInMs: redo.MinFlushIntervalInMs, - UseFileBackend: useFileBackend, - } - startTs := model.Ts(10) - ddlMgr, err := NewDDLManager(ctx, cfg, startTs) - require.NoError(t, err) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - ddlMgr.Run(ctx) - }() - - require.Equal(t, startTs, ddlMgr.GetResolvedTs()) - ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"} - err = ddlMgr.EmitDDLEvent(ctx, ddl) - require.NoError(t, err) - require.Equal(t, startTs, ddlMgr.GetResolvedTs()) - - ddlMgr.UpdateResolvedTs(ctx, ddl.CommitTs) - checkResolvedTs(t, ddlMgr.logManager, ddl.CommitTs) - - cancel() - wg.Wait() - } - - testWriteDDLs("blackhole://", true) - storages := []string{ - fmt.Sprintf("file://%s", t.TempDir()), - fmt.Sprintf("nfs://%s", t.TempDir()), - } - for _, storage := range storages { - testWriteDDLs(storage, true) - testWriteDDLs(storage, false) - } -} - -// TestManagerError tests whether internal error in bgUpdateLog could be managed correctly. -func TestLogManagerError(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - cfg := &config.ConsistentConfig{ - Level: string(redo.ConsistentLevelEventual), - MaxLogSize: redo.DefaultMaxLogSize, - Storage: "blackhole://", - FlushIntervalInMs: redo.MinFlushIntervalInMs, - } - logMgr, err := NewDMLManager(ctx, cfg) - require.NoError(t, err) - err = logMgr.writer.Close() - require.NoError(t, err) - logMgr.writer = blackhole.NewInvalidLogWriter(logMgr.writer) - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - err := logMgr.Run(ctx) - require.Regexp(t, ".*invalid black hole writer.*", err) - require.Regexp(t, ".*WriteLog.*", err) - }() - - testCases := []struct { - span model.TableID - rows []writer.RedoEvent - }{ - { - span: 53, - rows: []writer.RedoEvent{ - &model.RowChangedEvent{CommitTs: 120, Table: &model.TableName{TableID: 53}}, - &model.RowChangedEvent{CommitTs: 125, Table: &model.TableName{TableID: 53}}, - &model.RowChangedEvent{CommitTs: 130, Table: &model.TableName{TableID: 53}}, - }, - }, - } - for _, tc := range testCases { - err := logMgr.emitRedoEvents(ctx, tc.span, nil, tc.rows...) - require.NoError(t, err) - } - wg.Wait() -} - func BenchmarkBlackhole(b *testing.B) { runBenchTest(b, "blackhole://", false) } @@ -323,11 +47,12 @@ func BenchmarkFileWriter(b *testing.B) { func runBenchTest(b *testing.B, storage string, useFileBackend bool) { ctx, cancel := context.WithCancel(context.Background()) cfg := &config.ConsistentConfig{ - Level: string(redo.ConsistentLevelEventual), - MaxLogSize: redo.DefaultMaxLogSize, - Storage: storage, - FlushIntervalInMs: redo.MinFlushIntervalInMs, - UseFileBackend: useFileBackend, + Level: string(redo.ConsistentLevelEventual), + MaxLogSize: redo.DefaultMaxLogSize, + Storage: storage, + FlushIntervalInMs: redo.MinFlushIntervalInMs, + MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, + UseFileBackend: useFileBackend, } dmlMgr, err := NewDMLManager(ctx, cfg) require.Nil(b, err) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index 231f6db99e8..86ce58313eb 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -120,7 +120,13 @@ func NewMetaManager(ctx context.Context, cfg *config.ConsistentConfig) (*metaMan changeFeedID: contextutil.ChangefeedIDFromCtx(ctx), uuidGenerator: uuid.NewGenerator(), enabled: true, - flushIntervalInMs: cfg.FlushIntervalInMs, + flushIntervalInMs: cfg.MetaFlushIntervalInMs, + } + + if m.flushIntervalInMs < redo.MinFlushIntervalInMs { + log.Warn("redo flush interval is too small, use default value", + zap.Int64("interval", m.flushIntervalInMs)) + m.flushIntervalInMs = redo.DefaultMetaFlushIntervalInMs } uri, err := storage.ParseRawURL(cfg.Storage) diff --git a/cdc/redo/meta_manager_test.go b/cdc/redo/meta_manager_test.go index 3a13fdccb35..e630d19f249 100644 --- a/cdc/redo/meta_manager_test.go +++ b/cdc/redo/meta_manager_test.go @@ -72,10 +72,11 @@ func TestInitAndWriteMeta(t *testing.T) { startTs := uint64(10) cfg := &config.ConsistentConfig{ - Level: string(redo.ConsistentLevelEventual), - MaxLogSize: redo.DefaultMaxLogSize, - Storage: uri.String(), - FlushIntervalInMs: redo.MinFlushIntervalInMs, + Level: string(redo.ConsistentLevelEventual), + MaxLogSize: redo.DefaultMaxLogSize, + Storage: uri.String(), + FlushIntervalInMs: redo.MinFlushIntervalInMs, + MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, } m, err := NewMetaManagerWithInit(ctx, cfg, startTs) require.NoError(t, err) @@ -136,10 +137,11 @@ func TestPreCleanupAndWriteMeta(t *testing.T) { startTs := uint64(10) cfg := &config.ConsistentConfig{ - Level: string(redo.ConsistentLevelEventual), - MaxLogSize: redo.DefaultMaxLogSize, - Storage: uri.String(), - FlushIntervalInMs: redo.MinFlushIntervalInMs, + Level: string(redo.ConsistentLevelEventual), + MaxLogSize: redo.DefaultMaxLogSize, + Storage: uri.String(), + FlushIntervalInMs: redo.MinFlushIntervalInMs, + MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, } m, err := NewMetaManagerWithInit(ctx, cfg, startTs) require.NoError(t, err) @@ -267,10 +269,11 @@ func TestGCAndCleanup(t *testing.T) { startTs := uint64(3) cfg := &config.ConsistentConfig{ - Level: string(redo.ConsistentLevelEventual), - MaxLogSize: redo.DefaultMaxLogSize, - Storage: uri.String(), - FlushIntervalInMs: redo.MinFlushIntervalInMs, + Level: string(redo.ConsistentLevelEventual), + MaxLogSize: redo.DefaultMaxLogSize, + Storage: uri.String(), + FlushIntervalInMs: redo.MinFlushIntervalInMs, + MetaFlushIntervalInMs: redo.MinFlushIntervalInMs, } m, err := NewMetaManagerWithInit(ctx, cfg, startTs) require.NoError(t, err) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 6482a027f8c..e9def7b0406 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1764,6 +1764,9 @@ var doc = `{ "max_log_size": { "type": "integer" }, + "meta_flush_interval": { + "type": "integer" + }, "storage": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 59d27bf5b9b..8eb9fec990b 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1745,6 +1745,9 @@ "max_log_size": { "type": "integer" }, + "meta_flush_interval": { + "type": "integer" + }, "storage": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 753fcd376bd..3b84a6495f2 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -392,6 +392,8 @@ definitions: type: string max_log_size: type: integer + meta_flush_interval: + type: integer storage: type: string use_file_backend: diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 57a5b6bb01e..2d1be90650d 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -60,6 +60,7 @@ const ( "level": "none", "max-log-size": 64, "flush-interval": 2000, + "meta-flush-interval": 200, "storage": "", "use-file-backend": false } @@ -211,6 +212,7 @@ const ( "level": "none", "max-log-size": 64, "flush-interval": 2000, + "meta-flush-interval": 200, "storage": "", "use-file-backend": false } @@ -271,6 +273,7 @@ const ( "level": "none", "max-log-size": 64, "flush-interval": 2000, + "meta-flush-interval": 200, "storage": "", "use-file-backend": false } diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go index 636edcf865f..4a521f98da5 100644 --- a/pkg/config/consistent.go +++ b/pkg/config/consistent.go @@ -24,11 +24,12 @@ import ( // ConsistentConfig represents replication consistency config for a changefeed. type ConsistentConfig struct { - Level string `toml:"level" json:"level"` - MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"` - FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"` - Storage string `toml:"storage" json:"storage"` - UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` + Level string `toml:"level" json:"level"` + MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"` + FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"` + MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"` + Storage string `toml:"storage" json:"storage"` + UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` } // ValidateAndAdjust validates the consistency config and adjusts it if necessary. @@ -50,6 +51,15 @@ func (c *ConsistentConfig) ValidateAndAdjust() error { c.FlushIntervalInMs, redo.MinFlushIntervalInMs)) } + if c.MetaFlushIntervalInMs == 0 { + c.MetaFlushIntervalInMs = redo.DefaultMetaFlushIntervalInMs + } + if c.MetaFlushIntervalInMs < redo.MinFlushIntervalInMs { + return cerror.ErrInvalidReplicaConfig.FastGenByArgs( + fmt.Sprintf("The consistent.meta-flush-interval:%d must be equal or greater than %d", + c.MetaFlushIntervalInMs, redo.MinFlushIntervalInMs)) + } + uri, err := storage.ParseRawURL(c.Storage) if err != nil { return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs( diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index a602056300b..6dde6c2df06 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -65,11 +65,12 @@ var defaultReplicaConfig = &ReplicaConfig{ AdvanceTimeoutInSec: DefaultAdvanceTimeoutInSec, }, Consistent: &ConsistentConfig{ - Level: "none", - MaxLogSize: redo.DefaultMaxLogSize, - FlushIntervalInMs: redo.DefaultFlushIntervalInMs, - Storage: "", - UseFileBackend: false, + Level: "none", + MaxLogSize: redo.DefaultMaxLogSize, + FlushIntervalInMs: redo.DefaultFlushIntervalInMs, + MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs, + Storage: "", + UseFileBackend: false, }, } diff --git a/pkg/redo/config.go b/pkg/redo/config.go index fbcdb3f1eb3..d0562b83be5 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -44,6 +44,8 @@ const ( FlushWarnDuration = time.Second * 20 // DefaultFlushIntervalInMs is the default flush interval for redo log. DefaultFlushIntervalInMs = 2000 + // DefaultMetaFlushIntervalInMs is the default flush interval for redo meta. + DefaultMetaFlushIntervalInMs = 200 // MinFlushIntervalInMs is the minimum flush interval for redo log. MinFlushIntervalInMs = 50 diff --git a/tests/integration_tests/api_v2/cases.go b/tests/integration_tests/api_v2/cases.go index 36f7fbe0df9..e5d90e4e88a 100644 --- a/tests/integration_tests/api_v2/cases.go +++ b/tests/integration_tests/api_v2/cases.go @@ -125,11 +125,12 @@ var defaultReplicaConfig = &ReplicaConfig{ EnablePartitionSeparator: true, }, Consistent: &ConsistentConfig{ - Level: "none", - MaxLogSize: redo.DefaultMaxLogSize, - FlushIntervalInMs: redo.DefaultFlushIntervalInMs, - Storage: "", - UseFileBackend: false, + Level: "none", + MaxLogSize: redo.DefaultMaxLogSize, + FlushIntervalInMs: redo.DefaultFlushIntervalInMs, + MetaFlushIntervalInMs: redo.DefaultMetaFlushIntervalInMs, + Storage: "", + UseFileBackend: false, }, } diff --git a/tests/integration_tests/api_v2/model.go b/tests/integration_tests/api_v2/model.go index b25e8e5355c..e6b669aabbe 100644 --- a/tests/integration_tests/api_v2/model.go +++ b/tests/integration_tests/api_v2/model.go @@ -271,11 +271,12 @@ type ColumnSelector struct { // ConsistentConfig represents replication consistency config for a changefeed // This is a duplicate of config.ConsistentConfig type ConsistentConfig struct { - Level string `json:"level"` - MaxLogSize int64 `json:"max_log_size"` - FlushIntervalInMs int64 `json:"flush_interval"` - Storage string `json:"storage"` - UseFileBackend bool `json:"use_file_backend"` + Level string `json:"level"` + MaxLogSize int64 `json:"max_log_size"` + FlushIntervalInMs int64 `json:"flush_interval"` + MetaFlushIntervalInMs int64 `json:"meta_flush_interval"` + Storage string `json:"storage"` + UseFileBackend bool `json:"use_file_backend"` } // ChangefeedSchedulerConfig is per changefeed scheduler settings.