From 71be9652ac0f60ad558b3d20313a3eef1bb444ad Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 24 Nov 2023 16:01:18 +0800 Subject: [PATCH] sink(cdc): always handle sink failures for cases with sync-point enabled (#10132) (#10140) Signed-off-by: ti-chi-bot Signed-off-by: qupeng Co-authored-by: qupeng --- cdc/processor/sinkmanager/manager.go | 45 +++++++++++++++++-- cdc/processor/sinkmanager/manager_test.go | 41 +++++++++++++++++ .../sinkmanager/table_sink_worker.go | 19 ++------ .../sinkmanager/table_sink_wrapper.go | 14 ++++++ .../blackhole/black_hole_dml_sink.go | 18 +++++--- cdc/sinkv2/tablesink/table_sink.go | 2 + cdc/sinkv2/tablesink/table_sink_impl.go | 8 ++++ 7 files changed, 120 insertions(+), 27 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 7de5e88f2e8..4e308b2775b 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -540,13 +540,50 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { slowestTableProgress := progs[i] lowerBound := slowestTableProgress.nextLowerBoundPos upperBound := m.getUpperBound(tableSink.getUpperBoundTs()) - // The table has no available progress. - if lowerBound.Compare(upperBound) >= 0 { + + if !tableSink.initTableSink() { + // The table hasn't been attached to a sink. m.sinkProgressHeap.push(slowestTableProgress) continue } - // The table hasn't been attached to a sink. - if !tableSink.initTableSink() { + + if sinkErr := tableSink.checkTableSinkHealth(); sinkErr != nil { + switch errors.Cause(sinkErr).(type) { + case tablesink.SinkInternalError: + tableSink.closeAndClearTableSink() + if restartErr := tableSink.restart(ctx); restartErr == nil { + // Restart the table sink based on the checkpoint position. + ckpt := tableSink.getCheckpointTs().ResolvedMark() + lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} + p := &progress{ + tableID: tableSink.tableID, + nextLowerBoundPos: lastWrittenPos.Next(), + version: slowestTableProgress.version, + } + m.sinkProgressHeap.push(p) + log.Info("table sink has been restarted", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Int64("tableID", tableSink.tableID), + zap.Any("lastWrittenPos", lastWrittenPos), + zap.String("sinkError", sinkErr.Error())) + } else { + m.sinkProgressHeap.push(slowestTableProgress) + log.Warn("table sink restart fail", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Int64("tableID", tableSink.tableID), + zap.String("sinkError", sinkErr.Error()), + zap.Error(restartErr)) + } + default: + return sinkErr + } + continue + } + + // The table has no available progress. + if lowerBound.Compare(upperBound) >= 0 { m.sinkProgressHeap.push(slowestTableProgress) continue } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index d33fcb5aeae..7c66cb17f99 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -380,3 +380,44 @@ func TestSinkManagerNeedsStuckCheck(t *testing.T) { require.False(t, manager.needsStuckCheck()) } + +func TestSinkManagerRestartTableSinks(t *testing.T) { + failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return") + defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause") + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 16) + changefeedInfo := getChangefeedInfo() + manager, _ := createManagerWithMemEngine(t, ctx, model.ChangeFeedID{}, changefeedInfo, errCh) + defer func() { + cancel() + manager.Close() + }() + + manager.AddTable(1, 1, 100) + require.Nil(t, manager.StartTable(1, 2)) + table, exists := manager.tableSinks.Load(model.TableID(1)) + require.True(t, exists) + + table.(*tableSinkWrapper).updateReceivedSorterResolvedTs(4) + table.(*tableSinkWrapper).updateBarrierTs(4) + select { + case task := <-manager.sinkTaskChan: + require.Equal(t, engine.Position{StartTs: 0, CommitTs: 3}, task.lowerBound) + task.callback(engine.Position{StartTs: 3, CommitTs: 4}) + case <-time.After(2 * time.Second): + panic("should always get a sink task") + } + + // With the failpoint blackhole/WriteEventsFail enabled, sink manager should restarts + // the table sink at its checkpoint. + failpoint.Enable("github.com/pingcap/tiflow/cdc/sinkv2/eventsink/blackhole/WriteEventsFail", "1*return") + defer failpoint.Disable("github.com/pingcap/tiflow/cdc/sinkv2/eventsink/blackhole/WriteEventsFail") + select { + case task := <-manager.sinkTaskChan: + require.Equal(t, engine.Position{StartTs: 2, CommitTs: 2}, task.lowerBound) + task.callback(engine.Position{StartTs: 3, CommitTs: 4}) + case <-time.After(2 * time.Second): + panic("should always get a sink task") + } +} diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 0ed6d47c930..4a55b92983f 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -67,6 +67,7 @@ func newSinkWorker( } func (w *sinkWorker) handleTasks(ctx context.Context, taskChan <-chan *sinkTask) error { + failpoint.Inject("SinkWorkerTaskHandlePause", func() { <-ctx.Done() }) for { select { case <-ctx.Done(): @@ -170,25 +171,11 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // events have been reported. Then we can continue the table // at the checkpoint position. case tablesink.SinkInternalError: - task.tableSink.closeAndClearTableSink() // After the table sink is cleared all pending events are sent out or dropped. // So we can re-add the table into sinkMemQuota. w.sinkMemQuota.ClearTable(task.tableSink.tableID) - - // Restart the table sink based on the checkpoint position. - if err := task.tableSink.restart(ctx); err == nil { - checkpointTs := task.tableSink.getCheckpointTs() - ckpt := checkpointTs.ResolvedMark() - lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} - performCallback(lastWrittenPos) - log.Info("table sink has been restarted", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Int64("tableID", task.tableID), - zap.Any("lastWrittenPos", lastWrittenPos), - zap.String("sinkError", finalErr.Error())) - finalErr = err - } + performCallback(lastPos) + finalErr = nil default: } } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 7961957906f..c7119117bd0 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -165,6 +165,11 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err break } } + if model.NewResolvedTs(startTs).Greater(t.tableSink.checkpointTs) { + t.tableSink.checkpointTs = model.NewResolvedTs(startTs) + t.tableSink.resolvedTs = model.NewResolvedTs(startTs) + t.tableSink.advanced = time.Now() + } t.state.Store(tablepb.TableStateReplicating) return nil } @@ -362,6 +367,15 @@ func (t *tableSinkWrapper) doTableSinkClear() { t.tableSink.version = 0 } +func (t *tableSinkWrapper) checkTableSinkHealth() (err error) { + t.tableSink.RLock() + defer t.tableSink.RUnlock() + if t.tableSink.s != nil { + err = t.tableSink.s.CheckHealth() + } + return +} + // When the attached sink fail, there can be some events that have already been // committed at downstream but we don't know. So we need to update `replicateTs` // of the table so that we can re-send those events later. diff --git a/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go b/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go index 740e1ac7cef..d324ceb5c86 100644 --- a/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go @@ -14,6 +14,8 @@ package blackhole import ( + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" @@ -33,14 +35,16 @@ func New() *Sink { } // WriteEvents log the events. -func (s *Sink) WriteEvents(rows ...*eventsink.CallbackableEvent[*model.RowChangedEvent]) error { - for _, row := range rows { - // NOTE: don't change the log, some tests depend on it. - log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event)) - row.Callback() +func (s *Sink) WriteEvents(rows ...*eventsink.CallbackableEvent[*model.RowChangedEvent]) (err error) { + failpoint.Inject("WriteEventsFail", func() { err = errors.New("InjectedErrorForWriteEventsFail") }) + if err == nil { + for _, row := range rows { + // NOTE: don't change the log, some tests depend on it. + log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event)) + row.Callback() + } } - - return nil + return } // Scheme returns the sink scheme. diff --git a/cdc/sinkv2/tablesink/table_sink.go b/cdc/sinkv2/tablesink/table_sink.go index 588b30aabf9..69f6f61db11 100644 --- a/cdc/sinkv2/tablesink/table_sink.go +++ b/cdc/sinkv2/tablesink/table_sink.go @@ -37,6 +37,8 @@ type TableSink interface { Close() // AsyncClose closes the table sink asynchronously. Returns true if it's closed. AsyncClose() bool + // CheckHealth checks whether the associated sink backend is healthy or not. + CheckHealth() error } // SinkInternalError means the error comes from sink internal. diff --git a/cdc/sinkv2/tablesink/table_sink_impl.go b/cdc/sinkv2/tablesink/table_sink_impl.go index 13f9afc3a50..236227ba6e9 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl.go +++ b/cdc/sinkv2/tablesink/table_sink_impl.go @@ -158,6 +158,14 @@ func (e *EventTableSink[E]) AsyncClose() bool { return false } +// CheckHealth checks whether the associated sink backend is healthy or not. +func (e *EventTableSink[E]) CheckHealth() error { + if err := e.backendSink.WriteEvents(); err != nil { + return SinkInternalError{err} + } + return nil +} + func (e *EventTableSink[E]) freeze() { // Notice: We have to set the state to stopping first, // otherwise the progressTracker may be advanced incorrectly.