Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools/simulator: Make simulator work with large scale cluster #8269

Merged
merged 14 commits into from
Jul 26, 2024
10 changes: 0 additions & 10 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -2264,13 +2264,3 @@ func NewTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...Regi
}
return NewRegionInfo(metaRegion, leader, opts...)
}

// TraverseRegions executes a function on all regions.
// ONLY for simulator now and only for READ.
func (r *RegionsInfo) TraverseRegions(lockedFunc func(*RegionInfo)) {
r.t.RLock()
defer r.t.RUnlock()
for _, item := range r.regions {
lockedFunc(item.RegionInfo)
}
}
6 changes: 6 additions & 0 deletions tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon
}
tickInterval := simConfig.SimTickInterval.Duration

ctx, cancel := context.WithCancel(context.Background())
tick := time.NewTicker(tickInterval)
defer tick.Stop()
sc := make(chan os.Signal, 1)
Expand All @@ -161,6 +162,10 @@ func simStart(pdAddr, statusAddress string, simCase string, simConfig *sc.SimCon

simResult := "FAIL"

go driver.StoresHeartbeat(ctx)
go driver.RegionsHeartbeat(ctx)
go driver.StepRegions(ctx)

EXIT:
for {
select {
Expand All @@ -175,6 +180,7 @@ EXIT:
}
}

cancel()
driver.Stop()
if len(clean) != 0 && clean[0] != nil {
clean[0]()
Expand Down
1 change: 1 addition & 0 deletions tools/pd-simulator/simulator/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ var CaseMap = map[string]func(*config.SimConfig) *Case{
"diagnose-label-not-match1": newLabelNotMatch1,
"diagnose-label-isolation1": newLabelIsolation1,
"diagnose-label-isolation2": newLabelIsolation2,
"stable": newStableEnv,
}

// NewCase creates a new case.
Expand Down
66 changes: 66 additions & 0 deletions tools/pd-simulator/simulator/cases/stable_env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cases

import (
"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
sc "github.com/tikv/pd/tools/pd-simulator/simulator/config"
"github.com/tikv/pd/tools/pd-simulator/simulator/info"
"github.com/tikv/pd/tools/pd-simulator/simulator/simutil"
)

// newStableEnv provides a stable environment for test.
func newStableEnv(config *sc.SimConfig) *Case {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we just create a cluster with a fixed region and store rather than cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be an easy thing for the current simulator code structure

var simCase Case

totalStore := config.TotalStore
totalRegion := config.TotalRegion
allStores := make(map[uint64]struct{}, totalStore)
arrStoresID := make([]uint64, 0, totalStore)
replica := int(config.ServerConfig.Replication.MaxReplicas)
for i := 0; i < totalStore; i++ {
id := simutil.IDAllocator.NextID()
simCase.Stores = append(simCase.Stores, &Store{
ID: id,
Status: metapb.StoreState_Up,
})
allStores[id] = struct{}{}
arrStoresID = append(arrStoresID, id)
}

for i := 0; i < totalRegion; i++ {
peers := make([]*metapb.Peer, 0, replica)
for j := 0; j < replica; j++ {
peers = append(peers, &metapb.Peer{
Id: simutil.IDAllocator.NextID(),
StoreId: arrStoresID[(i+j)%totalStore],
})
}
simCase.Regions = append(simCase.Regions, Region{
ID: simutil.IDAllocator.NextID(),
Peers: peers,
Leader: peers[0],
Size: 96 * units.MiB,
Keys: 960000,
})
}

simCase.Checker = func(_ []*metapb.Store, _ *core.RegionsInfo, _ []info.StoreStats) bool {
return false
}
return &simCase
}
3 changes: 3 additions & 0 deletions tools/pd-simulator/simulator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
defaultTotalStore = 3
defaultTotalRegion = 1000
defaultEnableTransferRegionCounter = false
defaultHibernatePercent = 0
// store
defaultStoreIOMBPerSecond = 40
defaultStoreHeartbeat = 10 * time.Second
Expand All @@ -62,6 +63,7 @@ type SimConfig struct {
TotalRegion int `toml:"total-region"`
EnableTransferRegionCounter bool `toml:"enable-transfer-region-counter"`
SimTickInterval typeutil.Duration `toml:"sim-tick-interval"`
HibernatePercent int `toml:"hibernate-percent"`
// store
StoreIOMBPerSecond int64 `toml:"store-io-per-second"`
StoreVersion string `toml:"store-version"`
Expand Down Expand Up @@ -107,6 +109,7 @@ func (sc *SimConfig) Adjust(meta *toml.MetaData) error {
configutil.AdjustDuration(&sc.SimTickInterval, defaultSimTickInterval)
configutil.AdjustInt(&sc.TotalStore, defaultTotalStore)
configutil.AdjustInt(&sc.TotalRegion, defaultTotalRegion)
configutil.AdjustInt(&sc.HibernatePercent, defaultHibernatePercent)
configutil.AdjustBool(&sc.EnableTransferRegionCounter, defaultEnableTransferRegionCounter)
configutil.AdjustInt64(&sc.StoreIOMBPerSecond, defaultStoreIOMBPerSecond)
configutil.AdjustString(&sc.StoreVersion, versioninfo.PDReleaseVersion)
Expand Down
156 changes: 145 additions & 11 deletions tools/pd-simulator/simulator/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package simulator

import (
"context"
"math/rand"
"net/http"
"net/http/pprof"
"path"
Expand Down Expand Up @@ -47,12 +48,18 @@ type Driver struct {
pdAddr string
statusAddress string
simCase *cases.Case
tickCount int64
eventRunner *EventRunner
raftEngine *RaftEngine
conn *Connection
simConfig *config.SimConfig
pdConfig *config.PDConfig

tick struct {
count int64
region chan int64
store chan int64
stepRegion chan int64
}
}

// NewDriver returns a driver.
Expand All @@ -64,17 +71,22 @@ func NewDriver(pdAddr, statusAddress, caseName string, simConfig *config.SimConf
pdConfig := &config.PDConfig{}
pdConfig.PlacementRules = simCase.Rules
pdConfig.LocationLabels = simCase.Labels
return &Driver{
driver := Driver{
pdAddr: pdAddr,
statusAddress: statusAddress,
simCase: simCase,
simConfig: simConfig,
pdConfig: pdConfig,
}, nil
}
driver.tick.stepRegion = make(chan int64, 1)
driver.tick.region = make(chan int64, 1)
driver.tick.store = make(chan int64, 1)
return &driver, nil
}

// Prepare initializes cluster information, bootstraps cluster and starts nodes.
func (d *Driver) Prepare() error {
simutil.Logger.Info("prepare cluster")
conn, err := NewConnection(d.simCase, d.simConfig)
if err != nil {
return err
Expand Down Expand Up @@ -166,15 +178,136 @@ func (d *Driver) updateNodesClient() error {

// Tick invokes nodes' Tick.
func (d *Driver) Tick() {
d.tickCount++
d.raftEngine.stepRegions()
d.eventRunner.Tick(d.tickCount)
d.tick.count++
curTick := d.tick.count
go func() {
d.tick.stepRegion <- curTick
}()
go func() {
d.tick.region <- curTick
}()
go func() {
d.tick.store <- curTick
}()
}

func (d *Driver) StepRegions(ctx context.Context) {
for {
select {
case tick := <-d.tick.stepRegion:
d.raftEngine.stepRegions()
d.eventRunner.Tick(tick)
for _, n := range d.conn.Nodes {
n.reportRegionChange()
d.wg.Add(1)
go n.Tick(&d.wg)
}
d.wg.Wait()
case <-ctx.Done():
return
}
}
}

func (d *Driver) StoresHeartbeat(ctx context.Context) {
config := d.raftEngine.storeConfig
storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration)
var wg sync.WaitGroup
for {
select {
case tick := <-d.tick.store:
if uint64(tick)%storeInterval == 0 {
for _, n := range d.conn.Nodes {
wg.Add(1)
go n.storeHeartBeat(&wg)
}
wg.Wait()
}
case <-ctx.Done():
return
}
}
}

func (d *Driver) RegionsHeartbeat(ctx context.Context) {
// ensure only wait for the first time heartbeat done
firstReport := true
config := d.raftEngine.storeConfig
regionInterval := uint64(config.RaftStore.RegionHeartBeatInterval.Duration / config.SimTickInterval.Duration)
nodesChannel := make(map[uint64]chan *core.RegionInfo, len(d.conn.Nodes))
for _, n := range d.conn.Nodes {
n.reportRegionChange()
d.wg.Add(1)
go n.Tick(&d.wg)
nodesChannel[n.Store.GetId()] = make(chan *core.RegionInfo, d.simConfig.TotalRegion)
go func(storeID uint64, ch chan *core.RegionInfo) {
for {
select {
case region := <-ch:
d.conn.Nodes[storeID].regionHeartBeat(region)
case <-ctx.Done():
close(ch)
return
}
}
}(n.Store.GetId(), nodesChannel[n.Store.GetId()])
}

for {
select {
case tick := <-d.tick.region:
if uint64(tick)%regionInterval == 0 {
regions := d.raftEngine.GetRegions()
healthyNodes := make(map[uint64]bool)
for _, n := range d.conn.Nodes {
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
healthyNodes[n.Store.GetId()] = false
} else {
healthyNodes[n.Store.GetId()] = true
}
}
report := 0
for _, region := range regions {
hibernatePercent := d.simConfig.HibernatePercent
// using rand(0,100) to meet hibernatePercent
if !firstReport && rand.Intn(100) < hibernatePercent {
continue
}

if region.GetLeader() != nil {
storeID := region.GetLeader().GetStoreId()
if healthy, ok := healthyNodes[storeID]; !ok || !healthy {
continue
}
nodesChannel[storeID] <- region.Clone()
report++
}
}

// Only set HaltSchedule to false when the leader count is 80% of the total region count.
// using firstReport to avoid the haltSchedule set to true manually.
if HaltSchedule.Load() && firstReport {
storeInterval := uint64(config.RaftStore.StoreHeartBeatInterval.Duration / config.SimTickInterval.Duration)
ticker := time.NewTicker(time.Duration(storeInterval))
for range ticker.C {
// need to wait for first time heartbeat done
stores, _ := PDHTTPClient.GetStores(ctx)
var leaderCount int64
for _, store := range stores.Stores {
leaderCount += store.Status.LeaderCount
}
// Add halt schedule check to avoid the situation that the leader count is always less than 80%.
if leaderCount > int64(float64(d.simConfig.TotalRegion)*0.8) || !HaltSchedule.Load() {
ChooseToHaltPDSchedule(false)
firstReport = false
ticker.Stop()
simutil.Logger.Info("first region heartbeat done", zap.Int64("leaderCount", leaderCount), zap.Int("checkRegions", len(regions)))
break
}
}
}
}
case <-ctx.Done():
return
}
}
d.wg.Wait()
}

var HaltSchedule atomic.Bool
Expand All @@ -197,6 +330,7 @@ func (d *Driver) Check() bool {

// Start starts all nodes.
func (d *Driver) Start() error {
simutil.Logger.Info("init nodes")
if err := d.updateNodesClient(); err != nil {
return err
}
Expand All @@ -221,7 +355,7 @@ func (d *Driver) Stop() {

// TickCount returns the simulation's tick count.
func (d *Driver) TickCount() int64 {
return d.tickCount
return d.tick.count
}

// GetBootstrapInfo returns a valid bootstrap store and region.
Expand Down
Loading