diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index b70e17fc3ab..8d23ee5b47e 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -372,3 +372,47 @@ func TestSinkManagerNeedsStuckCheck(t *testing.T) { require.False(t, manager.needsStuckCheck()) } + +func TestSinkManagerRestartTableSinks(t *testing.T) { + t.Parallel() + + failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return") + defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause") + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 16) + changefeedInfo := getChangefeedInfo() + manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.ChangeFeedID{}, changefeedInfo, errCh) + defer func() { + cancel() + manager.Close() + }() + + span := tablepb.Span{TableID: 1} + manager.AddTable(span, 1, 100) + require.Nil(t, manager.StartTable(span, 2)) + table, exists := manager.tableSinks.Load(span) + require.True(t, exists) + + table.(*tableSinkWrapper).updateReceivedSorterResolvedTs(4) + table.(*tableSinkWrapper).updateBarrierTs(4) + select { + case task := <-manager.sinkTaskChan: + require.Equal(t, engine.Position{StartTs: 0, CommitTs: 3}, task.lowerBound) + task.callback(engine.Position{StartTs: 3, CommitTs: 4}) + case <-time.After(2 * time.Second): + panic("should always get a sink task") + } + + // With the failpoint blackhole/WriteEventsFail enabled, sink manager should restarts + // the table sink at its checkpoint. + failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail", "1*return") + defer failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail") + select { + case task := <-manager.sinkTaskChan: + require.Equal(t, engine.Position{StartTs: 2, CommitTs: 2}, task.lowerBound) + task.callback(engine.Position{StartTs: 3, CommitTs: 4}) + case <-time.After(2 * time.Second): + panic("should always get a sink task") + } +} diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 980a31db7b3..42c493f3918 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -96,6 +96,7 @@ func newSinkWorker( } func (w *sinkWorker) handleTasks(ctx context.Context, taskChan <-chan *sinkTask) error { + failpoint.Inject("SinkWorkerTaskHandlePause", func() { <-ctx.Done() }) for { select { case <-ctx.Done(): diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 68c7463a76c..45ce8c55e19 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -166,6 +166,11 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err break } } + if model.NewResolvedTs(startTs).Greater(t.tableSink.checkpointTs) { + t.tableSink.checkpointTs = model.NewResolvedTs(startTs) + t.tableSink.resolvedTs = model.NewResolvedTs(startTs) + t.tableSink.advanced = time.Now() + } t.state.Store(tablepb.TableStateReplicating) return nil } diff --git a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go index 6cc1d1f35dd..be470b6d357 100644 --- a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go @@ -14,6 +14,8 @@ package blackhole import ( + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" @@ -33,14 +35,16 @@ func NewDMLSink() *DMLSink { } // WriteEvents log the events. -func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChangedEvent]) error { - for _, row := range rows { - // NOTE: don't change the log, some tests depend on it. - log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event)) - row.Callback() +func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChangedEvent]) (err error) { + failpoint.Inject("WriteEventsFail", func() { err = errors.New("InjectedErrorForWriteEventsFail") }) + if err == nil { + for _, row := range rows { + // NOTE: don't change the log, some tests depend on it. + log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event)) + row.Callback() + } } - - return nil + return } // Scheme return the scheme of the sink.