From 7ca07efb31983dd190ec4627d77339a47a689190 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 8 Sep 2023 19:28:36 +0800 Subject: [PATCH 1/4] This is an automated cherry-pick of #9686 Signed-off-by: ti-chi-bot --- cdc/processor/sinkmanager/redo_log_worker.go | 24 ++ .../sinkmanager/redo_log_worker_test.go | 330 ++++++++++++++++++ .../sinkmanager/table_sink_worker.go | 74 +++- .../sinkmanager/table_sink_worker_test.go | 95 +++++ 4 files changed, 521 insertions(+), 2 deletions(-) create mode 100644 cdc/processor/sinkmanager/redo_log_worker_test.go diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 5b674ba568c..709f1fee076 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -72,6 +72,7 @@ func (w *redoWorker) handleTasks(ctx context.Context, taskChan <-chan *redoTask) } func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr error) { +<<<<<<< HEAD lowerBound := task.lowerBound upperBound := task.getUpperBound(task.tableSink.getReceivedSorterResolvedTs()) lowerPhs := oracle.GetTimeFromTS(lowerBound.CommitTs) @@ -91,16 +92,34 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e zap.Int64("tableID", task.tableID), zap.Any("upperBound", upperBound)) } +======= + advancer := newRedoLogAdvancer(task, w.memQuota, requestMemSize, w.redoDMLManager) + // The task is finished and some required memory isn't used. + defer advancer.cleanup() + + lowerBound, upperBound := validateAndAdjustBound( + w.changefeedID, + &task.span, + task.lowerBound, + task.getUpperBound(task.tableSink.getReceivedSorterResolvedTs()), + ) + advancer.lastPos = lowerBound.Prev() +>>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) var cache *eventAppender if w.eventCache != nil { cache = w.eventCache.maybeCreateAppender(task.tableID, lowerBound) } +<<<<<<< HEAD // Events are pushed into redoEventCache if possible. Otherwise, their memory will // be released after they are written into redo files. Then we need to release their // memory quota, which can be calculated based on rowsSize and cachedSize. rowsSize := uint64(0) +======= + iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.memQuota) + allEventCount := 0 +>>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) cachedSize := uint64(0) rows := make([]*model.RowChangedEvent, 0, 1024) @@ -260,7 +279,12 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e // Still need to update cache upper boundary even if no events. cache.pushBatch(nil, 0, lastPos) } +<<<<<<< HEAD return maybeEmitBatchEvents(true, true) +======= + + return advancer.finish(ctx, cachedSize, upperBound) +>>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) } allEventCount += 1 diff --git a/cdc/processor/sinkmanager/redo_log_worker_test.go b/cdc/processor/sinkmanager/redo_log_worker_test.go new file mode 100644 index 00000000000..98e107d067d --- /dev/null +++ b/cdc/processor/sinkmanager/redo_log_worker_test.go @@ -0,0 +1,330 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sinkmanager + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/memquota" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/tiflow/pkg/upstream" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type redoLogWorkerSuite struct { + suite.Suite + testChangefeedID model.ChangeFeedID + testSpan tablepb.Span +} + +func (suite *redoLogWorkerSuite) SetupSuite() { + requestMemSize = testEventSize + // For one batch size. + // Advance table sink per 2 events. + maxUpdateIntervalSize = testEventSize * 2 + suite.testChangefeedID = model.DefaultChangeFeedID("1") + suite.testSpan = spanz.TableIDToComparableSpan(1) +} + +func (suite *redoLogWorkerSuite) TearDownSuite() { + requestMemSize = defaultRequestMemSize + maxUpdateIntervalSize = defaultMaxUpdateIntervalSize +} + +func TestRedoLogWorkerSuite(t *testing.T) { + suite.Run(t, new(redoLogWorkerSuite)) +} + +//nolint:unparam +func (suite *redoLogWorkerSuite) createWorker( + ctx context.Context, memQuota uint64, +) (*redoWorker, engine.SortEngine, *mockRedoDMLManager) { + sortEngine := memory.New(context.Background()) + // Only sourcemanager.FetcyByTable is used, so NewForTest is fine. + sm := sourcemanager.NewForTest(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}), + &entry.MockMountGroup{}, sortEngine, false) + go func() { _ = sm.Run(ctx) }() + + // To avoid refund or release panics. + quota := memquota.NewMemQuota(suite.testChangefeedID, memQuota, "sink") + // NOTICE: Do not forget the initial memory quota in the worker first time running. + quota.ForceAcquire(testEventSize) + quota.AddTable(suite.testSpan) + redoDMLManager := newMockRedoDMLManager() + eventCache := newRedoEventCache(suite.testChangefeedID, 1024) + + return newRedoWorker(suite.testChangefeedID, sm, quota, + redoDMLManager, eventCache), sortEngine, redoDMLManager +} + +func (suite *redoLogWorkerSuite) addEventsToSortEngine( + events []*model.PolymorphicEvent, + sortEngine engine.SortEngine, +) { + sortEngine.AddTable(suite.testSpan, 0) + for _, event := range events { + sortEngine.Add(suite.testSpan, event) + } +} + +func (suite *redoLogWorkerSuite) TestHandleTaskGotSomeFilteredEvents() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 2, suite.testSpan), + // This event will be filtered, so its Row will be nil. + genPolymorphicEventWithNilRow(1, 2), + genPolymorphicEventWithNilRow(1, 2), + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicEvent(1, 4, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + + // Only for three events. + eventSize := uint64(testEventSize * 3) + w, e, m := suite.createWorker(ctx, eventSize) + defer w.memQuota.Close() + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *redoTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), engine.Position{ + StartTs: 1, + CommitTs: 4, + }, lastWritePos) + require.Equal(suite.T(), engine.Position{ + StartTs: 2, + CommitTs: 4, + }, lastWritePos.Next()) + cancel() + } + wrapper, _ := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + taskChan <- &redoTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return false }, + } + wg.Wait() + require.Len(suite.T(), m.getEvents(suite.testSpan), 3) + require.Len(suite.T(), w.eventCache.getAppender(suite.testSpan).getEvents(), 3) +} + +func (suite *redoLogWorkerSuite) TestHandleTaskAbortWhenNoMemAndOneTxnFinished() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 2, suite.testSpan), + genPolymorphicEvent(1, 2, suite.testSpan), + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicEvent(2, 4, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + + // Only for three events. + eventSize := uint64(testEventSize * 3) + w, e, m := suite.createWorker(ctx, eventSize) + defer w.memQuota.Close() + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *redoTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), engine.Position{ + StartTs: 1, + CommitTs: 3, + }, lastWritePos, "we only write 3 events because of the memory quota") + require.Equal(suite.T(), engine.Position{ + StartTs: 2, + CommitTs: 3, + }, lastWritePos.Next()) + cancel() + } + wrapper, _ := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + taskChan <- &redoTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return false }, + } + wg.Wait() + require.Len(suite.T(), m.getEvents(suite.testSpan), 3) + require.Len(suite.T(), w.eventCache.getAppender(suite.testSpan).getEvents(), 3) +} + +func (suite *redoLogWorkerSuite) TestHandleTaskAbortWhenNoMemAndBlocked() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 10, suite.testSpan), + genPolymorphicEvent(1, 10, suite.testSpan), + genPolymorphicEvent(1, 10, suite.testSpan), + genPolymorphicEvent(1, 10, suite.testSpan), + genPolymorphicResolvedEvent(14), + } + // Only for three events. + eventSize := uint64(testEventSize * 3) + w, e, m := suite.createWorker(ctx, eventSize) + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *redoTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.ErrorIs(suite.T(), err, context.Canceled) + }() + + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), engine.Position{ + StartTs: 0, + CommitTs: 0, + }, lastWritePos) + } + wrapper, _ := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + taskChan <- &redoTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(14), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return false }, + } + require.Eventually(suite.T(), func() bool { + return len(m.getEvents(suite.testSpan)) == 2 + }, 5*time.Second, 10*time.Millisecond) + // Abort the task when no memory quota and blocked. + w.memQuota.Close() + cancel() + wg.Wait() + require.Len(suite.T(), w.eventCache.getAppender(suite.testSpan).getEvents(), 3) + require.Len(suite.T(), m.getEvents(suite.testSpan), 2, "Only two events should be sent to sink") +} + +func (suite *redoLogWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceIfNoWorkload() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicResolvedEvent(4), + } + // Only for three events. + eventSize := uint64(testEventSize * 3) + w, e, m := suite.createWorker(ctx, eventSize) + defer w.memQuota.Close() + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *redoTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.ErrorIs(suite.T(), err, context.Canceled) + }() + + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), engine.Position{ + StartTs: 3, + CommitTs: 4, + }, lastWritePos) + require.Equal(suite.T(), engine.Position{ + StartTs: 4, + CommitTs: 4, + }, lastWritePos.Next()) + } + wrapper, _ := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + taskChan <- &redoTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return false }, + } + require.Eventually(suite.T(), func() bool { + return m.GetResolvedTs(suite.testSpan) == 4 + }, 5*time.Second, 10*time.Millisecond, "Directly advance resolved mark to 4") + cancel() + wg.Wait() +} + +// When starts to handle a task, advancer.lastPos should be set to a correct position. +// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an +// invalid `advancer.lastPos`. +func (suite *redoLogWorkerSuite) TestHandleTaskWithoutMemory() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + w, e, _ := suite.createWorker(ctx, 0) + defer w.memQuota.Close() + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *redoTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos) + close(chShouldBeClosed) + } + taskChan <- &redoTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return true }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +} diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index d5c1598776e..c3a9c4538bd 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -130,6 +130,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // is false, and it won't break transaction atomicity to downstreams. batchID := uint64(1) +<<<<<<< HEAD if w.eventCache != nil { drained, err := w.fetchFromCache(task, &lowerBound, &upperBound, &batchID) if err != nil { @@ -146,6 +147,14 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e return nil } } +======= + lowerBound, upperBound := validateAndAdjustBound( + w.changefeedID, + &task.span, + task.lowerBound, + task.getUpperBound(task.tableSink.getUpperBoundTs())) + advancer.lastPos = lowerBound.Prev() +>>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) needEmitAndAdvance := func() bool { // For splitTxn is enabled or not, sizes of events can be advanced will be different. @@ -267,7 +276,19 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // lowerBound and upperBound are both closed intervals. allEventSize := uint64(0) allEventCount := 0 +<<<<<<< HEAD iter := w.sourceManager.FetchByTable(task.tableID, lowerBound, upperBound, w.sinkMemQuota) +======= + + callbackIsPerformed := false + performCallback := func(pos engine.Position) { + if !callbackIsPerformed { + task.callback(pos) + callbackIsPerformed = true + } + } + +>>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) defer func() { w.metricRedoEventCacheMiss.Add(float64(allEventSize)) metrics.OutputEventCount.WithLabelValues( @@ -281,6 +302,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e task.tableSink.updateRangeEventCounts(eventCount) } +<<<<<<< HEAD if err := iter.Close(); err != nil { log.Error("Sink worker fails to close iterator", zap.String("namespace", w.changefeedID.Namespace), @@ -289,6 +311,8 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e zap.Error(err)) } +======= +>>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) log.Debug("Sink task finished", zap.String("namespace", w.changefeedID.Namespace), zap.String("changefeed", w.changefeedID.ID), @@ -302,8 +326,12 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e zap.Error(finalErr)) if finalErr == nil { +<<<<<<< HEAD // Otherwise we can't ensure all events before `lastPos` are emitted. task.callback(lastPos) +======= + performCallback(advancer.lastPos) +>>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) } else { switch errors.Cause(finalErr).(type) { // If it's a warning, close the table sink and wait all pending @@ -316,16 +344,23 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e w.sinkMemQuota.ClearTable(task.tableSink.tableID) // Restart the table sink based on the checkpoint position. - if finalErr = task.tableSink.restart(ctx); finalErr == nil { + if err := task.tableSink.restart(ctx); err == nil { checkpointTs, _, _ := task.tableSink.getCheckpointTs() ckpt := checkpointTs.ResolvedMark() lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} - task.callback(lastWrittenPos) + performCallback(lastWrittenPos) log.Info("table sink has been restarted", zap.String("namespace", w.changefeedID.Namespace), zap.String("changefeed", w.changefeedID.ID), +<<<<<<< HEAD zap.Int64("tableID", task.tableID), zap.Any("lastWrittenPos", lastWrittenPos)) +======= + zap.Stringer("span", &task.span), + zap.Any("lastWrittenPos", lastWrittenPos), + zap.String("sinkError", finalErr.Error())) + finalErr = err +>>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) } default: } @@ -342,7 +377,42 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e } }() +<<<<<<< HEAD for availableMem > usedMem && !task.isCanceled() { +======= + if w.eventCache != nil { + drained, err := w.fetchFromCache(task, &lowerBound, &upperBound) + failpoint.Inject("TableSinkWorkerFetchFromCache", func() { + err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected")) + }) + if err != nil { + return errors.Trace(err) + } + if drained { + // If drained is true it means we have drained all events from the cache, + // we can return directly instead of get events from the source manager again. + performCallback(lowerBound.Prev()) + return nil + } + advancer.lastPos = lowerBound.Prev() + } + + // lowerBound and upperBound are both closed intervals. + iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.sinkMemQuota) + defer func() { + if err := iter.Close(); err != nil { + log.Error("Sink worker fails to close iterator", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Stringer("span", &task.span), + zap.Error(err)) + } + }() + + // 1. We have enough memory to collect events. + // 2. The task is not canceled. + for advancer.hasEnoughMem() && !task.isCanceled() { +>>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) e, pos, err := iter.Next(ctx) if err != nil { return errors.Trace(err) diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 6dd88bf9965..052c66a9724 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/memquota" @@ -1097,3 +1098,97 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNoWorkload( func TestWorkerSuite(t *testing.T) { suite.Run(t, new(workerSuite)) } + +func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + // Only for three events. + eventSize := uint64(testEventSize * 3) + w, e := suite.createWorker(ctx, eventSize, true) + w.eventCache = newRedoEventCache(suite.testChangefeedID, 1024*1024) + defer w.sinkMemQuota.Close() + suite.addEventsToSortEngine(events, e) + + _ = failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache", "return") + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache") + }() + + taskChan := make(chan *sinkTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + close(chShouldBeClosed) + } + taskChan <- &sinkTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return false }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +} + +// When starts to handle a task, advancer.lastPos should be set to a correct position. +// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an +// invalid `advancer.lastPos`. +func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + w, e := suite.createWorker(ctx, 0, true) + defer w.sinkMemQuota.Close() + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *sinkTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos) + close(chShouldBeClosed) + } + taskChan <- &sinkTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return true }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +} From 59a90b63a42139ac88c9e033f72244df508b6dd7 Mon Sep 17 00:00:00 2001 From: qupeng Date: Sun, 10 Sep 2023 22:08:12 +0800 Subject: [PATCH 2/4] fix conflicts Signed-off-by: qupeng --- .../sinkmanager/redo_log_worker_test.go | 330 ------------------ .../sinkmanager/table_sink_worker.go | 222 +++++------- .../sinkmanager/table_sink_worker_test.go | 70 +++- 3 files changed, 138 insertions(+), 484 deletions(-) delete mode 100644 cdc/processor/sinkmanager/redo_log_worker_test.go diff --git a/cdc/processor/sinkmanager/redo_log_worker_test.go b/cdc/processor/sinkmanager/redo_log_worker_test.go deleted file mode 100644 index 98e107d067d..00000000000 --- a/cdc/processor/sinkmanager/redo_log_worker_test.go +++ /dev/null @@ -1,330 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package sinkmanager - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/pingcap/tiflow/cdc/entry" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/processor/memquota" - "github.com/pingcap/tiflow/cdc/processor/sourcemanager" - "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" - "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory" - "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/pkg/spanz" - "github.com/pingcap/tiflow/pkg/upstream" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" -) - -type redoLogWorkerSuite struct { - suite.Suite - testChangefeedID model.ChangeFeedID - testSpan tablepb.Span -} - -func (suite *redoLogWorkerSuite) SetupSuite() { - requestMemSize = testEventSize - // For one batch size. - // Advance table sink per 2 events. - maxUpdateIntervalSize = testEventSize * 2 - suite.testChangefeedID = model.DefaultChangeFeedID("1") - suite.testSpan = spanz.TableIDToComparableSpan(1) -} - -func (suite *redoLogWorkerSuite) TearDownSuite() { - requestMemSize = defaultRequestMemSize - maxUpdateIntervalSize = defaultMaxUpdateIntervalSize -} - -func TestRedoLogWorkerSuite(t *testing.T) { - suite.Run(t, new(redoLogWorkerSuite)) -} - -//nolint:unparam -func (suite *redoLogWorkerSuite) createWorker( - ctx context.Context, memQuota uint64, -) (*redoWorker, engine.SortEngine, *mockRedoDMLManager) { - sortEngine := memory.New(context.Background()) - // Only sourcemanager.FetcyByTable is used, so NewForTest is fine. - sm := sourcemanager.NewForTest(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}), - &entry.MockMountGroup{}, sortEngine, false) - go func() { _ = sm.Run(ctx) }() - - // To avoid refund or release panics. - quota := memquota.NewMemQuota(suite.testChangefeedID, memQuota, "sink") - // NOTICE: Do not forget the initial memory quota in the worker first time running. - quota.ForceAcquire(testEventSize) - quota.AddTable(suite.testSpan) - redoDMLManager := newMockRedoDMLManager() - eventCache := newRedoEventCache(suite.testChangefeedID, 1024) - - return newRedoWorker(suite.testChangefeedID, sm, quota, - redoDMLManager, eventCache), sortEngine, redoDMLManager -} - -func (suite *redoLogWorkerSuite) addEventsToSortEngine( - events []*model.PolymorphicEvent, - sortEngine engine.SortEngine, -) { - sortEngine.AddTable(suite.testSpan, 0) - for _, event := range events { - sortEngine.Add(suite.testSpan, event) - } -} - -func (suite *redoLogWorkerSuite) TestHandleTaskGotSomeFilteredEvents() { - ctx, cancel := context.WithCancel(context.Background()) - events := []*model.PolymorphicEvent{ - genPolymorphicEvent(1, 2, suite.testSpan), - // This event will be filtered, so its Row will be nil. - genPolymorphicEventWithNilRow(1, 2), - genPolymorphicEventWithNilRow(1, 2), - genPolymorphicEvent(1, 3, suite.testSpan), - genPolymorphicEvent(1, 4, suite.testSpan), - genPolymorphicResolvedEvent(4), - } - - // Only for three events. - eventSize := uint64(testEventSize * 3) - w, e, m := suite.createWorker(ctx, eventSize) - defer w.memQuota.Close() - suite.addEventsToSortEngine(events, e) - - taskChan := make(chan *redoTask) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := w.handleTasks(ctx, taskChan) - require.Equal(suite.T(), context.Canceled, err) - }() - - callback := func(lastWritePos engine.Position) { - require.Equal(suite.T(), engine.Position{ - StartTs: 1, - CommitTs: 4, - }, lastWritePos) - require.Equal(suite.T(), engine.Position{ - StartTs: 2, - CommitTs: 4, - }, lastWritePos.Next()) - cancel() - } - wrapper, _ := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) - taskChan <- &redoTask{ - span: suite.testSpan, - lowerBound: genLowerBound(), - getUpperBound: genUpperBoundGetter(4), - tableSink: wrapper, - callback: callback, - isCanceled: func() bool { return false }, - } - wg.Wait() - require.Len(suite.T(), m.getEvents(suite.testSpan), 3) - require.Len(suite.T(), w.eventCache.getAppender(suite.testSpan).getEvents(), 3) -} - -func (suite *redoLogWorkerSuite) TestHandleTaskAbortWhenNoMemAndOneTxnFinished() { - ctx, cancel := context.WithCancel(context.Background()) - events := []*model.PolymorphicEvent{ - genPolymorphicEvent(1, 2, suite.testSpan), - genPolymorphicEvent(1, 2, suite.testSpan), - genPolymorphicEvent(1, 3, suite.testSpan), - genPolymorphicEvent(2, 4, suite.testSpan), - genPolymorphicResolvedEvent(4), - } - - // Only for three events. - eventSize := uint64(testEventSize * 3) - w, e, m := suite.createWorker(ctx, eventSize) - defer w.memQuota.Close() - suite.addEventsToSortEngine(events, e) - - taskChan := make(chan *redoTask) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := w.handleTasks(ctx, taskChan) - require.Equal(suite.T(), context.Canceled, err) - }() - - callback := func(lastWritePos engine.Position) { - require.Equal(suite.T(), engine.Position{ - StartTs: 1, - CommitTs: 3, - }, lastWritePos, "we only write 3 events because of the memory quota") - require.Equal(suite.T(), engine.Position{ - StartTs: 2, - CommitTs: 3, - }, lastWritePos.Next()) - cancel() - } - wrapper, _ := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) - taskChan <- &redoTask{ - span: suite.testSpan, - lowerBound: genLowerBound(), - getUpperBound: genUpperBoundGetter(4), - tableSink: wrapper, - callback: callback, - isCanceled: func() bool { return false }, - } - wg.Wait() - require.Len(suite.T(), m.getEvents(suite.testSpan), 3) - require.Len(suite.T(), w.eventCache.getAppender(suite.testSpan).getEvents(), 3) -} - -func (suite *redoLogWorkerSuite) TestHandleTaskAbortWhenNoMemAndBlocked() { - ctx, cancel := context.WithCancel(context.Background()) - events := []*model.PolymorphicEvent{ - genPolymorphicEvent(1, 10, suite.testSpan), - genPolymorphicEvent(1, 10, suite.testSpan), - genPolymorphicEvent(1, 10, suite.testSpan), - genPolymorphicEvent(1, 10, suite.testSpan), - genPolymorphicResolvedEvent(14), - } - // Only for three events. - eventSize := uint64(testEventSize * 3) - w, e, m := suite.createWorker(ctx, eventSize) - suite.addEventsToSortEngine(events, e) - - taskChan := make(chan *redoTask) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := w.handleTasks(ctx, taskChan) - require.ErrorIs(suite.T(), err, context.Canceled) - }() - - callback := func(lastWritePos engine.Position) { - require.Equal(suite.T(), engine.Position{ - StartTs: 0, - CommitTs: 0, - }, lastWritePos) - } - wrapper, _ := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) - taskChan <- &redoTask{ - span: suite.testSpan, - lowerBound: genLowerBound(), - getUpperBound: genUpperBoundGetter(14), - tableSink: wrapper, - callback: callback, - isCanceled: func() bool { return false }, - } - require.Eventually(suite.T(), func() bool { - return len(m.getEvents(suite.testSpan)) == 2 - }, 5*time.Second, 10*time.Millisecond) - // Abort the task when no memory quota and blocked. - w.memQuota.Close() - cancel() - wg.Wait() - require.Len(suite.T(), w.eventCache.getAppender(suite.testSpan).getEvents(), 3) - require.Len(suite.T(), m.getEvents(suite.testSpan), 2, "Only two events should be sent to sink") -} - -func (suite *redoLogWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceIfNoWorkload() { - ctx, cancel := context.WithCancel(context.Background()) - events := []*model.PolymorphicEvent{ - genPolymorphicResolvedEvent(4), - } - // Only for three events. - eventSize := uint64(testEventSize * 3) - w, e, m := suite.createWorker(ctx, eventSize) - defer w.memQuota.Close() - suite.addEventsToSortEngine(events, e) - - taskChan := make(chan *redoTask) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := w.handleTasks(ctx, taskChan) - require.ErrorIs(suite.T(), err, context.Canceled) - }() - - callback := func(lastWritePos engine.Position) { - require.Equal(suite.T(), engine.Position{ - StartTs: 3, - CommitTs: 4, - }, lastWritePos) - require.Equal(suite.T(), engine.Position{ - StartTs: 4, - CommitTs: 4, - }, lastWritePos.Next()) - } - wrapper, _ := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) - taskChan <- &redoTask{ - span: suite.testSpan, - lowerBound: genLowerBound(), - getUpperBound: genUpperBoundGetter(4), - tableSink: wrapper, - callback: callback, - isCanceled: func() bool { return false }, - } - require.Eventually(suite.T(), func() bool { - return m.GetResolvedTs(suite.testSpan) == 4 - }, 5*time.Second, 10*time.Millisecond, "Directly advance resolved mark to 4") - cancel() - wg.Wait() -} - -// When starts to handle a task, advancer.lastPos should be set to a correct position. -// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an -// invalid `advancer.lastPos`. -func (suite *redoLogWorkerSuite) TestHandleTaskWithoutMemory() { - ctx, cancel := context.WithCancel(context.Background()) - events := []*model.PolymorphicEvent{ - genPolymorphicEvent(1, 3, suite.testSpan), - genPolymorphicResolvedEvent(4), - } - w, e, _ := suite.createWorker(ctx, 0) - defer w.memQuota.Close() - suite.addEventsToSortEngine(events, e) - - taskChan := make(chan *redoTask) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err := w.handleTasks(ctx, taskChan) - require.Equal(suite.T(), context.Canceled, err) - }() - - wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) - defer sink.Close() - - chShouldBeClosed := make(chan struct{}, 1) - callback := func(lastWritePos engine.Position) { - require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos) - close(chShouldBeClosed) - } - taskChan <- &redoTask{ - span: suite.testSpan, - lowerBound: genLowerBound(), - getUpperBound: genUpperBoundGetter(4), - tableSink: wrapper, - callback: callback, - isCanceled: func() bool { return true }, - } - - <-chShouldBeClosed - cancel() - wg.Wait() -} diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index c3a9c4538bd..d9d313b44a4 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -115,7 +115,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // Used to record the last written position. // We need to use it to update the lower bound of the table sink. - var lastPos engine.Position + var lastPos engine.Position = lowerBound.Prev() // To advance table sink in different cases: splitTxn is true or not. committedTxnSize := uint64(0) // Can be used in `advanceTableSink`. @@ -130,9 +130,90 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // is false, and it won't break transaction atomicity to downstreams. batchID := uint64(1) -<<<<<<< HEAD + allEventSize := uint64(0) + allEventCount := 0 + + callbackIsPerformed := false + performCallback := func(pos engine.Position) { + if !callbackIsPerformed { + task.callback(pos) + callbackIsPerformed = true + } + } + + defer func() { + w.metricRedoEventCacheMiss.Add(float64(allEventSize)) + metrics.OutputEventCount.WithLabelValues( + task.tableSink.changefeed.Namespace, + task.tableSink.changefeed.ID, + "kv", + ).Add(float64(allEventCount)) + + if w.eventCache == nil { + eventCount := newRangeEventCount(lastPos, allEventCount) + task.tableSink.updateRangeEventCounts(eventCount) + } + + log.Debug("Sink task finished", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Int64("tableID", task.tableID), + zap.Any("lowerBound", lowerBound), + zap.Any("upperBound", upperBound), + zap.Bool("splitTxn", w.splitTxn), + zap.Int("receivedEvents", allEventCount), + zap.Any("lastPos", lastPos), + zap.Float64("lag", time.Since(oracle.GetTimeFromTS(lastPos.CommitTs)).Seconds()), + zap.Error(finalErr)) + + if finalErr == nil { + // Otherwise we can't ensure all events before `lastPos` are emitted. + performCallback(lastPos) + } else { + switch errors.Cause(finalErr).(type) { + // If it's a warning, close the table sink and wait all pending + // events have been reported. Then we can continue the table + // at the checkpoint position. + case tablesink.SinkInternalError: + task.tableSink.closeAndClearTableSink() + // After the table sink is cleared all pending events are sent out or dropped. + // So we can re-add the table into sinkMemQuota. + w.sinkMemQuota.ClearTable(task.tableSink.tableID) + + // Restart the table sink based on the checkpoint position. + if err := task.tableSink.restart(ctx); err == nil { + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + ckpt := checkpointTs.ResolvedMark() + lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} + performCallback(lastWrittenPos) + log.Info("table sink has been restarted", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Int64("tableID", task.tableID), + zap.Any("lastWrittenPos", lastWrittenPos), + zap.String("sinkError", finalErr.Error())) + finalErr = err + } + default: + } + } + + // The task is finished and some required memory isn't used. + if availableMem > usedMem { + w.sinkMemQuota.Refund(availableMem - usedMem) + log.Debug("MemoryQuotaTracing: refund memory for table sink task", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Int64("tableID", task.tableID), + zap.Uint64("memory", availableMem-usedMem)) + } + }() + if w.eventCache != nil { drained, err := w.fetchFromCache(task, &lowerBound, &upperBound, &batchID) + failpoint.Inject("TableSinkWorkerFetchFromCache", func() { + err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected")) + }) if err != nil { return errors.Trace(err) } @@ -143,18 +224,10 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e zap.String("changefeed", w.changefeedID.ID), zap.Int64("tableID", task.tableID), zap.Uint64("memory", availableMem-usedMem)) - task.callback(lowerBound.Prev()) + performCallback(lowerBound.Prev()) return nil } } -======= - lowerBound, upperBound := validateAndAdjustBound( - w.changefeedID, - &task.span, - task.lowerBound, - task.getUpperBound(task.tableSink.getUpperBoundTs())) - advancer.lastPos = lowerBound.Prev() ->>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) needEmitAndAdvance := func() bool { // For splitTxn is enabled or not, sizes of events can be advanced will be different. @@ -274,35 +347,8 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e } // lowerBound and upperBound are both closed intervals. - allEventSize := uint64(0) - allEventCount := 0 -<<<<<<< HEAD iter := w.sourceManager.FetchByTable(task.tableID, lowerBound, upperBound, w.sinkMemQuota) -======= - - callbackIsPerformed := false - performCallback := func(pos engine.Position) { - if !callbackIsPerformed { - task.callback(pos) - callbackIsPerformed = true - } - } - ->>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) defer func() { - w.metricRedoEventCacheMiss.Add(float64(allEventSize)) - metrics.OutputEventCount.WithLabelValues( - task.tableSink.changefeed.Namespace, - task.tableSink.changefeed.ID, - "kv", - ).Add(float64(allEventCount)) - - if w.eventCache == nil { - eventCount := newRangeEventCount(lastPos, allEventCount) - task.tableSink.updateRangeEventCounts(eventCount) - } - -<<<<<<< HEAD if err := iter.Close(); err != nil { log.Error("Sink worker fails to close iterator", zap.String("namespace", w.changefeedID.Namespace), @@ -310,109 +356,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e zap.Int64("tableID", task.tableID), zap.Error(err)) } - -======= ->>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) - log.Debug("Sink task finished", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Int64("tableID", task.tableID), - zap.Any("lowerBound", lowerBound), - zap.Any("upperBound", upperBound), - zap.Bool("splitTxn", w.splitTxn), - zap.Int("receivedEvents", allEventCount), - zap.Any("lastPos", lastPos), - zap.Float64("lag", time.Since(oracle.GetTimeFromTS(lastPos.CommitTs)).Seconds()), - zap.Error(finalErr)) - - if finalErr == nil { -<<<<<<< HEAD - // Otherwise we can't ensure all events before `lastPos` are emitted. - task.callback(lastPos) -======= - performCallback(advancer.lastPos) ->>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) - } else { - switch errors.Cause(finalErr).(type) { - // If it's a warning, close the table sink and wait all pending - // events have been reported. Then we can continue the table - // at the checkpoint position. - case tablesink.SinkInternalError: - task.tableSink.closeAndClearTableSink() - // After the table sink is cleared all pending events are sent out or dropped. - // So we can re-add the table into sinkMemQuota. - w.sinkMemQuota.ClearTable(task.tableSink.tableID) - - // Restart the table sink based on the checkpoint position. - if err := task.tableSink.restart(ctx); err == nil { - checkpointTs, _, _ := task.tableSink.getCheckpointTs() - ckpt := checkpointTs.ResolvedMark() - lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} - performCallback(lastWrittenPos) - log.Info("table sink has been restarted", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), -<<<<<<< HEAD - zap.Int64("tableID", task.tableID), - zap.Any("lastWrittenPos", lastWrittenPos)) -======= - zap.Stringer("span", &task.span), - zap.Any("lastWrittenPos", lastWrittenPos), - zap.String("sinkError", finalErr.Error())) - finalErr = err ->>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) - } - default: - } - } - - // The task is finished and some required memory isn't used. - if availableMem > usedMem { - w.sinkMemQuota.Refund(availableMem - usedMem) - log.Debug("MemoryQuotaTracing: refund memory for table sink task", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Int64("tableID", task.tableID), - zap.Uint64("memory", availableMem-usedMem)) - } }() -<<<<<<< HEAD for availableMem > usedMem && !task.isCanceled() { -======= - if w.eventCache != nil { - drained, err := w.fetchFromCache(task, &lowerBound, &upperBound) - failpoint.Inject("TableSinkWorkerFetchFromCache", func() { - err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected")) - }) - if err != nil { - return errors.Trace(err) - } - if drained { - // If drained is true it means we have drained all events from the cache, - // we can return directly instead of get events from the source manager again. - performCallback(lowerBound.Prev()) - return nil - } - advancer.lastPos = lowerBound.Prev() - } - - // lowerBound and upperBound are both closed intervals. - iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.sinkMemQuota) - defer func() { - if err := iter.Close(); err != nil { - log.Error("Sink worker fails to close iterator", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Stringer("span", &task.span), - zap.Error(err)) - } - }() - - // 1. We have enough memory to collect events. - // 2. The task is not canceled. - for advancer.hasEnoughMem() && !task.isCanceled() { ->>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) e, pos, err := iter.Next(ctx) if err != nil { return errors.Trace(err) diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 052c66a9724..5328f7cebe0 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -60,6 +60,29 @@ func addEventsToSortEngine(t *testing.T, events []*model.PolymorphicEvent, sortE } } +func genPolymorphicResolvedEvent(resolvedTs uint64) *model.PolymorphicEvent { + return &model.PolymorphicEvent{ + CRTs: resolvedTs, + RawKV: &model.RawKVEntry{ + OpType: model.OpTypeResolved, + CRTs: resolvedTs, + }, + } +} + +func genPolymorphicEvent(startTs, commitTs uint64, tableID model.TableID) *model.PolymorphicEvent { + return &model.PolymorphicEvent{ + StartTs: startTs, + CRTs: commitTs, + RawKV: &model.RawKVEntry{ + OpType: model.OpTypePut, + StartTs: startTs, + CRTs: commitTs, + }, + Row: genRowChangedEvent(startTs, commitTs, tableID), + } +} + // It is ok to use the same tableID in test. // //nolint:unparam @@ -1099,20 +1122,19 @@ func TestWorkerSuite(t *testing.T) { suite.Run(t, new(workerSuite)) } -func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() { +func (suite *workerSuite) TestFetchFromCacheWithFailure() { ctx, cancel := context.WithCancel(context.Background()) events := []*model.PolymorphicEvent{ - genPolymorphicEvent(1, 3, suite.testSpan), - genPolymorphicEvent(1, 3, suite.testSpan), - genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicEvent(1, 3, 1), + genPolymorphicEvent(1, 3, 1), + genPolymorphicEvent(1, 3, 1), genPolymorphicResolvedEvent(4), } // Only for three events. - eventSize := uint64(testEventSize * 3) - w, e := suite.createWorker(ctx, eventSize, true) - w.eventCache = newRedoEventCache(suite.testChangefeedID, 1024*1024) + w, e := createWorker(model.ChangeFeedID{}, 1024*1024, true, 1) + w.eventCache = newRedoEventCache(model.ChangeFeedID{}, 1024*1024) defer w.sinkMemQuota.Close() - suite.addEventsToSortEngine(events, e) + addEventsToSortEngine(suite.T(), events, e, 1) _ = failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache", "return") defer func() { @@ -1128,7 +1150,7 @@ func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() { require.Equal(suite.T(), context.Canceled, err) }() - wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + wrapper, sink := createTableSinkWrapper(model.ChangeFeedID{}, 1) defer sink.Close() chShouldBeClosed := make(chan struct{}, 1) @@ -1136,7 +1158,7 @@ func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() { close(chShouldBeClosed) } taskChan <- &sinkTask{ - span: suite.testSpan, + tableID: 1, lowerBound: genLowerBound(), getUpperBound: genUpperBoundGetter(4), tableSink: wrapper, @@ -1152,15 +1174,15 @@ func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() { // When starts to handle a task, advancer.lastPos should be set to a correct position. // Otherwise if advancer.lastPos isn't updated during scanning, callback will get an // invalid `advancer.lastPos`. -func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() { +func (suite *workerSuite) TestHandleTaskWithoutMemory() { ctx, cancel := context.WithCancel(context.Background()) events := []*model.PolymorphicEvent{ - genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicEvent(1, 3, 1), genPolymorphicResolvedEvent(4), } - w, e := suite.createWorker(ctx, 0, true) + w, e := createWorker(model.ChangeFeedID{}, 0, true, 1) defer w.sinkMemQuota.Close() - suite.addEventsToSortEngine(events, e) + addEventsToSortEngine(suite.T(), events, e, 1) taskChan := make(chan *sinkTask) var wg sync.WaitGroup @@ -1171,7 +1193,7 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() { require.Equal(suite.T(), context.Canceled, err) }() - wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + wrapper, sink := createTableSinkWrapper(model.ChangeFeedID{}, 1) defer sink.Close() chShouldBeClosed := make(chan struct{}, 1) @@ -1180,7 +1202,7 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() { close(chShouldBeClosed) } taskChan <- &sinkTask{ - span: suite.testSpan, + tableID: 1, lowerBound: genLowerBound(), getUpperBound: genUpperBoundGetter(4), tableSink: wrapper, @@ -1192,3 +1214,19 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() { cancel() wg.Wait() } + +func genLowerBound() engine.Position { + return engine.Position{ + StartTs: 0, + CommitTs: 1, + } +} + +func genUpperBoundGetter(commitTs model.Ts) func(_ model.Ts) engine.Position { + return func(_ model.Ts) engine.Position { + return engine.Position{ + StartTs: commitTs - 1, + CommitTs: commitTs, + } + } +} From 6424d365117254541ea59f1246b49b4077da9b7a Mon Sep 17 00:00:00 2001 From: qupeng Date: Sun, 10 Sep 2023 22:09:08 +0800 Subject: [PATCH 3/4] fix Signed-off-by: qupeng --- cdc/processor/sinkmanager/redo_log_worker.go | 24 -------------------- 1 file changed, 24 deletions(-) diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 709f1fee076..5b674ba568c 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -72,7 +72,6 @@ func (w *redoWorker) handleTasks(ctx context.Context, taskChan <-chan *redoTask) } func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr error) { -<<<<<<< HEAD lowerBound := task.lowerBound upperBound := task.getUpperBound(task.tableSink.getReceivedSorterResolvedTs()) lowerPhs := oracle.GetTimeFromTS(lowerBound.CommitTs) @@ -92,34 +91,16 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e zap.Int64("tableID", task.tableID), zap.Any("upperBound", upperBound)) } -======= - advancer := newRedoLogAdvancer(task, w.memQuota, requestMemSize, w.redoDMLManager) - // The task is finished and some required memory isn't used. - defer advancer.cleanup() - - lowerBound, upperBound := validateAndAdjustBound( - w.changefeedID, - &task.span, - task.lowerBound, - task.getUpperBound(task.tableSink.getReceivedSorterResolvedTs()), - ) - advancer.lastPos = lowerBound.Prev() ->>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) var cache *eventAppender if w.eventCache != nil { cache = w.eventCache.maybeCreateAppender(task.tableID, lowerBound) } -<<<<<<< HEAD // Events are pushed into redoEventCache if possible. Otherwise, their memory will // be released after they are written into redo files. Then we need to release their // memory quota, which can be calculated based on rowsSize and cachedSize. rowsSize := uint64(0) -======= - iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.memQuota) - allEventCount := 0 ->>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) cachedSize := uint64(0) rows := make([]*model.RowChangedEvent, 0, 1024) @@ -279,12 +260,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e // Still need to update cache upper boundary even if no events. cache.pushBatch(nil, 0, lastPos) } -<<<<<<< HEAD return maybeEmitBatchEvents(true, true) -======= - - return advancer.finish(ctx, cachedSize, upperBound) ->>>>>>> 85dcc860d3 (sink(cdc): fix "dead dmlSink" error in sink workers (#9686)) } allEventCount += 1 From 78a7b29266e34f6f87ad7548cd5ac5ba57806d8b Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 11 Sep 2023 10:28:56 +0800 Subject: [PATCH 4/4] fix Signed-off-by: qupeng --- cdc/processor/sinkmanager/table_sink_worker.go | 6 ------ cdc/processor/sinkmanager/table_sink_worker_test.go | 1 + 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index d9d313b44a4..a1d6725bcd8 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -218,12 +218,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e return errors.Trace(err) } if drained { - w.sinkMemQuota.Refund(availableMem - usedMem) - log.Debug("MemoryQuotaTracing: refund memory for table sink task", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Int64("tableID", task.tableID), - zap.Uint64("memory", availableMem-usedMem)) performCallback(lowerBound.Prev()) return nil } diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 5328f7cebe0..964c336b746 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -70,6 +70,7 @@ func genPolymorphicResolvedEvent(resolvedTs uint64) *model.PolymorphicEvent { } } +//nolint:all func genPolymorphicEvent(startTs, commitTs uint64, tableID model.TableID) *model.PolymorphicEvent { return &model.PolymorphicEvent{ StartTs: startTs,