diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index bf92f812f2f..7de5e88f2e8 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -777,30 +777,14 @@ func (m *SinkManager) UpdateReceivedSorterResolvedTs(tableID model.TableID, ts m } // UpdateBarrierTs update all tableSink's barrierTs in the SinkManager -func (m *SinkManager) UpdateBarrierTs( - globalBarrierTs model.Ts, - tableBarrier map[model.TableID]model.Ts, -) { - m.tableSinks.Range(func(tableID, value interface{}) bool { - tableSink := value.(*tableSinkWrapper) - lastBarrierTs := tableSink.barrierTs.Load() - // It is safe to do not use compare and swap here. - // Only the processor will update the barrier ts. - // Other goroutines will only read the barrier ts. - // So it is safe to do not use compare and swap here, just Load and Store. - if tableBarrierTs, ok := tableBarrier[tableSink.tableID]; ok { - barrierTs := tableBarrierTs - if barrierTs > globalBarrierTs { - barrierTs = globalBarrierTs - } - if barrierTs > lastBarrierTs { - tableSink.barrierTs.Store(barrierTs) - } - } else { - if globalBarrierTs > lastBarrierTs { - tableSink.barrierTs.Store(globalBarrierTs) - } +func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map[model.TableID]model.Ts) { + m.tableSinks.Range(func(key, value interface{}) bool { + tableID := key.(model.TableID) + barrierTs := globalBarrierTs + if tableBarrierTs, ok := tableBarrier[tableID]; ok && tableBarrierTs < globalBarrierTs { + barrierTs = tableBarrierTs } + value.(*tableSinkWrapper).updateBarrierTs(barrierTs) return true }) } @@ -1016,12 +1000,13 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats { resolvedTs = tableSink.getReceivedSorterResolvedTs() } - if resolvedTs < checkpointTs.ResolvedMark() { - log.Error("sinkManager: resolved ts should not less than checkpoint ts", + sinkUpperBound := tableSink.getUpperBoundTs() + if sinkUpperBound < checkpointTs.ResolvedMark() { + log.Panic("sinkManager: sink upperbound should not less than checkpoint ts", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Int64("tableID", tableID), - zap.Uint64("resolvedTs", resolvedTs), + zap.Uint64("upperbound", sinkUpperBound), zap.Any("checkpointTs", checkpointTs)) } return TableStats{ diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index c1372d27345..cd58996428a 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -178,6 +178,15 @@ func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEve return nil } +func (t *tableSinkWrapper) updateBarrierTs(ts model.Ts) { + for { + old := t.barrierTs.Load() + if ts <= old || t.barrierTs.CompareAndSwap(old, ts) { + break + } + } +} + func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) { for { old := t.receivedSorterResolvedTs.Load() diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index 6afc3089c62..c7be3d1dfc6 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -183,10 +183,7 @@ func (a *agent) Tick(ctx context.Context) (*schedulepb.Barrier, error) { return nil, errors.Trace(err) } - outboundMessages, barrier, err := a.handleMessage(inboundMessages) - if err != nil { - return nil, errors.Trace(err) - } + outboundMessages, barrier := a.handleMessage(inboundMessages) responses, err := a.tableM.poll(ctx) if err != nil { @@ -214,9 +211,7 @@ func (a *agent) handleLivenessUpdate(liveness model.Liveness) { } } -func (a *agent) handleMessage(msg []*schedulepb.Message) ( - result []*schedulepb.Message, barrier *schedulepb.Barrier, err error, -) { +func (a *agent) handleMessage(msg []*schedulepb.Message) (result []*schedulepb.Message, barrier *schedulepb.Barrier) { for _, message := range msg { ownerCaptureID := message.GetFrom() header := message.GetHeader() @@ -231,7 +226,7 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) ( switch message.GetMsgType() { case schedulepb.MsgHeartbeat: var reMsg *schedulepb.Message - reMsg, barrier, err = a.handleMessageHeartbeat(message.GetHeartbeat()) + reMsg, barrier = a.handleMessageHeartbeat(message.GetHeartbeat()) result = append(result, reMsg) case schedulepb.MsgDispatchTableRequest: a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch) @@ -246,25 +241,22 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) ( return } -func (a *agent) handleMessageHeartbeat( - request *schedulepb.Heartbeat, -) (*schedulepb.Message, *schedulepb.Barrier, error) { +func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (*schedulepb.Message, *schedulepb.Barrier) { allTables := a.tableM.getAllTables() result := make([]tablepb.TableStatus, 0, len(allTables)) - for _, table := range allTables { + for tableID, table := range allTables { status := table.getTableStatus(request.CollectStats) + if status.Checkpoint.CheckpointTs > status.Checkpoint.ResolvedTs { + log.Warn("schedulerv3: CheckpointTs is greater than ResolvedTs", + zap.String("namespace", a.ChangeFeedID.Namespace), + zap.String("changefeed", a.ChangeFeedID.ID), + zap.Int64("tableID", tableID)) + } if table.task != nil && table.task.IsRemove { status.State = tablepb.TableStateStopping } result = append(result, status) - - isValidCheckpointTs := status.Checkpoint.CheckpointTs <= status.Checkpoint.ResolvedTs - if !isValidCheckpointTs { - status := result[len(result)-1] - return nil, nil, errors.ErrInvalidCheckpointTs. - GenWithStackByArgs(status.Checkpoint.CheckpointTs, status.Checkpoint.ResolvedTs) - } } for _, tableID := range request.GetTableIDs() { if _, ok := allTables[tableID]; !ok { @@ -292,7 +284,7 @@ func (a *agent) handleMessageHeartbeat( zap.String("changefeed", a.ChangeFeedID.ID), zap.Any("message", message)) - return message, request.GetBarrier(), nil + return message, request.GetBarrier() } type dispatchTableTaskStatus int32 diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index fdf07b5f654..4f950d95975 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -320,7 +320,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { }, } - response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) require.Equal(t, model.LivenessCaptureAlive, response[0].GetHeartbeatResponse().Liveness) @@ -340,7 +340,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { } a.tableM.tables[model.TableID(1)].task = &dispatchTableTask{IsRemove: true} - response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) result = response[0].GetHeartbeatResponse().Tables sort.Slice(result, func(i, j int) bool { return result[i].TableID < result[j].TableID @@ -348,13 +348,13 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { require.Equal(t, tablepb.TableStateStopping, result[1].State) a.handleLivenessUpdate(model.LivenessCaptureStopping) - response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness) a.handleLivenessUpdate(model.LivenessCaptureAlive) heartbeat.Heartbeat.IsStopping = true - response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness) require.Equal(t, model.LivenessCaptureStopping, a.liveness.Load()) } @@ -536,7 +536,7 @@ func TestAgentHandleMessage(t *testing.T) { } // handle the first heartbeat, from the known owner. - response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) addTableRequest := &schedulepb.Message{ @@ -559,17 +559,17 @@ func TestAgentHandleMessage(t *testing.T) { }, } // wrong epoch, ignored - responses, _, _ := a.handleMessage([]*schedulepb.Message{addTableRequest}) + responses, _ := a.handleMessage([]*schedulepb.Message{addTableRequest}) require.NotContains(t, tableM.tables, model.TableID(1)) require.Len(t, responses, 0) // correct epoch, processing. addTableRequest.Header.ProcessorEpoch = a.Epoch - _, _, _ = a.handleMessage([]*schedulepb.Message{addTableRequest}) + _, _ = a.handleMessage([]*schedulepb.Message{addTableRequest}) require.Contains(t, tableM.tables, model.TableID(1)) heartbeat.Header.OwnerRevision.Revision = 2 - response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) // this should never happen in real world @@ -583,12 +583,12 @@ func TestAgentHandleMessage(t *testing.T) { From: a.ownerInfo.CaptureID, } - response, _, _ = a.handleMessage([]*schedulepb.Message{unknownMessage}) + response, _ = a.handleMessage([]*schedulepb.Message{unknownMessage}) require.Len(t, response, 0) // staled message heartbeat.Header.OwnerRevision.Revision = 1 - response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 0) } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index b1f385145fc..04cf71aa802 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -495,6 +495,38 @@ func (r *Manager) AdvanceCheckpoint( barrier *schedulepb.BarrierWithMinTs, redoMetaManager redo.MetaManager, ) (newCheckpointTs, newResolvedTs model.Ts) { + var redoFlushedResolvedTs model.Ts + limitBarrierWithRedo := func(newCheckpointTs, newResolvedTs uint64) (uint64, uint64) { + flushedMeta := redoMetaManager.GetFlushedMeta() + redoFlushedResolvedTs = flushedMeta.ResolvedTs + log.Debug("owner gets flushed redo meta", + zap.String("namespace", r.changefeedID.Namespace), + zap.String("changefeed", r.changefeedID.ID), + zap.Uint64("flushedCheckpointTs", flushedMeta.CheckpointTs), + zap.Uint64("flushedResolvedTs", flushedMeta.ResolvedTs)) + if flushedMeta.ResolvedTs < newResolvedTs { + newResolvedTs = flushedMeta.ResolvedTs + } + + if newCheckpointTs > newResolvedTs { + newCheckpointTs = newResolvedTs + } + + if barrier.GlobalBarrierTs > newResolvedTs { + barrier.GlobalBarrierTs = newResolvedTs + } + return newCheckpointTs, newResolvedTs + } + defer func() { + if redoFlushedResolvedTs != 0 && barrier.GlobalBarrierTs > redoFlushedResolvedTs { + log.Panic("barrierTs should never greater than redo flushed", + zap.String("namespace", r.changefeedID.Namespace), + zap.String("changefeed", r.changefeedID.ID), + zap.Uint64("barrierTs", barrier.GlobalBarrierTs), + zap.Uint64("redoFlushedResolvedTs", redoFlushedResolvedTs)) + } + }() + newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64 slowestTableID := int64(0) for _, tableID := range currentTables { @@ -509,6 +541,11 @@ func (r *Manager) AdvanceCheckpoint( zap.Int64("tableID", tableID)) r.lastLogMissTime = now } + if redoMetaManager.Enabled() { + // If redo is enabled, GlobalBarrierTs should be limited by redo flushed meta. + newResolvedTs = barrier.RedoBarrierTs + limitBarrierWithRedo(newCheckpointTs, newResolvedTs) + } return checkpointCannotProceed, checkpointCannotProceed } // Find the minimum checkpoint ts and resolved ts. @@ -564,26 +601,12 @@ func (r *Manager) AdvanceCheckpoint( newResolvedTs = barrier.RedoBarrierTs } redoMetaManager.UpdateMeta(newCheckpointTs, newResolvedTs) - flushedMeta := redoMetaManager.GetFlushedMeta() - flushedCheckpointTs, flushedResolvedTs := flushedMeta.CheckpointTs, flushedMeta.ResolvedTs - log.Debug("owner gets flushed meta", - zap.Uint64("flushedResolvedTs", flushedResolvedTs), - zap.Uint64("flushedCheckpointTs", flushedCheckpointTs), - zap.Uint64("newResolvedTs", newResolvedTs), - zap.Uint64("newCheckpointTs", newCheckpointTs), + log.Debug("owner updates redo meta", zap.String("namespace", r.changefeedID.Namespace), - zap.String("changefeed", r.changefeedID.ID)) - if flushedResolvedTs != 0 && flushedResolvedTs < newResolvedTs { - newResolvedTs = flushedResolvedTs - } - - if newCheckpointTs > newResolvedTs { - newCheckpointTs = newResolvedTs - } - - if barrier.GlobalBarrierTs > newResolvedTs { - barrier.GlobalBarrierTs = newResolvedTs - } + zap.String("changefeed", r.changefeedID.ID), + zap.Uint64("newCheckpointTs", newCheckpointTs), + zap.Uint64("newResolvedTs", newResolvedTs)) + return limitBarrierWithRedo(newCheckpointTs, newResolvedTs) } return newCheckpointTs, newResolvedTs diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index 39cc137e00c..c5e7aad2c93 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -714,6 +714,48 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { require.Equal(t, model.Ts(9), barrier.GetGlobalBarrierTs()) } +func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) { + t.Parallel() + r := NewReplicationManager(1, model.ChangeFeedID{}) + rs, err := NewReplicationSet(1, model.Ts(10), + map[model.CaptureID]*tablepb.TableStatus{ + "1": { + TableID: 1, + State: tablepb.TableStateReplicating, + Checkpoint: tablepb.Checkpoint{ + CheckpointTs: model.Ts(10), + ResolvedTs: model.Ts(20), + }, + }, + }, model.ChangeFeedID{}) + require.NoError(t, err) + r.tables[1] = rs + + rs, err = NewReplicationSet(2, model.Ts(15), + map[model.CaptureID]*tablepb.TableStatus{ + "2": { + TableID: 2, + State: tablepb.TableStateReplicating, + Checkpoint: tablepb.Checkpoint{ + CheckpointTs: model.Ts(15), + ResolvedTs: model.Ts(30), + }, + }, + }, model.ChangeFeedID{}) + require.NoError(t, err) + r.tables[2] = rs + + redoMetaManager := &mockRedoMetaManager{enable: true, resolvedTs: 25} + + // some table not exist yet with redo is enabled. + currentTables := []model.TableID{1, 2, 3} + barrier := schedulepb.NewBarrierWithMinTs(30) + checkpoint, resolved := r.AdvanceCheckpoint(currentTables, time.Now(), barrier, redoMetaManager) + require.Equal(t, checkpointCannotProceed, checkpoint) + require.Equal(t, checkpointCannotProceed, resolved) + require.Equal(t, uint64(25), barrier.Barrier.GetGlobalBarrierTs()) +} + func TestReplicationManagerHandleCaptureChanges(t *testing.T) { t.Parallel()