diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index be1f7ec929b..506bef29a54 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -75,7 +75,7 @@ func (t *tableSpan) getTableSpanStatus(collectStat bool) tablepb.TableStatus { func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message { if status.Checkpoint.ResolvedTs < status.Checkpoint.CheckpointTs { - log.Panic("schedulerv3: resolved ts should not less than checkpoint ts", + log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.Any("tableStatus", status), zap.Any("checkpoint", status.Checkpoint.CheckpointTs), zap.Any("resolved", status.Checkpoint.ResolvedTs)) @@ -100,7 +100,7 @@ func newRemoveTableResponseMessage(status tablepb.TableStatus) *schedulepb.Messa // Advance resolved ts to checkpoint ts if table is removed. status.Checkpoint.ResolvedTs = status.Checkpoint.CheckpointTs } else { - log.Panic("schedulerv3: resolved ts should not less than checkpoint ts", + log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.Any("tableStatus", status), zap.Any("checkpoint", status.Checkpoint.CheckpointTs), zap.Any("resolved", status.Checkpoint.ResolvedTs)) diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index c88ee162e2e..fec432499a8 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -162,9 +162,7 @@ func NewReplicationSet( return nil, r.inconsistentError(table, captureID, "schedulerv3: table id inconsistent") } - if err := r.updateCheckpointAndStats(table.Checkpoint, table.Stats); err != nil { - return nil, errors.Trace(err) - } + r.updateCheckpointAndStats(table.Checkpoint, table.Stats) switch table.State { case tablepb.TableStateReplicating: @@ -500,8 +498,8 @@ func (r *ReplicationSet) pollOnPrepare( } case tablepb.TableStateReplicating: if r.Primary == captureID { - err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, err + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, nil } case tablepb.TableStateStopping, tablepb.TableStateStopped: if r.Primary == captureID { @@ -614,9 +612,7 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateStopped, tablepb.TableStateAbsent: if r.Primary == captureID { - if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { - return nil, false, errors.Trace(err) - } + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) original := r.Primary r.clearPrimary() if !r.hasRole(RoleSecondary) { @@ -688,9 +684,7 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateReplicating: if r.Primary == captureID { - if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { - return nil, false, errors.Trace(err) - } + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) if r.hasRole(RoleSecondary) { // Original primary is not stopped, ask for stopping. return &schedulepb.Message{ @@ -725,8 +719,8 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateStopping: if r.Primary == captureID && r.hasRole(RoleSecondary) { - err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, err + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, nil } else if r.isInRole(captureID, RoleUndetermined) { log.Info("schedulerv3: capture is stopping during Commit", zap.String("namespace", r.Changefeed.Namespace), @@ -755,8 +749,8 @@ func (r *ReplicationSet) pollOnReplicating( switch input.State { case tablepb.TableStateReplicating: if r.Primary == captureID { - err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, err + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, nil } return nil, false, r.multiplePrimaryError( input, captureID, "schedulerv3: multiple primary") @@ -767,10 +761,7 @@ func (r *ReplicationSet) pollOnReplicating( case tablepb.TableStateStopping: case tablepb.TableStateStopped: if r.Primary == captureID { - if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { - return nil, false, errors.Trace(err) - } - + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) // Primary is stopped, but we still has secondary. // Clear primary and promote secondary when it's prepared. log.Info("schedulerv3: primary is stopped during Replicating", @@ -1000,7 +991,7 @@ func (r *ReplicationSet) handleCaptureShutdown( func (r *ReplicationSet) updateCheckpointAndStats( checkpoint tablepb.Checkpoint, stats tablepb.Stats, -) error { +) { if checkpoint.ResolvedTs < checkpoint.CheckpointTs { log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.String("namespace", r.Changefeed.Namespace), @@ -1028,11 +1019,8 @@ func (r *ReplicationSet) updateCheckpointAndStats( zap.Any("replicationSet", r), zap.Any("checkpointTs", r.Checkpoint.CheckpointTs), zap.Any("resolvedTs", r.Checkpoint.ResolvedTs)) - return errors.ErrInvalidCheckpointTs.GenWithStackByArgs(r.Checkpoint.CheckpointTs, - r.Checkpoint.ResolvedTs) } r.Stats = stats - return nil } // SetHeap is a max-heap, it implements heap.Interface. diff --git a/cdc/scheduler/internal/v3/replication/replication_set_test.go b/cdc/scheduler/internal/v3/replication/replication_set_test.go index bf1b67bd498..da417438bc0 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_set_test.go @@ -1455,3 +1455,29 @@ func TestReplicationSetHeap_MinK(t *testing.T) { require.Equal(t, expectedTables, tables) require.Equal(t, 0, h.Len()) } + +func TestUpdateCheckpointAndStats(t *testing.T) { + cases := []struct { + checkpoint tablepb.Checkpoint + stats tablepb.Stats + }{ + { + checkpoint: tablepb.Checkpoint{ + CheckpointTs: 1, + ResolvedTs: 2, + }, + stats: tablepb.Stats{}, + }, + { + checkpoint: tablepb.Checkpoint{ + CheckpointTs: 2, + ResolvedTs: 1, + }, + stats: tablepb.Stats{}, + }, + } + r := &ReplicationSet{} + for _, c := range cases { + r.updateCheckpointAndStats(c.checkpoint, c.stats) + } +}