Skip to content

Commit

Permalink
sink(cdc): improve table sink advance timeout machanism (#9666) (#9724)
Browse files Browse the repository at this point in the history
close #9695
  • Loading branch information
ti-chi-bot authored Sep 11, 2023
1 parent 4a31086 commit 4547916
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 45 deletions.
26 changes: 10 additions & 16 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
zap.Error(err))
m.clearSinkFactory()

// To release memory quota ASAP, close all table sinks manually.
start := time.Now()
log.Info("Sink manager is closing all table sinks",
zap.String("namespace", m.changefeedID.Namespace),
Expand Down Expand Up @@ -381,22 +382,17 @@ func (m *SinkManager) clearSinkFactory() {
}
}

func (m *SinkManager) putSinkFactoryError(err error, version uint64) {
func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bool) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
skipped := true
if version == m.sinkFactory.version {
select {
case m.sinkFactory.errors <- err:
skipped = false
default:
}
return true
}
log.Info("Sink manager tries to put an sink error",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Bool("skipped", skipped),
zap.String("error", err.Error()))
return false
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) {
Expand Down Expand Up @@ -445,7 +441,7 @@ func (m *SinkManager) backgroundGC(errors chan<- error) {
if time.Since(sink.lastCleanTime) < cleanTableInterval {
continue
}
checkpointTs, _, _ := sink.getCheckpointTs()
checkpointTs := sink.getCheckpointTs()
resolvedMark := checkpointTs.ResolvedMark()
if resolvedMark == 0 {
continue
Expand Down Expand Up @@ -916,7 +912,7 @@ func (m *SinkManager) RemoveTable(tableID model.TableID) {
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
checkpointTs, _, _ := value.(*tableSinkWrapper).getCheckpointTs()
checkpointTs := value.(*tableSinkWrapper).getCheckpointTs()
log.Info("Remove table sink successfully",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand Down Expand Up @@ -984,27 +980,25 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
}
tableSink := value.(*tableSinkWrapper)

checkpointTs, version, advanced := tableSink.getCheckpointTs()
checkpointTs := tableSink.getCheckpointTs()
m.sinkMemQuota.Release(tableID, checkpointTs)
m.redoMemQuota.Release(tableID, checkpointTs)

advanceTimeoutInSec := m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec
if advanceTimeoutInSec <= 0 {
log.Warn("AdvanceTimeoutInSec is not set, use default value", zap.Any("sinkConfig", m.changefeedInfo.Config.Sink))
advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second
if version > 0 && time.Since(advanced) > stuckCheck &&
oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck {

isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", version))
tableSink.updateTableSinkAdvanced()
m.putSinkFactoryError(errors.New("table sink stuck"), version)
}

var resolvedTs model.Ts
Expand Down
6 changes: 3 additions & 3 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) {
require.Eventually(t, func() bool {
tableSink, ok := manager.tableSinks.Load(tableID)
require.True(t, ok)
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs()
return checkpointTS.ResolvedMark() == 4
}, 5*time.Second, 10*time.Millisecond)
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) {
require.Eventually(t, func() bool {
tableSink, ok := manager.tableSinks.Load(tableID)
require.True(t, ok)
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs()
return checkpointTS.ResolvedMark() == 3
}, 5*time.Second, 10*time.Millisecond)
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) {
tableSink, ok := manager.tableSinks.Load(tableID)
require.True(t, ok)
require.NotNil(t, tableSink)
checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs()
checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs()
require.Equal(t, uint64(1), checkpointTS.Ts)
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e

// Restart the table sink based on the checkpoint position.
if err := task.tableSink.restart(ctx); err == nil {
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
checkpointTs := task.tableSink.getCheckpointTs()
ckpt := checkpointTs.ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
performCallback(lastWrittenPos)
Expand Down
6 changes: 3 additions & 3 deletions cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndDoNotAdvanceTableUntilMee
receivedEvents[2].Callback()
require.Len(suite.T(), sink.GetEvents(), 3, "No more events should be sent to sink")

checkpointTs, _, _ := wrapper.getCheckpointTs()
checkpointTs := wrapper.getCheckpointTs()
require.Equal(suite.T(), uint64(2), checkpointTs.ResolvedMark(), "Only can advance resolved mark to 2")
}

Expand Down Expand Up @@ -1046,7 +1046,7 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableUntilTaskIsFi
receivedEvents := sink.GetEvents()
receivedEvents[0].Callback()
require.Len(suite.T(), sink.GetEvents(), 1, "No more events should be sent to sink")
checkpointTs, _, _ := wrapper.getCheckpointTs()
checkpointTs := wrapper.getCheckpointTs()
require.Equal(suite.T(), uint64(4), checkpointTs.ResolvedMark())
}

Expand Down Expand Up @@ -1112,7 +1112,7 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNoWorkload(
isCanceled: func() bool { return false },
}
require.Eventually(suite.T(), func() bool {
checkpointTs, _, _ := wrapper.getCheckpointTs()
checkpointTs := wrapper.getCheckpointTs()
return checkpointTs.ResolvedMark() == 4
}, 5*time.Second, 10*time.Millisecond, "Directly advance resolved mark to 4")
cancel()
Expand Down
65 changes: 43 additions & 22 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ type tableSinkWrapper struct {
// tableSink is the underlying sink.
tableSink struct {
sync.RWMutex
s tablesink.TableSink
version uint64 // it's generated by `tableSinkCreater`.
s tablesink.TableSink
version uint64 // it's generated by `tableSinkCreater`.

innerMu sync.Mutex
advanced time.Time
resolvedTs model.ResolvedTs
checkpointTs model.ResolvedTs
advanced atomic.Int64
}

// state used to control the lifecycle of the table.
Expand Down Expand Up @@ -119,7 +122,8 @@ func newTableSinkWrapper(

res.tableSink.version = 0
res.tableSink.checkpointTs = model.NewResolvedTs(startTs)
res.updateTableSinkAdvanced()
res.tableSink.resolvedTs = model.NewResolvedTs(startTs)
res.tableSink.advanced = time.Now()

res.receivedSorterResolvedTs.Store(startTs)
res.barrierTs.Store(startTs)
Expand Down Expand Up @@ -196,33 +200,28 @@ func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error {
// If it's nil it means it's closed.
return tablesink.NewSinkInternalError(errors.New("table sink cleared"))
}
t.tableSink.innerMu.Lock()
defer t.tableSink.innerMu.Unlock()
t.tableSink.resolvedTs = ts
return t.tableSink.s.UpdateResolvedTs(ts)
}

// getCheckpointTs returns
// 1. checkpoint timestamp of the table;
// 2. the table sink version, which comes from `tableSinkCreater`;
// 3. recent time of the table is advanced.
func (t *tableSinkWrapper) getCheckpointTs() (model.ResolvedTs, uint64, time.Time) {
func (t *tableSinkWrapper) getCheckpointTs() model.ResolvedTs {
t.tableSink.RLock()
defer t.tableSink.RUnlock()
t.tableSink.innerMu.Lock()
defer t.tableSink.innerMu.Unlock()

if t.tableSink.s != nil {
checkpointTs := t.tableSink.s.GetCheckpointTs()
if t.tableSink.checkpointTs.Less(checkpointTs) {
t.tableSink.checkpointTs = checkpointTs
t.updateTableSinkAdvanced()
t.tableSink.advanced = time.Now()
} else if !checkpointTs.Less(t.tableSink.resolvedTs) {
t.tableSink.advanced = time.Now()
}
}
advanced := time.Unix(t.tableSink.advanced.Load(), 0)
return t.tableSink.checkpointTs, t.tableSink.version, advanced
}

func (t *tableSinkWrapper) updateTableSinkAdvanced() {
curr := t.tableSink.advanced.Load()
now := time.Now().Unix()
if now > curr {
t.tableSink.advanced.CompareAndSwap(curr, now)
}
return t.tableSink.checkpointTs
}

func (t *tableSinkWrapper) getReceivedSorterResolvedTs() model.Ts {
Expand Down Expand Up @@ -295,7 +294,7 @@ func (t *tableSinkWrapper) initTableSink() bool {
if t.tableSink.s == nil {
t.tableSink.s, t.tableSink.version = t.tableSinkCreater()
if t.tableSink.s != nil {
t.updateTableSinkAdvanced()
t.tableSink.advanced = time.Now()
return true
}
return false
Expand Down Expand Up @@ -341,12 +340,15 @@ func (t *tableSinkWrapper) doTableSinkClear() {
return
}
checkpointTs := t.tableSink.s.GetCheckpointTs()
t.tableSink.innerMu.Lock()
if t.tableSink.checkpointTs.Less(checkpointTs) {
t.tableSink.checkpointTs = checkpointTs
}
t.tableSink.resolvedTs = checkpointTs
t.tableSink.advanced = time.Now()
t.tableSink.innerMu.Unlock()
t.tableSink.s = nil
t.tableSink.version = 0
t.tableSink.advanced.Store(time.Now().Unix())
}

// When the attached sink fail, there can be some events that have already been
Expand Down Expand Up @@ -416,6 +418,25 @@ func (t *tableSinkWrapper) cleanRangeEventCounts(upperBound engine.Position, min
return shouldClean
}

func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint64) {
t.getCheckpointTs()

t.tableSink.RLock()
defer t.tableSink.RUnlock()
t.tableSink.innerMu.Lock()
defer t.tableSink.innerMu.Unlock()

// What these conditions mean:
// 1. the table sink has been associated with a valid sink;
// 2. its checkpoint hasn't been advanced for a while;
version := t.tableSink.version
advanced := t.tableSink.advanced
if version > 0 && time.Since(advanced) > stuckCheck {
return true, version
}
return false, uint64(0)
}

// convertRowChangedEvents uses to convert RowChangedEvents to TableSinkRowChangedEvents.
// It will deal with the old value compatibility.
func convertRowChangedEvents(
Expand Down
106 changes: 106 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"math"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

type mockSink struct {
Expand Down Expand Up @@ -329,3 +331,107 @@ func TestGetUpperBoundTs(t *testing.T) {
wrapper.barrierTs.Store(uint64(12))
require.Equal(t, uint64(11), wrapper.getUpperBoundTs())
}

func TestTableSinkWrapperSinkVersion(t *testing.T) {
t.Parallel()

innerTableSink := tablesink.New[*model.RowChangedEvent](
model.ChangeFeedID{}, 1, model.Ts(0),
newMockSink(), &eventsink.RowChangeEventAppender{},
prometheus.NewCounter(prometheus.CounterOpts{}),
)
version := new(uint64)

wrapper := newTableSinkWrapper(
model.DefaultChangeFeedID("1"),
1,
func() (tablesink.TableSink, uint64) { return nil, 0 },
tablepb.TableStatePrepared,
model.Ts(10),
model.Ts(20),
func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil },
)

require.False(t, wrapper.initTableSink())

wrapper.tableSinkCreater = func() (tablesink.TableSink, uint64) {
*version += 1
return innerTableSink, *version
}

require.True(t, wrapper.initTableSink())
require.Equal(t, wrapper.tableSink.version, uint64(1))

require.True(t, wrapper.asyncCloseTableSink())

wrapper.doTableSinkClear()
require.Nil(t, wrapper.tableSink.s)
require.Equal(t, wrapper.tableSink.version, uint64(0))

require.True(t, wrapper.initTableSink())
require.Equal(t, wrapper.tableSink.version, uint64(2))

wrapper.closeTableSink()

wrapper.doTableSinkClear()
require.Nil(t, wrapper.tableSink.s)
require.Equal(t, wrapper.tableSink.version, uint64(0))
}

func TestTableSinkWrapperSinkInner(t *testing.T) {
t.Parallel()

innerTableSink := tablesink.New[*model.RowChangedEvent](
model.ChangeFeedID{}, 1, model.Ts(0),
newMockSink(), &eventsink.RowChangeEventAppender{},
prometheus.NewCounter(prometheus.CounterOpts{}),
)
version := new(uint64)

wrapper := newTableSinkWrapper(
model.DefaultChangeFeedID("1"),
1,
func() (tablesink.TableSink, uint64) {
*version += 1
return innerTableSink, *version
},
tablepb.TableStatePrepared,
oracle.GoTimeToTS(time.Now()),
oracle.GoTimeToTS(time.Now().Add(10000*time.Second)),
func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil },
)

require.True(t, wrapper.initTableSink())

wrapper.closeAndClearTableSink()

// Shouldn't be stuck because version is 0.
require.Equal(t, wrapper.tableSink.version, uint64(0))
isStuck, _ := wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because tableSink.advanced is just updated.
require.True(t, wrapper.initTableSink())
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because upperbound hasn't been advanced.
time.Sleep(200 * time.Millisecond)
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because `getCheckpointTs` will update tableSink.advanced.
nowTs := oracle.GoTimeToTS(time.Now())
wrapper.updateReceivedSorterResolvedTs(nowTs)
wrapper.barrierTs.Store(nowTs)
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

time.Sleep(200 * time.Millisecond)
nowTs = oracle.GoTimeToTS(time.Now())
wrapper.updateReceivedSorterResolvedTs(nowTs)
wrapper.barrierTs.Store(nowTs)
wrapper.updateResolvedTs(model.NewResolvedTs(nowTs))
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.True(t, isStuck)
}

0 comments on commit 4547916

Please sign in to comment.