From 985f8af54726a3eaa5c0b74ed173af59a2c2b64c Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 13 May 2024 22:35:41 +0800 Subject: [PATCH] owner(ticdc): update changefeed configuration changefeed-error-stuck-duration correctly (#11042) close pingcap/tiflow#10998 --- cdc/owner/feed_state_manager.go | 10 +++ cdc/owner/feed_state_manager_test.go | 104 ++++++++++++++++++++++++++- 2 files changed, 113 insertions(+), 1 deletion(-) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index ba2ec2f9943..dfa6748df59 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -168,6 +169,15 @@ func (m *feedStateManager) Tick(resolvedTs model.Ts, // `handleAdminJob` returns true means that some admin jobs are pending // skip to the next tick until all the admin jobs is handled adminJobPending = true + changefeedErrorStuckDuration := util.GetOrZero(m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration) + if m.changefeedErrorStuckDuration != changefeedErrorStuckDuration { + log.Info("changefeedErrorStuckDuration update", + zap.Duration("oldChangefeedErrorStuckDuration", m.changefeedErrorStuckDuration), + zap.Duration("newChangefeedErrorStuckDuration", changefeedErrorStuckDuration), + ) + m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration + m.changefeedErrorStuckDuration = changefeedErrorStuckDuration + } return } diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 18747a4d8cb..72250a499a6 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" ) @@ -999,7 +1000,7 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { require.True(t, manager.ShouldRunning()) // 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing, - // the changefeed state will remain warning whena new warning is encountered. + // the changefeed state will remain warning when a new warning is encountered. time.Sleep(manager.changefeedErrorStuckDuration + 10) state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { require.NotNil(t, status) @@ -1064,3 +1065,104 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { require.Equal(t, model.StateFailed, state.Info.State) require.False(t, manager.ShouldRunning()) } + +func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { + globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() + manager := newFeedStateManager4Test(200, 1600, 0, 2.0) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + model.DefaultChangeFeedID(changefeedInfo.ID)) + tester := orchestrator.NewReactorStateTester(t, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.Nil(t, info) + return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.Nil(t, status) + return &model.ChangeFeedStatus{}, true, nil + }) + tester.MustApplyPatches() + manager.state = state + manager.Tick(0, state.Status, state.Info) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + + stuckDuration := manager.changefeedErrorStuckDuration + time.Second*3 + state.PatchTaskPosition(globalVars.CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: globalVars.CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + time.Sleep(stuckDuration - time.Second) + manager.Tick(100, state.Status, state.Info) + tester.MustApplyPatches() + require.False(t, manager.ShouldRunning()) + require.Less(t, manager.changefeedErrorStuckDuration, stuckDuration) + require.Equal(t, state.Info.State, model.StateFailed) + + // update ChangefeedErrorStuckDuration + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.NotNil(t, info) + info.Config.ChangefeedErrorStuckDuration = util.AddressOf(stuckDuration) + return info, true, nil + }) + // update status + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 100, + }, true, nil + }) + tester.MustApplyPatches() + + // resume the changefeed in failed state + manager.PushAdminJob(&model.AdminJob{ + CfID: model.DefaultChangeFeedID(changefeedInfo.ID), + Type: model.AdminResume, + OverwriteCheckpointTs: 100, + }) + + manager.Tick(101, state.Status, state.Info) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + require.False(t, manager.ShouldRemoved()) + require.Equal(t, manager.changefeedErrorStuckDuration, stuckDuration) + require.Equal(t, state.Info.State, model.StateNormal) + require.Equal(t, state.Info.AdminJobType, model.AdminNone) + require.Equal(t, state.Status.AdminJobType, model.AdminNone) + + state.PatchTaskPosition(globalVars.CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: globalVars.CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + + time.Sleep(stuckDuration - time.Second) + manager.Tick(200, state.Status, state.Info) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + require.Equal(t, state.Info.State, model.StateWarning) + + state.PatchTaskPosition(globalVars.CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: globalVars.CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + + time.Sleep(time.Second) + manager.Tick(201, state.Status, state.Info) + tester.MustApplyPatches() + require.False(t, manager.ShouldRunning()) + require.Equal(t, state.Info.State, model.StateFailed) +}