Skip to content

Commit

Permalink
sink(cdc): fix the check about resolvedTs and checkpointTs (#9772) (#…
Browse files Browse the repository at this point in the history
…9798)

close #9769
  • Loading branch information
hicqu authored Sep 23, 2023
1 parent 11b023a commit a01df72
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 75 deletions.
37 changes: 11 additions & 26 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,30 +777,14 @@ func (m *SinkManager) UpdateReceivedSorterResolvedTs(tableID model.TableID, ts m
}

// UpdateBarrierTs update all tableSink's barrierTs in the SinkManager
func (m *SinkManager) UpdateBarrierTs(
globalBarrierTs model.Ts,
tableBarrier map[model.TableID]model.Ts,
) {
m.tableSinks.Range(func(tableID, value interface{}) bool {
tableSink := value.(*tableSinkWrapper)
lastBarrierTs := tableSink.barrierTs.Load()
// It is safe to do not use compare and swap here.
// Only the processor will update the barrier ts.
// Other goroutines will only read the barrier ts.
// So it is safe to do not use compare and swap here, just Load and Store.
if tableBarrierTs, ok := tableBarrier[tableSink.tableID]; ok {
barrierTs := tableBarrierTs
if barrierTs > globalBarrierTs {
barrierTs = globalBarrierTs
}
if barrierTs > lastBarrierTs {
tableSink.barrierTs.Store(barrierTs)
}
} else {
if globalBarrierTs > lastBarrierTs {
tableSink.barrierTs.Store(globalBarrierTs)
}
func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map[model.TableID]model.Ts) {
m.tableSinks.Range(func(key, value interface{}) bool {
tableID := key.(model.TableID)
barrierTs := globalBarrierTs
if tableBarrierTs, ok := tableBarrier[tableID]; ok && tableBarrierTs < globalBarrierTs {
barrierTs = tableBarrierTs
}
value.(*tableSinkWrapper).updateBarrierTs(barrierTs)
return true
})
}
Expand Down Expand Up @@ -1016,12 +1000,13 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
resolvedTs = tableSink.getReceivedSorterResolvedTs()
}

if resolvedTs < checkpointTs.ResolvedMark() {
log.Error("sinkManager: resolved ts should not less than checkpoint ts",
sinkUpperBound := tableSink.getUpperBoundTs()
if sinkUpperBound < checkpointTs.ResolvedMark() {
log.Panic("sinkManager: sink upperbound should not less than checkpoint ts",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("upperbound", sinkUpperBound),
zap.Any("checkpointTs", checkpointTs))
}
return TableStats{
Expand Down
9 changes: 9 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEve
return nil
}

func (t *tableSinkWrapper) updateBarrierTs(ts model.Ts) {
for {
old := t.barrierTs.Load()
if ts <= old || t.barrierTs.CompareAndSwap(old, ts) {
break
}
}
}

func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) {
for {
old := t.receivedSorterResolvedTs.Load()
Expand Down
32 changes: 12 additions & 20 deletions cdc/scheduler/internal/v3/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,7 @@ func (a *agent) Tick(ctx context.Context) (*schedulepb.Barrier, error) {
return nil, errors.Trace(err)
}

outboundMessages, barrier, err := a.handleMessage(inboundMessages)
if err != nil {
return nil, errors.Trace(err)
}
outboundMessages, barrier := a.handleMessage(inboundMessages)

responses, err := a.tableM.poll(ctx)
if err != nil {
Expand Down Expand Up @@ -214,9 +211,7 @@ func (a *agent) handleLivenessUpdate(liveness model.Liveness) {
}
}

func (a *agent) handleMessage(msg []*schedulepb.Message) (
result []*schedulepb.Message, barrier *schedulepb.Barrier, err error,
) {
func (a *agent) handleMessage(msg []*schedulepb.Message) (result []*schedulepb.Message, barrier *schedulepb.Barrier) {
for _, message := range msg {
ownerCaptureID := message.GetFrom()
header := message.GetHeader()
Expand All @@ -231,7 +226,7 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) (
switch message.GetMsgType() {
case schedulepb.MsgHeartbeat:
var reMsg *schedulepb.Message
reMsg, barrier, err = a.handleMessageHeartbeat(message.GetHeartbeat())
reMsg, barrier = a.handleMessageHeartbeat(message.GetHeartbeat())
result = append(result, reMsg)
case schedulepb.MsgDispatchTableRequest:
a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch)
Expand All @@ -246,25 +241,22 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) (
return
}

func (a *agent) handleMessageHeartbeat(
request *schedulepb.Heartbeat,
) (*schedulepb.Message, *schedulepb.Barrier, error) {
func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) (*schedulepb.Message, *schedulepb.Barrier) {
allTables := a.tableM.getAllTables()
result := make([]tablepb.TableStatus, 0, len(allTables))

for _, table := range allTables {
for tableID, table := range allTables {
status := table.getTableStatus(request.CollectStats)
if status.Checkpoint.CheckpointTs > status.Checkpoint.ResolvedTs {
log.Warn("schedulerv3: CheckpointTs is greater than ResolvedTs",
zap.String("namespace", a.ChangeFeedID.Namespace),
zap.String("changefeed", a.ChangeFeedID.ID),
zap.Int64("tableID", tableID))
}
if table.task != nil && table.task.IsRemove {
status.State = tablepb.TableStateStopping
}
result = append(result, status)

isValidCheckpointTs := status.Checkpoint.CheckpointTs <= status.Checkpoint.ResolvedTs
if !isValidCheckpointTs {
status := result[len(result)-1]
return nil, nil, errors.ErrInvalidCheckpointTs.
GenWithStackByArgs(status.Checkpoint.CheckpointTs, status.Checkpoint.ResolvedTs)
}
}
for _, tableID := range request.GetTableIDs() {
if _, ok := allTables[tableID]; !ok {
Expand Down Expand Up @@ -292,7 +284,7 @@ func (a *agent) handleMessageHeartbeat(
zap.String("changefeed", a.ChangeFeedID.ID),
zap.Any("message", message))

return message, request.GetBarrier(), nil
return message, request.GetBarrier()
}

type dispatchTableTaskStatus int32
Expand Down
20 changes: 10 additions & 10 deletions cdc/scheduler/internal/v3/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) {
},
}

response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.Equal(t, model.LivenessCaptureAlive, response[0].GetHeartbeatResponse().Liveness)

Expand All @@ -340,21 +340,21 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) {
}

a.tableM.tables[model.TableID(1)].task = &dispatchTableTask{IsRemove: true}
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
result = response[0].GetHeartbeatResponse().Tables
sort.Slice(result, func(i, j int) bool {
return result[i].TableID < result[j].TableID
})
require.Equal(t, tablepb.TableStateStopping, result[1].State)

a.handleLivenessUpdate(model.LivenessCaptureStopping)
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness)

a.handleLivenessUpdate(model.LivenessCaptureAlive)
heartbeat.Heartbeat.IsStopping = true
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness)
require.Equal(t, model.LivenessCaptureStopping, a.liveness.Load())
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func TestAgentHandleMessage(t *testing.T) {
}

// handle the first heartbeat, from the known owner.
response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)

addTableRequest := &schedulepb.Message{
Expand All @@ -559,17 +559,17 @@ func TestAgentHandleMessage(t *testing.T) {
},
}
// wrong epoch, ignored
responses, _, _ := a.handleMessage([]*schedulepb.Message{addTableRequest})
responses, _ := a.handleMessage([]*schedulepb.Message{addTableRequest})
require.NotContains(t, tableM.tables, model.TableID(1))
require.Len(t, responses, 0)

// correct epoch, processing.
addTableRequest.Header.ProcessorEpoch = a.Epoch
_, _, _ = a.handleMessage([]*schedulepb.Message{addTableRequest})
_, _ = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.Contains(t, tableM.tables, model.TableID(1))

heartbeat.Header.OwnerRevision.Revision = 2
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)

// this should never happen in real world
Expand All @@ -583,12 +583,12 @@ func TestAgentHandleMessage(t *testing.T) {
From: a.ownerInfo.CaptureID,
}

response, _, _ = a.handleMessage([]*schedulepb.Message{unknownMessage})
response, _ = a.handleMessage([]*schedulepb.Message{unknownMessage})
require.Len(t, response, 0)

// staled message
heartbeat.Header.OwnerRevision.Revision = 1
response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
response, _ = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 0)
}

Expand Down
61 changes: 42 additions & 19 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,38 @@ func (r *Manager) AdvanceCheckpoint(
barrier *schedulepb.BarrierWithMinTs,
redoMetaManager redo.MetaManager,
) (newCheckpointTs, newResolvedTs model.Ts) {
var redoFlushedResolvedTs model.Ts
limitBarrierWithRedo := func(newCheckpointTs, newResolvedTs uint64) (uint64, uint64) {
flushedMeta := redoMetaManager.GetFlushedMeta()
redoFlushedResolvedTs = flushedMeta.ResolvedTs
log.Debug("owner gets flushed redo meta",
zap.String("namespace", r.changefeedID.Namespace),
zap.String("changefeed", r.changefeedID.ID),
zap.Uint64("flushedCheckpointTs", flushedMeta.CheckpointTs),
zap.Uint64("flushedResolvedTs", flushedMeta.ResolvedTs))
if flushedMeta.ResolvedTs < newResolvedTs {
newResolvedTs = flushedMeta.ResolvedTs
}

if newCheckpointTs > newResolvedTs {
newCheckpointTs = newResolvedTs
}

if barrier.GlobalBarrierTs > newResolvedTs {
barrier.GlobalBarrierTs = newResolvedTs
}
return newCheckpointTs, newResolvedTs
}
defer func() {
if redoFlushedResolvedTs != 0 && barrier.GlobalBarrierTs > redoFlushedResolvedTs {
log.Panic("barrierTs should never greater than redo flushed",
zap.String("namespace", r.changefeedID.Namespace),
zap.String("changefeed", r.changefeedID.ID),
zap.Uint64("barrierTs", barrier.GlobalBarrierTs),
zap.Uint64("redoFlushedResolvedTs", redoFlushedResolvedTs))
}
}()

newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
slowestTableID := int64(0)
for _, tableID := range currentTables {
Expand All @@ -509,6 +541,11 @@ func (r *Manager) AdvanceCheckpoint(
zap.Int64("tableID", tableID))
r.lastLogMissTime = now
}
if redoMetaManager.Enabled() {
// If redo is enabled, GlobalBarrierTs should be limited by redo flushed meta.
newResolvedTs = barrier.RedoBarrierTs
limitBarrierWithRedo(newCheckpointTs, newResolvedTs)
}
return checkpointCannotProceed, checkpointCannotProceed
}
// Find the minimum checkpoint ts and resolved ts.
Expand Down Expand Up @@ -564,26 +601,12 @@ func (r *Manager) AdvanceCheckpoint(
newResolvedTs = barrier.RedoBarrierTs
}
redoMetaManager.UpdateMeta(newCheckpointTs, newResolvedTs)
flushedMeta := redoMetaManager.GetFlushedMeta()
flushedCheckpointTs, flushedResolvedTs := flushedMeta.CheckpointTs, flushedMeta.ResolvedTs
log.Debug("owner gets flushed meta",
zap.Uint64("flushedResolvedTs", flushedResolvedTs),
zap.Uint64("flushedCheckpointTs", flushedCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs),
log.Debug("owner updates redo meta",
zap.String("namespace", r.changefeedID.Namespace),
zap.String("changefeed", r.changefeedID.ID))
if flushedResolvedTs != 0 && flushedResolvedTs < newResolvedTs {
newResolvedTs = flushedResolvedTs
}

if newCheckpointTs > newResolvedTs {
newCheckpointTs = newResolvedTs
}

if barrier.GlobalBarrierTs > newResolvedTs {
barrier.GlobalBarrierTs = newResolvedTs
}
zap.String("changefeed", r.changefeedID.ID),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs))
return limitBarrierWithRedo(newCheckpointTs, newResolvedTs)
}

return newCheckpointTs, newResolvedTs
Expand Down
42 changes: 42 additions & 0 deletions cdc/scheduler/internal/v3/replication/replication_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,48 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) {
require.Equal(t, model.Ts(9), barrier.GetGlobalBarrierTs())
}

func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) {
t.Parallel()
r := NewReplicationManager(1, model.ChangeFeedID{})
rs, err := NewReplicationSet(1, model.Ts(10),
map[model.CaptureID]*tablepb.TableStatus{
"1": {
TableID: 1,
State: tablepb.TableStateReplicating,
Checkpoint: tablepb.Checkpoint{
CheckpointTs: model.Ts(10),
ResolvedTs: model.Ts(20),
},
},
}, model.ChangeFeedID{})
require.NoError(t, err)
r.tables[1] = rs

rs, err = NewReplicationSet(2, model.Ts(15),
map[model.CaptureID]*tablepb.TableStatus{
"2": {
TableID: 2,
State: tablepb.TableStateReplicating,
Checkpoint: tablepb.Checkpoint{
CheckpointTs: model.Ts(15),
ResolvedTs: model.Ts(30),
},
},
}, model.ChangeFeedID{})
require.NoError(t, err)
r.tables[2] = rs

redoMetaManager := &mockRedoMetaManager{enable: true, resolvedTs: 25}

// some table not exist yet with redo is enabled.
currentTables := []model.TableID{1, 2, 3}
barrier := schedulepb.NewBarrierWithMinTs(30)
checkpoint, resolved := r.AdvanceCheckpoint(currentTables, time.Now(), barrier, redoMetaManager)
require.Equal(t, checkpointCannotProceed, checkpoint)
require.Equal(t, checkpointCannotProceed, resolved)
require.Equal(t, uint64(25), barrier.Barrier.GetGlobalBarrierTs())
}

func TestReplicationManagerHandleCaptureChanges(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit a01df72

Please sign in to comment.