diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 4f23b89289a..1e237afd2b1 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -278,6 +278,7 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er zap.Error(err)) m.clearSinkFactory() + // To release memory quota ASAP, close all table sinks manually. start := time.Now() log.Info("Sink manager is closing all table sinks", zap.String("namespace", m.changefeedID.Namespace), @@ -381,22 +382,17 @@ func (m *SinkManager) clearSinkFactory() { } } -func (m *SinkManager) putSinkFactoryError(err error, version uint64) { +func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bool) { m.sinkFactory.Lock() defer m.sinkFactory.Unlock() - skipped := true if version == m.sinkFactory.version { select { case m.sinkFactory.errors <- err: - skipped = false default: } + return true } - log.Info("Sink manager tries to put an sink error", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Bool("skipped", skipped), - zap.String("error", err.Error())) + return false } func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) { @@ -445,7 +441,7 @@ func (m *SinkManager) backgroundGC(errors chan<- error) { if time.Since(sink.lastCleanTime) < cleanTableInterval { continue } - checkpointTs, _, _ := sink.getCheckpointTs() + checkpointTs := sink.getCheckpointTs() resolvedMark := checkpointTs.ResolvedMark() if resolvedMark == 0 { continue @@ -916,7 +912,7 @@ func (m *SinkManager) RemoveTable(tableID model.TableID) { zap.String("changefeed", m.changefeedID.ID), zap.Int64("tableID", tableID)) } - checkpointTs, _, _ := value.(*tableSinkWrapper).getCheckpointTs() + checkpointTs := value.(*tableSinkWrapper).getCheckpointTs() log.Info("Remove table sink successfully", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), @@ -984,18 +980,18 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats { } tableSink := value.(*tableSinkWrapper) - checkpointTs, version, advanced := tableSink.getCheckpointTs() + checkpointTs := tableSink.getCheckpointTs() m.sinkMemQuota.Release(tableID, checkpointTs) m.redoMemQuota.Release(tableID, checkpointTs) advanceTimeoutInSec := m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec if advanceTimeoutInSec <= 0 { - log.Warn("AdvanceTimeoutInSec is not set, use default value", zap.Any("sinkConfig", m.changefeedInfo.Config.Sink)) advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec } stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second - if version > 0 && time.Since(advanced) > stuckCheck && - oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck { + + isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck) + if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) { log.Warn("Table checkpoint is stuck too long, will restart the sink backend", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), @@ -1003,8 +999,6 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats { zap.Any("checkpointTs", checkpointTs), zap.Float64("stuckCheck", stuckCheck.Seconds()), zap.Uint64("factoryVersion", version)) - tableSink.updateTableSinkAdvanced() - m.putSinkFactoryError(errors.New("table sink stuck"), version) } var resolvedTs model.Ts diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 6f17d71e775..63d79185c2f 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -222,7 +222,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) { require.Eventually(t, func() bool { tableSink, ok := manager.tableSinks.Load(tableID) require.True(t, ok) - checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs() + checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() return checkpointTS.ResolvedMark() == 4 }, 5*time.Second, 10*time.Millisecond) } @@ -250,7 +250,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) { require.Eventually(t, func() bool { tableSink, ok := manager.tableSinks.Load(tableID) require.True(t, ok) - checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs() + checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() return checkpointTS.ResolvedMark() == 3 }, 5*time.Second, 10*time.Millisecond) } @@ -299,7 +299,7 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) { tableSink, ok := manager.tableSinks.Load(tableID) require.True(t, ok) require.NotNil(t, tableSink) - checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs() + checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() require.Equal(t, uint64(1), checkpointTS.Ts) } diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index a1d6725bcd8..50609f8645c 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -182,7 +182,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // Restart the table sink based on the checkpoint position. if err := task.tableSink.restart(ctx); err == nil { - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() ckpt := checkpointTs.ResolvedMark() lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} performCallback(lastWrittenPos) diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 964c336b746..984855649c2 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -963,7 +963,7 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndDoNotAdvanceTableUntilMee receivedEvents[2].Callback() require.Len(suite.T(), sink.GetEvents(), 3, "No more events should be sent to sink") - checkpointTs, _, _ := wrapper.getCheckpointTs() + checkpointTs := wrapper.getCheckpointTs() require.Equal(suite.T(), uint64(2), checkpointTs.ResolvedMark(), "Only can advance resolved mark to 2") } @@ -1046,7 +1046,7 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableUntilTaskIsFi receivedEvents := sink.GetEvents() receivedEvents[0].Callback() require.Len(suite.T(), sink.GetEvents(), 1, "No more events should be sent to sink") - checkpointTs, _, _ := wrapper.getCheckpointTs() + checkpointTs := wrapper.getCheckpointTs() require.Equal(suite.T(), uint64(4), checkpointTs.ResolvedMark()) } @@ -1112,7 +1112,7 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNoWorkload( isCanceled: func() bool { return false }, } require.Eventually(suite.T(), func() bool { - checkpointTs, _, _ := wrapper.getCheckpointTs() + checkpointTs := wrapper.getCheckpointTs() return checkpointTs.ResolvedMark() == 4 }, 5*time.Second, 10*time.Millisecond, "Directly advance resolved mark to 4") cancel() diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 90c3fc38d59..0cefd7e6db0 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -50,10 +50,13 @@ type tableSinkWrapper struct { // tableSink is the underlying sink. tableSink struct { sync.RWMutex - s tablesink.TableSink - version uint64 // it's generated by `tableSinkCreater`. + s tablesink.TableSink + version uint64 // it's generated by `tableSinkCreater`. + + innerMu sync.Mutex + advanced time.Time + resolvedTs model.ResolvedTs checkpointTs model.ResolvedTs - advanced atomic.Int64 } // state used to control the lifecycle of the table. @@ -119,7 +122,8 @@ func newTableSinkWrapper( res.tableSink.version = 0 res.tableSink.checkpointTs = model.NewResolvedTs(startTs) - res.updateTableSinkAdvanced() + res.tableSink.resolvedTs = model.NewResolvedTs(startTs) + res.tableSink.advanced = time.Now() res.receivedSorterResolvedTs.Store(startTs) res.barrierTs.Store(startTs) @@ -196,33 +200,28 @@ func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error { // If it's nil it means it's closed. return tablesink.NewSinkInternalError(errors.New("table sink cleared")) } + t.tableSink.innerMu.Lock() + defer t.tableSink.innerMu.Unlock() + t.tableSink.resolvedTs = ts return t.tableSink.s.UpdateResolvedTs(ts) } -// getCheckpointTs returns -// 1. checkpoint timestamp of the table; -// 2. the table sink version, which comes from `tableSinkCreater`; -// 3. recent time of the table is advanced. -func (t *tableSinkWrapper) getCheckpointTs() (model.ResolvedTs, uint64, time.Time) { +func (t *tableSinkWrapper) getCheckpointTs() model.ResolvedTs { t.tableSink.RLock() defer t.tableSink.RUnlock() + t.tableSink.innerMu.Lock() + defer t.tableSink.innerMu.Unlock() + if t.tableSink.s != nil { checkpointTs := t.tableSink.s.GetCheckpointTs() if t.tableSink.checkpointTs.Less(checkpointTs) { t.tableSink.checkpointTs = checkpointTs - t.updateTableSinkAdvanced() + t.tableSink.advanced = time.Now() + } else if !checkpointTs.Less(t.tableSink.resolvedTs) { + t.tableSink.advanced = time.Now() } } - advanced := time.Unix(t.tableSink.advanced.Load(), 0) - return t.tableSink.checkpointTs, t.tableSink.version, advanced -} - -func (t *tableSinkWrapper) updateTableSinkAdvanced() { - curr := t.tableSink.advanced.Load() - now := time.Now().Unix() - if now > curr { - t.tableSink.advanced.CompareAndSwap(curr, now) - } + return t.tableSink.checkpointTs } func (t *tableSinkWrapper) getReceivedSorterResolvedTs() model.Ts { @@ -295,7 +294,7 @@ func (t *tableSinkWrapper) initTableSink() bool { if t.tableSink.s == nil { t.tableSink.s, t.tableSink.version = t.tableSinkCreater() if t.tableSink.s != nil { - t.updateTableSinkAdvanced() + t.tableSink.advanced = time.Now() return true } return false @@ -341,12 +340,15 @@ func (t *tableSinkWrapper) doTableSinkClear() { return } checkpointTs := t.tableSink.s.GetCheckpointTs() + t.tableSink.innerMu.Lock() if t.tableSink.checkpointTs.Less(checkpointTs) { t.tableSink.checkpointTs = checkpointTs } + t.tableSink.resolvedTs = checkpointTs + t.tableSink.advanced = time.Now() + t.tableSink.innerMu.Unlock() t.tableSink.s = nil t.tableSink.version = 0 - t.tableSink.advanced.Store(time.Now().Unix()) } // When the attached sink fail, there can be some events that have already been @@ -416,6 +418,25 @@ func (t *tableSinkWrapper) cleanRangeEventCounts(upperBound engine.Position, min return shouldClean } +func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint64) { + t.getCheckpointTs() + + t.tableSink.RLock() + defer t.tableSink.RUnlock() + t.tableSink.innerMu.Lock() + defer t.tableSink.innerMu.Unlock() + + // What these conditions mean: + // 1. the table sink has been associated with a valid sink; + // 2. its checkpoint hasn't been advanced for a while; + version := t.tableSink.version + advanced := t.tableSink.advanced + if version > 0 && time.Since(advanced) > stuckCheck { + return true, version + } + return false, uint64(0) +} + // convertRowChangedEvents uses to convert RowChangedEvents to TableSinkRowChangedEvents. // It will deal with the old value compatibility. func convertRowChangedEvents( diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index e3314ac1df1..5cb65c5f609 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -18,6 +18,7 @@ import ( "math" "sync" "testing" + "time" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/tiflow/cdc/sinkv2/tablesink" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) type mockSink struct { @@ -329,3 +331,107 @@ func TestGetUpperBoundTs(t *testing.T) { wrapper.barrierTs.Store(uint64(12)) require.Equal(t, uint64(11), wrapper.getUpperBoundTs()) } + +func TestTableSinkWrapperSinkVersion(t *testing.T) { + t.Parallel() + + innerTableSink := tablesink.New[*model.RowChangedEvent]( + model.ChangeFeedID{}, 1, model.Ts(0), + newMockSink(), &eventsink.RowChangeEventAppender{}, + prometheus.NewCounter(prometheus.CounterOpts{}), + ) + version := new(uint64) + + wrapper := newTableSinkWrapper( + model.DefaultChangeFeedID("1"), + 1, + func() (tablesink.TableSink, uint64) { return nil, 0 }, + tablepb.TableStatePrepared, + model.Ts(10), + model.Ts(20), + func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil }, + ) + + require.False(t, wrapper.initTableSink()) + + wrapper.tableSinkCreater = func() (tablesink.TableSink, uint64) { + *version += 1 + return innerTableSink, *version + } + + require.True(t, wrapper.initTableSink()) + require.Equal(t, wrapper.tableSink.version, uint64(1)) + + require.True(t, wrapper.asyncCloseTableSink()) + + wrapper.doTableSinkClear() + require.Nil(t, wrapper.tableSink.s) + require.Equal(t, wrapper.tableSink.version, uint64(0)) + + require.True(t, wrapper.initTableSink()) + require.Equal(t, wrapper.tableSink.version, uint64(2)) + + wrapper.closeTableSink() + + wrapper.doTableSinkClear() + require.Nil(t, wrapper.tableSink.s) + require.Equal(t, wrapper.tableSink.version, uint64(0)) +} + +func TestTableSinkWrapperSinkInner(t *testing.T) { + t.Parallel() + + innerTableSink := tablesink.New[*model.RowChangedEvent]( + model.ChangeFeedID{}, 1, model.Ts(0), + newMockSink(), &eventsink.RowChangeEventAppender{}, + prometheus.NewCounter(prometheus.CounterOpts{}), + ) + version := new(uint64) + + wrapper := newTableSinkWrapper( + model.DefaultChangeFeedID("1"), + 1, + func() (tablesink.TableSink, uint64) { + *version += 1 + return innerTableSink, *version + }, + tablepb.TableStatePrepared, + oracle.GoTimeToTS(time.Now()), + oracle.GoTimeToTS(time.Now().Add(10000*time.Second)), + func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil }, + ) + + require.True(t, wrapper.initTableSink()) + + wrapper.closeAndClearTableSink() + + // Shouldn't be stuck because version is 0. + require.Equal(t, wrapper.tableSink.version, uint64(0)) + isStuck, _ := wrapper.sinkMaybeStuck(100 * time.Millisecond) + require.False(t, isStuck) + + // Shouldn't be stuck because tableSink.advanced is just updated. + require.True(t, wrapper.initTableSink()) + isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) + require.False(t, isStuck) + + // Shouldn't be stuck because upperbound hasn't been advanced. + time.Sleep(200 * time.Millisecond) + isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) + require.False(t, isStuck) + + // Shouldn't be stuck because `getCheckpointTs` will update tableSink.advanced. + nowTs := oracle.GoTimeToTS(time.Now()) + wrapper.updateReceivedSorterResolvedTs(nowTs) + wrapper.barrierTs.Store(nowTs) + isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) + require.False(t, isStuck) + + time.Sleep(200 * time.Millisecond) + nowTs = oracle.GoTimeToTS(time.Now()) + wrapper.updateReceivedSorterResolvedTs(nowTs) + wrapper.barrierTs.Store(nowTs) + wrapper.updateResolvedTs(model.NewResolvedTs(nowTs)) + isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) + require.True(t, isStuck) +}