Skip to content

Commit

Permalink
improve region statistics observe
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 16, 2024
1 parent e105836 commit b0488a6
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 31 deletions.
75 changes: 44 additions & 31 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type RegionInfoProvider interface {
}

// RegionStatisticType represents the type of the region's status.
type RegionStatisticType uint32
type RegionStatisticType uint16

const emptyStatistic = RegionStatisticType(0)

Expand Down Expand Up @@ -81,7 +81,6 @@ var (

// RegionInfoWithTS is used to record the extra timestamp status of a region.
type RegionInfoWithTS struct {
id uint64
startMissVoterPeerTS int64
startDownPeerTS int64
}
Expand All @@ -91,7 +90,7 @@ type RegionStatistics struct {
syncutil.RWMutex
rip RegionInfoProvider
conf sc.CheckerConfigProvider
stats map[RegionStatisticType]map[uint64]*RegionInfoWithTS
stats map[RegionStatisticType]map[uint64]any
index map[uint64]RegionStatisticType
ruleManager *placement.RuleManager
}
Expand All @@ -106,11 +105,11 @@ func NewRegionStatistics(
rip: rip,
conf: conf,
ruleManager: ruleManager,
stats: make(map[RegionStatisticType]map[uint64]*RegionInfoWithTS),
stats: make(map[RegionStatisticType]map[uint64]any),
index: make(map[uint64]RegionStatisticType),
}
for _, typ := range regionStatisticTypes {
r.stats[typ] = make(map[uint64]*RegionInfoWithTS)
r.stats[typ] = make(map[uint64]any)
}
return r
}
Expand Down Expand Up @@ -207,14 +206,27 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
}
}
}

peers := region.GetPeers()
downPeers := region.GetDownPeers()
pendingPeers := region.GetPendingPeers()
learners := region.GetLearners()
voters := region.GetVoters()
regionSize := region.GetApproximateSize()
regionMaxSize := int64(r.conf.GetRegionMaxSize())
regionMaxKeys := int64(r.conf.GetRegionMaxKeys())
maxMergeRegionSize := int64(r.conf.GetMaxMergeRegionSize())
maxMergeRegionKeys := int64(r.conf.GetMaxMergeRegionKeys())
leaderIsWitness := region.GetLeader().GetIsWitness()

// Better to make sure once any of these conditions changes, it will trigger the heartbeat `save_cache`.
// Otherwise, the state may be out-of-date for a long time, which needs another way to apply the change ASAP.
// For example, see `RegionStatsNeedUpdate` above to know how `OversizedRegion` and `UndersizedRegion` are updated.
conditions := map[RegionStatisticType]bool{
MissPeer: len(region.GetPeers()) < desiredReplicas,
ExtraPeer: len(region.GetPeers()) > desiredReplicas,
DownPeer: len(region.GetDownPeers()) > 0,
PendingPeer: len(region.GetPendingPeers()) > 0,
MissPeer: len(peers) < desiredReplicas,
ExtraPeer: len(peers) > desiredReplicas,
DownPeer: len(downPeers) > 0,
PendingPeer: len(pendingPeers) > 0,
OfflinePeer: func() bool {
for _, store := range stores {
if store.IsRemoving() {
Expand All @@ -226,39 +238,40 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
}
return false
}(),
LearnerPeer: len(region.GetLearners()) > 0,
EmptyRegion: region.GetApproximateSize() <= core.EmptyRegionApproximateSize,
OversizedRegion: region.IsOversized(
int64(r.conf.GetRegionMaxSize()),
int64(r.conf.GetRegionMaxKeys()),
),
UndersizedRegion: region.NeedMerge(
int64(r.conf.GetMaxMergeRegionSize()),
int64(r.conf.GetMaxMergeRegionKeys()),
),
WitnessLeader: region.GetLeader().GetIsWitness(),
LearnerPeer: len(learners) > 0,
EmptyRegion: regionSize <= core.EmptyRegionApproximateSize,
OversizedRegion: region.IsOversized(regionMaxSize, regionMaxKeys),
UndersizedRegion: region.NeedMerge(maxMergeRegionSize, maxMergeRegionKeys),
WitnessLeader: leaderIsWitness,
}
// Check if the region meets any of the conditions and update the corresponding info.
regionID := region.GetID()
for typ, c := range conditions {
if c {
info := r.stats[typ][regionID]
if info == nil {
info = &RegionInfoWithTS{id: regionID}
}
if typ == DownPeer {
if info.startDownPeerTS != 0 {
regionDownPeerDuration.Observe(float64(time.Now().Unix() - info.startDownPeerTS))
if info == nil {
info = &RegionInfoWithTS{}
}
if info.(*RegionInfoWithTS).startDownPeerTS != 0 {
regionDownPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startDownPeerTS))
} else {
info.startDownPeerTS = time.Now().Unix()
info.(*RegionInfoWithTS).startDownPeerTS = time.Now().Unix()
logDownPeerWithNoDisconnectedStore(region, stores)
}
} else if typ == MissPeer && len(region.GetVoters()) < desiredVoters {
if info.startMissVoterPeerTS != 0 {
regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.startMissVoterPeerTS))
} else {
info.startMissVoterPeerTS = time.Now().Unix()
} else if typ == MissPeer {
if info == nil {
info = &RegionInfoWithTS{}
}
if len(voters) < desiredVoters {
if info.(*RegionInfoWithTS).startMissVoterPeerTS != 0 {
regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startMissVoterPeerTS))
} else {
info.(*RegionInfoWithTS).startMissVoterPeerTS = time.Now().Unix()
}
}
} else {
info = struct{}{}
}

r.stats[typ][regionID] = info
Expand Down
40 changes: 40 additions & 0 deletions pkg/statistics/region_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,43 @@ func TestRegionLabelIsolationLevel(t *testing.T) {
re.Equal(res, labelLevelStats.labelCounter[i])
}
}

func BenchmarkObserve(b *testing.B) {
// Setup
store := storage.NewStorageWithMemoryBackend()
manager := placement.NewRuleManager(context.Background(), store, nil, nil)
manager.Initialize(3, []string{"zone", "rack", "host"}, "")
opt := mockconfig.NewTestOptions()
opt.SetPlacementRuleEnabled(false)
peers := []*metapb.Peer{
{Id: 4, StoreId: 1},
{Id: 5, StoreId: 2},
{Id: 6, StoreId: 3},
}

metaStores := []*metapb.Store{
{Id: 1, Address: "mock://tikv-1"},
{Id: 2, Address: "mock://tikv-2"},
{Id: 3, Address: "mock://tikv-3"},
}

stores := make([]*core.StoreInfo, 0, len(metaStores))
for _, m := range metaStores {
s := core.NewStoreInfo(m)
stores = append(stores, s)
}

regionNum := uint64(1000000)
regions := make([]*core.RegionInfo, 0, regionNum)
for i := uint64(1); i <= regionNum; i++ {
r := &metapb.Region{Id: i, Peers: peers, StartKey: []byte{byte(i)}, EndKey: []byte{byte(i + 1)}}
regions = append(regions, core.NewRegionInfo(r, peers[0]))
}
regionStats := NewRegionStatistics(nil, opt, manager)

b.ResetTimer()
// Run the Observe function b.N times
for i := 0; i < b.N; i++ {
regionStats.Observe(regions[i%int(regionNum)], stores)
}
}

0 comments on commit b0488a6

Please sign in to comment.