Skip to content

Commit

Permalink
sink(cdc): fix "dead dmlSink" error in sink workers (#9686) (#9714)
Browse files Browse the repository at this point in the history
close #9685
  • Loading branch information
ti-chi-bot authored Sep 11, 2023
1 parent 963c08f commit 5fe84bb
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 73 deletions.
156 changes: 83 additions & 73 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e

// Used to record the last written position.
// We need to use it to update the lower bound of the table sink.
var lastPos engine.Position
var lastPos engine.Position = lowerBound.Prev()

// To advance table sink in different cases: splitTxn is true or not.
committedTxnSize := uint64(0) // Can be used in `advanceTableSink`.
Expand All @@ -130,19 +130,95 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// is false, and it won't break transaction atomicity to downstreams.
batchID := uint64(1)

if w.eventCache != nil {
drained, err := w.fetchFromCache(task, &lowerBound, &upperBound, &batchID)
if err != nil {
return errors.Trace(err)
allEventSize := uint64(0)
allEventCount := 0

callbackIsPerformed := false
performCallback := func(pos engine.Position) {
if !callbackIsPerformed {
task.callback(pos)
callbackIsPerformed = true
}
if drained {
}

defer func() {
w.metricRedoEventCacheMiss.Add(float64(allEventSize))
metrics.OutputEventCount.WithLabelValues(
task.tableSink.changefeed.Namespace,
task.tableSink.changefeed.ID,
"kv",
).Add(float64(allEventCount))

if w.eventCache == nil {
eventCount := newRangeEventCount(lastPos, allEventCount)
task.tableSink.updateRangeEventCounts(eventCount)
}

log.Debug("Sink task finished",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Int64("tableID", task.tableID),
zap.Any("lowerBound", lowerBound),
zap.Any("upperBound", upperBound),
zap.Bool("splitTxn", w.splitTxn),
zap.Int("receivedEvents", allEventCount),
zap.Any("lastPos", lastPos),
zap.Float64("lag", time.Since(oracle.GetTimeFromTS(lastPos.CommitTs)).Seconds()),
zap.Error(finalErr))

if finalErr == nil {
// Otherwise we can't ensure all events before `lastPos` are emitted.
performCallback(lastPos)
} else {
switch errors.Cause(finalErr).(type) {
// If it's a warning, close the table sink and wait all pending
// events have been reported. Then we can continue the table
// at the checkpoint position.
case tablesink.SinkInternalError:
task.tableSink.closeAndClearTableSink()
// After the table sink is cleared all pending events are sent out or dropped.
// So we can re-add the table into sinkMemQuota.
w.sinkMemQuota.ClearTable(task.tableSink.tableID)

// Restart the table sink based on the checkpoint position.
if err := task.tableSink.restart(ctx); err == nil {
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
ckpt := checkpointTs.ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
performCallback(lastWrittenPos)
log.Info("table sink has been restarted",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Int64("tableID", task.tableID),
zap.Any("lastWrittenPos", lastWrittenPos),
zap.String("sinkError", finalErr.Error()))
finalErr = err
}
default:
}
}

// The task is finished and some required memory isn't used.
if availableMem > usedMem {
w.sinkMemQuota.Refund(availableMem - usedMem)
log.Debug("MemoryQuotaTracing: refund memory for table sink task",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Int64("tableID", task.tableID),
zap.Uint64("memory", availableMem-usedMem))
task.callback(lowerBound.Prev())
}
}()

if w.eventCache != nil {
drained, err := w.fetchFromCache(task, &lowerBound, &upperBound, &batchID)
failpoint.Inject("TableSinkWorkerFetchFromCache", func() {
err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected"))
})
if err != nil {
return errors.Trace(err)
}
if drained {
performCallback(lowerBound.Prev())
return nil
}
}
Expand Down Expand Up @@ -265,81 +341,15 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
}

// lowerBound and upperBound are both closed intervals.
allEventSize := uint64(0)
allEventCount := 0
iter := w.sourceManager.FetchByTable(task.tableID, lowerBound, upperBound, w.sinkMemQuota)
defer func() {
w.metricRedoEventCacheMiss.Add(float64(allEventSize))
metrics.OutputEventCount.WithLabelValues(
task.tableSink.changefeed.Namespace,
task.tableSink.changefeed.ID,
"kv",
).Add(float64(allEventCount))

if w.eventCache == nil {
eventCount := newRangeEventCount(lastPos, allEventCount)
task.tableSink.updateRangeEventCounts(eventCount)
}

if err := iter.Close(); err != nil {
log.Error("Sink worker fails to close iterator",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Int64("tableID", task.tableID),
zap.Error(err))
}

log.Debug("Sink task finished",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Int64("tableID", task.tableID),
zap.Any("lowerBound", lowerBound),
zap.Any("upperBound", upperBound),
zap.Bool("splitTxn", w.splitTxn),
zap.Int("receivedEvents", allEventCount),
zap.Any("lastPos", lastPos),
zap.Float64("lag", time.Since(oracle.GetTimeFromTS(lastPos.CommitTs)).Seconds()),
zap.Error(finalErr))

if finalErr == nil {
// Otherwise we can't ensure all events before `lastPos` are emitted.
task.callback(lastPos)
} else {
switch errors.Cause(finalErr).(type) {
// If it's a warning, close the table sink and wait all pending
// events have been reported. Then we can continue the table
// at the checkpoint position.
case tablesink.SinkInternalError:
task.tableSink.closeAndClearTableSink()
// After the table sink is cleared all pending events are sent out or dropped.
// So we can re-add the table into sinkMemQuota.
w.sinkMemQuota.ClearTable(task.tableSink.tableID)

// Restart the table sink based on the checkpoint position.
if finalErr = task.tableSink.restart(ctx); finalErr == nil {
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
ckpt := checkpointTs.ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
task.callback(lastWrittenPos)
log.Info("table sink has been restarted",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Int64("tableID", task.tableID),
zap.Any("lastWrittenPos", lastWrittenPos))
}
default:
}
}

// The task is finished and some required memory isn't used.
if availableMem > usedMem {
w.sinkMemQuota.Refund(availableMem - usedMem)
log.Debug("MemoryQuotaTracing: refund memory for table sink task",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Int64("tableID", task.tableID),
zap.Uint64("memory", availableMem-usedMem))
}
}()

for availableMem > usedMem && !task.isCanceled() {
Expand Down
134 changes: 134 additions & 0 deletions cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/memquota"
Expand Down Expand Up @@ -59,6 +60,30 @@ func addEventsToSortEngine(t *testing.T, events []*model.PolymorphicEvent, sortE
}
}

func genPolymorphicResolvedEvent(resolvedTs uint64) *model.PolymorphicEvent {
return &model.PolymorphicEvent{
CRTs: resolvedTs,
RawKV: &model.RawKVEntry{
OpType: model.OpTypeResolved,
CRTs: resolvedTs,
},
}
}

//nolint:all
func genPolymorphicEvent(startTs, commitTs uint64, tableID model.TableID) *model.PolymorphicEvent {
return &model.PolymorphicEvent{
StartTs: startTs,
CRTs: commitTs,
RawKV: &model.RawKVEntry{
OpType: model.OpTypePut,
StartTs: startTs,
CRTs: commitTs,
},
Row: genRowChangedEvent(startTs, commitTs, tableID),
}
}

// It is ok to use the same tableID in test.
//
//nolint:unparam
Expand Down Expand Up @@ -1097,3 +1122,112 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNoWorkload(
func TestWorkerSuite(t *testing.T) {
suite.Run(t, new(workerSuite))
}

func (suite *workerSuite) TestFetchFromCacheWithFailure() {
ctx, cancel := context.WithCancel(context.Background())
events := []*model.PolymorphicEvent{
genPolymorphicEvent(1, 3, 1),
genPolymorphicEvent(1, 3, 1),
genPolymorphicEvent(1, 3, 1),
genPolymorphicResolvedEvent(4),
}
// Only for three events.
w, e := createWorker(model.ChangeFeedID{}, 1024*1024, true, 1)
w.eventCache = newRedoEventCache(model.ChangeFeedID{}, 1024*1024)
defer w.sinkMemQuota.Close()
addEventsToSortEngine(suite.T(), events, e, 1)

_ = failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache", "return")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache")
}()

taskChan := make(chan *sinkTask)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.handleTasks(ctx, taskChan)
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(model.ChangeFeedID{}, 1)
defer sink.Close()

chShouldBeClosed := make(chan struct{}, 1)
callback := func(lastWritePos engine.Position) {
close(chShouldBeClosed)
}
taskChan <- &sinkTask{
tableID: 1,
lowerBound: genLowerBound(),
getUpperBound: genUpperBoundGetter(4),
tableSink: wrapper,
callback: callback,
isCanceled: func() bool { return false },
}

<-chShouldBeClosed
cancel()
wg.Wait()
}

// When starts to handle a task, advancer.lastPos should be set to a correct position.
// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an
// invalid `advancer.lastPos`.
func (suite *workerSuite) TestHandleTaskWithoutMemory() {
ctx, cancel := context.WithCancel(context.Background())
events := []*model.PolymorphicEvent{
genPolymorphicEvent(1, 3, 1),
genPolymorphicResolvedEvent(4),
}
w, e := createWorker(model.ChangeFeedID{}, 0, true, 1)
defer w.sinkMemQuota.Close()
addEventsToSortEngine(suite.T(), events, e, 1)

taskChan := make(chan *sinkTask)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.handleTasks(ctx, taskChan)
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(model.ChangeFeedID{}, 1)
defer sink.Close()

chShouldBeClosed := make(chan struct{}, 1)
callback := func(lastWritePos engine.Position) {
require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos)
close(chShouldBeClosed)
}
taskChan <- &sinkTask{
tableID: 1,
lowerBound: genLowerBound(),
getUpperBound: genUpperBoundGetter(4),
tableSink: wrapper,
callback: callback,
isCanceled: func() bool { return true },
}

<-chShouldBeClosed
cancel()
wg.Wait()
}

func genLowerBound() engine.Position {
return engine.Position{
StartTs: 0,
CommitTs: 1,
}
}

func genUpperBoundGetter(commitTs model.Ts) func(_ model.Ts) engine.Position {
return func(_ model.Ts) engine.Position {
return engine.Position{
StartTs: commitTs - 1,
CommitTs: commitTs,
}
}
}

0 comments on commit 5fe84bb

Please sign in to comment.