diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index d5c1598776e..a1d6725bcd8 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -115,7 +115,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // Used to record the last written position. // We need to use it to update the lower bound of the table sink. - var lastPos engine.Position + var lastPos engine.Position = lowerBound.Prev() // To advance table sink in different cases: splitTxn is true or not. committedTxnSize := uint64(0) // Can be used in `advanceTableSink`. @@ -130,19 +130,95 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // is false, and it won't break transaction atomicity to downstreams. batchID := uint64(1) - if w.eventCache != nil { - drained, err := w.fetchFromCache(task, &lowerBound, &upperBound, &batchID) - if err != nil { - return errors.Trace(err) + allEventSize := uint64(0) + allEventCount := 0 + + callbackIsPerformed := false + performCallback := func(pos engine.Position) { + if !callbackIsPerformed { + task.callback(pos) + callbackIsPerformed = true } - if drained { + } + + defer func() { + w.metricRedoEventCacheMiss.Add(float64(allEventSize)) + metrics.OutputEventCount.WithLabelValues( + task.tableSink.changefeed.Namespace, + task.tableSink.changefeed.ID, + "kv", + ).Add(float64(allEventCount)) + + if w.eventCache == nil { + eventCount := newRangeEventCount(lastPos, allEventCount) + task.tableSink.updateRangeEventCounts(eventCount) + } + + log.Debug("Sink task finished", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Int64("tableID", task.tableID), + zap.Any("lowerBound", lowerBound), + zap.Any("upperBound", upperBound), + zap.Bool("splitTxn", w.splitTxn), + zap.Int("receivedEvents", allEventCount), + zap.Any("lastPos", lastPos), + zap.Float64("lag", time.Since(oracle.GetTimeFromTS(lastPos.CommitTs)).Seconds()), + zap.Error(finalErr)) + + if finalErr == nil { + // Otherwise we can't ensure all events before `lastPos` are emitted. + performCallback(lastPos) + } else { + switch errors.Cause(finalErr).(type) { + // If it's a warning, close the table sink and wait all pending + // events have been reported. Then we can continue the table + // at the checkpoint position. + case tablesink.SinkInternalError: + task.tableSink.closeAndClearTableSink() + // After the table sink is cleared all pending events are sent out or dropped. + // So we can re-add the table into sinkMemQuota. + w.sinkMemQuota.ClearTable(task.tableSink.tableID) + + // Restart the table sink based on the checkpoint position. + if err := task.tableSink.restart(ctx); err == nil { + checkpointTs, _, _ := task.tableSink.getCheckpointTs() + ckpt := checkpointTs.ResolvedMark() + lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} + performCallback(lastWrittenPos) + log.Info("table sink has been restarted", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Int64("tableID", task.tableID), + zap.Any("lastWrittenPos", lastWrittenPos), + zap.String("sinkError", finalErr.Error())) + finalErr = err + } + default: + } + } + + // The task is finished and some required memory isn't used. + if availableMem > usedMem { w.sinkMemQuota.Refund(availableMem - usedMem) log.Debug("MemoryQuotaTracing: refund memory for table sink task", zap.String("namespace", w.changefeedID.Namespace), zap.String("changefeed", w.changefeedID.ID), zap.Int64("tableID", task.tableID), zap.Uint64("memory", availableMem-usedMem)) - task.callback(lowerBound.Prev()) + } + }() + + if w.eventCache != nil { + drained, err := w.fetchFromCache(task, &lowerBound, &upperBound, &batchID) + failpoint.Inject("TableSinkWorkerFetchFromCache", func() { + err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected")) + }) + if err != nil { + return errors.Trace(err) + } + if drained { + performCallback(lowerBound.Prev()) return nil } } @@ -265,22 +341,8 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e } // lowerBound and upperBound are both closed intervals. - allEventSize := uint64(0) - allEventCount := 0 iter := w.sourceManager.FetchByTable(task.tableID, lowerBound, upperBound, w.sinkMemQuota) defer func() { - w.metricRedoEventCacheMiss.Add(float64(allEventSize)) - metrics.OutputEventCount.WithLabelValues( - task.tableSink.changefeed.Namespace, - task.tableSink.changefeed.ID, - "kv", - ).Add(float64(allEventCount)) - - if w.eventCache == nil { - eventCount := newRangeEventCount(lastPos, allEventCount) - task.tableSink.updateRangeEventCounts(eventCount) - } - if err := iter.Close(); err != nil { log.Error("Sink worker fails to close iterator", zap.String("namespace", w.changefeedID.Namespace), @@ -288,58 +350,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e zap.Int64("tableID", task.tableID), zap.Error(err)) } - - log.Debug("Sink task finished", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Int64("tableID", task.tableID), - zap.Any("lowerBound", lowerBound), - zap.Any("upperBound", upperBound), - zap.Bool("splitTxn", w.splitTxn), - zap.Int("receivedEvents", allEventCount), - zap.Any("lastPos", lastPos), - zap.Float64("lag", time.Since(oracle.GetTimeFromTS(lastPos.CommitTs)).Seconds()), - zap.Error(finalErr)) - - if finalErr == nil { - // Otherwise we can't ensure all events before `lastPos` are emitted. - task.callback(lastPos) - } else { - switch errors.Cause(finalErr).(type) { - // If it's a warning, close the table sink and wait all pending - // events have been reported. Then we can continue the table - // at the checkpoint position. - case tablesink.SinkInternalError: - task.tableSink.closeAndClearTableSink() - // After the table sink is cleared all pending events are sent out or dropped. - // So we can re-add the table into sinkMemQuota. - w.sinkMemQuota.ClearTable(task.tableSink.tableID) - - // Restart the table sink based on the checkpoint position. - if finalErr = task.tableSink.restart(ctx); finalErr == nil { - checkpointTs, _, _ := task.tableSink.getCheckpointTs() - ckpt := checkpointTs.ResolvedMark() - lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} - task.callback(lastWrittenPos) - log.Info("table sink has been restarted", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Int64("tableID", task.tableID), - zap.Any("lastWrittenPos", lastWrittenPos)) - } - default: - } - } - - // The task is finished and some required memory isn't used. - if availableMem > usedMem { - w.sinkMemQuota.Refund(availableMem - usedMem) - log.Debug("MemoryQuotaTracing: refund memory for table sink task", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Int64("tableID", task.tableID), - zap.Uint64("memory", availableMem-usedMem)) - } }() for availableMem > usedMem && !task.isCanceled() { diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 6dd88bf9965..964c336b746 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/memquota" @@ -59,6 +60,30 @@ func addEventsToSortEngine(t *testing.T, events []*model.PolymorphicEvent, sortE } } +func genPolymorphicResolvedEvent(resolvedTs uint64) *model.PolymorphicEvent { + return &model.PolymorphicEvent{ + CRTs: resolvedTs, + RawKV: &model.RawKVEntry{ + OpType: model.OpTypeResolved, + CRTs: resolvedTs, + }, + } +} + +//nolint:all +func genPolymorphicEvent(startTs, commitTs uint64, tableID model.TableID) *model.PolymorphicEvent { + return &model.PolymorphicEvent{ + StartTs: startTs, + CRTs: commitTs, + RawKV: &model.RawKVEntry{ + OpType: model.OpTypePut, + StartTs: startTs, + CRTs: commitTs, + }, + Row: genRowChangedEvent(startTs, commitTs, tableID), + } +} + // It is ok to use the same tableID in test. // //nolint:unparam @@ -1097,3 +1122,112 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNoWorkload( func TestWorkerSuite(t *testing.T) { suite.Run(t, new(workerSuite)) } + +func (suite *workerSuite) TestFetchFromCacheWithFailure() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, 1), + genPolymorphicEvent(1, 3, 1), + genPolymorphicEvent(1, 3, 1), + genPolymorphicResolvedEvent(4), + } + // Only for three events. + w, e := createWorker(model.ChangeFeedID{}, 1024*1024, true, 1) + w.eventCache = newRedoEventCache(model.ChangeFeedID{}, 1024*1024) + defer w.sinkMemQuota.Close() + addEventsToSortEngine(suite.T(), events, e, 1) + + _ = failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache", "return") + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache") + }() + + taskChan := make(chan *sinkTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(model.ChangeFeedID{}, 1) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + close(chShouldBeClosed) + } + taskChan <- &sinkTask{ + tableID: 1, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return false }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +} + +// When starts to handle a task, advancer.lastPos should be set to a correct position. +// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an +// invalid `advancer.lastPos`. +func (suite *workerSuite) TestHandleTaskWithoutMemory() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, 1), + genPolymorphicResolvedEvent(4), + } + w, e := createWorker(model.ChangeFeedID{}, 0, true, 1) + defer w.sinkMemQuota.Close() + addEventsToSortEngine(suite.T(), events, e, 1) + + taskChan := make(chan *sinkTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(model.ChangeFeedID{}, 1) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos) + close(chShouldBeClosed) + } + taskChan <- &sinkTask{ + tableID: 1, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return true }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +} + +func genLowerBound() engine.Position { + return engine.Position{ + StartTs: 0, + CommitTs: 1, + } +} + +func genUpperBoundGetter(commitTs model.Ts) func(_ model.Ts) engine.Position { + return func(_ model.Ts) engine.Position { + return engine.Position{ + StartTs: commitTs - 1, + CommitTs: commitTs, + } + } +}