From 0c2904081451619ac1f65d3039583c2f336b736b Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Thu, 26 Oct 2023 16:06:34 +0800 Subject: [PATCH] scheduler(ticdc): revert 3b8d55 and do not return error when resolvedTs less than checkpoint (#9953) ref pingcap/tiflow#9830, ref pingcap/tiflow#9926 --- cdc/owner/changefeed.go | 6 +- cdc/processor/processor.go | 5 +- cdc/processor/sinkmanager/manager.go | 8 +- cdc/processor/sinkmanager/manager_test.go | 12 +- cdc/scheduler/internal/v3/agent/main_test.go | 24 ---- cdc/scheduler/internal/v3/agent/table.go | 4 +- cdc/scheduler/internal/v3/compat/main_test.go | 24 ---- cdc/scheduler/internal/v3/coordinator.go | 2 +- .../internal/v3/coordinator_bench_test.go | 2 +- .../internal/v3/keyspan/main_test.go | 24 ---- cdc/scheduler/internal/v3/member/main_test.go | 24 ---- .../internal/v3/replication/main_test.go | 24 ---- .../v3/replication/replication_manager.go | 19 ++- .../replication/replication_manager_test.go | 111 ++++-------------- .../v3/replication/replication_set.go | 41 +++---- .../v3/replication/replication_set_test.go | 53 ++++++--- .../internal/v3/scheduler/main_test.go | 24 ---- .../internal/v3/scheduler/scheduler.go | 2 +- .../v3/scheduler/scheduler_balance.go | 2 +- .../v3/scheduler/scheduler_balance_test.go | 15 +-- .../internal/v3/scheduler/scheduler_basic.go | 12 +- .../v3/scheduler/scheduler_basic_test.go | 25 ++-- .../v3/scheduler/scheduler_drain_capture.go | 2 +- .../scheduler/scheduler_drain_capture_test.go | 32 ++--- .../v3/scheduler/scheduler_manager.go | 18 +-- .../v3/scheduler/scheduler_manager_test.go | 11 +- .../v3/scheduler/scheduler_move_table.go | 2 +- .../v3/scheduler/scheduler_move_table_test.go | 18 +-- .../v3/scheduler/scheduler_rebalance.go | 2 +- .../v3/scheduler/scheduler_rebalance_test.go | 18 +-- .../internal/v3/transport/main_test.go | 24 ---- 31 files changed, 170 insertions(+), 420 deletions(-) delete mode 100644 cdc/scheduler/internal/v3/agent/main_test.go delete mode 100644 cdc/scheduler/internal/v3/compat/main_test.go delete mode 100644 cdc/scheduler/internal/v3/keyspan/main_test.go delete mode 100644 cdc/scheduler/internal/v3/member/main_test.go delete mode 100644 cdc/scheduler/internal/v3/replication/main_test.go delete mode 100644 cdc/scheduler/internal/v3/scheduler/main_test.go delete mode 100644 cdc/scheduler/internal/v3/transport/main_test.go diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 2370380ebdb..d7e5e18af1a 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -481,7 +481,9 @@ LOOP2: } checkpointTs := c.latestStatus.CheckpointTs - c.resolvedTs = checkpointTs + if c.resolvedTs == 0 { + c.resolvedTs = checkpointTs + } minTableBarrierTs := c.latestStatus.MinTableBarrierTs failpoint.Inject("NewChangefeedNoRetryError", func() { @@ -630,7 +632,6 @@ LOOP2: return err } if c.redoMetaMgr.Enabled() { - c.resolvedTs = c.redoMetaMgr.GetFlushedMeta().ResolvedTs c.wg.Add(1) go func() { defer c.wg.Done() @@ -748,7 +749,6 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { c.barriers = nil c.initialized = false c.isReleased = true - c.resolvedTs = 0 log.Info("changefeed closed", zap.String("namespace", c.id.Namespace), diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 5a148240f80..3c279758881 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -166,13 +166,12 @@ func (p *processor) AddTableSpan( // table is `prepared`, and a `isPrepare = false` request indicate that old table should // be stopped on original capture already, it's safe to start replicating data now. if !isPrepare { - redoStartTs := checkpoint.ResolvedTs if p.redo.r.Enabled() { // ResolvedTs is store in external storage when redo log is enabled, so we need to // start table with ResolvedTs in redoDMLManager. - p.redo.r.StartTable(span, redoStartTs) + p.redo.r.StartTable(span, checkpoint.ResolvedTs) } - if err := p.sinkManager.r.StartTable(span, startTs, redoStartTs); err != nil { + if err := p.sinkManager.r.StartTable(span, startTs); err != nil { return false, errors.Trace(err) } } diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 76cf1685479..74c5513888c 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -821,11 +821,7 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod } // StartTable sets the table(TableSink) state to replicating. -func (m *SinkManager) StartTable( - span tablepb.Span, - startTs model.Ts, - redoStartTs model.Ts, -) error { +func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error { log.Info("Start table sink", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), @@ -852,7 +848,7 @@ func (m *SinkManager) StartTable( if m.redoDMLMgr != nil { m.redoProgressHeap.push(&progress{ span: span, - nextLowerBoundPos: engine.Position{StartTs: 0, CommitTs: redoStartTs + 1}, + nextLowerBoundPos: engine.Position{StartTs: 0, CommitTs: startTs + 1}, version: tableSink.(*tableSinkWrapper).version, }) } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 16806580c02..b70e17fc3ab 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -117,7 +117,7 @@ func TestAddTable(t *testing.T) { require.True(t, ok) require.NotNil(t, tableSink) require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap") - err := manager.StartTable(span, 1, 1) + err := manager.StartTable(span, 1) require.NoError(t, err) require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs) @@ -144,7 +144,7 @@ func TestRemoveTable(t *testing.T) { tableSink, ok := manager.tableSinks.Load(span) require.True(t, ok) require.NotNil(t, tableSink) - err := manager.StartTable(span, 0, 0) + err := manager.StartTable(span, 0) require.NoError(t, err) addTableAndAddEventsToSortEngine(t, e, span) manager.UpdateBarrierTs(4, nil) @@ -191,7 +191,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) { manager.UpdateBarrierTs(4, nil) manager.UpdateReceivedSorterResolvedTs(span, 5) manager.schemaStorage.AdvanceResolvedTs(5) - err := manager.StartTable(span, 0, 0) + err := manager.StartTable(span, 0) require.NoError(t, err) require.Eventually(t, func() bool { @@ -222,7 +222,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) { manager.UpdateBarrierTs(4, nil) manager.UpdateReceivedSorterResolvedTs(span, 3) manager.schemaStorage.AdvanceResolvedTs(4) - err := manager.StartTable(span, 0, 0) + err := manager.StartTable(span, 0) require.NoError(t, err) require.Eventually(t, func() bool { @@ -252,7 +252,7 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) { manager.UpdateBarrierTs(4, nil) manager.UpdateReceivedSorterResolvedTs(span, 5) manager.schemaStorage.AdvanceResolvedTs(5) - err := manager.StartTable(span, 0, 0) + err := manager.StartTable(span, 0) require.NoError(t, err) require.Eventually(t, func() bool { @@ -341,7 +341,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) { source.AddTable(span, "test", 100) manager.AddTable(span, 100, math.MaxUint64) - manager.StartTable(span, 100, 0) + manager.StartTable(span, 100) source.Add(span, model.NewResolvedPolymorphicEvent(0, 101)) manager.UpdateReceivedSorterResolvedTs(span, 101) manager.UpdateBarrierTs(101, nil) diff --git a/cdc/scheduler/internal/v3/agent/main_test.go b/cdc/scheduler/internal/v3/agent/main_test.go deleted file mode 100644 index cb7775cac71..00000000000 --- a/cdc/scheduler/internal/v3/agent/main_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package agent - -import ( - "testing" - - "github.com/pingcap/tiflow/pkg/leakutil" -) - -func TestMain(m *testing.M) { - leakutil.SetUpLeakTest(m) -} diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index be1f7ec929b..506bef29a54 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -75,7 +75,7 @@ func (t *tableSpan) getTableSpanStatus(collectStat bool) tablepb.TableStatus { func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message { if status.Checkpoint.ResolvedTs < status.Checkpoint.CheckpointTs { - log.Panic("schedulerv3: resolved ts should not less than checkpoint ts", + log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.Any("tableStatus", status), zap.Any("checkpoint", status.Checkpoint.CheckpointTs), zap.Any("resolved", status.Checkpoint.ResolvedTs)) @@ -100,7 +100,7 @@ func newRemoveTableResponseMessage(status tablepb.TableStatus) *schedulepb.Messa // Advance resolved ts to checkpoint ts if table is removed. status.Checkpoint.ResolvedTs = status.Checkpoint.CheckpointTs } else { - log.Panic("schedulerv3: resolved ts should not less than checkpoint ts", + log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.Any("tableStatus", status), zap.Any("checkpoint", status.Checkpoint.CheckpointTs), zap.Any("resolved", status.Checkpoint.ResolvedTs)) diff --git a/cdc/scheduler/internal/v3/compat/main_test.go b/cdc/scheduler/internal/v3/compat/main_test.go deleted file mode 100644 index d8ad350ef4d..00000000000 --- a/cdc/scheduler/internal/v3/compat/main_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package compat - -import ( - "testing" - - "github.com/pingcap/tiflow/pkg/leakutil" -) - -func TestMain(m *testing.M) { - leakutil.SetUpLeakTest(m) -} diff --git a/cdc/scheduler/internal/v3/coordinator.go b/cdc/scheduler/internal/v3/coordinator.go index 2d536eb23d5..e27bdf49c97 100644 --- a/cdc/scheduler/internal/v3/coordinator.go +++ b/cdc/scheduler/internal/v3/coordinator.go @@ -334,7 +334,7 @@ func (c *coordinator) poll( currentSpans := c.reconciler.Reconcile( ctx, &c.tableRanges, replications, c.captureM.Captures, c.compat) allTasks := c.schedulerM.Schedule( - checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks, c.redoMetaManager) + checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks) // Handle generated schedule tasks. msgs, err = c.replicationM.HandleTasks(allTasks) diff --git a/cdc/scheduler/internal/v3/coordinator_bench_test.go b/cdc/scheduler/internal/v3/coordinator_bench_test.go index cdf3adc86d3..75094ba1d91 100644 --- a/cdc/scheduler/internal/v3/coordinator_bench_test.go +++ b/cdc/scheduler/internal/v3/coordinator_bench_test.go @@ -149,7 +149,7 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) { captureID := fmt.Sprint(i % captureCount) span := tablepb.Span{TableID: tableID} rep, err := replication.NewReplicationSet( - span, tablepb.Checkpoint{}, map[string]*tablepb.TableStatus{ + span, 0, map[string]*tablepb.TableStatus{ captureID: { Span: tablepb.Span{TableID: tableID}, State: tablepb.TableStateReplicating, diff --git a/cdc/scheduler/internal/v3/keyspan/main_test.go b/cdc/scheduler/internal/v3/keyspan/main_test.go deleted file mode 100644 index da122358394..00000000000 --- a/cdc/scheduler/internal/v3/keyspan/main_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package keyspan - -import ( - "testing" - - "github.com/pingcap/tiflow/pkg/leakutil" -) - -func TestMain(m *testing.M) { - leakutil.SetUpLeakTest(m) -} diff --git a/cdc/scheduler/internal/v3/member/main_test.go b/cdc/scheduler/internal/v3/member/main_test.go deleted file mode 100644 index 02b77a22186..00000000000 --- a/cdc/scheduler/internal/v3/member/main_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package member - -import ( - "testing" - - "github.com/pingcap/tiflow/pkg/leakutil" -) - -func TestMain(m *testing.M) { - leakutil.SetUpLeakTest(m) -} diff --git a/cdc/scheduler/internal/v3/replication/main_test.go b/cdc/scheduler/internal/v3/replication/main_test.go deleted file mode 100644 index cefa385ed62..00000000000 --- a/cdc/scheduler/internal/v3/replication/main_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package replication - -import ( - "testing" - - "github.com/pingcap/tiflow/pkg/leakutil" -) - -func TestMain(m *testing.M) { - leakutil.SetUpLeakTest(m) -} diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 416aaf4b29d..0ec7c7a671b 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -78,14 +78,14 @@ func (t MoveTable) String() string { // AddTable is a schedule task for adding a table. type AddTable struct { - Span tablepb.Span - CaptureID model.CaptureID - Checkpoint tablepb.Checkpoint + Span tablepb.Span + CaptureID model.CaptureID + CheckpointTs model.Ts } func (t AddTable) String() string { - return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d, resolvedTs: %d", - t.Span.String(), t.CaptureID, t.Checkpoint.CheckpointTs, t.Checkpoint.ResolvedTs) + return fmt.Sprintf("AddTable, span: %s, capture: %s, checkpointTs: %d", + t.Span.String(), t.CaptureID, t.CheckpointTs) } // RemoveTable is a schedule task for removing a table. @@ -200,12 +200,7 @@ func (r *Manager) HandleCaptureChanges( } var err error spanStatusMap.Ascend(func(span tablepb.Span, status map[string]*tablepb.TableStatus) bool { - checkpoint := tablepb.Checkpoint{ - CheckpointTs: checkpointTs, - // Note that the real resolved ts is stored in the status. - ResolvedTs: checkpointTs, - } - table, err1 := NewReplicationSet(span, checkpoint, status, r.changefeedID) + table, err1 := NewReplicationSet(span, checkpointTs, status, r.changefeedID) if err1 != nil { err = errors.Trace(err1) return false @@ -442,7 +437,7 @@ func (r *Manager) handleAddTableTask( var err error table, ok := r.spans.Get(task.Span) if !ok { - table, err = NewReplicationSet(task.Span, task.Checkpoint, nil, r.changefeedID) + table, err = NewReplicationSet(task.Span, task.CheckpointTs, nil, r.changefeedID) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager_test.go b/cdc/scheduler/internal/v3/replication/replication_manager_test.go index 6cbfab08396..0584e62972c 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager_test.go @@ -35,12 +35,7 @@ func TestReplicationManagerHandleAddTableTask(t *testing.T) { // Absent -> Prepare msgs, err := r.HandleTasks([]*ScheduleTask{{ AddTable: &AddTable{ - Span: spanz.TableIDToComparableSpan(1), - CaptureID: "1", - Checkpoint: tablepb.Checkpoint{ - CheckpointTs: 1, - ResolvedTs: 1, - }, + Span: spanz.TableIDToComparableSpan(1), CaptureID: "1", CheckpointTs: 1, }, Accept: func() { addTableCh <- 1 @@ -155,15 +150,9 @@ func TestReplicationManagerRemoveTable(t *testing.T) { // Add the table. span := spanz.TableIDToComparableSpan(1) - tbl, err := NewReplicationSet(span, - tablepb.Checkpoint{ - CheckpointTs: 0, - ResolvedTs: 0, - }, - map[string]*tablepb.TableStatus{ - "1": {Span: span, State: tablepb.TableStateReplicating}, - }, - model.ChangeFeedID{}) + tbl, err := NewReplicationSet(span, 0, map[string]*tablepb.TableStatus{ + "1": {Span: span, State: tablepb.TableStateReplicating}, + }, model.ChangeFeedID{}) require.Nil(t, err) require.Equal(t, ReplicationSetStateReplicating, tbl.State) r.spans.ReplaceOrInsert(spanz.TableIDToComparableSpan(1), tbl) @@ -257,15 +246,9 @@ func TestReplicationManagerMoveTable(t *testing.T) { // Add the table. span := spanz.TableIDToComparableSpan(1) - tbl, err := NewReplicationSet(span, - tablepb.Checkpoint{ - CheckpointTs: 0, - ResolvedTs: 0, - }, - map[string]*tablepb.TableStatus{ - source: {Span: span, State: tablepb.TableStateReplicating}, - }, - model.ChangeFeedID{}) + tbl, err := NewReplicationSet(span, 0, map[string]*tablepb.TableStatus{ + source: {Span: span, State: tablepb.TableStateReplicating}, + }, model.ChangeFeedID{}) require.Nil(t, err) require.Equal(t, ReplicationSetStateReplicating, tbl.State) r.spans.ReplaceOrInsert(spanz.TableIDToComparableSpan(1), tbl) @@ -394,23 +377,19 @@ func TestReplicationManagerBurstBalance(t *testing.T) { r := NewReplicationManager(1, model.ChangeFeedID{}) balanceTableCh := make(chan int, 1) - checkpoint := tablepb.Checkpoint{ - CheckpointTs: 1, - ResolvedTs: 1, - } // Burst balance is not limited by maxTaskConcurrency. msgs, err := r.HandleTasks([]*ScheduleTask{{ AddTable: &AddTable{ - Span: spanz.TableIDToComparableSpan(1), CaptureID: "0", Checkpoint: checkpoint, + Span: spanz.TableIDToComparableSpan(1), CaptureID: "0", CheckpointTs: 1, }, }, { BurstBalance: &BurstBalance{ AddTables: []AddTable{{ - Span: spanz.TableIDToComparableSpan(1), CaptureID: "1", Checkpoint: checkpoint, + Span: spanz.TableIDToComparableSpan(1), CaptureID: "1", CheckpointTs: 1, }, { - Span: spanz.TableIDToComparableSpan(2), CaptureID: "2", Checkpoint: checkpoint, + Span: spanz.TableIDToComparableSpan(2), CaptureID: "2", CheckpointTs: 1, }, { - Span: spanz.TableIDToComparableSpan(3), CaptureID: "3", Checkpoint: checkpoint, + Span: spanz.TableIDToComparableSpan(3), CaptureID: "3", CheckpointTs: 1, }}, }, Accept: func() { @@ -445,25 +424,19 @@ func TestReplicationManagerBurstBalance(t *testing.T) { // Add a new table. span := spanz.TableIDToComparableSpan(5) - table5, err := NewReplicationSet(span, - tablepb.Checkpoint{}, - map[string]*tablepb.TableStatus{ - "5": {Span: span, State: tablepb.TableStateReplicating}, - }, model.ChangeFeedID{}) + table5, err := NewReplicationSet(span, 0, map[string]*tablepb.TableStatus{ + "5": {Span: span, State: tablepb.TableStateReplicating}, + }, model.ChangeFeedID{}) require.Nil(t, err) r.spans.ReplaceOrInsert(span, table5) - checkpoint = tablepb.Checkpoint{ - CheckpointTs: 2, - ResolvedTs: 2, - } // More burst balance is still allowed. msgs, err = r.HandleTasks([]*ScheduleTask{{ BurstBalance: &BurstBalance{ AddTables: []AddTable{{ - Span: spanz.TableIDToComparableSpan(4), CaptureID: "4", Checkpoint: checkpoint, + Span: spanz.TableIDToComparableSpan(4), CaptureID: "4", CheckpointTs: 2, }, { - Span: spanz.TableIDToComparableSpan(1), CaptureID: "0", Checkpoint: checkpoint, + Span: spanz.TableIDToComparableSpan(1), CaptureID: "0", CheckpointTs: 2, }}, RemoveTables: []RemoveTable{{ Span: spanz.TableIDToComparableSpan(5), CaptureID: "5", @@ -516,13 +489,13 @@ func TestReplicationManagerBurstBalanceMoveTables(t *testing.T) { var err error // Two tables in "1". span := spanz.TableIDToComparableSpan(1) - table, err := NewReplicationSet(span, tablepb.Checkpoint{}, map[string]*tablepb.TableStatus{ + table, err := NewReplicationSet(span, 0, map[string]*tablepb.TableStatus{ "1": {Span: span, State: tablepb.TableStateReplicating}, }, model.ChangeFeedID{}) require.Nil(t, err) r.spans.ReplaceOrInsert(span, table) span2 := spanz.TableIDToComparableSpan(2) - table2, err := NewReplicationSet(span2, tablepb.Checkpoint{}, map[string]*tablepb.TableStatus{ + table2, err := NewReplicationSet(span2, 0, map[string]*tablepb.TableStatus{ "1": { Span: span2, State: tablepb.TableStateReplicating, Checkpoint: tablepb.Checkpoint{CheckpointTs: 1, ResolvedTs: 1}, @@ -635,11 +608,7 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { t.Parallel() r := NewReplicationManager(1, model.ChangeFeedID{}) span := spanz.TableIDToComparableSpan(1) - rs, err := NewReplicationSet(span, - tablepb.Checkpoint{ - CheckpointTs: 10, - ResolvedTs: 10, - }, + rs, err := NewReplicationSet(span, model.Ts(10), map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: spanz.TableIDToComparableSpan(1), @@ -654,11 +623,7 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { r.spans.ReplaceOrInsert(span, rs) span2 := spanz.TableIDToComparableSpan(2) - rs, err = NewReplicationSet(span2, - tablepb.Checkpoint{ - CheckpointTs: 15, - ResolvedTs: 15, - }, + rs, err = NewReplicationSet(span2, model.Ts(15), map[model.CaptureID]*tablepb.TableStatus{ "2": { Span: spanz.TableIDToComparableSpan(2), @@ -694,11 +659,7 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { require.Equal(t, checkpointCannotProceed, resolved) span3 := spanz.TableIDToComparableSpan(3) - rs, err = NewReplicationSet(span3, - tablepb.Checkpoint{ - CheckpointTs: 5, - ResolvedTs: 5, - }, + rs, err = NewReplicationSet(span3, model.Ts(5), map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: spanz.TableIDToComparableSpan(3), @@ -725,11 +686,7 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { currentTables.UpdateTables([]model.TableID{1, 2, 3, 4}) span4 := spanz.TableIDToComparableSpan(4) - rs, err = NewReplicationSet(span4, - tablepb.Checkpoint{ - CheckpointTs: 3, - ResolvedTs: 3, - }, + rs, err = NewReplicationSet(span4, model.Ts(3), map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: spanz.TableIDToComparableSpan(4), @@ -753,11 +710,7 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { span5_2 := spanz.TableIDToComparableSpan(5) span5_2.StartKey = append(span5_2.StartKey, 0) for _, span := range []tablepb.Span{span5_1, span5_2} { - rs, err = NewReplicationSet(span, - tablepb.Checkpoint{ - CheckpointTs: 3, - ResolvedTs: 3, - }, + rs, err = NewReplicationSet(span, model.Ts(3), map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: span, @@ -791,11 +744,7 @@ func TestReplicationManagerAdvanceCheckpoint(t *testing.T) { // redo is enabled currentTables.UpdateTables([]model.TableID{4}) spanRedo := spanz.TableIDToComparableSpan(4) - rs, err = NewReplicationSet(spanRedo, - tablepb.Checkpoint{ - CheckpointTs: 3, - ResolvedTs: 3, - }, + rs, err = NewReplicationSet(spanRedo, model.Ts(3), map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: spanz.TableIDToComparableSpan(4), @@ -821,11 +770,7 @@ func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) { t.Parallel() r := NewReplicationManager(1, model.ChangeFeedID{}) span := spanz.TableIDToComparableSpan(1) - rs, err := NewReplicationSet(span, - tablepb.Checkpoint{ - CheckpointTs: 10, - ResolvedTs: 10, - }, + rs, err := NewReplicationSet(span, model.Ts(10), map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: spanz.TableIDToComparableSpan(1), @@ -840,11 +785,7 @@ func TestReplicationManagerAdvanceCheckpointWithRedoEnabled(t *testing.T) { r.spans.ReplaceOrInsert(span, rs) span2 := spanz.TableIDToComparableSpan(2) - rs, err = NewReplicationSet(span2, - tablepb.Checkpoint{ - CheckpointTs: 15, - ResolvedTs: 15, - }, + rs, err = NewReplicationSet(span2, model.Ts(15), map[model.CaptureID]*tablepb.TableStatus{ "2": { Span: spanz.TableIDToComparableSpan(2), diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index f62752f27f2..fec432499a8 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -141,7 +141,7 @@ type ReplicationSet struct { //nolint:revive // NewReplicationSet returns a new replication set. func NewReplicationSet( span tablepb.Span, - checkpoint tablepb.Checkpoint, + checkpoint model.Ts, tableStatus map[model.CaptureID]*tablepb.TableStatus, changefeed model.ChangeFeedID, ) (*ReplicationSet, error) { @@ -149,7 +149,10 @@ func NewReplicationSet( Changefeed: changefeed, Span: span, Captures: make(map[string]Role), - Checkpoint: checkpoint, + Checkpoint: tablepb.Checkpoint{ + CheckpointTs: checkpoint, + ResolvedTs: checkpoint, + }, } // Count of captures that is in Stopping states. stoppingCount := 0 @@ -159,9 +162,7 @@ func NewReplicationSet( return nil, r.inconsistentError(table, captureID, "schedulerv3: table id inconsistent") } - if err := r.updateCheckpointAndStats(table.Checkpoint, table.Stats); err != nil { - return nil, errors.Trace(err) - } + r.updateCheckpointAndStats(table.Checkpoint, table.Stats) switch table.State { case tablepb.TableStateReplicating: @@ -497,8 +498,8 @@ func (r *ReplicationSet) pollOnPrepare( } case tablepb.TableStateReplicating: if r.Primary == captureID { - err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, err + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, nil } case tablepb.TableStateStopping, tablepb.TableStateStopped: if r.Primary == captureID { @@ -611,9 +612,7 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateStopped, tablepb.TableStateAbsent: if r.Primary == captureID { - if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { - return nil, false, errors.Trace(err) - } + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) original := r.Primary r.clearPrimary() if !r.hasRole(RoleSecondary) { @@ -685,9 +684,7 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateReplicating: if r.Primary == captureID { - if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { - return nil, false, errors.Trace(err) - } + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) if r.hasRole(RoleSecondary) { // Original primary is not stopped, ask for stopping. return &schedulepb.Message{ @@ -722,8 +719,8 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateStopping: if r.Primary == captureID && r.hasRole(RoleSecondary) { - err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, err + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, nil } else if r.isInRole(captureID, RoleUndetermined) { log.Info("schedulerv3: capture is stopping during Commit", zap.String("namespace", r.Changefeed.Namespace), @@ -752,8 +749,8 @@ func (r *ReplicationSet) pollOnReplicating( switch input.State { case tablepb.TableStateReplicating: if r.Primary == captureID { - err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, err + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, nil } return nil, false, r.multiplePrimaryError( input, captureID, "schedulerv3: multiple primary") @@ -764,10 +761,7 @@ func (r *ReplicationSet) pollOnReplicating( case tablepb.TableStateStopping: case tablepb.TableStateStopped: if r.Primary == captureID { - if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { - return nil, false, errors.Trace(err) - } - + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) // Primary is stopped, but we still has secondary. // Clear primary and promote secondary when it's prepared. log.Info("schedulerv3: primary is stopped during Replicating", @@ -997,7 +991,7 @@ func (r *ReplicationSet) handleCaptureShutdown( func (r *ReplicationSet) updateCheckpointAndStats( checkpoint tablepb.Checkpoint, stats tablepb.Stats, -) error { +) { if checkpoint.ResolvedTs < checkpoint.CheckpointTs { log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.String("namespace", r.Changefeed.Namespace), @@ -1025,11 +1019,8 @@ func (r *ReplicationSet) updateCheckpointAndStats( zap.Any("replicationSet", r), zap.Any("checkpointTs", r.Checkpoint.CheckpointTs), zap.Any("resolvedTs", r.Checkpoint.ResolvedTs)) - return errors.ErrInvalidCheckpointTs.GenWithStackByArgs(r.Checkpoint.CheckpointTs, - r.Checkpoint.ResolvedTs) } r.Stats = stats - return nil } // SetHeap is a max-heap, it implements heap.Interface. diff --git a/cdc/scheduler/internal/v3/replication/replication_set_test.go b/cdc/scheduler/internal/v3/replication/replication_set_test.go index 1da88a55753..da417438bc0 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set_test.go +++ b/cdc/scheduler/internal/v3/replication/replication_set_test.go @@ -220,10 +220,7 @@ func TestNewReplicationSet(t *testing.T) { for id, tc := range testcases { set := tc.set status := tc.tableStatus - checkpoint := tablepb.Checkpoint{ - CheckpointTs: tc.checkpoint, - ResolvedTs: tc.checkpoint, - } + checkpoint := tc.checkpoint span := tablepb.Span{TableID: 0} output, err := NewReplicationSet(span, checkpoint, status, model.ChangeFeedID{}) @@ -271,7 +268,7 @@ func TestReplicationSetPoll(t *testing.T) { } } span := tablepb.Span{TableID: 1} - r, _ := NewReplicationSet(span, tablepb.Checkpoint{}, status, model.ChangeFeedID{}) + r, _ := NewReplicationSet(span, 0, status, model.ChangeFeedID{}) var tableStates []int for state := range tablepb.TableState_name { tableStates = append(tableStates, int(state)) @@ -303,7 +300,7 @@ func TestReplicationSetPollUnknownCapture(t *testing.T) { tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, tablepb.Checkpoint{}, map[model.CaptureID]*tablepb.TableStatus{ + r, err := NewReplicationSet(span, 0, map[model.CaptureID]*tablepb.TableStatus{ "1": { Span: tablepb.Span{TableID: tableID}, State: tablepb.TableStateReplicating, @@ -340,7 +337,7 @@ func TestReplicationSetAddTable(t *testing.T) { from := "1" tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) require.Nil(t, err) // Absent -> Prepare @@ -485,7 +482,7 @@ func TestReplicationSetRemoveTable(t *testing.T) { from := "1" tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) require.Nil(t, err) // Ignore removing table if it's not in replicating. @@ -566,7 +563,7 @@ func TestReplicationSetMoveTable(t *testing.T) { tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) require.Nil(t, err) source := "1" @@ -798,7 +795,7 @@ func TestReplicationSetCaptureShutdown(t *testing.T) { from := "1" tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) require.Nil(t, err) // Add table, Absent -> Prepare @@ -1104,7 +1101,7 @@ func TestReplicationSetCaptureShutdownAfterReconstructCommitState(t *testing.T) from: {Span: tablepb.Span{TableID: tableID}, State: tablepb.TableStatePrepared}, } span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, tablepb.Checkpoint{}, tableStatus, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, 0, tableStatus, model.ChangeFeedID{}) require.Nil(t, err) require.Equal(t, ReplicationSetStateCommit, r.State) require.Equal(t, "", r.Primary) @@ -1125,7 +1122,7 @@ func TestReplicationSetMoveTableWithHeartbeatResponse(t *testing.T) { tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) require.Nil(t, err) source := "1" @@ -1213,7 +1210,7 @@ func TestReplicationSetMoveTableSameDestCapture(t *testing.T) { tableID := model.TableID(1) span := tablepb.Span{TableID: tableID} - r, err := NewReplicationSet(span, tablepb.Checkpoint{}, nil, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, 0, nil, model.ChangeFeedID{}) require.Nil(t, err) source := "1" @@ -1246,7 +1243,7 @@ func TestReplicationSetCommitRestart(t *testing.T) { }, } span := tablepb.Span{TableID: 0} - r, err := NewReplicationSet(span, tablepb.Checkpoint{}, tableStatus, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, 0, tableStatus, model.ChangeFeedID{}) require.Nil(t, err) require.Equal(t, ReplicationSetStateCommit, r.State) require.EqualValues(t, RoleSecondary, r.Captures["1"]) @@ -1329,7 +1326,7 @@ func TestReplicationSetRemoveRestart(t *testing.T) { }, } span := tablepb.Span{TableID: 0} - r, err := NewReplicationSet(span, tablepb.Checkpoint{}, tableStatus, model.ChangeFeedID{}) + r, err := NewReplicationSet(span, 0, tableStatus, model.ChangeFeedID{}) require.Nil(t, err) require.Equal(t, ReplicationSetStateRemoving, r.State) require.False(t, r.hasRole(RoleSecondary)) @@ -1458,3 +1455,29 @@ func TestReplicationSetHeap_MinK(t *testing.T) { require.Equal(t, expectedTables, tables) require.Equal(t, 0, h.Len()) } + +func TestUpdateCheckpointAndStats(t *testing.T) { + cases := []struct { + checkpoint tablepb.Checkpoint + stats tablepb.Stats + }{ + { + checkpoint: tablepb.Checkpoint{ + CheckpointTs: 1, + ResolvedTs: 2, + }, + stats: tablepb.Stats{}, + }, + { + checkpoint: tablepb.Checkpoint{ + CheckpointTs: 2, + ResolvedTs: 1, + }, + stats: tablepb.Stats{}, + }, + } + r := &ReplicationSet{} + for _, c := range cases { + r.updateCheckpointAndStats(c.checkpoint, c.stats) + } +} diff --git a/cdc/scheduler/internal/v3/scheduler/main_test.go b/cdc/scheduler/internal/v3/scheduler/main_test.go deleted file mode 100644 index 47da658bfe6..00000000000 --- a/cdc/scheduler/internal/v3/scheduler/main_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package scheduler - -import ( - "testing" - - "github.com/pingcap/tiflow/pkg/leakutil" -) - -func TestMain(m *testing.M) { - leakutil.SetUpLeakTest(m) -} diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler.go b/cdc/scheduler/internal/v3/scheduler/scheduler.go index fd117849bc3..96088ec52ff 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler.go @@ -24,7 +24,7 @@ import ( type scheduler interface { Name() string Schedule( - checkpoint tablepb.Checkpoint, + checkpointTs model.Ts, currentSpans []tablepb.Span, aliveCaptures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_balance.go b/cdc/scheduler/internal/v3/scheduler/scheduler_balance.go index 26dce7da249..21c642a9178 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_balance.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_balance.go @@ -56,7 +56,7 @@ func (b *balanceScheduler) Name() string { } func (b *balanceScheduler) Schedule( - _ tablepb.Checkpoint, + _ model.Ts, _ []tablepb.Span, captures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_balance_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_balance_test.go index a66b52b328e..a8efb07ad35 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_balance_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_balance_test.go @@ -18,7 +18,6 @@ import ( "time" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication" "github.com/pingcap/tiflow/pkg/spanz" @@ -28,7 +27,6 @@ import ( func TestSchedulerBalanceCaptureOnline(t *testing.T) { t.Parallel() - var checkpoint tablepb.Checkpoint sched := newBalanceScheduler(time.Duration(0), 3) sched.random = nil @@ -39,14 +37,14 @@ func TestSchedulerBalanceCaptureOnline(t *testing.T) { 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, 2: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks := sched.Schedule(checkpoint, currentTables, captures, replications) + tasks := sched.Schedule(0, currentTables, captures, replications) require.Len(t, tasks, 1) require.NotNil(t, tasks[0].MoveTable) require.Equal(t, tasks[0].MoveTable.Span.TableID, model.TableID(1)) // New capture "b" online, but this time has capture is stopping captures["a"].State = member.CaptureStateStopping - tasks = sched.Schedule(checkpoint, currentTables, captures, replications) + tasks = sched.Schedule(0, currentTables, captures, replications) require.Len(t, tasks, 0) // New capture "b" online, it keeps balancing, even though it has not pass @@ -58,7 +56,7 @@ func TestSchedulerBalanceCaptureOnline(t *testing.T) { 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, 2: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks = sched.Schedule(checkpoint, currentTables, captures, replications) + tasks = sched.Schedule(0, currentTables, captures, replications) require.Len(t, tasks, 1) // New capture "b" online, but this time it not pass check balance interval. @@ -70,14 +68,13 @@ func TestSchedulerBalanceCaptureOnline(t *testing.T) { 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, 2: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks = sched.Schedule(checkpoint, currentTables, captures, replications) + tasks = sched.Schedule(0, currentTables, captures, replications) require.Len(t, tasks, 0) } func TestSchedulerBalanceTaskLimit(t *testing.T) { t.Parallel() - var checkpoint tablepb.Checkpoint sched := newBalanceScheduler(time.Duration(0), 2) sched.random = nil @@ -90,10 +87,10 @@ func TestSchedulerBalanceTaskLimit(t *testing.T) { 3: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, 4: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks := sched.Schedule(checkpoint, currentTables, captures, replications) + tasks := sched.Schedule(0, currentTables, captures, replications) require.Len(t, tasks, 2) sched = newBalanceScheduler(time.Duration(0), 1) - tasks = sched.Schedule(checkpoint, currentTables, captures, replications) + tasks = sched.Schedule(0, currentTables, captures, replications) require.Len(t, tasks, 1) } diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go index bb6e613f9de..2efffdbda7a 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_basic.go @@ -49,7 +49,7 @@ func (b *basicScheduler) Name() string { } func (b *basicScheduler) Schedule( - checkpoint tablepb.Checkpoint, + checkpointTs model.Ts, currentSpans []tablepb.Span, captures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], @@ -102,7 +102,7 @@ func (b *basicScheduler) Schedule( return tasks } tasks = append( - tasks, newBurstAddTables(b.changefeedID, checkpoint, newSpans, captureIDs)) + tasks, newBurstAddTables(b.changefeedID, checkpointTs, newSpans, captureIDs)) } // Build remove table tasks. @@ -140,16 +140,16 @@ func (b *basicScheduler) Schedule( // newBurstAddTables add each new table to captures in a round-robin way. func newBurstAddTables( changefeedID model.ChangeFeedID, - checkpoint tablepb.Checkpoint, newSpans []tablepb.Span, captureIDs []model.CaptureID, + checkpointTs model.Ts, newSpans []tablepb.Span, captureIDs []model.CaptureID, ) *replication.ScheduleTask { idx := 0 tables := make([]replication.AddTable, 0, len(newSpans)) for _, span := range newSpans { targetCapture := captureIDs[idx] tables = append(tables, replication.AddTable{ - Span: span, - CaptureID: targetCapture, - Checkpoint: checkpoint, + Span: span, + CaptureID: targetCapture, + CheckpointTs: checkpointTs, }) log.Info("schedulerv3: burst add table", zap.String("namespace", changefeedID.Namespace), diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_basic_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_basic_test.go index ad8ecd63a3d..83d00ed36b0 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_basic_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_basic_test.go @@ -36,7 +36,6 @@ func mapToSpanMap[T any](in map[model.TableID]T) *spanz.BtreeMap[T] { func TestSchedulerBasic(t *testing.T) { t.Parallel() - var checkpoint tablepb.Checkpoint captures := map[model.CaptureID]*member.CaptureStatus{"a": {}, "b": {}} currentTables := spanz.ArrayToSpan([]model.TableID{1, 2, 3, 4}) @@ -47,7 +46,7 @@ func TestSchedulerBasic(t *testing.T) { // one capture stopping, another one is initialized captures["a"].State = member.CaptureStateStopping - tasks := b.Schedule(checkpoint, currentTables, captures, replications) + tasks := b.Schedule(0, currentTables, captures, replications) require.Len(t, tasks, 1) require.Len(t, tasks[0].BurstBalance.AddTables, 2) require.Equal(t, tasks[0].BurstBalance.AddTables[0].CaptureID, "b") @@ -55,12 +54,12 @@ func TestSchedulerBasic(t *testing.T) { // all capture's stopping, cannot add table captures["b"].State = member.CaptureStateStopping - tasks = b.Schedule(checkpoint, currentTables, captures, replications) + tasks = b.Schedule(0, currentTables, captures, replications) require.Len(t, tasks, 0) captures["a"].State = member.CaptureStateInitialized captures["b"].State = member.CaptureStateInitialized - tasks = b.Schedule(checkpoint, currentTables, captures, replications) + tasks = b.Schedule(0, currentTables, captures, replications) require.Len(t, tasks, 1) require.Len(t, tasks[0].BurstBalance.AddTables, 2) require.Equal(t, tasks[0].BurstBalance.AddTables[0].Span.TableID, model.TableID(1)) @@ -89,11 +88,10 @@ func TestSchedulerBasic(t *testing.T) { }, 4: {State: replication.ReplicationSetStateAbsent}, }) - checkpoint1 := tablepb.Checkpoint{CheckpointTs: 1, ResolvedTs: 1} - tasks = b.Schedule(checkpoint1, currentTables, captures, replications) + tasks = b.Schedule(1, currentTables, captures, replications) require.Len(t, tasks, 1) require.Equal(t, tasks[0].BurstBalance.AddTables[0].Span.TableID, model.TableID(4)) - require.Equal(t, tasks[0].BurstBalance.AddTables[0].Checkpoint, checkpoint1) + require.Equal(t, tasks[0].BurstBalance.AddTables[0].CheckpointTs, model.Ts(1)) // DDL CREATE/DROP/TRUNCATE TABLE. // AddTable 4, and RemoveTable 5. @@ -123,16 +121,15 @@ func TestSchedulerBasic(t *testing.T) { }, }, }) - checkpoint2 := tablepb.Checkpoint{CheckpointTs: 2, ResolvedTs: 2} - tasks = b.Schedule(checkpoint2, currentTables, captures, replications) + tasks = b.Schedule(2, currentTables, captures, replications) require.Len(t, tasks, 2) if tasks[0].BurstBalance.AddTables != nil { require.Equal(t, tasks[0].BurstBalance.AddTables[0].Span.TableID, model.TableID(4)) - require.Equal(t, tasks[0].BurstBalance.AddTables[0].Checkpoint, checkpoint2) + require.Equal(t, tasks[0].BurstBalance.AddTables[0].CheckpointTs, model.Ts(2)) require.Equal(t, tasks[1].BurstBalance.RemoveTables[0].Span.TableID, model.TableID(5)) } else { require.Equal(t, tasks[1].BurstBalance.AddTables[0].Span.TableID, model.TableID(4)) - require.Equal(t, tasks[0].BurstBalance.AddTables[0].Checkpoint, checkpoint2) + require.Equal(t, tasks[0].BurstBalance.AddTables[0].CheckpointTs, model.Ts(2)) require.Equal(t, tasks[0].BurstBalance.RemoveTables[0].Span.TableID, model.TableID(5)) } @@ -169,8 +166,7 @@ func TestSchedulerBasic(t *testing.T) { }, }, }) - checkpoint3 := tablepb.Checkpoint{CheckpointTs: 3, ResolvedTs: 3} - tasks = b.Schedule(checkpoint3, currentTables, captures, replications) + tasks = b.Schedule(3, currentTables, captures, replications) require.Len(t, tasks, 1) require.Equal(t, tasks[0].BurstBalance.RemoveTables[0].Span.TableID, model.TableID(5)) } @@ -197,13 +193,12 @@ func benchmarkSchedulerBalance( ), ) { size := 16384 - var checkpoint tablepb.Checkpoint for total := 1; total <= size; total *= 2 { name, currentTables, captures, replications, sched := factory(total) b.ResetTimer() b.Run(name, func(b *testing.B) { for i := 0; i < b.N; i++ { - sched.Schedule(checkpoint, currentTables, captures, replications) + sched.Schedule(0, currentTables, captures, replications) } }) b.StopTimer() diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go b/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go index a1995a58506..66de141b7a4 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture.go @@ -71,7 +71,7 @@ func (d *drainCaptureScheduler) setTarget(target model.CaptureID) bool { } func (d *drainCaptureScheduler) Schedule( - _ tablepb.Checkpoint, + _ model.Ts, _ []tablepb.Span, captures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture_test.go index 81af850e20a..03f1a99197e 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_drain_capture_test.go @@ -30,18 +30,18 @@ func TestDrainCapture(t *testing.T) { scheduler := newDrainCaptureScheduler(10, model.ChangeFeedID{}) require.Equal(t, "drain-capture-scheduler", scheduler.Name()) - var checkpoint tablepb.Checkpoint + var checkpointTs model.Ts captures := make(map[model.CaptureID]*member.CaptureStatus) currentTables := make([]tablepb.Span, 0) replications := mapToSpanMap(make(map[model.TableID]*replication.ReplicationSet)) - tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) ok := scheduler.setTarget("a") require.True(t, ok) - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) // the target capture has no table at the beginning, so reset the target require.Equal(t, captureIDNotDraining, scheduler.target) @@ -50,7 +50,7 @@ func TestDrainCapture(t *testing.T) { ok = scheduler.setTarget("b") require.True(t, ok) - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) // the target capture cannot be found in the latest captures require.Equal(t, captureIDNotDraining, scheduler.target) @@ -105,7 +105,7 @@ func TestDrainCapture(t *testing.T) { ok = scheduler.setTarget("a") require.True(t, ok) // not all table is replicating, skip this tick. - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Equal(t, "a", scheduler.target) require.Len(t, tasks, 0) @@ -118,13 +118,13 @@ func TestDrainCapture(t *testing.T) { 7: {State: replication.ReplicationSetStateReplicating, Primary: "b"}, }) - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Equal(t, "a", scheduler.target) require.Len(t, tasks, 3) scheduler = newDrainCaptureScheduler(1, model.ChangeFeedID{}) require.True(t, scheduler.setTarget("a")) - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Equal(t, "a", scheduler.target) require.Len(t, tasks, 1) } @@ -132,13 +132,13 @@ func TestDrainCapture(t *testing.T) { func TestDrainStoppingCapture(t *testing.T) { t.Parallel() - var checkpoint tablepb.Checkpoint + var checkpointTs model.Ts captures := make(map[model.CaptureID]*member.CaptureStatus) currentTables := make([]tablepb.Span, 0) replications := mapToSpanMap(make(map[model.TableID]*replication.ReplicationSet)) scheduler := newDrainCaptureScheduler(10, model.ChangeFeedID{}) - tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Empty(t, tasks) captures["a"] = &member.CaptureStatus{} @@ -147,7 +147,7 @@ func TestDrainStoppingCapture(t *testing.T) { 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, 2: {State: replication.ReplicationSetStateReplicating, Primary: "b"}, }) - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 1) require.EqualValues(t, 2, tasks[0].MoveTable.Span.TableID) require.EqualValues(t, "a", tasks[0].MoveTable.DestCapture) @@ -157,7 +157,7 @@ func TestDrainStoppingCapture(t *testing.T) { func TestDrainSkipOwner(t *testing.T) { t.Parallel() - var checkpoint tablepb.Checkpoint + var checkpointTs model.Ts currentTables := make([]tablepb.Span, 0) captures := map[model.CaptureID]*member.CaptureStatus{ "a": {}, @@ -168,7 +168,7 @@ func TestDrainSkipOwner(t *testing.T) { 2: {State: replication.ReplicationSetStateReplicating, Primary: "b"}, }) scheduler := newDrainCaptureScheduler(10, model.ChangeFeedID{}) - tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) require.EqualValues(t, captureIDNotDraining, scheduler.getTarget()) } @@ -176,7 +176,7 @@ func TestDrainSkipOwner(t *testing.T) { func TestDrainImbalanceCluster(t *testing.T) { t.Parallel() - var checkpoint tablepb.Checkpoint + var checkpointTs model.Ts currentTables := make([]tablepb.Span, 0) captures := map[model.CaptureID]*member.CaptureStatus{ "a": {State: member.CaptureStateInitialized}, @@ -188,7 +188,7 @@ func TestDrainImbalanceCluster(t *testing.T) { }) scheduler := newDrainCaptureScheduler(10, model.ChangeFeedID{}) scheduler.setTarget("a") - tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 2) require.EqualValues(t, "a", scheduler.getTarget()) } @@ -196,7 +196,7 @@ func TestDrainImbalanceCluster(t *testing.T) { func TestDrainEvenlyDistributedTables(t *testing.T) { t.Parallel() - var checkpoint tablepb.Checkpoint + var checkpointTs model.Ts currentTables := make([]tablepb.Span, 0) captures := map[model.CaptureID]*member.CaptureStatus{ "a": {State: member.CaptureStateInitialized}, @@ -211,7 +211,7 @@ func TestDrainEvenlyDistributedTables(t *testing.T) { }) scheduler := newDrainCaptureScheduler(10, model.ChangeFeedID{}) scheduler.setTarget("a") - tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 3) taskMap := make(map[model.CaptureID]int) for _, t := range tasks { diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go b/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go index 21ab13b3ef5..76817b344f6 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_manager.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication" "github.com/pingcap/tiflow/pkg/config" @@ -70,22 +69,7 @@ func (sm *Manager) Schedule( aliveCaptures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], runTasking *spanz.BtreeMap[*replication.ScheduleTask], - redoMetaManager redo.MetaManager, ) []*replication.ScheduleTask { - checkpoint := tablepb.Checkpoint{ - CheckpointTs: checkpointTs, - ResolvedTs: checkpointTs, - } - if redoMetaManager != nil && redoMetaManager.Enabled() { - flushedMeta := redoMetaManager.GetFlushedMeta() - if flushedMeta.ResolvedTs < checkpointTs { - log.Panic("schedulerv3: flushed resolved ts is less than checkpoint ts", - zap.Uint64("checkpointTs", checkpointTs), - zap.Any("flushedMeta", flushedMeta)) - } - checkpoint.ResolvedTs = flushedMeta.ResolvedTs - } - for sid, scheduler := range sm.schedulers { // Basic scheduler bypasses max task check, because it handles the most // critical scheduling, e.g. add table via CREATE TABLE DDL. @@ -96,7 +80,7 @@ func (sm *Manager) Schedule( return nil } } - tasks := scheduler.Schedule(checkpoint, currentSpans, aliveCaptures, replications) + tasks := scheduler.Schedule(checkpointTs, currentSpans, aliveCaptures, replications) for _, t := range tasks { name := struct { scheduler, task string diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_manager_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_manager_test.go index 613273b81c6..e7b3fb83a64 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_manager_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_manager_test.go @@ -18,7 +18,6 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" - "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication" "github.com/pingcap/tiflow/pkg/config" @@ -54,16 +53,14 @@ func TestSchedulerManagerScheduler(t *testing.T) { // schedulerPriorityBasic bypasses task check. replications := mapToSpanMap(map[model.TableID]*replication.ReplicationSet{}) runningTasks := mapToSpanMap(map[model.TableID]*replication.ScheduleTask{1: {}}) - - redoMetaManager := redo.NewDisabledMetaManager() - tasks := m.Schedule(0, currentSpans, captures, replications, runningTasks, redoMetaManager) + tasks := m.Schedule(0, currentSpans, captures, replications, runningTasks) require.Len(t, tasks, 1) // No more task. replications = mapToSpanMap(map[model.TableID]*replication.ReplicationSet{ 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks, redoMetaManager) + tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks) require.Len(t, tasks, 0) // Move table is dropped because of running tasks. @@ -71,7 +68,7 @@ func TestSchedulerManagerScheduler(t *testing.T) { replications = mapToSpanMap(map[model.TableID]*replication.ReplicationSet{ 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) - tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks, redoMetaManager) + tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks) require.Len(t, tasks, 0) // Move table can proceed after clean up tasks. @@ -80,6 +77,6 @@ func TestSchedulerManagerScheduler(t *testing.T) { 1: {State: replication.ReplicationSetStateReplicating, Primary: "a"}, }) runningTasks = spanz.NewBtreeMap[*replication.ScheduleTask]() - tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks, redoMetaManager) + tasks = m.Schedule(0, currentSpans, captures, replications, runningTasks) require.Len(t, tasks, 1) } diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_move_table.go b/cdc/scheduler/internal/v3/scheduler/scheduler_move_table.go index bf9ef491f57..10b44de77e5 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_move_table.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_move_table.go @@ -67,7 +67,7 @@ func (m *moveTableScheduler) addTask(span tablepb.Span, target model.CaptureID) } func (m *moveTableScheduler) Schedule( - _ tablepb.Checkpoint, + _ model.Ts, currentSpans []tablepb.Span, captures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_move_table_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_move_table_test.go index 3adc047bfbc..e485db9ebed 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_move_table_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_move_table_test.go @@ -27,7 +27,7 @@ import ( func TestSchedulerMoveTable(t *testing.T) { t.Parallel() - var checkpoint tablepb.Checkpoint + var checkpointTs model.Ts captures := map[model.CaptureID]*member.CaptureStatus{"a": { State: member.CaptureStateInitialized, }, "b": { @@ -43,38 +43,38 @@ func TestSchedulerMoveTable(t *testing.T) { require.Equal(t, "move-table-scheduler", scheduler.Name()) tasks := scheduler.Schedule( - checkpoint, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) + checkpointTs, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) require.Len(t, tasks, 0) scheduler.addTask(tablepb.Span{TableID: 0}, "a") tasks = scheduler.Schedule( - checkpoint, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) + checkpointTs, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) require.Len(t, tasks, 0) // move a not exist table scheduler.addTask(tablepb.Span{TableID: 0}, "a") - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) // move table to a not exist capture scheduler.addTask(tablepb.Span{TableID: 1}, "c") - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) // move table not replicating scheduler.addTask(tablepb.Span{TableID: 1}, "b") tasks = scheduler.Schedule( - checkpoint, currentTables, captures, spanz.NewBtreeMap[*replication.ReplicationSet]()) + checkpointTs, currentTables, captures, spanz.NewBtreeMap[*replication.ReplicationSet]()) require.Len(t, tasks, 0) scheduler.addTask(tablepb.Span{TableID: 1}, "b") replications.GetV(tablepb.Span{TableID: 1}).State = replication.ReplicationSetStatePrepare - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) scheduler.addTask(tablepb.Span{TableID: 1}, "b") replications.GetV(tablepb.Span{TableID: 1}).State = replication.ReplicationSetStateReplicating - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 1) require.Equal(t, model.TableID(1), tasks[0].MoveTable.Span.TableID) require.Equal(t, "b", tasks[0].MoveTable.DestCapture) @@ -83,7 +83,7 @@ func TestSchedulerMoveTable(t *testing.T) { // the target capture is stopping scheduler.addTask(tablepb.Span{TableID: 1}, "b") captures["b"].State = member.CaptureStateStopping - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) require.False(t, scheduler.tasks.Has(tablepb.Span{TableID: 1})) } diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance.go b/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance.go index 2bb4a4ecbda..ecc02d37c20 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance.go @@ -51,7 +51,7 @@ func (r *rebalanceScheduler) Name() string { } func (r *rebalanceScheduler) Schedule( - _ tablepb.Checkpoint, + _ model.Ts, currentSpans []tablepb.Span, captures map[model.CaptureID]*member.CaptureStatus, replications *spanz.BtreeMap[*replication.ReplicationSet], diff --git a/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance_test.go b/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance_test.go index c2b3ea9c878..e6b1ff53d83 100644 --- a/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance_test.go +++ b/cdc/scheduler/internal/v3/scheduler/scheduler_rebalance_test.go @@ -28,7 +28,7 @@ import ( func TestSchedulerRebalance(t *testing.T) { t.Parallel() - var checkpoint tablepb.Checkpoint + var checkpointTs model.Ts captures := map[model.CaptureID]*member.CaptureStatus{"a": {}, "b": {}} currentTables := spanz.ArrayToSpan([]model.TableID{1, 2, 3, 4}) @@ -57,22 +57,22 @@ func TestSchedulerRebalance(t *testing.T) { scheduler := newRebalanceScheduler(model.ChangeFeedID{}) require.Equal(t, "rebalance-scheduler", scheduler.Name()) // rebalance is not triggered - tasks := scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) atomic.StoreInt32(&scheduler.rebalance, 1) // no captures tasks = scheduler.Schedule( - checkpoint, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) + checkpointTs, currentTables, map[model.CaptureID]*member.CaptureStatus{}, replications) require.Len(t, tasks, 0) // table not in the replication set, tasks = scheduler.Schedule( - checkpoint, spanz.ArrayToSpan([]model.TableID{0}), captures, replications) + checkpointTs, spanz.ArrayToSpan([]model.TableID{0}), captures, replications) require.Len(t, tasks, 0) // not all tables are replicating, - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) // table distribution is balanced, should have no task. @@ -82,7 +82,7 @@ func TestSchedulerRebalance(t *testing.T) { 3: {State: replication.ReplicationSetStateReplicating, Primary: "b"}, 4: {State: replication.ReplicationSetStateReplicating, Primary: "b"}, }) - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) // Imbalance. @@ -97,14 +97,14 @@ func TestSchedulerRebalance(t *testing.T) { // capture is stopping, ignore the request captures["a"].State = member.CaptureStateStopping - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) require.Equal(t, atomic.LoadInt32(&scheduler.rebalance), int32(0)) captures["a"].State = member.CaptureStateInitialized atomic.StoreInt32(&scheduler.rebalance, 1) scheduler.random = nil // disable random to make test easier. - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 1) require.Contains(t, tasks[0].BurstBalance.MoveTables, replication.MoveTable{ Span: tablepb.Span{TableID: 1}, DestCapture: "b", @@ -115,6 +115,6 @@ func TestSchedulerRebalance(t *testing.T) { require.EqualValues(t, 0, atomic.LoadInt32(&scheduler.rebalance)) // pending task is not consumed yet, this turn should have no tasks. - tasks = scheduler.Schedule(checkpoint, currentTables, captures, replications) + tasks = scheduler.Schedule(checkpointTs, currentTables, captures, replications) require.Len(t, tasks, 0) } diff --git a/cdc/scheduler/internal/v3/transport/main_test.go b/cdc/scheduler/internal/v3/transport/main_test.go deleted file mode 100644 index bdb388e4cbc..00000000000 --- a/cdc/scheduler/internal/v3/transport/main_test.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package transport - -import ( - "testing" - - "github.com/pingcap/tiflow/pkg/leakutil" -) - -func TestMain(m *testing.M) { - leakutil.SetUpLeakTest(m) -}