Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config, changefeed (ticdc): add changefeed error stuck duration config (#9872) #9975

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,18 @@

// resolvedTs and initCheckpointTs is for checking whether resolved timestamp
// has been advanced or not.
<<<<<<< HEAD

Check failure on line 66 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected field name or embedded type

Check failure on line 66 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected field name or embedded type
resolvedTs model.Ts
checkpointTs model.Ts
checkpointTsAdvanced time.Time

=======

Check failure on line 71 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ==, expected field name or embedded type

Check failure on line 71 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expected field name or embedded type
resolvedTs model.Ts
initCheckpointTs model.Ts

checkpointTsAdvanced time.Time
lastCheckpointTs model.Ts
>>>>>>> 797dc6dd3a (config, changefeed (ticdc): add changefeed error stuck duration config (#9872))

Check failure on line 77 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected >>, expected field name or embedded type

Check failure on line 77 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 77 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected >>, expected field name or embedded type

Check failure on line 77 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'
changefeedErrorStuckDuration time.Duration
}

Expand All @@ -85,6 +93,7 @@
m.changefeedErrorStuckDuration = *cfg.ChangefeedErrorStuckDuration

m.resetErrRetry()
<<<<<<< HEAD

Check failure on line 96 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected }

Check failure on line 96 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected }
m.isRetrying = false
return m
}
Expand All @@ -105,6 +114,9 @@

m.lastErrorRetryTime = time.Now()
return false
=======

Check failure on line 117 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ==, expected }

Check failure on line 117 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expected }
return m
>>>>>>> 797dc6dd3a (config, changefeed (ticdc): add changefeed error stuck duration config (#9872))

Check failure on line 119 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 119 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'
}

// resetErrRetry reset the error retry related fields
Expand Down Expand Up @@ -582,9 +594,19 @@
currTime := m.upstream.PDClock.CurrentTime()
ckptTime := oracle.GetTimeFromTS(m.state.Status.CheckpointTs)
m.lastWarningReportCheckpointTs = m.state.Status.CheckpointTs
<<<<<<< HEAD

Check failure on line 597 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expected }

Check failure on line 597 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected }

checkpointTsStuck := time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration
if checkpointTsStuck {
=======

Check failure on line 601 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ==, expected }

Check failure on line 601 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ==, expected }
// Conditions:
// 1. checkpoint lag is large enough;
// 2. checkpoint hasn't been advanced for a long while;
// 3. the changefeed has been initialized.
if currTime.Sub(ckptTime) > m.changefeedErrorStuckDuration &&
time.Since(m.checkpointTsAdvanced) > m.changefeedErrorStuckDuration &&
m.resolvedTs > m.initCheckpointTs {
>>>>>>> 797dc6dd3a (config, changefeed (ticdc): add changefeed error stuck duration config (#9872))

Check failure on line 609 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected >>, expected }

Check failure on line 609 in cdc/owner/feed_state_manager.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected >>, expected }
log.Info("changefeed retry on warning for a very long time and does not resume, "+
"it will be failed", zap.String("changefeed", m.state.ID.ID),
zap.Uint64("checkpointTs", m.state.Status.CheckpointTs),
Expand Down
1 change: 1 addition & 0 deletions pkg/orchestrator/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er
return errors.Trace(err)
}
if key.Tp == etcd.CDCKeyTypeChangefeedInfo {
log.Info("update changefeed info", zap.Any("info", s.Info))
s.Info.VerifyAndComplete()
}
return nil
Expand Down
Loading