diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 8da86ded230..b08204831f0 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -432,6 +432,16 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) { } } + // changefeed state from stopped to failed is allowed + // but stopped to error or normal is not allowed + if m.state.Info != nil && m.state.Info.State == model.StateStopped { + log.Warn("changefeed is stopped, ignore errors", + zap.String("changefeed", m.state.ID.ID), + zap.String("namespace", m.state.ID.Namespace), + zap.Any("errors", errs)) + return + } + // we need to patch changefeed unretryable error to the changefeed info, // so we have to iterate all errs here to check wether it is a unretryable // error in errs diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 6900f9f14ae..3b8a0243a76 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -361,6 +361,23 @@ func TestHandleFastFailError(t *testing.T) { tester.MustApplyPatches() } +func TestHandleErrorWhenChangefeedIsPaused(t *testing.T) { + ctx := cdcContext.NewBackendContext4Test(true) + manager := newFeedStateManager4Test(0, 0, 0, 0) + manager.state = orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) + err := &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "CDC:ErrReachMaxTry", + Message: "fake error for test", + } + manager.state.Info = &model.ChangeFeedInfo{ + State: model.StateStopped, + } + manager.handleError(err) + require.Equal(t, model.StateStopped, manager.state.Info.State) +} + func TestChangefeedStatusNotExist(t *testing.T) { changefeedInfo := ` {