From f5c62d899dd87c88c4dda9ea4d7829db7080751c Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 23 Nov 2023 16:11:42 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #10132 Signed-off-by: ti-chi-bot --- cdc/processor/sinkmanager/manager.go | 45 +++++++++++++++++-- cdc/processor/sinkmanager/manager_test.go | 42 +++++++++++++++++ .../sinkmanager/table_sink_worker.go | 8 +++- .../sinkmanager/table_sink_wrapper.go | 14 ++++++ .../blackhole/black_hole_dml_sink.go | 16 ++++++- cdc/sinkv2/tablesink/table_sink.go | 2 + cdc/sinkv2/tablesink/table_sink_impl.go | 12 +++++ 7 files changed, 132 insertions(+), 7 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 7de5e88f2e8..90015087cc3 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -540,13 +540,50 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { slowestTableProgress := progs[i] lowerBound := slowestTableProgress.nextLowerBoundPos upperBound := m.getUpperBound(tableSink.getUpperBoundTs()) - // The table has no available progress. - if lowerBound.Compare(upperBound) >= 0 { + + if !tableSink.initTableSink() { + // The table hasn't been attached to a sink. m.sinkProgressHeap.push(slowestTableProgress) continue } - // The table hasn't been attached to a sink. - if !tableSink.initTableSink() { + + if sinkErr := tableSink.checkTableSinkHealth(); sinkErr != nil { + switch errors.Cause(sinkErr).(type) { + case tablesink.SinkInternalError: + tableSink.closeAndClearTableSink() + if restartErr := tableSink.restart(ctx); restartErr == nil { + // Restart the table sink based on the checkpoint position. + ckpt := tableSink.getCheckpointTs().ResolvedMark() + lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} + p := &progress{ + span: tableSink.span, + nextLowerBoundPos: lastWrittenPos.Next(), + version: slowestTableProgress.version, + } + m.sinkProgressHeap.push(p) + log.Info("table sink has been restarted", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("span", &tableSink.span), + zap.Any("lastWrittenPos", lastWrittenPos), + zap.String("sinkError", sinkErr.Error())) + } else { + m.sinkProgressHeap.push(slowestTableProgress) + log.Warn("table sink restart fail", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("span", &tableSink.span), + zap.String("sinkError", sinkErr.Error()), + zap.Error(restartErr)) + } + default: + return sinkErr + } + continue + } + + // The table has no available progress. + if lowerBound.Compare(upperBound) >= 0 { m.sinkProgressHeap.push(slowestTableProgress) continue } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index d33fcb5aeae..740d6f3442c 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -380,3 +380,45 @@ func TestSinkManagerNeedsStuckCheck(t *testing.T) { require.False(t, manager.needsStuckCheck()) } + +func TestSinkManagerRestartTableSinks(t *testing.T) { + failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return") + defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause") + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 16) + changefeedInfo := getChangefeedInfo() + manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.ChangeFeedID{}, changefeedInfo, errCh) + defer func() { + cancel() + manager.Close() + }() + + span := tablepb.Span{TableID: 1} + manager.AddTable(span, 1, 100) + require.Nil(t, manager.StartTable(span, 2)) + table, exists := manager.tableSinks.Load(span) + require.True(t, exists) + + table.(*tableSinkWrapper).updateReceivedSorterResolvedTs(4) + table.(*tableSinkWrapper).updateBarrierTs(4) + select { + case task := <-manager.sinkTaskChan: + require.Equal(t, engine.Position{StartTs: 0, CommitTs: 3}, task.lowerBound) + task.callback(engine.Position{StartTs: 3, CommitTs: 4}) + case <-time.After(2 * time.Second): + panic("should always get a sink task") + } + + // With the failpoint blackhole/WriteEventsFail enabled, sink manager should restarts + // the table sink at its checkpoint. + failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail", "1*return") + defer failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail") + select { + case task := <-manager.sinkTaskChan: + require.Equal(t, engine.Position{StartTs: 2, CommitTs: 2}, task.lowerBound) + task.callback(engine.Position{StartTs: 3, CommitTs: 4}) + case <-time.After(2 * time.Second): + panic("should always get a sink task") + } +} diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 0ed6d47c930..a9c0d905b19 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -67,6 +67,7 @@ func newSinkWorker( } func (w *sinkWorker) handleTasks(ctx context.Context, taskChan <-chan *sinkTask) error { + failpoint.Inject("SinkWorkerTaskHandlePause", func() { <-ctx.Done() }) for { select { case <-ctx.Done(): @@ -170,9 +171,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // events have been reported. Then we can continue the table // at the checkpoint position. case tablesink.SinkInternalError: - task.tableSink.closeAndClearTableSink() // After the table sink is cleared all pending events are sent out or dropped. // So we can re-add the table into sinkMemQuota. +<<<<<<< HEAD w.sinkMemQuota.ClearTable(task.tableSink.tableID) // Restart the table sink based on the checkpoint position. @@ -189,6 +190,11 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e zap.String("sinkError", finalErr.Error())) finalErr = err } +======= + w.sinkMemQuota.ClearTable(task.tableSink.span) + performCallback(advancer.lastPos) + finalErr = nil +>>>>>>> f35b76a1fe (sink(cdc): always handle sink failures for cases with sync-point enabled (#10132)) default: } } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 7961957906f..c7119117bd0 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -165,6 +165,11 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err break } } + if model.NewResolvedTs(startTs).Greater(t.tableSink.checkpointTs) { + t.tableSink.checkpointTs = model.NewResolvedTs(startTs) + t.tableSink.resolvedTs = model.NewResolvedTs(startTs) + t.tableSink.advanced = time.Now() + } t.state.Store(tablepb.TableStateReplicating) return nil } @@ -362,6 +367,15 @@ func (t *tableSinkWrapper) doTableSinkClear() { t.tableSink.version = 0 } +func (t *tableSinkWrapper) checkTableSinkHealth() (err error) { + t.tableSink.RLock() + defer t.tableSink.RUnlock() + if t.tableSink.s != nil { + err = t.tableSink.s.CheckHealth() + } + return +} + // When the attached sink fail, there can be some events that have already been // committed at downstream but we don't know. So we need to update `replicateTs` // of the table so that we can re-send those events later. diff --git a/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go b/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go index 740e1ac7cef..82f4228a980 100644 --- a/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go @@ -14,6 +14,8 @@ package blackhole import ( + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" @@ -33,14 +35,24 @@ func New() *Sink { } // WriteEvents log the events. +<<<<<<< HEAD:cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go func (s *Sink) WriteEvents(rows ...*eventsink.CallbackableEvent[*model.RowChangedEvent]) error { for _, row := range rows { // NOTE: don't change the log, some tests depend on it. log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event)) row.Callback() +======= +func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChangedEvent]) (err error) { + failpoint.Inject("WriteEventsFail", func() { err = errors.New("InjectedErrorForWriteEventsFail") }) + if err == nil { + for _, row := range rows { + // NOTE: don't change the log, some tests depend on it. + log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event)) + row.Callback() + } +>>>>>>> f35b76a1fe (sink(cdc): always handle sink failures for cases with sync-point enabled (#10132)):cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go } - - return nil + return } // Scheme returns the sink scheme. diff --git a/cdc/sinkv2/tablesink/table_sink.go b/cdc/sinkv2/tablesink/table_sink.go index 588b30aabf9..69f6f61db11 100644 --- a/cdc/sinkv2/tablesink/table_sink.go +++ b/cdc/sinkv2/tablesink/table_sink.go @@ -37,6 +37,8 @@ type TableSink interface { Close() // AsyncClose closes the table sink asynchronously. Returns true if it's closed. AsyncClose() bool + // CheckHealth checks whether the associated sink backend is healthy or not. + CheckHealth() error } // SinkInternalError means the error comes from sink internal. diff --git a/cdc/sinkv2/tablesink/table_sink_impl.go b/cdc/sinkv2/tablesink/table_sink_impl.go index 13f9afc3a50..dfecd80231a 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl.go +++ b/cdc/sinkv2/tablesink/table_sink_impl.go @@ -158,7 +158,19 @@ func (e *EventTableSink[E]) AsyncClose() bool { return false } +<<<<<<< HEAD:cdc/sinkv2/tablesink/table_sink_impl.go func (e *EventTableSink[E]) freeze() { +======= +// CheckHealth checks whether the associated sink backend is healthy or not. +func (e *EventTableSink[E, P]) CheckHealth() error { + if err := e.backendSink.WriteEvents(); err != nil { + return SinkInternalError{err} + } + return nil +} + +func (e *EventTableSink[E, P]) freeze() { +>>>>>>> f35b76a1fe (sink(cdc): always handle sink failures for cases with sync-point enabled (#10132)):cdc/sink/tablesink/table_sink_impl.go // Notice: We have to set the state to stopping first, // otherwise the progressTracker may be advanced incorrectly. // For example, if we do not freeze it and set the state to stooping From ab027f5ecc0b1ff7f7d3cddab800dc377d760461 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 24 Nov 2023 13:43:05 +0800 Subject: [PATCH 2/2] fix conflicts Signed-off-by: qupeng --- cdc/processor/sinkmanager/manager.go | 6 +++--- cdc/processor/sinkmanager/manager_test.go | 13 ++++++------ .../sinkmanager/table_sink_worker.go | 21 +------------------ .../blackhole/black_hole_dml_sink.go | 10 +-------- cdc/sinkv2/tablesink/table_sink_impl.go | 8 ++----- 5 files changed, 13 insertions(+), 45 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 90015087cc3..4e308b2775b 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -556,7 +556,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { ckpt := tableSink.getCheckpointTs().ResolvedMark() lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} p := &progress{ - span: tableSink.span, + tableID: tableSink.tableID, nextLowerBoundPos: lastWrittenPos.Next(), version: slowestTableProgress.version, } @@ -564,7 +564,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { log.Info("table sink has been restarted", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), - zap.Stringer("span", &tableSink.span), + zap.Int64("tableID", tableSink.tableID), zap.Any("lastWrittenPos", lastWrittenPos), zap.String("sinkError", sinkErr.Error())) } else { @@ -572,7 +572,7 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { log.Warn("table sink restart fail", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), - zap.Stringer("span", &tableSink.span), + zap.Int64("tableID", tableSink.tableID), zap.String("sinkError", sinkErr.Error()), zap.Error(restartErr)) } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 740d6f3442c..7c66cb17f99 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -388,16 +388,15 @@ func TestSinkManagerRestartTableSinks(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error, 16) changefeedInfo := getChangefeedInfo() - manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.ChangeFeedID{}, changefeedInfo, errCh) + manager, _ := createManagerWithMemEngine(t, ctx, model.ChangeFeedID{}, changefeedInfo, errCh) defer func() { cancel() manager.Close() }() - span := tablepb.Span{TableID: 1} - manager.AddTable(span, 1, 100) - require.Nil(t, manager.StartTable(span, 2)) - table, exists := manager.tableSinks.Load(span) + manager.AddTable(1, 1, 100) + require.Nil(t, manager.StartTable(1, 2)) + table, exists := manager.tableSinks.Load(model.TableID(1)) require.True(t, exists) table.(*tableSinkWrapper).updateReceivedSorterResolvedTs(4) @@ -412,8 +411,8 @@ func TestSinkManagerRestartTableSinks(t *testing.T) { // With the failpoint blackhole/WriteEventsFail enabled, sink manager should restarts // the table sink at its checkpoint. - failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail", "1*return") - defer failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail") + failpoint.Enable("github.com/pingcap/tiflow/cdc/sinkv2/eventsink/blackhole/WriteEventsFail", "1*return") + defer failpoint.Disable("github.com/pingcap/tiflow/cdc/sinkv2/eventsink/blackhole/WriteEventsFail") select { case task := <-manager.sinkTaskChan: require.Equal(t, engine.Position{StartTs: 2, CommitTs: 2}, task.lowerBound) diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index a9c0d905b19..4a55b92983f 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -173,28 +173,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e case tablesink.SinkInternalError: // After the table sink is cleared all pending events are sent out or dropped. // So we can re-add the table into sinkMemQuota. -<<<<<<< HEAD w.sinkMemQuota.ClearTable(task.tableSink.tableID) - - // Restart the table sink based on the checkpoint position. - if err := task.tableSink.restart(ctx); err == nil { - checkpointTs := task.tableSink.getCheckpointTs() - ckpt := checkpointTs.ResolvedMark() - lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} - performCallback(lastWrittenPos) - log.Info("table sink has been restarted", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Int64("tableID", task.tableID), - zap.Any("lastWrittenPos", lastWrittenPos), - zap.String("sinkError", finalErr.Error())) - finalErr = err - } -======= - w.sinkMemQuota.ClearTable(task.tableSink.span) - performCallback(advancer.lastPos) + performCallback(lastPos) finalErr = nil ->>>>>>> f35b76a1fe (sink(cdc): always handle sink failures for cases with sync-point enabled (#10132)) default: } } diff --git a/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go b/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go index 82f4228a980..d324ceb5c86 100644 --- a/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go @@ -35,14 +35,7 @@ func New() *Sink { } // WriteEvents log the events. -<<<<<<< HEAD:cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go -func (s *Sink) WriteEvents(rows ...*eventsink.CallbackableEvent[*model.RowChangedEvent]) error { - for _, row := range rows { - // NOTE: don't change the log, some tests depend on it. - log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event)) - row.Callback() -======= -func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChangedEvent]) (err error) { +func (s *Sink) WriteEvents(rows ...*eventsink.CallbackableEvent[*model.RowChangedEvent]) (err error) { failpoint.Inject("WriteEventsFail", func() { err = errors.New("InjectedErrorForWriteEventsFail") }) if err == nil { for _, row := range rows { @@ -50,7 +43,6 @@ func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChang log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event)) row.Callback() } ->>>>>>> f35b76a1fe (sink(cdc): always handle sink failures for cases with sync-point enabled (#10132)):cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go } return } diff --git a/cdc/sinkv2/tablesink/table_sink_impl.go b/cdc/sinkv2/tablesink/table_sink_impl.go index dfecd80231a..236227ba6e9 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl.go +++ b/cdc/sinkv2/tablesink/table_sink_impl.go @@ -158,19 +158,15 @@ func (e *EventTableSink[E]) AsyncClose() bool { return false } -<<<<<<< HEAD:cdc/sinkv2/tablesink/table_sink_impl.go -func (e *EventTableSink[E]) freeze() { -======= // CheckHealth checks whether the associated sink backend is healthy or not. -func (e *EventTableSink[E, P]) CheckHealth() error { +func (e *EventTableSink[E]) CheckHealth() error { if err := e.backendSink.WriteEvents(); err != nil { return SinkInternalError{err} } return nil } -func (e *EventTableSink[E, P]) freeze() { ->>>>>>> f35b76a1fe (sink(cdc): always handle sink failures for cases with sync-point enabled (#10132)):cdc/sink/tablesink/table_sink_impl.go +func (e *EventTableSink[E]) freeze() { // Notice: We have to set the state to stopping first, // otherwise the progressTracker may be advanced incorrectly. // For example, if we do not freeze it and set the state to stooping