Skip to content

Commit

Permalink
scheduler (ticdc): does not return error when resolvedTs less than ch…
Browse files Browse the repository at this point in the history
…eckpoint (#9828)

close #9769
  • Loading branch information
asddongmen authored Oct 5, 2023
1 parent 5dc93e4 commit cf9fb66
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 25 deletions.
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/agent/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (t *table) getTableStatus(collectStat bool) tablepb.TableStatus {

func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message {
if status.Checkpoint.ResolvedTs < status.Checkpoint.CheckpointTs {
log.Panic("schedulerv3: resolved ts should not less than checkpoint ts",
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.Any("tableStatus", status),
zap.Any("checkpoint", status.Checkpoint.CheckpointTs),
zap.Any("resolved", status.Checkpoint.ResolvedTs))
Expand All @@ -99,7 +99,7 @@ func newRemoveTableResponseMessage(status tablepb.TableStatus) *schedulepb.Messa
// Advance resolved ts to checkpoint ts if table is removed.
status.Checkpoint.ResolvedTs = status.Checkpoint.CheckpointTs
} else {
log.Panic("schedulerv3: resolved ts should not less than checkpoint ts",
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.Any("tableStatus", status),
zap.Any("checkpoint", status.Checkpoint.CheckpointTs),
zap.Any("resolved", status.Checkpoint.ResolvedTs))
Expand Down
34 changes: 11 additions & 23 deletions cdc/scheduler/internal/v3/replication/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ func NewReplicationSet(
return nil, r.inconsistentError(table, captureID,
"schedulerv3: table id inconsistent")
}
if err := r.updateCheckpointAndStats(table.Checkpoint, table.Stats); err != nil {
return nil, errors.Trace(err)
}
r.updateCheckpointAndStats(table.Checkpoint, table.Stats)

switch table.State {
case tablepb.TableStateReplicating:
Expand Down Expand Up @@ -485,8 +483,8 @@ func (r *ReplicationSet) pollOnPrepare(
}
case tablepb.TableStateReplicating:
if r.Primary == captureID {
err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, errors.Trace(err)
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, nil
}
case tablepb.TableStateStopping, tablepb.TableStateStopped:
if r.Primary == captureID {
Expand Down Expand Up @@ -587,9 +585,7 @@ func (r *ReplicationSet) pollOnCommit(

case tablepb.TableStateStopped, tablepb.TableStateAbsent:
if r.Primary == captureID {
if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil {
return nil, false, errors.Trace(err)
}
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
original := r.Primary
r.clearPrimary()
if !r.hasRole(RoleSecondary) {
Expand Down Expand Up @@ -653,9 +649,7 @@ func (r *ReplicationSet) pollOnCommit(

case tablepb.TableStateReplicating:
if r.Primary == captureID {
if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil {
return nil, false, errors.Trace(err)
}
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
if r.hasRole(RoleSecondary) {
// Original primary is not stopped, ask for stopping.
return &schedulepb.Message{
Expand Down Expand Up @@ -688,8 +682,8 @@ func (r *ReplicationSet) pollOnCommit(

case tablepb.TableStateStopping:
if r.Primary == captureID && r.hasRole(RoleSecondary) {
err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, errors.Trace(err)
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, nil
} else if r.isInRole(captureID, RoleUndetermined) {
log.Info("schedulerv3: capture is stopping during Commit",
zap.Stringer("tableState", input),
Expand All @@ -714,8 +708,8 @@ func (r *ReplicationSet) pollOnReplicating(
switch input.State {
case tablepb.TableStateReplicating:
if r.Primary == captureID {
err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, errors.Trace(err)
r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
return nil, false, nil
}
return nil, false, r.multiplePrimaryError(
input, captureID, "schedulerv3: multiple primary")
Expand All @@ -726,10 +720,7 @@ func (r *ReplicationSet) pollOnReplicating(
case tablepb.TableStateStopping:
case tablepb.TableStateStopped:
if r.Primary == captureID {
if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil {
return nil, false, errors.Trace(err)
}

r.updateCheckpointAndStats(input.Checkpoint, input.Stats)
// Primary is stopped, but we still has secondary.
// Clear primary and promote secondary when it's prepared.
log.Info("schedulerv3: primary is stopped during Replicating",
Expand Down Expand Up @@ -919,7 +910,7 @@ func (r *ReplicationSet) handleCaptureShutdown(

func (r *ReplicationSet) updateCheckpointAndStats(
checkpoint tablepb.Checkpoint, stats tablepb.Stats,
) error {
) {
if checkpoint.ResolvedTs < checkpoint.CheckpointTs {
log.Warn("schedulerv3: resolved ts should not less than checkpoint ts",
zap.Any("replicationSet", r),
Expand All @@ -941,11 +932,8 @@ func (r *ReplicationSet) updateCheckpointAndStats(
zap.Any("replicationSet", r),
zap.Any("checkpointTs", r.Checkpoint.CheckpointTs),
zap.Any("resolvedTs", r.Checkpoint.ResolvedTs))
return errors.ErrInvalidCheckpointTs.GenWithStackByArgs(r.Checkpoint.CheckpointTs,
r.Checkpoint.ResolvedTs)
}
r.Stats = stats
return nil
}

// SetHeap is a max-heap, it implements heap.Interface.
Expand Down
26 changes: 26 additions & 0 deletions cdc/scheduler/internal/v3/replication/replication_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,3 +1426,29 @@ func TestReplicationSetHeap_MinK(t *testing.T) {
require.Equal(t, expectedTables, tables)
require.Equal(t, 0, h.Len())
}

func TestUpdateCheckpointAndStats(t *testing.T) {
cases := []struct {
checkpoint tablepb.Checkpoint
stats tablepb.Stats
}{
{
checkpoint: tablepb.Checkpoint{
CheckpointTs: 1,
ResolvedTs: 2,
},
stats: tablepb.Stats{},
},
{
checkpoint: tablepb.Checkpoint{
CheckpointTs: 2,
ResolvedTs: 1,
},
stats: tablepb.Stats{},
},
}
r := &ReplicationSet{}
for _, c := range cases {
r.updateCheckpointAndStats(c.checkpoint, c.stats)
}
}

0 comments on commit cf9fb66

Please sign in to comment.