Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools/simulator: Make simulator work with large scale cluster #8269

Merged
merged 14 commits into from
Jul 26, 2024
6 changes: 6 additions & 0 deletions tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon
}
tickInterval := simConfig.SimTickInterval.Duration

ctx, cancel := context.WithCancel(context.Background())
tick := time.NewTicker(tickInterval)
defer tick.Stop()
sc := make(chan os.Signal, 1)
Expand All @@ -161,6 +162,10 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon

simResult := "FAIL"

go driver.StoresHeartbeat(ctx)
go driver.RegionsHeartbeat(ctx)
go driver.StepRegions(ctx)

EXIT:
for {
select {
Expand All @@ -175,6 +180,7 @@ EXIT:
}
}

cancel()
driver.Stop()
if len(clean) != 0 && clean[0] != nil {
clean[0]()
Expand Down
1 change: 1 addition & 0 deletions tools/pd-simulator/simulator/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ var CaseMap = map[string]func(*config.SimConfig) *Case{
"diagnose-label-not-match1": newLabelNotMatch1,
"diagnose-label-isolation1": newLabelIsolation1,
"diagnose-label-isolation2": newLabelIsolation2,
"stable": newStableEnv,
}

// NewCase creates a new case.
Expand Down
66 changes: 66 additions & 0 deletions tools/pd-simulator/simulator/cases/stable_env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cases

import (
"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
sc "github.com/tikv/pd/tools/pd-simulator/simulator/config"
"github.com/tikv/pd/tools/pd-simulator/simulator/info"
"github.com/tikv/pd/tools/pd-simulator/simulator/simutil"
)

// newStableEnv provides a stable environment for test.
func newStableEnv(config *sc.SimConfig) *Case {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we just create a cluster with a fixed region and store rather than cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be an easy thing for the current simulator code structure

var simCase Case

totalStore := config.TotalStore
totalRegion := config.TotalRegion
allStores := make(map[uint64]struct{}, totalStore)
arrStoresID := make([]uint64, 0, totalStore)
replica := int(config.ServerConfig.Replication.MaxReplicas)
for i := 0; i < totalStore; i++ {
id := simutil.IDAllocator.NextID()
simCase.Stores = append(simCase.Stores, &Store{
ID: id,
Status: metapb.StoreState_Up,
})
allStores[id] = struct{}{}
arrStoresID = append(arrStoresID, id)
}

for i := 0; i < totalRegion; i++ {
peers := make([]*metapb.Peer, 0, replica)
for j := 0; j < replica; j++ {
peers = append(peers, &metapb.Peer{
Id: simutil.IDAllocator.NextID(),
StoreId: arrStoresID[(i+j)%totalStore],
})
}
simCase.Regions = append(simCase.Regions, Region{
ID: simutil.IDAllocator.NextID(),
Peers: peers,
Leader: peers[0],
Size: 96 * units.MiB,
Keys: 960000,
})
}

simCase.Checker = func(_ []*metapb.Store, _ *core.RegionsInfo, _ []info.StoreStats) bool {
return false
}
return &simCase
}
98 changes: 82 additions & 16 deletions tools/pd-simulator/simulator/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type Driver struct {
conn *Connection
simConfig *config.SimConfig
pdConfig *config.PDConfig

regionTickc chan int64
storeTickc chan int64
tickc chan struct{}
}

// NewDriver returns a driver.
Expand All @@ -69,6 +73,9 @@ func NewDriver(pdAddr, statusAddress, caseName string, simConfig *config.SimConf
simCase: simCase,
simConfig: simConfig,
pdConfig: pdConfig,
tickc: make(chan struct{}, 1),
regionTickc: make(chan int64, 1),
storeTickc: make(chan int64, 1),
}, nil
}

Expand Down Expand Up @@ -166,14 +173,79 @@ func (d *Driver) updateNodesClient() error {
// Tick invokes nodes' Tick.
func (d *Driver) Tick() {
d.tickCount++
d.raftEngine.stepRegions()
d.eventRunner.Tick(d.tickCount)
for _, n := range d.conn.Nodes {
n.reportRegionChange()
d.wg.Add(1)
go n.Tick(&d.wg)
go func() {
d.tickc <- struct{}{}
}()
go func() {
d.regionTickc <- d.tickCount
}()
go func() {
d.storeTickc <- d.tickCount
}()
}

func (d *Driver) StepRegions(ctx context.Context) {
for {
select {
case <-d.tickc:
d.raftEngine.stepRegions()
d.eventRunner.Tick(d.tickCount)
for _, n := range d.conn.Nodes {
n.reportRegionChange()
d.wg.Add(1)
go n.Tick(&d.wg)
}
d.wg.Wait()
case <-ctx.Done():
return
}
}
}

func (d *Driver) StoresHeartbeat(ctx context.Context) {
config := d.raftEngine.storeConfig
storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration)
var wg sync.WaitGroup
for {
select {
case tick := <-d.storeTickc:
if uint64(tick)%storeInterval == 0 {
for _, n := range d.conn.Nodes {
wg.Add(1)
go n.storeHeartBeat(&wg)
}
wg.Wait()
}
case <-ctx.Done():
return
}
}
}

var schedule sync.Once

func (d *Driver) RegionsHeartbeat(ctx context.Context) {
config := d.raftEngine.storeConfig
regionInterval := uint64(config.RaftStore.RegionHeartBeatInterval.Duration / config.SimTickInterval.Duration)
var wg sync.WaitGroup
for {
select {
case tick := <-d.regionTickc:
if uint64(tick)%regionInterval == 0 {
for _, n := range d.conn.Nodes {
wg.Add(1)
go n.regionHeartBeat(&wg)
}
wg.Wait()
schedule.Do(func() {
// simulator don't need any schedulers util all regions send their heartbeat.
ChooseToHaltPDSchedule(false)
})
}
case <-ctx.Done():
return
}
}
d.wg.Wait()
}

var HaltSchedule = false
Expand All @@ -183,17 +255,11 @@ func (d *Driver) Check() bool {
if !HaltSchedule {
return false
}
length := uint64(len(d.conn.Nodes) + 1)
var stats []info.StoreStats
var stores []*metapb.Store
for index, s := range d.conn.Nodes {
if index >= length {
length = index + 1
}
for _, s := range d.conn.Nodes {
stores = append(stores, s.Store)
}
stats := make([]info.StoreStats, length)
for index, node := range d.conn.Nodes {
stats[index] = *node.stats
stats = append(stats, *s.stats)
}
return d.simCase.Checker(stores, d.raftEngine.regionsInfo, stats)
}
Expand Down
23 changes: 4 additions & 19 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (n *Node) Tick(wg *sync.WaitGroup) {
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
return
}
n.stepHeartBeat()
n.stepCompaction()
n.stepTask()
n.tick++
Expand All @@ -153,29 +152,14 @@ func (n *Node) stepTask() {
}
}

var schedulerCheck sync.Once

func (n *Node) stepHeartBeat() {
config := n.raftEngine.storeConfig

period := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration)
if n.tick%period == 0 {
n.storeHeartBeat()
}
period = uint64(config.RaftStore.RegionHeartBeatInterval.Duration / config.SimTickInterval.Duration)
if n.tick%period == 0 {
n.regionHeartBeat()
schedulerCheck.Do(func() { ChooseToHaltPDSchedule(false) })
}
}

func (n *Node) stepCompaction() {
if n.tick%compactionDelayPeriod == 0 {
n.compaction()
}
}

func (n *Node) storeHeartBeat() {
func (n *Node) storeHeartBeat(wg *sync.WaitGroup) {
defer wg.Done()
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
return
}
Expand All @@ -201,7 +185,8 @@ func (n *Node) compaction() {
n.stats.ToCompactionSize = 0
}

func (n *Node) regionHeartBeat() {
func (n *Node) regionHeartBeat(wg *sync.WaitGroup) {
defer wg.Done()
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
return
}
Expand Down
Loading