From 69982018de13cb89b32e3d8643f41cc214ba6074 Mon Sep 17 00:00:00 2001 From: nolouch Date: Sun, 28 Apr 2024 11:49:08 +0800 Subject: [PATCH] core: check stats healthy by reference Signed-off-by: nolouch --- pkg/core/region.go | 47 +++++++++++---- pkg/core/region_test.go | 87 +++++++++++++++++++++++++++- pkg/core/region_tree.go | 24 ++++++++ pkg/core/region_tree_test.go | 1 + pkg/mcs/scheduling/server/cluster.go | 10 ++++ server/cluster/cluster.go | 10 ++++ 6 files changed, 166 insertions(+), 13 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 713e82cc36d8..849d62d6ea9d 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -81,6 +81,8 @@ type RegionInfo struct { buckets unsafe.Pointer // source is used to indicate region's source, such as Storage/Sync/Heartbeat. source RegionSource + // ref is used to indicate the reference count of the region in root-tree and sub-tree. + ref atomic.Int32 } // RegionSource is the source of region. @@ -106,6 +108,21 @@ func (r *RegionInfo) LoadedFromSync() bool { return r.source == Sync } +// IncRef increases the reference count. +func (r *RegionInfo) IncRef() { + r.ref.Add(1) +} + +// DecRef decreases the reference count. +func (r *RegionInfo) DecRef() { + r.ref.Add(-1) +} + +// GetRef returns the reference count. +func (r *RegionInfo) GetRef() int32 { + return r.ref.Load() +} + // NewRegionInfo creates RegionInfo with region's meta and leader peer. func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo { regionInfo := &RegionInfo{ @@ -928,7 +945,7 @@ type RegionsInfo struct { // NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers func NewRegionsInfo() *RegionsInfo { return &RegionsInfo{ - tree: newRegionTree(), + tree: newRegionTreeWithRecord(), regions: make(map[uint64]*regionItem), subRegions: make(map[uint64]*regionItem), leaders: make(map[uint64]*regionTree), @@ -1117,10 +1134,14 @@ func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) { r.subRegions[region.GetID()] = item // It has been removed and all information needs to be updated again. // Set peers then. - setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) { + setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem, isRecord bool) { store, ok := peersMap[storeID] if !ok { - store = newRegionTree() + if !isRecord { + store = newRegionTree() + } else { + store = newRegionTreeWithRecord() + } peersMap[storeID] = store } store.update(item, false) @@ -1131,17 +1152,17 @@ func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) { storeID := peer.GetStoreId() if peer.GetId() == region.leader.GetId() { // Add leader peer to leaders. - setPeer(r.leaders, storeID, item) + setPeer(r.leaders, storeID, item, true) } else { // Add follower peer to followers. - setPeer(r.followers, storeID, item) + setPeer(r.followers, storeID, item, false) } } setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) { for _, peer := range peers { storeID := peer.GetStoreId() - setPeer(peersMap, storeID, item) + setPeer(peersMap, storeID, item, false) } } // Add to learners. @@ -1309,10 +1330,14 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi r.subRegions[region.GetID()] = item // It has been removed and all information needs to be updated again. // Set peers then. - setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) { + setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem, isRecord bool) { store, ok := peersMap[storeID] if !ok { - store = newRegionTree() + if !isRecord { + store = newRegionTree() + } else { + store = newRegionTreeWithRecord() + } peersMap[storeID] = store } store.update(item, false) @@ -1323,17 +1348,17 @@ func (r *RegionsInfo) UpdateSubTree(region, origin *RegionInfo, overlaps []*Regi storeID := peer.GetStoreId() if peer.GetId() == region.leader.GetId() { // Add leader peer to leaders. - setPeer(r.leaders, storeID, item) + setPeer(r.leaders, storeID, item, true) } else { // Add follower peer to followers. - setPeer(r.followers, storeID, item) + setPeer(r.followers, storeID, item, false) } } setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) { for _, peer := range peers { storeID := peer.GetStoreId() - setPeer(peersMap, storeID, item) + setPeer(peersMap, storeID, item, false) } } // Add to learners. diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 88683968f3fd..ca5ece69d4f3 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -874,9 +874,10 @@ func TestUpdateRegionEquivalence(t *testing.T) { ctx := ContextTODO() regionsOld.AtomicCheckAndPutRegion(ctx, item) // new way + newItem := item.Clone() ctx = ContextTODO() - regionsNew.CheckAndPutRootTree(ctx, item) - regionsNew.CheckAndPutSubTree(item) + regionsNew.CheckAndPutRootTree(ctx, newItem) + regionsNew.CheckAndPutSubTree(newItem) } checksEquivalence := func() { re.Equal(regionsOld.GetRegionCount([]byte(""), []byte("")), regionsNew.GetRegionCount([]byte(""), []byte(""))) @@ -884,6 +885,13 @@ func TestUpdateRegionEquivalence(t *testing.T) { checkRegions(re, regionsOld) checkRegions(re, regionsNew) + for _, r := range regionsOld.GetRegions() { + re.Equal(int32(2), r.GetRef(), r.GetID()) + } + for _, r := range regionsNew.GetRegions() { + re.Equal(int32(2), r.GetRef()) + } + for i := 1; i <= storeNums; i++ { re.Equal(regionsOld.GetStoreRegionCount(uint64(i)), regionsNew.GetStoreRegionCount(uint64(i))) re.Equal(regionsOld.GetStoreLeaderCount(uint64(i)), regionsNew.GetStoreLeaderCount(uint64(i))) @@ -899,12 +907,14 @@ func TestUpdateRegionEquivalence(t *testing.T) { for _, item := range items { updateRegion(item) } + fmt.Println("==1===") checksEquivalence() // Merge regions. itemA, itemB := items[10], items[11] itemMergedAB := itemA.Clone(WithEndKey(itemB.GetEndKey()), WithIncVersion()) updateRegion(itemMergedAB) + fmt.Println("==2===") checksEquivalence() // Split @@ -912,6 +922,7 @@ func TestUpdateRegionEquivalence(t *testing.T) { itemB = itemB.Clone(WithIncVersion(), WithIncVersion()) updateRegion(itemA) updateRegion(itemB) + fmt.Println("==3===") checksEquivalence() } @@ -938,3 +949,75 @@ func generateTestRegions(count int, storeNum int) []*RegionInfo { } return items } + +func TestUpdateRegionEventualConsistency(t *testing.T) { + re := require.New(t) + regionsOld := NewRegionsInfo() + regionsNew := NewRegionsInfo() + i := 1 + storeNum := 5 + peer1 := &metapb.Peer{StoreId: uint64(i%storeNum + 1), Id: uint64(i*storeNum + 1)} + peer2 := &metapb.Peer{StoreId: uint64((i+1)%storeNum + 1), Id: uint64(i*storeNum + 2)} + peer3 := &metapb.Peer{StoreId: uint64((i+2)%storeNum + 1), Id: uint64(i*storeNum + 3)} + item := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer1, peer2, peer3}, + StartKey: []byte(fmt.Sprintf("%20d", i*10)), + EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100}, + }, + peer1, + SetApproximateKeys(10), + SetApproximateSize(10), + ) + regionItemA := item + regionPendingItemA := regionItemA.Clone(WithPendingPeers([]*metapb.Peer{peer3})) + + regionItemB := regionItemA.Clone() + regionPendingItemB := regionItemB.Clone(WithPendingPeers([]*metapb.Peer{peer3})) + regionGuide := GenerateRegionGuideFunc(true) + + // Old way + { + ctx := ContextTODO() + regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA) + re.Equal(int32(2), regionPendingItemA.GetRef()) + // check new item + saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA) + re.True(needSync) + re.True(saveCache) + re.False(saveKV) + // update cache + regionsOld.AtomicCheckAndPutRegion(ctx, regionItemA) + re.Equal(int32(2), regionItemA.GetRef()) + } + + // New way + { + // root tree part in order, and updated in order + ctx := ContextTODO() + regionsNew.CheckAndPutRootTree(ctx, regionPendingItemB) + re.Equal(int32(1), regionPendingItemB.GetRef()) + ctx = ContextTODO() + regionsNew.CheckAndPutRootTree(ctx, regionItemB) + re.Equal(int32(1), regionItemB.GetRef()) + + // subtree part missing order, updated + regionsNew.CheckAndPutSubTree(regionItemB) + re.Equal(int32(2), regionItemB.GetRef()) + re.Equal(int32(0), regionPendingItemB.GetRef()) + regionsNew.CheckAndPutSubTree(regionPendingItemB) + re.Equal(int32(2), regionItemB.GetRef()) + re.Equal(int32(0), regionPendingItemB.GetRef()) + + // heartbeat again, no need updates root tree + saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB) + re.False(needSync) + re.False(saveCache) + re.False(saveKV) + + // but need update sub tree again + item := regionsNew.GetRegion(regionItemB.GetID()) + re.Equal(int32(2), item.GetRef()) + } +} diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index 8c928f391eb3..096d4f242a6e 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -69,6 +69,8 @@ type regionTree struct { totalWriteKeysRate float64 // count the number of regions that not loaded from storage. notFromStorageRegionsCnt int + // record ref + record bool } func newRegionTree() *regionTree { @@ -81,6 +83,17 @@ func newRegionTree() *regionTree { } } +func newRegionTreeWithRecord() *regionTree { + return ®ionTree{ + tree: btree.NewG[*regionItem](defaultBTreeDegree), + totalSize: 0, + totalWriteBytesRate: 0, + totalWriteKeysRate: 0, + notFromStorageRegionsCnt: 0, + record: true, + } +} + func (t *regionTree) length() int { if t == nil { return 0 @@ -140,6 +153,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re t.tree.Delete(old) } t.tree.ReplaceOrInsert(item) + if t.record { + item.RegionInfo.IncRef() + } result := make([]*RegionInfo, len(overlaps)) for i, overlap := range overlaps { old := overlap.RegionInfo @@ -155,6 +171,9 @@ func (t *regionTree) update(item *regionItem, withOverlaps bool, overlaps ...*re if !old.LoadedFromStorage() { t.notFromStorageRegionsCnt-- } + if t.record { + old.DecRef() + } } return result @@ -180,6 +199,8 @@ func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) { if !origin.LoadedFromStorage() && region.LoadedFromStorage() { t.notFromStorageRegionsCnt-- } + origin.DecRef() + region.IncRef() } // remove removes a region if the region is in the tree. @@ -199,6 +220,9 @@ func (t *regionTree) remove(region *RegionInfo) { regionWriteBytesRate, regionWriteKeysRate := result.GetWriteRate() t.totalWriteBytesRate -= regionWriteBytesRate t.totalWriteKeysRate -= regionWriteKeysRate + if t.record { + result.RegionInfo.DecRef() + } if !region.LoadedFromStorage() { t.notFromStorageRegionsCnt-- } diff --git a/pkg/core/region_tree_test.go b/pkg/core/region_tree_test.go index f4ef6cb67b3b..3f2ca0c1fb8f 100644 --- a/pkg/core/region_tree_test.go +++ b/pkg/core/region_tree_test.go @@ -159,6 +159,7 @@ func TestRegionTree(t *testing.T) { updateNewItem(tree, regionA) updateNewItem(tree, regionC) re.Nil(tree.overlaps(newRegionItem([]byte("b"), []byte("c")))) + re.Equal(regionC, tree.overlaps(newRegionItem([]byte("c"), []byte("d")))[0].RegionInfo) re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1].RegionInfo) re.Nil(tree.search([]byte{})) re.Equal(regionA, tree.search([]byte("a"))) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 94b24f4ca16e..6e6df8e3775f 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -615,6 +615,16 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c }, ) } + // region is not updated to the subtree. + if origin.GetRef() < 2 { + ctx.TaskRunner.RunTask( + ctx, + core.ExtraTaskOpts(ctx, core.UpdateSubTree), + func(_ context.Context) { + c.CheckAndPutSubTree(region) + }, + ) + } return nil } tracer.OnSaveCacheBegin() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index dbc6a6cadf33..b62ace92a740 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1044,6 +1044,16 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio }, ) } + // region is not updated to the subtree. + if origin.GetRef() < 2 { + ctx.TaskRunner.RunTask( + ctx, + core.ExtraTaskOpts(ctx, core.UpdateSubTree), + func(_ context.Context) { + c.CheckAndPutSubTree(region) + }, + ) + } return nil } failpoint.Inject("concurrentRegionHeartbeat", func() {