Skip to content

Commit

Permalink
core: check stats healthy by reference
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Apr 28, 2024
1 parent de5bad3 commit 6998201
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 13 deletions.
47 changes: 36 additions & 11 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type RegionInfo struct {
buckets unsafe.Pointer
// source is used to indicate region's source, such as Storage/Sync/Heartbeat.
source RegionSource
// ref is used to indicate the reference count of the region in root-tree and sub-tree.
ref atomic.Int32
}

// RegionSource is the source of region.
Expand All @@ -106,6 +108,21 @@ func (r *RegionInfo) LoadedFromSync() bool {
return r.source == Sync
}

// IncRef increases the reference count.
func (r *RegionInfo) IncRef() {
r.ref.Add(1)
}

// DecRef decreases the reference count.
func (r *RegionInfo) DecRef() {
r.ref.Add(-1)
}

// GetRef returns the reference count.
func (r *RegionInfo) GetRef() int32 {
return r.ref.Load()
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo {
regionInfo := &RegionInfo{
Expand Down Expand Up @@ -928,7 +945,7 @@ type RegionsInfo struct {
// NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers
func NewRegionsInfo() *RegionsInfo {
return &RegionsInfo{
tree: newRegionTree(),
tree: newRegionTreeWithRecord(),
regions: make(map[uint64]*regionItem),
subRegions: make(map[uint64]*regionItem),
leaders: make(map[uint64]*regionTree),
Expand Down Expand Up @@ -1117,10 +1134,14 @@ func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) {
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem, isRecord bool) {
store, ok := peersMap[storeID]
if !ok {
store = newRegionTree()
if !isRecord {
store = newRegionTree()
} else {
store = newRegionTreeWithRecord()
}
peersMap[storeID] = store
}
store.update(item, false)
Expand All @@ -1131,17 +1152,17 @@ func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) {
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item)
setPeer(r.leaders, storeID, item, true)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item)
setPeer(r.followers, storeID, item, false)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item)
setPeer(peersMap, storeID, item, false)
}
}
// Add to learners.
Expand Down Expand Up @@ -1309,10 +1330,14 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
r.subRegions[region.GetID()] = item
// It has been removed and all information needs to be updated again.
// Set peers then.
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) {
setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem, isRecord bool) {
store, ok := peersMap[storeID]
if !ok {
store = newRegionTree()
if !isRecord {
store = newRegionTree()
} else {
store = newRegionTreeWithRecord()
}
peersMap[storeID] = store
}
store.update(item, false)
Expand All @@ -1323,17 +1348,17 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi
storeID := peer.GetStoreId()
if peer.GetId() == region.leader.GetId() {
// Add leader peer to leaders.
setPeer(r.leaders, storeID, item)
setPeer(r.leaders, storeID, item, true)
} else {
// Add follower peer to followers.
setPeer(r.followers, storeID, item)
setPeer(r.followers, storeID, item, false)
}
}

setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) {
for _, peer := range peers {
storeID := peer.GetStoreId()
setPeer(peersMap, storeID, item)
setPeer(peersMap, storeID, item, false)
}
}
// Add to learners.
Expand Down
87 changes: 85 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,16 +874,24 @@ func TestUpdateRegionEquivalence(t *testing.T) {
ctx := ContextTODO()
regionsOld.AtomicCheckAndPutRegion(ctx, item)
// new way
newItem := item.Clone()
ctx = ContextTODO()
regionsNew.CheckAndPutRootTree(ctx, item)
regionsNew.CheckAndPutSubTree(item)
regionsNew.CheckAndPutRootTree(ctx, newItem)
regionsNew.CheckAndPutSubTree(newItem)
}
checksEquivalence := func() {
re.Equal(regionsOld.GetRegionCount([]byte(""), []byte("")), regionsNew.GetRegionCount([]byte(""), []byte("")))
re.Equal(regionsOld.GetRegionSizeByRange([]byte(""), []byte("")), regionsNew.GetRegionSizeByRange([]byte(""), []byte("")))
checkRegions(re, regionsOld)
checkRegions(re, regionsNew)

for _, r := range regionsOld.GetRegions() {
re.Equal(int32(2), r.GetRef(), r.GetID())
}
for _, r := range regionsNew.GetRegions() {
re.Equal(int32(2), r.GetRef())
}

for i := 1; i <= storeNums; i++ {
re.Equal(regionsOld.GetStoreRegionCount(uint64(i)), regionsNew.GetStoreRegionCount(uint64(i)))
re.Equal(regionsOld.GetStoreLeaderCount(uint64(i)), regionsNew.GetStoreLeaderCount(uint64(i)))
Expand All @@ -899,19 +907,22 @@ func TestUpdateRegionEquivalence(t *testing.T) {
for _, item := range items {
updateRegion(item)
}
fmt.Println("==1===")
checksEquivalence()

// Merge regions.
itemA, itemB := items[10], items[11]
itemMergedAB := itemA.Clone(WithEndKey(itemB.GetEndKey()), WithIncVersion())
updateRegion(itemMergedAB)
fmt.Println("==2===")
checksEquivalence()

// Split
itemA = itemA.Clone(WithIncVersion(), WithIncVersion())
itemB = itemB.Clone(WithIncVersion(), WithIncVersion())
updateRegion(itemA)
updateRegion(itemB)
fmt.Println("==3===")
checksEquivalence()
}

Expand All @@ -938,3 +949,75 @@ func generateTestRegions(count int, storeNum int) []*RegionInfo {
}
return items
}

func TestUpdateRegionEventualConsistency(t *testing.T) {
re := require.New(t)
regionsOld := NewRegionsInfo()
regionsNew := NewRegionsInfo()
i := 1
storeNum := 5
peer1 := &metapb.Peer{StoreId: uint64(i%storeNum + 1), Id: uint64(i*storeNum + 1)}
peer2 := &metapb.Peer{StoreId: uint64((i+1)%storeNum + 1), Id: uint64(i*storeNum + 2)}
peer3 := &metapb.Peer{StoreId: uint64((i+2)%storeNum + 1), Id: uint64(i*storeNum + 3)}
item := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer1, peer2, peer3},
StartKey: []byte(fmt.Sprintf("%20d", i*10)),
EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100},
},
peer1,
SetApproximateKeys(10),
SetApproximateSize(10),
)
regionItemA := item
regionPendingItemA := regionItemA.Clone(WithPendingPeers([]*metapb.Peer{peer3}))

regionItemB := regionItemA.Clone()
regionPendingItemB := regionItemB.Clone(WithPendingPeers([]*metapb.Peer{peer3}))
regionGuide := GenerateRegionGuideFunc(true)

// Old way
{
ctx := ContextTODO()
regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA)
re.Equal(int32(2), regionPendingItemA.GetRef())
// check new item
saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA)
re.True(needSync)
re.True(saveCache)
re.False(saveKV)
// update cache
regionsOld.AtomicCheckAndPutRegion(ctx, regionItemA)
re.Equal(int32(2), regionItemA.GetRef())
}

// New way
{
// root tree part in order, and updated in order
ctx := ContextTODO()
regionsNew.CheckAndPutRootTree(ctx, regionPendingItemB)
re.Equal(int32(1), regionPendingItemB.GetRef())
ctx = ContextTODO()
regionsNew.CheckAndPutRootTree(ctx, regionItemB)
re.Equal(int32(1), regionItemB.GetRef())

// subtree part missing order, updated
regionsNew.CheckAndPutSubTree(regionItemB)
re.Equal(int32(2), regionItemB.GetRef())
re.Equal(int32(0), regionPendingItemB.GetRef())
regionsNew.CheckAndPutSubTree(regionPendingItemB)
re.Equal(int32(2), regionItemB.GetRef())
re.Equal(int32(0), regionPendingItemB.GetRef())

// heartbeat again, no need updates root tree
saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB)
re.False(needSync)
re.False(saveCache)
re.False(saveKV)

// but need update sub tree again
item := regionsNew.GetRegion(regionItemB.GetID())
re.Equal(int32(2), item.GetRef())
}
}
24 changes: 24 additions & 0 deletions pkg/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type regionTree struct {
totalWriteKeysRate float64
// count the number of regions that not loaded from storage.
notFromStorageRegionsCnt int
// record ref
record bool
}

func newRegionTree() *regionTree {
Expand All @@ -81,6 +83,17 @@ func newRegionTree() *regionTree {
}
}

func newRegionTreeWithRecord() *regionTree {
return &regionTree{
tree: btree.NewG[*regionItem](defaultBTreeDegree),
totalSize: 0,
totalWriteBytesRate: 0,
totalWriteKeysRate: 0,
notFromStorageRegionsCnt: 0,
record: true,
}
}

func (t *regionTree) length() int {
if t == nil {
return 0
Expand Down Expand Up @@ -140,6 +153,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
t.tree.Delete(old)
}
t.tree.ReplaceOrInsert(item)
if t.record {
item.RegionInfo.IncRef()
}
result := make([]*RegionInfo, len(overlaps))
for i, overlap := range overlaps {
old := overlap.RegionInfo
Expand All @@ -155,6 +171,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re
if !old.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
if t.record {
old.DecRef()
}
}

return result
Expand All @@ -180,6 +199,8 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
if !origin.LoadedFromStorage() && region.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
origin.DecRef()
region.IncRef()
}

// remove removes a region if the region is in the tree.
Expand All @@ -199,6 +220,9 @@ func (t *regionTree) remove(region *RegionInfo) {
regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
if t.record {
result.RegionInfo.DecRef()
}
if !region.LoadedFromStorage() {
t.notFromStorageRegionsCnt--
}
Expand Down
1 change: 1 addition & 0 deletions pkg/core/region_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func TestRegionTree(t *testing.T) {
updateNewItem(tree, regionA)
updateNewItem(tree, regionC)
re.Nil(tree.overlaps(newRegionItem([]byte("b"), []byte("c"))))
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0].RegionInfo)
re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1].RegionInfo)
re.Nil(tree.search([]byte{}))
re.Equal(regionA, tree.search([]byte("a")))
Expand Down
10 changes: 10 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,16 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
},
)
}
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.UpdateSubTree),
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
)
}
return nil
}
tracer.OnSaveCacheBegin()
Expand Down
10 changes: 10 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,16 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
},
)
}
// region is not updated to the subtree.
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
core.ExtraTaskOpts(ctx, core.UpdateSubTree),
func(_ context.Context) {
c.CheckAndPutSubTree(region)
},
)
}
return nil
}
failpoint.Inject("concurrentRegionHeartbeat", func() {
Expand Down

0 comments on commit 6998201

Please sign in to comment.