From 7497ea66a8adc339561f3c24936afb380f1b8fc8 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Thu, 29 Jun 2023 14:15:13 +0800 Subject: [PATCH] redo, processor(ticdc): set flushed resolvedTs when start table (#9281) close pingcap/tiflow#9172 --- cdc/processor/manager_test.go | 2 +- cdc/processor/processor.go | 14 +- cdc/processor/processor_test.go | 172 +++++++++++++++--- cdc/processor/sinkmanager/manager.go | 3 +- .../sinkmanager/manager_test_helper.go | 4 +- cdc/redo/manager.go | 31 +++- cdc/redo/meta_manager.go | 4 +- cdc/scheduler/internal/table_executor.go | 5 +- cdc/scheduler/internal/v3/agent/agent.go | 26 +-- cdc/scheduler/internal/v3/agent/agent_test.go | 21 ++- cdc/scheduler/internal/v3/agent/table.go | 16 +- 11 files changed, 215 insertions(+), 83 deletions(-) diff --git a/cdc/processor/manager_test.go b/cdc/processor/manager_test.go index f8df7fe7af4..9941de0df41 100644 --- a/cdc/processor/manager_test.go +++ b/cdc/processor/manager_test.go @@ -55,7 +55,7 @@ func NewManager4Test( changefeedEpoch uint64, cfg *config.SchedulerConfig, ) *processor { - return newProcessor4Test(t, state, captureInfo, m.liveness, cfg) + return newProcessor4Test(t, state, captureInfo, m.liveness, cfg, false) } return m } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index b3aa26e6793..9581a75fff3 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -108,12 +108,13 @@ var _ scheduler.TableExecutor = (*processor)(nil) // 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true. // 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false func (p *processor) AddTableSpan( - ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, barrier *schedulepb.Barrier, + ctx context.Context, span tablepb.Span, checkpoint tablepb.Checkpoint, isPrepare bool, ) (bool, error) { if !p.checkReadyForMessages() { return false, nil } + startTs := checkpoint.CheckpointTs if startTs == 0 { log.Panic("table start ts must not be 0", zap.String("captureID", p.captureInfo.ID), @@ -144,14 +145,9 @@ func (p *processor) AddTableSpan( // be stopped on original capture already, it's safe to start replicating data now. if !isPrepare { if p.redo.r.Enabled() { - var redoResolvedTs model.Ts - if barrier != nil { - redoResolvedTs = barrier.GlobalBarrierTs - } else { - stats := p.sinkManager.r.GetTableStats(span) - redoResolvedTs = stats.BarrierTs - } - p.redo.r.StartTable(span, redoResolvedTs) + // ResolvedTs is store in external storage when redo log is enabled, so we need to + // start table with ResolvedTs in redoDMLManager. + p.redo.r.StartTable(span, checkpoint.ResolvedTs) } if err := p.sinkManager.r.StartTable(span, startTs); err != nil { return false, errors.Trace(err) diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index c8bd6478022..ace1ef90008 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -36,6 +36,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" + redoPkg "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" "github.com/stretchr/testify/require" @@ -47,6 +48,7 @@ func newProcessor4Test( captureInfo *model.CaptureInfo, liveness *model.Liveness, cfg *config.SchedulerConfig, + enableRedo bool, ) *processor { changefeedID := model.ChangeFeedID4Test("processor-test", "processor-test") up := upstream.NewUpstream4Test(&sinkmanager.MockPD{}) @@ -61,9 +63,28 @@ func newProcessor4Test( return nil } + if !enableRedo { + p.redo.r = redo.NewDisabledDMLManager() + } else { + tmpDir := t.TempDir() + redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID) + dmlMgr, err := redo.NewDMLManager(ctx, changefeedID, &config.ConsistentConfig{ + Level: string(redoPkg.ConsistentLevelEventual), + MaxLogSize: redoPkg.DefaultMaxLogSize, + FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs, + Storage: "file://" + redoDir, + UseFileBackend: false, + }) + require.NoError(t, err) + p.redo.r = dmlMgr + } + p.redo.name = "RedoManager" + p.redo.changefeedID = changefeedID + p.redo.spawn(ctx) + p.agent = &mockAgent{executor: p, liveness: liveness} p.sinkManager.r, p.sourceManager.r, _ = sinkmanager.NewManagerWithMemEngine( - t, changefeedID, state.Info) + t, changefeedID, state.Info, p.redo.r) p.sinkManager.name = "SinkManager" p.sinkManager.changefeedID = changefeedID p.sinkManager.spawn(ctx) @@ -75,11 +96,6 @@ func newProcessor4Test( // otherwise the sinkManager will not receive the resolvedTs. p.sourceManager.r.OnResolve(p.sinkManager.r.UpdateReceivedSorterResolvedTs) - p.redo.r = redo.NewDisabledDMLManager() - p.redo.name = "RedoManager" - p.redo.changefeedID = changefeedID - p.redo.spawn(ctx) - p.initialized = true return nil } @@ -91,7 +107,7 @@ func newProcessor4Test( } func initProcessor4Test( - ctx cdcContext.Context, t *testing.T, liveness *model.Liveness, + ctx cdcContext.Context, t *testing.T, liveness *model.Liveness, enableRedo bool, ) (*processor, *orchestrator.ReactorStateTester) { changefeedInfo := ` { @@ -132,7 +148,7 @@ func initProcessor4Test( etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID) captureInfo := &model.CaptureInfo{ID: "capture-test", AdvertiseAddr: "127.0.0.1:0000"} cfg := config.NewDefaultSchedulerConfig() - p := newProcessor4Test(t, changefeed, captureInfo, liveness, cfg) + p := newProcessor4Test(t, changefeed, captureInfo, liveness, cfg, enableRedo) captureID := ctx.GlobalVars().CaptureInfo.ID changefeedID := ctx.ChangefeedVars().ID @@ -194,7 +210,7 @@ func (a *mockAgent) Close() error { func TestTableExecutorAddingTableIndirectly(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) // init tick err := p.Tick(ctx) @@ -213,7 +229,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { // table-1: `preparing` -> `prepared` -> `replicating` span := spanz.TableIDToComparableSpan(1) - ok, err := p.AddTableSpan(ctx, span, 20, true, nil) + ok, err := p.AddTableSpan(ctx, span, tablepb.Checkpoint{CheckpointTs: 20}, true) require.NoError(t, err) require.True(t, ok) p.sinkManager.r.UpdateBarrierTs(20, nil) @@ -252,7 +268,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.True(t, ok) require.Equal(t, tablepb.TableStatePrepared, state) - ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, true, nil) + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, true) require.NoError(t, err) require.True(t, ok) stats = p.sinkManager.r.GetTableStats(span) @@ -261,7 +277,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Equal(t, model.Ts(20), stats.BarrierTs) // Start to replicate table-1. - ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, false, nil) + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, false) require.NoError(t, err) require.True(t, ok) @@ -279,10 +295,108 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Nil(t, p.agent) } +func TestTableExecutorAddingTableIndirectlyWithRedoEnabled(t *testing.T) { + ctx := cdcContext.NewBackendContext4Test(true) + liveness := model.LivenessCaptureAlive + p, tester := initProcessor4Test(ctx, t, &liveness, true) + + // init tick + err := p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + p.changefeed.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + status.CheckpointTs = 20 + return status, true, nil + }) + tester.MustApplyPatches() + + // no operation + err = p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + + // table-1: `preparing` -> `prepared` -> `replicating` + span := spanz.TableIDToComparableSpan(1) + ok, err := p.AddTableSpan(ctx, span, tablepb.Checkpoint{CheckpointTs: 20}, true) + require.NoError(t, err) + require.True(t, ok) + p.sinkManager.r.UpdateBarrierTs(20, nil) + stats := p.sinkManager.r.GetTableStats(span) + require.Equal(t, model.Ts(20), stats.CheckpointTs) + require.Equal(t, model.Ts(20), stats.ResolvedTs) + require.Equal(t, model.Ts(20), stats.BarrierTs) + require.Len(t, p.sinkManager.r.GetAllCurrentTableSpans(), 1) + require.Equal(t, 1, p.sinkManager.r.GetAllCurrentTableSpansCount()) + + done := p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), true) + require.False(t, done) + state, ok := p.sinkManager.r.GetTableState(span) + require.True(t, ok) + require.Equal(t, tablepb.TableStatePreparing, state) + + // Push the resolved ts, mock that sorterNode receive first resolved event. + p.sourceManager.r.Add( + span, + []*model.PolymorphicEvent{{ + CRTs: 101, + RawKV: &model.RawKVEntry{ + OpType: model.OpTypeResolved, + CRTs: 101, + }, + }}..., + ) + + err = p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + + done = p.IsAddTableSpanFinished(span, true) + require.True(t, done) + state, ok = p.sinkManager.r.GetTableState(span) + require.True(t, ok) + require.Equal(t, tablepb.TableStatePrepared, state) + + // ignore duplicate add request + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, true) + require.NoError(t, err) + require.True(t, ok) + stats = p.sinkManager.r.GetTableStats(span) + require.Equal(t, model.Ts(20), stats.CheckpointTs) + require.Equal(t, model.Ts(20), stats.ResolvedTs) + require.Equal(t, model.Ts(20), stats.BarrierTs) + + p.sinkManager.r.UpdateBarrierTs(50, nil) + stats = p.sinkManager.r.GetTableStats(span) + require.Equal(t, model.Ts(20), stats.ResolvedTs) + require.Equal(t, model.Ts(50), stats.BarrierTs) + + // Start to replicate table-1. + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30, ResolvedTs: 60}, false) + require.NoError(t, err) + require.True(t, ok) + + stats = p.sinkManager.r.GetTableStats(span) + require.Equal(t, model.Ts(60), stats.ResolvedTs) + require.Equal(t, model.Ts(50), stats.BarrierTs) + + err = p.Tick(ctx) + require.Nil(t, err) + tester.MustApplyPatches() + + // table-1: `prepared` -> `replicating` + state, ok = p.sinkManager.r.GetTableState(span) + require.True(t, ok) + require.Equal(t, tablepb.TableStateReplicating, state) + + err = p.Close() + require.Nil(t, err) + require.Nil(t, p.agent) +} + func TestProcessorError(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) // init tick err := p.Tick(ctx) require.Nil(t, err) @@ -302,7 +416,7 @@ func TestProcessorError(t *testing.T) { }, }) - p, tester = initProcessor4Test(ctx, t, &liveness) + p, tester = initProcessor4Test(ctx, t, &liveness, false) // init tick err = p.Tick(ctx) require.Nil(t, err) @@ -321,7 +435,7 @@ func TestProcessorError(t *testing.T) { func TestProcessorExit(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) var err error // init tick err = p.Tick(ctx) @@ -345,7 +459,7 @@ func TestProcessorExit(t *testing.T) { func TestProcessorClose(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) // init tick err := p.Tick(ctx) require.Nil(t, err) @@ -357,10 +471,10 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // add tables - done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false, nil) + done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 20}, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 30, false, nil) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 30}, false) require.Nil(t, err) require.True(t, done) @@ -384,7 +498,7 @@ func TestProcessorClose(t *testing.T) { require.Nil(t, p.sourceManager.r) require.Nil(t, p.agent) - p, tester = initProcessor4Test(ctx, t, &liveness) + p, tester = initProcessor4Test(ctx, t, &liveness, false) // init tick err = p.Tick(ctx) require.Nil(t, err) @@ -396,10 +510,10 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // add tables - done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false, nil) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 20}, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 30, false, nil) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 30}, false) require.Nil(t, err) require.True(t, done) err = p.Tick(ctx) @@ -428,7 +542,7 @@ func TestProcessorClose(t *testing.T) { func TestPositionDeleted(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) // init tick err := p.Tick(ctx) require.Nil(t, err) @@ -441,10 +555,10 @@ func TestPositionDeleted(t *testing.T) { tester.MustApplyPatches() // add table - done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, false, nil) + done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 40, false, nil) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 40}, false) require.Nil(t, err) require.True(t, done) @@ -469,7 +583,7 @@ func TestPositionDeleted(t *testing.T) { func TestSchemaGC(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) var err error // init tick @@ -531,7 +645,7 @@ func TestIgnorableError(t *testing.T) { func TestUpdateBarrierTs(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) p.changefeed.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { status.CheckpointTs = 5 return status, true, nil @@ -550,7 +664,7 @@ func TestUpdateBarrierTs(t *testing.T) { tester.MustApplyPatches() span := spanz.TableIDToComparableSpan(1) - done, err := p.AddTableSpan(ctx, span, 5, false, nil) + done, err := p.AddTableSpan(ctx, span, tablepb.Checkpoint{CheckpointTs: 5}, false) require.True(t, done) require.Nil(t, err) err = p.Tick(ctx) @@ -584,7 +698,7 @@ func TestUpdateBarrierTs(t *testing.T) { func TestProcessorLiveness(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) // First tick for creating position. err := p.Tick(ctx) @@ -619,7 +733,7 @@ func TestProcessorDostNotStuckInInit(t *testing.T) { ctx := cdcContext.NewBackendContext4Test(true) liveness := model.LivenessCaptureAlive - p, tester := initProcessor4Test(ctx, t, &liveness) + p, tester := initProcessor4Test(ctx, t, &liveness, false) // First tick for creating position. err := p.Tick(ctx) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 194581b4579..8406e5e1c13 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -940,7 +940,8 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { zap.String("changefeed", m.changefeedID.ID), zap.Stringer("span", &span), zap.Uint64("resolvedTs", resolvedTs), - zap.Any("checkpointTs", checkpointTs)) + zap.Any("checkpointTs", checkpointTs), + zap.Uint64("barrierTs", tableSink.barrierTs.Load())) } return TableStats{ CheckpointTs: checkpointTs.ResolvedMark(), diff --git a/cdc/processor/sinkmanager/manager_test_helper.go b/cdc/processor/sinkmanager/manager_test_helper.go index c6624f7dfcb..7726dbd5b25 100644 --- a/cdc/processor/sinkmanager/manager_test_helper.go +++ b/cdc/processor/sinkmanager/manager_test_helper.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/sourcemanager" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory" + "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/pkg/upstream" pd "github.com/tikv/pd/client" ) @@ -82,12 +83,13 @@ func NewManagerWithMemEngine( t *testing.T, changefeedID model.ChangeFeedID, changefeedInfo *model.ChangeFeedInfo, + redoMgr redo.DMLManager, ) (*SinkManager, *sourcemanager.SourceManager, engine.SortEngine) { sortEngine := memory.New(context.Background()) up := upstream.NewUpstream4Test(&MockPD{}) mg := &entry.MockMountGroup{} schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64} sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false) - sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, nil, sourceManager) + sinkManager := New(changefeedID, changefeedInfo, up, schemaStorage, redoMgr, sourceManager) return sinkManager, sourceManager, sortEngine } diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index f350f45c63e..0fc95448916 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -173,10 +173,6 @@ func (s *statefulRts) getUnflushed() model.Ts { return atomic.LoadUint64(&s.unflushed) } -func (s *statefulRts) setFlushed(flushed model.Ts) { - atomic.StoreUint64(&s.flushed, flushed) -} - func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) (changed bool) { for { old := atomic.LoadUint64(&s.unflushed) @@ -190,6 +186,19 @@ func (s *statefulRts) checkAndSetUnflushed(unflushed model.Ts) (changed bool) { return true } +func (s *statefulRts) checkAndSetFlushed(flushed model.Ts) (changed bool) { + for { + old := atomic.LoadUint64(&s.flushed) + if old > flushed { + return false + } + if atomic.CompareAndSwapUint64(&s.flushed, old, flushed) { + break + } + } + return true +} + // logManager manages redo log writer, buffers un-persistent redo logs, calculates // redo log resolved ts. It implements DDLManager and DMLManager interface. type logManager struct { @@ -309,7 +318,13 @@ func (m *logManager) emitRedoEvents( // StartTable starts a table, which means the table is ready to emit redo events. // Note that this function should only be called once when adding a new table to processor. func (m *logManager) StartTable(span tablepb.Span, resolvedTs uint64) { + // advance unflushed resolved ts m.onResolvedTsMsg(span, resolvedTs) + + // advance flushed resolved ts + if value, loaded := m.rtsMap.Load(span); loaded { + value.(*statefulRts).checkAndSetFlushed(resolvedTs) + } } // UpdateResolvedTs asynchronously updates resolved ts of a single table. @@ -375,7 +390,13 @@ func (m *logManager) prepareForFlush() *spanz.HashMap[model.Ts] { func (m *logManager) postFlush(tableRtsMap *spanz.HashMap[model.Ts]) { tableRtsMap.Range(func(span tablepb.Span, flushed uint64) bool { if value, loaded := m.rtsMap.Load(span); loaded { - value.(*statefulRts).setFlushed(flushed) + changed := value.(*statefulRts).checkAndSetFlushed(flushed) + if !changed { + log.Debug("flush redo with regressed resolved ts", + zap.Stringer("span", &span), + zap.Uint64("flushed", flushed), + zap.Uint64("current", value.(*statefulRts).getFlushed())) + } } return true }) diff --git a/cdc/redo/meta_manager.go b/cdc/redo/meta_manager.go index e781596c17f..e0aefe30df8 100644 --- a/cdc/redo/meta_manager.go +++ b/cdc/redo/meta_manager.go @@ -370,8 +370,8 @@ func (m *metaManager) prepareForFlushMeta() (bool, common.LogMeta) { } func (m *metaManager) postFlushMeta(meta common.LogMeta) { - m.metaResolvedTs.setFlushed(meta.ResolvedTs) - m.metaCheckpointTs.setFlushed(meta.CheckpointTs) + m.metaResolvedTs.checkAndSetFlushed(meta.ResolvedTs) + m.metaCheckpointTs.checkAndSetFlushed(meta.CheckpointTs) } func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error { diff --git a/cdc/scheduler/internal/table_executor.go b/cdc/scheduler/internal/table_executor.go index 25bf23e9514..5b2e3c7560f 100644 --- a/cdc/scheduler/internal/table_executor.go +++ b/cdc/scheduler/internal/table_executor.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" ) // TableExecutor is an abstraction for "Processor". @@ -27,11 +26,11 @@ import ( // to adapt the current Processor implementation to it. // TODO find a way to make the semantics easier to understand. type TableExecutor interface { - // AddTableSpan add a new table span with `startTs` + // AddTableSpan add a new table span with `Checkpoint.CheckpointTs` // if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol. // if `isPrepare` is false, the 2nd phase. AddTableSpan( - ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, barrier *schedulepb.Barrier, + ctx context.Context, span tablepb.Span, checkpoint tablepb.Checkpoint, isPrepare bool, ) (done bool, err error) // IsAddTableSpanFinished make sure the requested table span is in the proper status diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index 9984b2bb5d1..e4dad8bb903 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -211,7 +211,7 @@ func (a *agent) Tick(ctx context.Context) (*schedulepb.Barrier, error) { return nil, errors.Trace(err) } - responses, err := a.tableM.poll(ctx, barrier) + responses, err := a.tableM.poll(ctx) if err != nil { return nil, errors.Trace(err) } @@ -331,12 +331,12 @@ const ( ) type dispatchTableTask struct { - Span tablepb.Span - StartTs model.Ts - IsRemove bool - IsPrepare bool - Epoch schedulepb.ProcessorEpoch - status dispatchTableTaskStatus + Span tablepb.Span + Checkpoint tablepb.Checkpoint + IsRemove bool + IsPrepare bool + Epoch schedulepb.ProcessorEpoch + status dispatchTableTaskStatus } func (a *agent) handleMessageDispatchTableRequest( @@ -364,12 +364,12 @@ func (a *agent) handleMessageDispatchTableRequest( case *schedulepb.DispatchTableRequest_AddTable: span := req.AddTable.GetSpan() task = &dispatchTableTask{ - Span: span, - StartTs: req.AddTable.GetCheckpoint().CheckpointTs, - IsRemove: false, - IsPrepare: req.AddTable.GetIsSecondary(), - Epoch: epoch, - status: dispatchTableTaskReceived, + Span: span, + Checkpoint: req.AddTable.GetCheckpoint(), + IsRemove: false, + IsPrepare: req.AddTable.GetIsSecondary(), + Epoch: epoch, + status: dispatchTableTaskReceived, } table = a.tableM.addTableSpan(span) case *schedulepb.DispatchTableRequest_RemoveTable: diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index 01df322ef34..bc08d491f0c 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -164,7 +164,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { // remove table not exist ctx := context.Background() a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) - responses, err := a.tableM.poll(ctx, &schedulepb.Barrier{}) + responses, err := a.tableM.poll(ctx) require.NoError(t, err) require.Len(t, responses, 0) @@ -183,7 +183,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { mockTableExecutor.On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(false, nil) a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) - responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{}) + responses, err = a.tableM.poll(ctx) require.NoError(t, err) require.Len(t, responses, 1) @@ -203,14 +203,14 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) - _, err = a.tableM.poll(ctx, &schedulepb.Barrier{}) + _, err = a.tableM.poll(ctx) require.NoError(t, err) mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) - responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{}) + responses, err = a.tableM.poll(ctx) require.NoError(t, err) require.Len(t, responses, 1) @@ -232,7 +232,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { mock.Anything, mock.Anything).Return(false, nil) a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) - responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{}) + responses, err = a.tableM.poll(ctx) require.NoError(t, err) require.Len(t, responses, 1) @@ -247,7 +247,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) - responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{}) + responses, err = a.tableM.poll(ctx) require.NoError(t, err) require.Len(t, responses, 1) @@ -262,7 +262,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { Return(false) // remove table in the replicating state failed, should still in replicating. a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) - responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{}) + responses, err = a.tableM.poll(ctx) require.NoError(t, err) require.Len(t, responses, 1) removeTableResponse, ok := responses[0].DispatchTableResponse. @@ -279,7 +279,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { Return(3, false) // remove table in the replicating state failed, should still in replicating. a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) - responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{}) + responses, err = a.tableM.poll(ctx) require.NoError(t, err) require.Len(t, responses, 1) removeTableResponse, ok = responses[0].DispatchTableResponse. @@ -293,7 +293,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { Return(3, true) // remove table in the replicating state success, should in stopped a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) - responses, err = a.tableM.poll(ctx, &schedulepb.Barrier{}) + responses, err = a.tableM.poll(ctx) require.NoError(t, err) require.Len(t, responses, 1) removeTableResponse, ok = responses[0].DispatchTableResponse. @@ -1069,8 +1069,9 @@ func newMockTableExecutor() *MockTableExecutor { // AddTableSpan adds a table span to the executor. func (e *MockTableExecutor) AddTableSpan( - ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, _ *schedulepb.Barrier, + ctx context.Context, span tablepb.Span, checkpoint tablepb.Checkpoint, isPrepare bool, ) (bool, error) { + startTs := checkpoint.CheckpointTs log.Info("AddTableSpan", zap.String("span", span.String()), zap.Any("startTs", startTs), diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index d198f7a227f..be1f7ec929b 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -169,15 +169,13 @@ func (t *tableSpan) handleRemoveTableTask() *schedulepb.Message { return nil } -func (t *tableSpan) handleAddTableTask( - ctx context.Context, barrier *schedulepb.Barrier, -) (result *schedulepb.Message, err error) { +func (t *tableSpan) handleAddTableTask(ctx context.Context) (result *schedulepb.Message, err error) { state, _ := t.getAndUpdateTableSpanState() changed := true for changed { switch state { case tablepb.TableStateAbsent: - done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.StartTs, t.task.IsPrepare, barrier) + done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.Checkpoint, t.task.IsPrepare) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), @@ -208,7 +206,7 @@ func (t *tableSpan) handleAddTableTask( } if t.task.status == dispatchTableTaskReceived { - done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.StartTs, false, barrier) + done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.Checkpoint, false) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), @@ -283,14 +281,14 @@ func (t *tableSpan) injectDispatchTableTask(task *dispatchTableTask) { zap.Any("ignoredTask", task)) } -func (t *tableSpan) poll(ctx context.Context, barrier *schedulepb.Barrier) (*schedulepb.Message, error) { +func (t *tableSpan) poll(ctx context.Context) (*schedulepb.Message, error) { if t.task == nil { return nil, nil } if t.task.IsRemove { return t.handleRemoveTableTask(), nil } - return t.handleAddTableTask(ctx, barrier) + return t.handleAddTableTask(ctx) } type tableSpanManager struct { @@ -310,12 +308,12 @@ func newTableSpanManager( } } -func (tm *tableSpanManager) poll(ctx context.Context, barrier *schedulepb.Barrier) ([]*schedulepb.Message, error) { +func (tm *tableSpanManager) poll(ctx context.Context) ([]*schedulepb.Message, error) { result := make([]*schedulepb.Message, 0) var err error toBeDropped := []tablepb.Span{} tm.tables.Ascend(func(span tablepb.Span, table *tableSpan) bool { - message, err1 := table.poll(ctx, barrier) + message, err1 := table.poll(ctx) if err != nil { err = errors.Trace(err1) return false