Skip to content

Commit

Permalink
cherry pick fixes of 10065
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu committed Nov 14, 2023
1 parent c8eed98 commit 260d671
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
50 changes: 50 additions & 0 deletions cdc/processor/sinkmanager/redo_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,53 @@ func TestRedoEventCache(t *testing.T) {
require.Equal(t, uint64(0), appender.upperBound.StartTs)
require.Equal(t, uint64(0), appender.upperBound.CommitTs)
}

func TestRedoEventCacheAllPopBranches(t *testing.T) {
cache := newRedoEventCache(model.ChangeFeedID{}, 1000)
span := spanz.TableIDToComparableSpan(3)
appender := cache.maybeCreateAppender(span, engine.Position{StartTs: 101, CommitTs: 111})
var batch []*model.RowChangedEvent
var ok bool
var popRes popResult

batch = []*model.RowChangedEvent{{StartTs: 1, CommitTs: 11}, {StartTs: 1, CommitTs: 11}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{})
require.True(t, ok)

batch = []*model.RowChangedEvent{{StartTs: 2, CommitTs: 12}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{})
require.True(t, ok)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 3, CommitTs: 4})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 4, CommitTs: 4}, popRes.boundary)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 2}, engine.Position{StartTs: 300, CommitTs: 400})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.boundary)

popRes = appender.pop(engine.Position{StartTs: 1, CommitTs: 11}, engine.Position{StartTs: 2, CommitTs: 12})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 3, CommitTs: 12}, popRes.boundary)

batch = []*model.RowChangedEvent{{StartTs: 101, CommitTs: 111}, {StartTs: 101, CommitTs: 111}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{StartTs: 101, CommitTs: 111})
require.True(t, ok)

batch = []*model.RowChangedEvent{{StartTs: 102, CommitTs: 112}}
ok, _ = appender.pushBatch(batch, 0, engine.Position{})
require.True(t, ok)
require.Equal(t, 5, appender.readyCount)

popRes = appender.pop(engine.Position{StartTs: 101, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112})
require.True(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 101, CommitTs: 111}, popRes.boundary)
require.Equal(t, 2, len(popRes.events))
require.Equal(t, 1, popRes.pushCount)
require.Equal(t, uint64(101), popRes.events[1].StartTs)
require.Equal(t, 0, appender.readyCount)

popRes = appender.pop(engine.Position{StartTs: 102, CommitTs: 111}, engine.Position{StartTs: 102, CommitTs: 112})
require.False(t, popRes.success)
require.Equal(t, engine.Position{StartTs: 103, CommitTs: 112}, popRes.boundary)
}
3 changes: 2 additions & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
if err != nil {
return errors.Trace(err)
}
lastPos = lowerBound.Prev()
if drained {
performCallback(lowerBound.Prev())
performCallback(lastPos)
return nil
}
}
Expand Down

0 comments on commit 260d671

Please sign in to comment.