From bdeeffe2d76396bd68035ad2313e1f177b0f1e28 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 16 Nov 2023 13:27:51 +0800 Subject: [PATCH] sink(cdc): avoid sinking redundant events in some rare cases with redo enabled (#10085) (#10093) close pingcap/tiflow#10065 --- cdc/processor/sinkmanager/redo_cache.go | 62 +++++++------- cdc/processor/sinkmanager/redo_cache_test.go | 82 ++++++++++++++++--- .../sinkmanager/table_sink_worker.go | 17 ++-- .../sinkmanager/table_sink_worker_test.go | 48 +++++++++++ 4 files changed, 163 insertions(+), 46 deletions(-) diff --git a/cdc/processor/sinkmanager/redo_cache.go b/cdc/processor/sinkmanager/redo_cache.go index 17572cd34b0..719396bee59 100644 --- a/cdc/processor/sinkmanager/redo_cache.go +++ b/cdc/processor/sinkmanager/redo_cache.go @@ -59,11 +59,20 @@ type popResult struct { events []*model.RowChangedEvent size uint64 // size of events. releaseSize uint64 // size of all released events. - pushCount int - success bool - // If success, boundary is the upperBound of poped events. - // Otherwise, boundary is the lowerBound of cached events. - boundary engine.Position + + // many RowChangedEvent can come from one same PolymorphicEvent. + // pushCount indicates the count of raw PolymorphicEvents. + pushCount int + + // success indicates whether there is a gap between cached events and required events. + success bool + + // If success, upperBoundIfSuccess is the upperBound of poped events. + // The caller should fetch events (upperBoundIfSuccess, upperBound] from engine. + upperBoundIfSuccess engine.Position + // If fail, lowerBoundIfFail is the lowerBound of cached events. + // The caller should fetch events [lowerBound, lowerBoundIfFail) from engine. + lowerBoundIfFail engine.Position } // newRedoEventCache creates a redoEventCache instance. @@ -150,26 +159,28 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu // NOTE: the caller will fetch events [lowerBound, res.boundary) from engine. res.success = false if e.lowerBound.Compare(upperBound.Next()) <= 0 { - res.boundary = e.lowerBound + res.lowerBoundIfFail = e.lowerBound } else { - res.boundary = upperBound.Next() + res.lowerBoundIfFail = upperBound.Next() } return } + if !e.upperBound.Valid() { - // if e.upperBound is invalid, it means there are no resolved transactions - // in the cache. + // It means there are no resolved cached transactions in the required range. // NOTE: the caller will fetch events [lowerBound, res.boundary) from engine. res.success = false - res.boundary = upperBound.Next() + res.lowerBoundIfFail = upperBound.Next() return } res.success = true - if upperBound.Compare(e.upperBound) > 0 { - res.boundary = e.upperBound + if lowerBound.Compare(e.upperBound) > 0 { + res.upperBoundIfSuccess = lowerBound.Prev() + } else if upperBound.Compare(e.upperBound) > 0 { + res.upperBoundIfSuccess = e.upperBound } else { - res.boundary = upperBound + res.upperBoundIfSuccess = upperBound } startIdx := sort.Search(e.readyCount, func(i int) bool { @@ -181,28 +192,23 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu res.releaseSize += e.sizes[i] } - var endIdx int - if startIdx == e.readyCount { - endIdx = startIdx - } else { - endIdx = sort.Search(e.readyCount, func(i int) bool { - pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs} - return pos.Compare(res.boundary) > 0 - }) - res.events = e.events[startIdx:endIdx] - for i := startIdx; i < endIdx; i++ { - res.size += e.sizes[i] - res.pushCount += int(e.pushCounts[i]) - } - res.releaseSize += res.size + endIdx := sort.Search(e.readyCount, func(i int) bool { + pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs} + return pos.Compare(res.upperBoundIfSuccess) > 0 + }) + res.events = e.events[startIdx:endIdx] + for i := startIdx; i < endIdx; i++ { + res.size += e.sizes[i] + res.pushCount += int(e.pushCounts[i]) } + res.releaseSize += res.size e.events = e.events[endIdx:] e.sizes = e.sizes[endIdx:] e.pushCounts = e.pushCounts[endIdx:] e.readyCount -= endIdx // Update boundaries. Set upperBound to invalid if the range has been drained. - e.lowerBound = res.boundary.Next() + e.lowerBound = res.upperBoundIfSuccess.Next() if e.lowerBound.Compare(e.upperBound) > 0 { e.upperBound = engine.Position{} } diff --git a/cdc/processor/sinkmanager/redo_cache_test.go b/cdc/processor/sinkmanager/redo_cache_test.go index 005971e57e2..3de5958a03a 100644 --- a/cdc/processor/sinkmanager/redo_cache_test.go +++ b/cdc/processor/sinkmanager/redo_cache_test.go @@ -48,14 +48,14 @@ func TestRedoEventCache(t *testing.T) { // Try to pop [{0,1}, {0,4}], shoud fail. And the returned boundary should be {1,4}. popRes = appender.pop(engine.Position{StartTs: 0, CommitTs: 1}, engine.Position{StartTs: 0, CommitTs: 4}) require.False(t, popRes.success) - require.Equal(t, uint64(1), popRes.boundary.StartTs) - require.Equal(t, uint64(4), popRes.boundary.CommitTs) + require.Equal(t, uint64(1), popRes.lowerBoundIfFail.StartTs) + require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs) // Try to pop [{0,2}, {0,4}], shoud fail. And the returned boundary should be {3,4}. popRes = appender.pop(engine.Position{StartTs: 0, CommitTs: 1}, engine.Position{StartTs: 5, CommitTs: 6}) require.False(t, popRes.success) - require.Equal(t, uint64(3), popRes.boundary.StartTs) - require.Equal(t, uint64(4), popRes.boundary.CommitTs) + require.Equal(t, uint64(3), popRes.lowerBoundIfFail.StartTs) + require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs) // Try to pop [{3,4}, {3,4}], should success. popRes = appender.pop(engine.Position{StartTs: 3, CommitTs: 4}, engine.Position{StartTs: 3, CommitTs: 4}) @@ -63,22 +63,22 @@ func TestRedoEventCache(t *testing.T) { require.Equal(t, 2, len(popRes.events)) require.Equal(t, uint64(300), popRes.size) require.Equal(t, 2, popRes.pushCount) - require.Equal(t, uint64(3), popRes.boundary.StartTs) - require.Equal(t, uint64(4), popRes.boundary.CommitTs) + require.Equal(t, uint64(3), popRes.upperBoundIfSuccess.StartTs) + require.Equal(t, uint64(4), popRes.upperBoundIfSuccess.CommitTs) // Try to pop [{3,4}, {3,4}] again, shoud fail. And the returned boundary should be {4,4}. popRes = appender.pop(engine.Position{StartTs: 3, CommitTs: 4}, engine.Position{StartTs: 3, CommitTs: 4}) require.False(t, popRes.success) - require.Equal(t, uint64(4), popRes.boundary.StartTs) - require.Equal(t, uint64(4), popRes.boundary.CommitTs) + require.Equal(t, uint64(4), popRes.lowerBoundIfFail.StartTs) + require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs) popRes = appender.pop(engine.Position{StartTs: 4, CommitTs: 4}, engine.Position{StartTs: 9, CommitTs: 10}) require.True(t, popRes.success) require.Equal(t, 1, len(popRes.events)) require.Equal(t, uint64(300), popRes.size) require.Equal(t, 1, popRes.pushCount) - require.Equal(t, uint64(5), popRes.boundary.StartTs) - require.Equal(t, uint64(6), popRes.boundary.CommitTs) + require.Equal(t, uint64(5), popRes.upperBoundIfSuccess.StartTs) + require.Equal(t, uint64(6), popRes.upperBoundIfSuccess.CommitTs) require.Equal(t, 0, len(appender.events)) require.True(t, appender.broken) @@ -87,3 +87,65 @@ func TestRedoEventCache(t *testing.T) { require.Equal(t, uint64(0), appender.upperBound.StartTs) require.Equal(t, uint64(0), appender.upperBound.CommitTs) } + +func TestRedoEventCacheAllPopBranches(t *testing.T) { + cache := newRedoEventCache(model.ChangeFeedID{}, 1000) + span := spanz.TableIDToComparableSpan(3) + appender := cache.maybeCreateAppender(span, engine.Position{StartTs: 101, CommitTs: 111}) + var batch []*model.RowChangedEvent + var ok bool + var popRes popResult + + batch = []*model.RowChangedEvent{{StartTs: 1, CommitTs: 11}, {StartTs: 1, CommitTs: 11}} + ok, _ = appender.pushBatch(batch, 0, engine.Position{}) + require.True(t, ok) + + batch = []*model.RowChangedEvent{{StartTs: 2, CommitTs: 12}} + ok, _ = appender.pushBatch(batch, 0, engine.Position{}) + require.True(t, ok) + + popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 3, CommitTs: 4}) + require.False(t, popRes.success) + require.Equal(t, engine.Position{StartTs: 4, CommitTs: 4}, popRes.lowerBoundIfFail) + + popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 300, CommitTs: 400}) + require.False(t, popRes.success) + require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.lowerBoundIfFail) + + popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 11}, engine.Position{StartTs: 2, CommitTs: 12}) + require.False(t, popRes.success) + require.Equal(t, engine.Position{StartTs: 3, CommitTs: 12}, popRes.lowerBoundIfFail) + + batch = []*model.RowChangedEvent{{StartTs: 101, CommitTs: 111}, {StartTs: 101, CommitTs: 111}} + ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 101, CommitTs: 111}) + require.True(t, ok) + + batch = []*model.RowChangedEvent{{StartTs: 102, CommitTs: 112}} + ok, _ = appender.pushBatch(batch, 0, engine.Position{}) + require.True(t, ok) + require.Equal(t, 5, appender.readyCount) + + popRes = appender.pop(engine.Position{StartTs: 101, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112}) + require.True(t, popRes.success) + require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.upperBoundIfSuccess) + require.Equal(t, 2, len(popRes.events)) + require.Equal(t, 1, popRes.pushCount) + require.Equal(t, uint64(101), popRes.events[1].StartTs) + require.Equal(t, 0, appender.readyCount) + + popRes = appender.pop(engine.Position{StartTs: 102, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112}) + require.False(t, popRes.success) + require.Equal(t, engine.Position{StartTs: 103, CommitTs: 112}, popRes.lowerBoundIfFail) + + batch = []*model.RowChangedEvent{{StartTs: 102, CommitTs: 112}} + ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 102, CommitTs: 102}) + require.True(t, ok) + require.Equal(t, 2, appender.readyCount) + + popRes = appender.pop(engine.Position{StartTs: 501, CommitTs: 502}, engine.Position{StartTs: 701, CommitTs: 702}) + require.True(t, popRes.success) + require.Equal(t, 0, len(popRes.events)) + require.Equal(t, engine.Position{StartTs: 500, CommitTs: 502}, popRes.upperBoundIfSuccess) + require.Equal(t, 0, appender.readyCount) + require.Equal(t, 0, len(appender.events)) +} diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 9d7de864397..b04ce8f7c40 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -201,13 +201,14 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e if err != nil { return errors.Trace(err) } + // NOTE: lowerBound can be updated by `fetchFromCache`, so `lastPos` should also be updated. + advancer.lastPos = lowerBound.Prev() if drained { // If drained is true it means we have drained all events from the cache, // we can return directly instead of get events from the source manager again. - performCallback(lowerBound.Prev()) + performCallback(advancer.lastPos) return nil } - advancer.lastPos = lowerBound.Prev() } // lowerBound and upperBound are both closed intervals. @@ -277,7 +278,7 @@ func (w *sinkWorker) fetchFromCache( } popRes := cache.pop(*lowerBound, *upperBound) if popRes.success { - newLowerBound = popRes.boundary.Next() + newLowerBound = popRes.upperBoundIfSuccess.Next() if len(popRes.events) > 0 { w.metricOutputEventCountKV.Add(float64(popRes.pushCount)) w.metricRedoEventCacheHit.Add(float64(popRes.size)) @@ -288,9 +289,9 @@ func (w *sinkWorker) fetchFromCache( // Get a resolvedTs so that we can record it into sink memory quota. var resolvedTs model.ResolvedTs - isCommitFence := popRes.boundary.IsCommitFence() + isCommitFence := popRes.upperBoundIfSuccess.IsCommitFence() if w.splitTxn { - resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs) + resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs) if !isCommitFence { resolvedTs.Mode = model.BatchResolvedMode resolvedTs.BatchID = batchID.Load() @@ -298,9 +299,9 @@ func (w *sinkWorker) fetchFromCache( } } else { if isCommitFence { - resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs) + resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs) } else { - resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs - 1) + resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs - 1) } } // Transfer the memory usage from redoMemQuota to sinkMemQuota. @@ -316,7 +317,7 @@ func (w *sinkWorker) fetchFromCache( zap.Any("resolvedTs", resolvedTs), zap.Error(err)) } else { - newUpperBound = popRes.boundary.Prev() + newUpperBound = popRes.lowerBoundIfFail.Prev() } cacheDrained = newLowerBound.Compare(newUpperBound) > 0 log.Debug("fetchFromCache is performed", diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 3a8c0667753..0bb42a1f9ee 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -760,3 +760,51 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() { cancel() wg.Wait() } + +func (suite *tableSinkWorkerSuite) TestHandleTaskWithCache() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(2, 4, suite.testSpan), + genPolymorphicEvent(2, 4, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + w, e := suite.createWorker(ctx, 0, true) + w.eventCache = newRedoEventCache(suite.testChangefeedID, 1024*1024) + appender := w.eventCache.maybeCreateAppender(suite.testSpan, engine.Position{StartTs: 1, CommitTs: 3}) + appender.pushBatch( + []*model.RowChangedEvent{events[0].Row, events[1].Row}, + uint64(0), engine.Position{StartTs: 2, CommitTs: 4}, + ) + defer w.sinkMemQuota.Close() + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *sinkTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWrittenPos engine.Position) { + require.Equal(suite.T(), engine.Position{StartTs: 2, CommitTs: 4}, lastWrittenPos) + close(chShouldBeClosed) + } + taskChan <- &sinkTask{ + span: suite.testSpan, + lowerBound: engine.Position{StartTs: 1, CommitTs: 3}, + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return true }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +}