Skip to content

Commit

Permalink
cherry pick fixes of 10065
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu committed Nov 14, 2023
1 parent 260d671 commit 7c633d4
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 49 deletions.
54 changes: 28 additions & 26 deletions cdc/processor/sinkmanager/redo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ type popResult struct {
releaseSize uint64 // size of all released events.
pushCount int
success bool
// If success, boundary is the upperBound of poped events.
// Otherwise, boundary is the lowerBound of cached events.
boundary engine.Position

// If success, upperBoundIfSuccess is the upperBound of poped events.
// The caller should fetch events (upperBoundIfSuccess, upperBound] from engine.
upperBoundIfSuccess engine.Position
// If fail, lowerBoundIfFail is the lowerBound of cached events.
// The caller should fetch events [lowerBound, lowerBoundIfFail) from engine.
lowerBoundIfFail engine.Position
}

// newRedoEventCache creates a redoEventCache instance.
Expand Down Expand Up @@ -142,26 +146,29 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu
// NOTE: the caller will fetch events [lowerBound, res.boundary) from engine.
res.success = false
if e.lowerBound.Compare(upperBound.Next()) <= 0 {
res.boundary = e.lowerBound
res.lowerBoundIfFail = e.lowerBound
} else {
res.boundary = upperBound.Next()
res.lowerBoundIfFail = upperBound.Next()
res.lowerBoundIfFail = upperBound.Next()
}
return
}

if !e.upperBound.Valid() {
// if e.upperBound is invalid, it means there are no resolved transactions
// in the cache.
// It means there are no resolved cached transactions in the required range.
// NOTE: the caller will fetch events [lowerBound, res.boundary) from engine.
res.success = false
res.boundary = upperBound.Next()
res.lowerBoundIfFail = upperBound.Next()
return
}

res.success = true
if upperBound.Compare(e.upperBound) > 0 {
res.boundary = e.upperBound
if lowerBound.Compare(e.upperBound) > 0 {
res.upperBoundIfSuccess = lowerBound.Prev()
} else if upperBound.Compare(e.upperBound) > 0 {
res.upperBoundIfSuccess = e.upperBound
} else {
res.boundary = upperBound
res.upperBoundIfSuccess = upperBound
}

startIdx := sort.Search(e.readyCount, func(i int) bool {
Expand All @@ -173,28 +180,23 @@ func (e *eventAppender) pop(lowerBound, upperBound engine.Position) (res popResu
res.releaseSize += e.sizes[i]
}

var endIdx int
if startIdx == e.readyCount {
endIdx = startIdx
} else {
endIdx = sort.Search(e.readyCount, func(i int) bool {
pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs}
return pos.Compare(res.boundary) > 0
})
res.events = e.events[startIdx:endIdx]
for i := startIdx; i < endIdx; i++ {
res.size += e.sizes[i]
res.pushCount += int(e.pushCounts[i])
}
res.releaseSize += res.size
endIdx := sort.Search(e.readyCount, func(i int) bool {
pos := engine.Position{CommitTs: e.events[i].CommitTs, StartTs: e.events[i].StartTs}
return pos.Compare(res.upperBoundIfSuccess) > 0
})
res.events = e.events[startIdx:endIdx]
for i := startIdx; i < endIdx; i++ {
res.size += e.sizes[i]
res.pushCount += int(e.pushCounts[i])
}
res.releaseSize += res.size

e.events = e.events[endIdx:]
e.sizes = e.sizes[endIdx:]
e.pushCounts = e.pushCounts[endIdx:]
e.readyCount -= endIdx
// Update boundaries. Set upperBound to invalid if the range has been drained.
e.lowerBound = res.boundary.Next()
e.lowerBound = res.upperBoundIfSuccess.Next()
if e.lowerBound.Compare(e.upperBound) > 0 {
e.upperBound = engine.Position{}
}
Expand Down
44 changes: 27 additions & 17 deletions cdc/processor/sinkmanager/redo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,37 +46,37 @@ func TestRedoEventCache(t *testing.T) {
// Try to pop [{0,1}, {0,4}], shoud fail. And the returned boundary should be {1,4}.
popRes = appender.pop(engine.Position{StartTs: 0, CommitTs: 1}, engine.Position{StartTs: 0, CommitTs: 4})
require.False(t, popRes.success)
require.Equal(t, uint64(1), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(1), popRes.lowerBoundIfFail.StartTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs)

// Try to pop [{0,2}, {0,4}], shoud fail. And the returned boundary should be {3,4}.
popRes = appender.pop(engine.Position{StartTs: 0, CommitTs: 1}, engine.Position{StartTs: 5, CommitTs: 6})
require.False(t, popRes.success)
require.Equal(t, uint64(3), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(3), popRes.lowerBoundIfFail.StartTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs)

// Try to pop [{3,4}, {3,4}], should success.
popRes = appender.pop(engine.Position{StartTs: 3, CommitTs: 4}, engine.Position{StartTs: 3, CommitTs: 4})
require.True(t, popRes.success)
require.Equal(t, 2, len(popRes.events))
require.Equal(t, uint64(300), popRes.size)
require.Equal(t, 2, popRes.pushCount)
require.Equal(t, uint64(3), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(3), popRes.upperBoundIfSuccess.StartTs)
require.Equal(t, uint64(4), popRes.upperBoundIfSuccess.CommitTs)

// Try to pop [{3,4}, {3,4}] again, shoud fail. And the returned boundary should be {4,4}.
popRes = appender.pop(engine.Position{StartTs: 3, CommitTs: 4}, engine.Position{StartTs: 3, CommitTs: 4})
require.False(t, popRes.success)
require.Equal(t, uint64(4), popRes.boundary.StartTs)
require.Equal(t, uint64(4), popRes.boundary.CommitTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.StartTs)
require.Equal(t, uint64(4), popRes.lowerBoundIfFail.CommitTs)

popRes = appender.pop(engine.Position{StartTs: 4, CommitTs: 4}, engine.Position{StartTs: 9, CommitTs: 10})
require.True(t, popRes.success)
require.Equal(t, 1, len(popRes.events))
require.Equal(t, uint64(300), popRes.size)
require.Equal(t, 1, popRes.pushCount)
require.Equal(t, uint64(5), popRes.boundary.StartTs)
require.Equal(t, uint64(6), popRes.boundary.CommitTs)
require.Equal(t, uint64(5), popRes.upperBoundIfSuccess.StartTs)
require.Equal(t, uint64(6), popRes.upperBoundIfSuccess.CommitTs)
require.Equal(t, 0, len(appender.events))
require.True(t, appender.broken)

Expand All @@ -88,8 +88,7 @@ func TestRedoEventCache(t *testing.T) {

func TestRedoEventCacheAllPopBranches(t *testing.T) {
cache := newRedoEventCache(model.ChangeFeedID{}, 1000)
span := spanz.TableIDToComparableSpan(3)
appender := cache.maybeCreateAppender(span, engine.Position{StartTs: 101, CommitTs: 111})
appender := cache.maybeCreateAppender(1, engine.Position{StartTs: 101, CommitTs: 111})
var batch []*model.RowChangedEvent
var ok bool
var popRes popResult
Expand All @@ -104,15 +103,15 @@ func TestRedoEventCacheAllPopBranches(t *testing.T) {

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 3, CommitTs: 4})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 4, CommitTs: 4}, popRes.boundary)
require.Equal(t, engine.Position{StartTs: 4, CommitTs: 4}, popRes.lowerBoundIfFail)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 300, CommitTs: 400})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.boundary)
require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.lowerBoundIfFail)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 11}, engine.Position{StartTs: 2, CommitTs: 12})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 3, CommitTs: 12}, popRes.boundary)
require.Equal(t, engine.Position{StartTs: 3, CommitTs: 12}, popRes.lowerBoundIfFail)

batch = []*model.RowChangedEvent{{StartTs: 101, CommitTs: 111}, {StartTs: 101, CommitTs: 111}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 101, CommitTs: 111})
Expand All @@ -125,13 +124,24 @@ func TestRedoEventCacheAllPopBranches(t *testing.T) {

popRes = appender.pop(engine.Position{StartTs: 101, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112})
require.True(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.boundary)
require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.upperBoundIfSuccess)
require.Equal(t, 2, len(popRes.events))
require.Equal(t, 1, popRes.pushCount)
require.Equal(t, uint64(101), popRes.events[1].StartTs)
require.Equal(t, 0, appender.readyCount)

popRes = appender.pop(engine.Position{StartTs: 102, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 103, CommitTs: 112}, popRes.boundary)
require.Equal(t, engine.Position{StartTs: 103, CommitTs: 112}, popRes.lowerBoundIfFail)

batch = []*model.RowChangedEvent{&model.RowChangedEvent{StartTs: 102, CommitTs: 112}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 102, CommitTs: 102})
require.True(t, ok)
require.Equal(t, 2, appender.readyCount)

popRes = appender.pop(engine.Position{StartTs: 501, CommitTs: 502}, engine.Position{StartTs: 701, CommitTs: 702})
require.True(t, popRes.success)
require.Equal(t, 0, len(popRes.events))
require.Equal(t, engine.Position{StartTs: 500, CommitTs: 502}, popRes.upperBoundIfSuccess)
require.Equal(t, 0, appender.readyCount)
}
12 changes: 6 additions & 6 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (w *sinkWorker) fetchFromCache(
}
popRes := cache.pop(*lowerBound, *upperBound)
if popRes.success {
newLowerBound = popRes.boundary.Next()
newLowerBound = popRes.upperBoundIfSuccess.Next()
if len(popRes.events) > 0 {
metrics.OutputEventCount.WithLabelValues(
task.tableSink.changefeed.Namespace,
Expand All @@ -421,19 +421,19 @@ func (w *sinkWorker) fetchFromCache(

// Get a resolvedTs so that we can record it into sink memory quota.
var resolvedTs model.ResolvedTs
isCommitFence := popRes.boundary.IsCommitFence()
isCommitFence := popRes.upperBoundIfSuccess.IsCommitFence()
if w.splitTxn {
resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs)
resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs)
if !isCommitFence {
resolvedTs.Mode = model.BatchResolvedMode
resolvedTs.BatchID = *batchID
*batchID += 1
}
} else {
if isCommitFence {
resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs)
resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs)
} else {
resolvedTs = model.NewResolvedTs(popRes.boundary.CommitTs - 1)
resolvedTs = model.NewResolvedTs(popRes.upperBoundIfSuccess.CommitTs - 1)
}
}
// Transfer the memory usage from redoMemQuota to sinkMemQuota.
Expand All @@ -449,7 +449,7 @@ func (w *sinkWorker) fetchFromCache(
zap.Any("resolvedTs", resolvedTs),
zap.Error(err))
} else {
newUpperBound = popRes.boundary.Prev()
newUpperBound = popRes.lowerBoundIfFail.Prev()
}
cacheDrained = newLowerBound.Compare(newUpperBound) > 0
log.Debug("fetchFromCache is performed",
Expand Down

0 comments on commit 7c633d4

Please sign in to comment.