Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): fix changefeed backoff (#9687) #9698

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 11 additions & 39 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ const (
defaultBackoffMaxElapsedTime = 30 * time.Minute
defaultBackoffRandomizationFactor = 0.1
defaultBackoffMultiplier = 2.0

// If all states recorded in window are 'normal', it can be assumed that the changefeed
// is running steady. And then if we enter a state other than normal at next tick,
// the backoff must be reset.
defaultStateWindowSize = 512
)

// feedStateManager manages the ReactorState of a changefeed
Expand All @@ -59,7 +54,7 @@ type feedStateManager struct {
shouldBeRemoved bool

adminJobQueue []*model.AdminJob
stateHistory [defaultStateWindowSize]model.FeedState
isRetrying bool
lastErrorRetryTime time.Time // time of last error for a changefeed
lastErrorRetryCheckpointTs model.Ts // checkpoint ts of last retry
lastWarningReportCheckpointTs model.Ts // checkpoint ts of last warning report
Expand Down Expand Up @@ -93,26 +88,6 @@ func newFeedStateManager(up *upstream.Upstream) *feedStateManager {
return f
}

// isChangefeedStable check if there are states other than 'normal' in this sliding window.
func (m *feedStateManager) isChangefeedStable() bool {
for _, val := range m.stateHistory {
if val != model.StateNormal {
return false
}
}

return true
}

// shiftStateWindow shift the sliding window
func (m *feedStateManager) shiftStateWindow(state model.FeedState) {
for i := 0; i < defaultStateWindowSize-1; i++ {
m.stateHistory[i] = m.stateHistory[i+1]
}

m.stateHistory[defaultStateWindowSize-1] = state
}

func (m *feedStateManager) Tick(
state *orchestrator.ChangefeedReactorState,
resolvedTs model.Ts,
Expand All @@ -128,7 +103,6 @@ func (m *feedStateManager) Tick(
}
}

m.shiftStateWindow(state.Info.State)
m.checkAndInitLastRetryCheckpointTs(state.Status)

m.state = state
Expand Down Expand Up @@ -167,13 +141,13 @@ func (m *feedStateManager) Tick(
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {
log.Warn("The changefeed won't be restarted "+
"as it has been experiencing failures for "+
log.Error("The changefeed won't be restarted as it has been experiencing failures for "+
"an extended duration",
zap.Duration(
"maxElapsedTime",
m.errBackoff.MaxElapsedTime,
),
zap.Duration("maxElapsedTime", m.errBackoff.MaxElapsedTime),
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
zap.Time("lastRetryTime", m.lastErrorRetryTime),
zap.Uint64("lastRetryCheckpointTs", m.lastErrorRetryCheckpointTs),
)
m.shouldBeRunning = false
m.patchState(model.StateFailed)
Expand Down Expand Up @@ -563,13 +537,10 @@ func (m *feedStateManager) handleError(errs ...*model.RunningError) {
})
}

// If we enter into an abnormal state 'error' for this changefeed now
// but haven't seen abnormal states in a sliding window (512 ticks),
// it can be assumed that this changefeed meets a sudden change from a stable condition.
// So we can reset the exponential backoff and re-backoff from the InitialInterval.
// TODO: this detection policy should be added into unit test.
if m.isChangefeedStable() {
// The errBackoff needs to be reset before the first retry.
if !m.isRetrying {
m.resetErrRetry()
m.isRetrying = true
}
}

Expand Down Expand Up @@ -646,6 +617,7 @@ func (m *feedStateManager) checkAndChangeState() {
zap.Uint64("checkpointTs", m.state.Status.CheckpointTs),
zap.Uint64("lastRetryCheckpointTs", m.lastErrorRetryCheckpointTs))
m.patchState(model.StateNormal)
m.isRetrying = false
}
}

Expand Down
210 changes: 203 additions & 7 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
Expand All @@ -46,19 +47,18 @@ func (p *mockPD) GetTS(_ context.Context) (int64, int64, error) {

// newFeedStateManager4Test creates feedStateManager for test
func newFeedStateManager4Test(
initialIntervalInMs time.Duration,
maxIntervalInMs time.Duration,
maxElapsedTimeInMs time.Duration,
initialIntervalInMs, maxIntervalInMs, maxElapsedTimeInMs int,
multiplier float64,
) *feedStateManager {
f := new(feedStateManager)
f.upstream = new(upstream.Upstream)
f.upstream.PDClient = &mockPD{}
f.upstream.PDClock = pdutil.NewClock4Test()

f.errBackoff = backoff.NewExponentialBackOff()
f.errBackoff.InitialInterval = initialIntervalInMs * time.Millisecond
f.errBackoff.MaxInterval = maxIntervalInMs * time.Millisecond
f.errBackoff.MaxElapsedTime = maxElapsedTimeInMs * time.Millisecond
f.errBackoff.InitialInterval = time.Duration(initialIntervalInMs) * time.Millisecond
f.errBackoff.MaxInterval = time.Duration(maxIntervalInMs) * time.Millisecond
f.errBackoff.MaxElapsedTime = time.Duration(maxElapsedTimeInMs) * time.Millisecond
f.errBackoff.Multiplier = multiplier
f.errBackoff.RandomizationFactor = 0

Expand Down Expand Up @@ -690,7 +690,7 @@ func TestBackoffNeverStops(t *testing.T) {
func TestUpdateChangefeedEpoch(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
// Set a long backoff time
manager := newFeedStateManager4Test(time.Hour, time.Hour, 0, 1.0)
manager := newFeedStateManager4Test(int(time.Hour), int(time.Hour), 0, 1.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
Expand Down Expand Up @@ -739,3 +739,199 @@ func TestUpdateChangefeedEpoch(t *testing.T) {
}
}
}

func TestHandleWarning(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, 0, 2.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.Nil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})

tester.MustApplyPatches()
manager.Tick(state, 0)
tester.MustApplyPatches()
require.Equal(t, model.StateNormal, state.Info.State)
require.True(t, manager.ShouldRunning())

// 1. test when an warning occurs, the changefeed state will be changed to warning
// and it will still keep running
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 2. test when the changefeed is in warning state, and the checkpointTs is not progressing,
// the changefeed state will remain warning
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 3. test when the changefeed is in warning state, and the checkpointTs is progressing,
// the changefeed state will be changed to normal
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 201,
}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
tester.MustApplyPatches()
require.Equal(t, model.StateNormal, state.Info.State)
require.True(t, manager.ShouldRunning())

// 4. test when the changefeed is in warning state, and the checkpointTs is not progressing
// for defaultBackoffMaxElapsedTime time, the changefeed state will be changed to failed
// and it will stop running
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
// mock the checkpointTs is not progressing for defaultBackoffMaxElapsedTime time
manager.checkpointTsAdvanced = manager.
checkpointTsAdvanced.Add(-(defaultBackoffMaxElapsedTime + 1))
// resolveTs = 202 > checkpointTs = 201
manager.Tick(state, 202)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateFailed, state.Info.State)
require.False(t, manager.ShouldRunning())
}

func TestErrorAfterWarning(t *testing.T) {
t.Parallel()

maxElapsedTimeInMs := 2000
ctx := cdcContext.NewBackendContext4Test(true)
manager := newFeedStateManager4Test(200, 1600, maxElapsedTimeInMs, 2.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.Nil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})

tester.MustApplyPatches()
manager.Tick(state, 0)
tester.MustApplyPatches()
require.Equal(t, model.StateNormal, state.Info.State)
require.True(t, manager.ShouldRunning())

// 1. test when an warning occurs, the changefeed state will be changed to warning
// and it will still keep running
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 2. test when the changefeed is in warning state, and the checkpointTs is not progressing,
// the changefeed state will remain warning
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 200,
}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state, 0)
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())

// 3. Sleep maxElapsedTimeInMs to wait backoff timeout. And when an error occurs after an warning,
// the backoff will be reseted, and changefeed state will be changed to warning and it will still
// keep running.
time.Sleep(time.Millisecond * time.Duration(maxElapsedTimeInMs))
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()

manager.Tick(state, 0)
// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StatePending, state.Info.State)
require.False(t, manager.ShouldRunning())
manager.Tick(state, 0)

// some patches will be generated when the manager.Tick is called
// so we need to apply the patches before we check the state
tester.MustApplyPatches()
require.Equal(t, model.StateWarning, state.Info.State)
require.True(t, manager.ShouldRunning())
}