Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(cdc): always handle sink failures for cases with sync-point enabled (#10132) #10141

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,13 +530,50 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
slowestTableProgress := progs[i]
lowerBound := slowestTableProgress.nextLowerBoundPos
upperBound := m.getUpperBound(tableSink.getUpperBoundTs())
// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {

if !tableSink.initTableSink() {
// The table hasn't been attached to a sink.
m.sinkProgressHeap.push(slowestTableProgress)
continue
}
// The table hasn't been attached to a sink.
if !tableSink.initTableSink() {

if sinkErr := tableSink.checkTableSinkHealth(); sinkErr != nil {
switch errors.Cause(sinkErr).(type) {
case tablesink.SinkInternalError:
tableSink.closeAndClearTableSink()
if restartErr := tableSink.restart(ctx); restartErr == nil {
// Restart the table sink based on the checkpoint position.
ckpt := tableSink.getCheckpointTs().ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
p := &progress{
span: tableSink.span,
nextLowerBoundPos: lastWrittenPos.Next(),
version: slowestTableProgress.version,
}
m.sinkProgressHeap.push(p)
log.Info("table sink has been restarted",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &tableSink.span),
zap.Any("lastWrittenPos", lastWrittenPos),
zap.String("sinkError", sinkErr.Error()))
} else {
m.sinkProgressHeap.push(slowestTableProgress)
log.Warn("table sink restart fail",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &tableSink.span),
zap.String("sinkError", sinkErr.Error()),
zap.Error(restartErr))
}
default:
return sinkErr
}
continue
}

// The table has no available progress.
if lowerBound.Compare(upperBound) >= 0 {
m.sinkProgressHeap.push(slowestTableProgress)
continue
}
Expand Down
42 changes: 42 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,45 @@ func TestSinkManagerNeedsStuckCheck(t *testing.T) {

require.False(t, manager.needsStuckCheck())
}

func TestSinkManagerRestartTableSinks(t *testing.T) {
failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return")
defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause")

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.ChangeFeedID{}, changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

span := tablepb.Span{TableID: 1}
manager.AddTable(span, 1, 100)
require.Nil(t, manager.StartTable(span, 2))
table, exists := manager.tableSinks.Load(span)
require.True(t, exists)

table.(*tableSinkWrapper).updateReceivedSorterResolvedTs(4)
table.(*tableSinkWrapper).updateBarrierTs(4)
select {
case task := <-manager.sinkTaskChan:
require.Equal(t, engine.Position{StartTs: 0, CommitTs: 3}, task.lowerBound)
task.callback(engine.Position{StartTs: 3, CommitTs: 4})
case <-time.After(2 * time.Second):
panic("should always get a sink task")
}

// With the failpoint blackhole/WriteEventsFail enabled, sink manager should restarts
// the table sink at its checkpoint.
failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail", "1*return")
defer failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/dmlsink/blackhole/WriteEventsFail")
select {
case task := <-manager.sinkTaskChan:
require.Equal(t, engine.Position{StartTs: 2, CommitTs: 2}, task.lowerBound)
task.callback(engine.Position{StartTs: 3, CommitTs: 4})
case <-time.After(2 * time.Second):
panic("should always get a sink task")
}
}
19 changes: 3 additions & 16 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func newSinkWorker(
}

func (w *sinkWorker) handleTasks(ctx context.Context, taskChan <-chan *sinkTask) error {
failpoint.Inject("SinkWorkerTaskHandlePause", func() { <-ctx.Done() })
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -169,25 +170,11 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// events have been reported. Then we can continue the table
// at the checkpoint position.
case tablesink.SinkInternalError:
task.tableSink.closeAndClearTableSink()
// After the table sink is cleared all pending events are sent out or dropped.
// So we can re-add the table into sinkMemQuota.
w.sinkMemQuota.ClearTable(task.tableSink.span)

// Restart the table sink based on the checkpoint position.
if err := task.tableSink.restart(ctx); err == nil {
checkpointTs := task.tableSink.getCheckpointTs()
ckpt := checkpointTs.ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
performCallback(lastWrittenPos)
log.Info("table sink has been restarted",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Any("lastWrittenPos", lastWrittenPos),
zap.String("sinkError", finalErr.Error()))
finalErr = err
}
performCallback(advancer.lastPos)
finalErr = nil
default:
}
}
Expand Down
14 changes: 14 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err
break
}
}
if model.NewResolvedTs(startTs).Greater(t.tableSink.checkpointTs) {
t.tableSink.checkpointTs = model.NewResolvedTs(startTs)
t.tableSink.resolvedTs = model.NewResolvedTs(startTs)
t.tableSink.advanced = time.Now()
}
t.state.Store(tablepb.TableStateReplicating)
return nil
}
Expand Down Expand Up @@ -363,6 +368,15 @@ func (t *tableSinkWrapper) doTableSinkClear() {
t.tableSink.version = 0
}

func (t *tableSinkWrapper) checkTableSinkHealth() (err error) {
t.tableSink.RLock()
defer t.tableSink.RUnlock()
if t.tableSink.s != nil {
err = t.tableSink.s.CheckHealth()
}
return
}

// When the attached sink fail, there can be some events that have already been
// committed at downstream but we don't know. So we need to update `replicateTs`
// of the table so that we can re-send those events later.
Expand Down
18 changes: 11 additions & 7 deletions cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package blackhole

import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
Expand All @@ -33,14 +35,16 @@ func NewDMLSink() *DMLSink {
}

// WriteEvents log the events.
func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChangedEvent]) error {
for _, row := range rows {
// NOTE: don't change the log, some tests depend on it.
log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event))
row.Callback()
func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChangedEvent]) (err error) {
failpoint.Inject("WriteEventsFail", func() { err = errors.New("InjectedErrorForWriteEventsFail") })
if err == nil {
for _, row := range rows {
// NOTE: don't change the log, some tests depend on it.
log.Debug("BlackHoleSink: WriteEvents", zap.Any("row", row.Event))
row.Callback()
}
}

return nil
return
}

// Close do nothing.
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/tablesink/table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type TableSink interface {
Close()
// AsyncClose closes the table sink asynchronously. Returns true if it's closed.
AsyncClose() bool
// CheckHealth checks whether the associated sink backend is healthy or not.
CheckHealth() error
}

// SinkInternalError means the error comes from sink internal.
Expand Down
8 changes: 8 additions & 0 deletions cdc/sink/tablesink/table_sink_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ func (e *EventTableSink[E, P]) AsyncClose() bool {
return false
}

// CheckHealth checks whether the associated sink backend is healthy or not.
func (e *EventTableSink[E, P]) CheckHealth() error {
if err := e.backendSink.WriteEvents(); err != nil {
return SinkInternalError{err}
}
return nil
}

func (e *EventTableSink[E, P]) freeze() {
// Notice: We have to set the state to stopping first,
// otherwise the progressTracker may be advanced incorrectly.
Expand Down
Loading