Skip to content

Commit

Permalink
sink(cdc): avoid sinking redundant events in some rare cases with red…
Browse files Browse the repository at this point in the history
…o enabled (#10085)

close #10065
  • Loading branch information
hicqu authored Nov 14, 2023
1 parent a609411 commit c34ac30
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 46 deletions.
62 changes: 34 additions & 28 deletions cdc/processor/sinkmanager/redo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,20 @@ type popResult struct {
events []*model.RowChangedEvent
size uint64 // size of events.
releaseSize uint64 // size of all released events.
pushCount int
success bool
// If success, boundary is the upperBound of poped events.
// Otherwise, boundary is the lowerBound of cached events.
boundary engine.Position

// many RowChangedEvent can come from one same PolymorphicEvent.
// pushCount indicates the count of raw PolymorphicEvents.
pushCount int

// success indicates whether there is a gap between cached events and required events.
success bool

// If success, upperBoundIfSuccess is the upperBound of poped events.
// The caller should fetch events (upperBoundIfSuccess, upperBound] from engine.
upperBoundIfSuccess engine.Position
// If fail, lowerBoundIfFail is the lowerBound of cached events.
// The caller should fetch events [lowerBound, lowerBoundIfFail) from engine.
lowerBoundIfFail engine.Position
}

// newRedoEventCache creates a redoEventCache instance.
Expand Down Expand Up @@ -150,26 +159,28 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu
// NOTE: the caller will fetch events [lowerBound, res.boundary) from engine.
res.success = false
if e.lowerBound.Compare(upperBound.Next()) <= 0 {
res.boundary = e.lowerBound
res.lowerBoundIfFail = e.lowerBound
} else {
res.boundary = upperBound.Next()
res.lowerBoundIfFail = upperBound.Next()
}
return
}

if !e.upperBound.Valid() {
// if e.upperBound is invalid, it means there are no resolved transactions
// in the cache.
// It means there are no resolved cached transactions in the required range.
// NOTE: the caller will fetch events [lowerBound, res.boundary) from engine.
res.success = false
res.boundary = upperBound.Next()
res.lowerBoundIfFail = upperBound.Next()
return
}

res.success = true
if upperBound.Compare(e.upperBound) > 0 {
res.boundary = e.upperBound
if lowerBound.Compare(e.upperBound) > 0 {
res.upperBoundIfSuccess = lowerBound.Prev()
} else if upperBound.Compare(e.upperBound) > 0 {
res.upperBoundIfSuccess = e.upperBound
} else {
res.boundary = upperBound
res.upperBoundIfSuccess = upperBound
}

startIdx := sort.Search(e.readyCount, func(i int) bool {
Expand All @@ -181,28 +192,23 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu
res.releaseSize += e.sizes[i]
}

var endIdx int
if startIdx == e.readyCount {
endIdx = startIdx
} else {
endIdx = sort.Search(e.readyCount, func(i int) bool {
pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs}
return pos.Compare(res.boundary) > 0
})
res.events = e.events[startIdx:endIdx]
for i := startIdx; i < endIdx; i++ {
res.size += e.sizes[i]
res.pushCount += int(e.pushCounts[i])
}
res.releaseSize += res.size
endIdx := sort.Search(e.readyCount, func(i int) bool {
pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs}
return pos.Compare(res.upperBoundIfSuccess) > 0
})
res.events = e.events[startIdx:endIdx]
for i := startIdx; i < endIdx; i++ {
res.size += e.sizes[i]
res.pushCount += int(e.pushCounts[i])
}
res.releaseSize += res.size

e.events = e.events[endIdx:]
e.sizes = e.sizes[endIdx:]
e.pushCounts = e.pushCounts[endIdx:]
e.readyCount -= endIdx
// Update boundaries. Set upperBound to invalid if the range has been drained.
e.lowerBound = res.boundary.Next()
e.lowerBound = res.upperBoundIfSuccess.Next()
if e.lowerBound.Compare(e.upperBound) > 0 {
e.upperBound = engine.Position{}
}
Expand Down
82 changes: 72 additions & 10 deletions cdc/processor/sinkmanager/redo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,37 @@ func TestRedoEventCache(t *testing.T) {
// Try to pop [{0,1}, {0,4}], shoud fail. And the returned boundary should be {1,4}.
popRes = appender.pop(engine.Position{StartTs: 0, CommitTs: 1}, engine.Position{StartTs: 0, CommitTs: 4})
require.False(t, popRes.success)
require.Equal(t, uint64(1), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(1), popRes.lowerBoundIfFail.StartTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs)

// Try to pop [{0,2}, {0,4}], shoud fail. And the returned boundary should be {3,4}.
popRes = appender.pop(engine.Position{StartTs: 0, CommitTs: 1}, engine.Position{StartTs: 5, CommitTs: 6})
require.False(t, popRes.success)
require.Equal(t, uint64(3), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(3), popRes.lowerBoundIfFail.StartTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs)

// Try to pop [{3,4}, {3,4}], should success.
popRes = appender.pop(engine.Position{StartTs: 3, CommitTs: 4}, engine.Position{StartTs: 3, CommitTs: 4})
require.True(t, popRes.success)
require.Equal(t, 2, len(popRes.events))
require.Equal(t, uint64(300), popRes.size)
require.Equal(t, 2, popRes.pushCount)
require.Equal(t, uint64(3), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(3), popRes.upperBoundIfSuccess.StartTs)
require.Equal(t, uint64(4), popRes.upperBoundIfSuccess.CommitTs)

// Try to pop [{3,4}, {3,4}] again, shoud fail. And the returned boundary should be {4,4}.
popRes = appender.pop(engine.Position{StartTs: 3, CommitTs: 4}, engine.Position{StartTs: 3, CommitTs: 4})
require.False(t, popRes.success)
require.Equal(t, uint64(4), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.StartTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs)

popRes = appender.pop(engine.Position{StartTs: 4, CommitTs: 4}, engine.Position{StartTs: 9, CommitTs: 10})
require.True(t, popRes.success)
require.Equal(t, 1, len(popRes.events))
require.Equal(t, uint64(300), popRes.size)
require.Equal(t, 1, popRes.pushCount)
require.Equal(t, uint64(5), popRes.boundary.StartTs)
require.Equal(t, uint64(6), popRes.boundary.CommitTs)
require.Equal(t, uint64(5), popRes.upperBoundIfSuccess.StartTs)
require.Equal(t, uint64(6), popRes.upperBoundIfSuccess.CommitTs)
require.Equal(t, 0, len(appender.events))
require.True(t, appender.broken)

Expand All @@ -87,3 +87,65 @@ func TestRedoEventCache(t *testing.T) {
require.Equal(t, uint64(0), appender.upperBound.StartTs)
require.Equal(t, uint64(0), appender.upperBound.CommitTs)
}

func TestRedoEventCacheAllPopBranches(t *testing.T) {
cache := newRedoEventCache(model.ChangeFeedID{}, 1000)
span := spanz.TableIDToComparableSpan(3)
appender := cache.maybeCreateAppender(span, engine.Position{StartTs: 101, CommitTs: 111})
var batch []*model.RowChangedEvent
var ok bool
var popRes popResult

batch = []*model.RowChangedEvent{{StartTs: 1, CommitTs: 11}, {StartTs: 1, CommitTs: 11}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{})
require.True(t, ok)

batch = []*model.RowChangedEvent{{StartTs: 2, CommitTs: 12}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{})
require.True(t, ok)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 3, CommitTs: 4})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 4, CommitTs: 4}, popRes.lowerBoundIfFail)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 300, CommitTs: 400})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.lowerBoundIfFail)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 11}, engine.Position{StartTs: 2, CommitTs: 12})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 3, CommitTs: 12}, popRes.lowerBoundIfFail)

batch = []*model.RowChangedEvent{{StartTs: 101, CommitTs: 111}, {StartTs: 101, CommitTs: 111}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 101, CommitTs: 111})
require.True(t, ok)

batch = []*model.RowChangedEvent{{StartTs: 102, CommitTs: 112}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{})
require.True(t, ok)
require.Equal(t, 5, appender.readyCount)

popRes = appender.pop(engine.Position{StartTs: 101, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112})
require.True(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.upperBoundIfSuccess)
require.Equal(t, 2, len(popRes.events))
require.Equal(t, 1, popRes.pushCount)
require.Equal(t, uint64(101), popRes.events[1].StartTs)
require.Equal(t, 0, appender.readyCount)

popRes = appender.pop(engine.Position{StartTs: 102, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 103, CommitTs: 112}, popRes.lowerBoundIfFail)

batch = []*model.RowChangedEvent{{StartTs: 102, CommitTs: 112}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 102, CommitTs: 102})
require.True(t, ok)
require.Equal(t, 2, appender.readyCount)

popRes = appender.pop(engine.Position{StartTs: 501, CommitTs: 502}, engine.Position{StartTs: 701, CommitTs: 702})
require.True(t, popRes.success)
require.Equal(t, 0, len(popRes.events))
require.Equal(t, engine.Position{StartTs: 500, CommitTs: 502}, popRes.upperBoundIfSuccess)
require.Equal(t, 0, appender.readyCount)
require.Equal(t, 0, len(appender.events))
}
17 changes: 9 additions & 8 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,14 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
if err != nil {
return errors.Trace(err)
}
// NOTE: lowerBound can be updated by `fetchFromCache`, so `lastPos` should also be updated.
advancer.lastPos = lowerBound.Prev()
if drained {
// If drained is true it means we have drained all events from the cache,
// we can return directly instead of get events from the source manager again.
performCallback(lowerBound.Prev())
performCallback(advancer.lastPos)
return nil
}
advancer.lastPos = lowerBound.Prev()
}

// lowerBound and upperBound are both closed intervals.
Expand Down Expand Up @@ -277,7 +278,7 @@ func (w *sinkWorker) fetchFromCache(
}
popRes := cache.pop(*lowerBound, *upperBound)
if popRes.success {
newLowerBound = popRes.boundary.Next()
newLowerBound = popRes.upperBoundIfSuccess.Next()
if len(popRes.events) > 0 {
w.metricOutputEventCountKV.Add(float64(popRes.pushCount))
w.metricRedoEventCacheHit.Add(float64(popRes.size))
Expand All @@ -288,19 +289,19 @@ func (w *sinkWorker) fetchFromCache(

// Get a resolvedTs so that we can record it into sink memory quota.
var resolvedTs model.ResolvedTs
isCommitFence := popRes.boundary.IsCommitFence()
isCommitFence := popRes.upperBoundIfSuccess.IsCommitFence()
if w.splitTxn {
resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs)
resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs)
if !isCommitFence {
resolvedTs.Mode = model.BatchResolvedMode
resolvedTs.BatchID = batchID.Load()
batchID.Add(1)
}
} else {
if isCommitFence {
resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs)
resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs)
} else {
resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs - 1)
resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs - 1)
}
}
// Transfer the memory usage from redoMemQuota to sinkMemQuota.
Expand All @@ -316,7 +317,7 @@ func (w *sinkWorker) fetchFromCache(
zap.Any("resolvedTs", resolvedTs),
zap.Error(err))
} else {
newUpperBound = popRes.boundary.Prev()
newUpperBound = popRes.lowerBoundIfFail.Prev()
}
cacheDrained = newLowerBound.Compare(newUpperBound) > 0
log.Debug("fetchFromCache is performed",
Expand Down
48 changes: 48 additions & 0 deletions cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,3 +761,51 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() {
cancel()
wg.Wait()
}

func (suite *tableSinkWorkerSuite) TestHandleTaskWithCache() {
ctx, cancel := context.WithCancel(context.Background())
events := []*model.PolymorphicEvent{
genPolymorphicEvent(2, 4, suite.testSpan),
genPolymorphicEvent(2, 4, suite.testSpan),
genPolymorphicResolvedEvent(4),
}
w, e := suite.createWorker(ctx, 0, true)
w.eventCache = newRedoEventCache(suite.testChangefeedID, 1024*1024)
appender := w.eventCache.maybeCreateAppender(suite.testSpan, engine.Position{StartTs: 1, CommitTs: 3})
appender.pushBatch(
[]*model.RowChangedEvent{events[0].Row, events[1].Row},
uint64(0), engine.Position{StartTs: 2, CommitTs: 4},
)
defer w.sinkMemQuota.Close()
suite.addEventsToSortEngine(events, e)

taskChan := make(chan *sinkTask)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.handleTasks(ctx, taskChan)
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan)
defer sink.Close()

chShouldBeClosed := make(chan struct{}, 1)
callback := func(lastWrittenPos engine.Position) {
require.Equal(suite.T(), engine.Position{StartTs: 2, CommitTs: 4}, lastWrittenPos)
close(chShouldBeClosed)
}
taskChan <- &sinkTask{
span: suite.testSpan,
lowerBound: engine.Position{StartTs: 1, CommitTs: 3},
getUpperBound: genUpperBoundGetter(4),
tableSink: wrapper,
callback: callback,
isCanceled: func() bool { return true },
}

<-chShouldBeClosed
cancel()
wg.Wait()
}

0 comments on commit c34ac30

Please sign in to comment.