From 1628bed871078c4a65ebc4a3d383d6df00e4d74c Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 22 Sep 2023 10:48:37 +0800 Subject: [PATCH] Revert "cluster: handle region after report split (#6867)" This reverts commit a9973165571504f1f9091e9887d0ea9e68517485. --- pkg/core/region.go | 40 ++++----- pkg/core/region_test.go | 4 +- pkg/core/store.go | 39 ++++---- pkg/core/store_option.go | 7 -- pkg/mcs/scheduling/server/cluster.go | 8 +- pkg/schedule/filter/counter.go | 2 - pkg/schedule/filter/counter_test.go | 2 +- pkg/schedule/filter/filters.go | 13 +-- pkg/schedule/filter/region_filters.go | 45 ++++------ pkg/schedule/filter/status.go | 5 +- pkg/schedule/plan/status.go | 5 +- pkg/schedule/schedulers/balance_leader.go | 13 ++- pkg/schedule/schedulers/balance_test.go | 8 -- pkg/syncer/client.go | 4 +- server/cluster/cluster.go | 24 +++-- server/cluster/cluster_worker.go | 105 ++++++++-------------- server/cluster/cluster_worker_test.go | 101 ++------------------- server/grpc_service.go | 32 +------ 18 files changed, 126 insertions(+), 331 deletions(-) diff --git a/pkg/core/region.go b/pkg/core/region.go index 2fec30de1326..4540f7aafb30 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -682,14 +682,9 @@ func (r *RegionInfo) isRegionRecreated() bool { return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0) } -// RegionChanged is a struct that records the changes of the region. -type RegionChanged struct { - IsNew, SaveKV, SaveCache, NeedSync bool -} - // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(region, origin *RegionInfo) *RegionChanged +type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -702,19 +697,18 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - // Mark IsNew if the region in cache does not have leader. - return func(region, origin *RegionInfo) (changed *RegionChanged) { - changed = &RegionChanged{} + // Mark isNew if the region in cache does not have leader. + return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) { if origin == nil { if log.GetLevel() <= zap.DebugLevel { debug("insert new region", zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - changed.SaveKV, changed.SaveCache, changed.IsNew = true, true, true + saveKV, saveCache, isNew = true, true, true } else { if !origin.IsFromHeartbeat() { - changed.IsNew = true + isNew = true } r := region.GetRegionEpoch() o := origin.GetRegionEpoch() @@ -727,7 +721,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-version", r.GetVersion()), ) } - changed.SaveKV, changed.SaveCache = true, true + saveKV, saveCache = true, true } if r.GetConfVer() > o.GetConfVer() { if log.GetLevel() <= zap.InfoLevel { @@ -738,11 +732,11 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-confver", r.GetConfVer()), ) } - changed.SaveCache, changed.SaveKV = true, true + saveKV, saveCache = true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() == 0 { - changed.IsNew = true + isNew = true } else if log.GetLevel() <= zap.InfoLevel { info("leader changed", zap.Uint64("region-id", region.GetID()), @@ -751,17 +745,17 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { ) } // We check it first and do not return because the log is important for us to investigate, - changed.SaveCache, changed.NeedSync = true, true + saveCache, needSync = true, true } if len(region.GetPeers()) != len(origin.GetPeers()) { - changed.SaveCache, changed.SaveKV = true, true + saveKV, saveCache = true, true return } if len(region.GetBuckets().GetKeys()) != len(origin.GetBuckets().GetKeys()) { if log.GetLevel() <= zap.DebugLevel { debug("bucket key changed", zap.Uint64("region-id", region.GetID())) } - changed.SaveCache, changed.SaveKV = true, true + saveKV, saveCache = true, true return } // Once flow has changed, will update the cache. @@ -769,39 +763,39 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || region.GetRoundBytesRead() != origin.GetRoundBytesRead() || region.flowRoundDivisor < origin.flowRoundDivisor { - changed.SaveCache, changed.NeedSync = true, true + saveCache, needSync = true, true return } if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) { if log.GetLevel() <= zap.DebugLevel { debug("down-peers changed", zap.Uint64("region-id", region.GetID())) } - changed.SaveCache, changed.NeedSync = true, true + saveCache, needSync = true, true return } if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) { if log.GetLevel() <= zap.DebugLevel { debug("pending-peers changed", zap.Uint64("region-id", region.GetID())) } - changed.SaveCache, changed.NeedSync = true, true + saveCache, needSync = true, true return } if region.GetApproximateSize() != origin.GetApproximateSize() || region.GetApproximateKeys() != origin.GetApproximateKeys() { - changed.SaveCache = true + saveCache = true return } if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN && (region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() || region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) { - changed.SaveCache = true + saveCache = true return } // Do not save to kv, because 1) flashback will be eventually set to // false, 2) flashback changes almost all regions in a cluster. // Saving kv may downgrade PD performance when there are many regions. if region.IsFlashbackChanged(origin) { - changed.SaveCache = true + saveCache = true return } } diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 3b58f5ee15a3..1e6b43fbf964 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -333,8 +333,8 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - changed := RegionGuide(regionA, regionB) - re.Equal(testCase.needSync, changed.NeedSync) + _, _, _, needSync := RegionGuide(regionA, regionB) + re.Equal(testCase.needSync, needSync) } } diff --git a/pkg/core/store.go b/pkg/core/store.go index cafb443bb7dd..1d3362cac0e4 100644 --- a/pkg/core/store.go +++ b/pkg/core/store.go @@ -36,7 +36,6 @@ const ( initialMinSpace = 8 * units.GiB // 2^33=8GB slowStoreThreshold = 80 awakenStoreInterval = 10 * time.Minute // 2 * slowScoreRecoveryTime - splitStoreWait = time.Minute // EngineKey is the label key used to indicate engine. EngineKey = "engine" @@ -51,23 +50,22 @@ const ( type StoreInfo struct { meta *metapb.Store *storeStats - pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader - slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it - slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it - leaderCount int - regionCount int - learnerCount int - witnessCount int - leaderSize int64 - regionSize int64 - pendingPeerCount int - lastPersistTime time.Time - leaderWeight float64 - regionWeight float64 - limiter storelimit.StoreLimit - minResolvedTS uint64 - lastAwakenTime time.Time - recentlySplitRegionsTime time.Time + pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader + slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it + slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it + leaderCount int + regionCount int + learnerCount int + witnessCount int + leaderSize int64 + regionSize int64 + pendingPeerCount int + lastPersistTime time.Time + leaderWeight float64 + regionWeight float64 + limiter storelimit.StoreLimit + minResolvedTS uint64 + lastAwakenTime time.Time } // NewStoreInfo creates StoreInfo with meta data. @@ -541,11 +539,6 @@ func (s *StoreInfo) NeedAwakenStore() bool { return s.GetLastHeartbeatTS().Sub(s.lastAwakenTime) > awakenStoreInterval } -// HasRecentlySplitRegions checks if there are some region are splitted in this store. -func (s *StoreInfo) HasRecentlySplitRegions() bool { - return time.Since(s.recentlySplitRegionsTime) < splitStoreWait -} - var ( // If a store's last heartbeat is storeDisconnectDuration ago, the store will // be marked as disconnected state. The value should be greater than tikv's diff --git a/pkg/core/store_option.go b/pkg/core/store_option.go index 4d8864ea4788..8a2aa1ef089f 100644 --- a/pkg/core/store_option.go +++ b/pkg/core/store_option.go @@ -274,10 +274,3 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption { store.lastAwakenTime = lastAwaken } } - -// SetRecentlySplitRegionsTime sets last split time for the store. -func SetRecentlySplitRegionsTime(recentlySplitRegionsTime time.Time) StoreCreateOption { - return func(store *StoreInfo) { - store.recentlySplitRegionsTime = recentlySplitRegionsTime - } -} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index b2986f722df7..20af077d241f 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -433,8 +433,8 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. - changed := core.GenerateRegionGuideFunc(true)(region, origin) - if !changed.SaveCache && !changed.IsNew { + isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) + if !saveCache && !isNew { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -444,7 +444,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { } var overlaps []*core.RegionInfo - if changed.SaveCache { + if saveCache { // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // @@ -456,7 +456,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { cluster.HandleOverlaps(c, overlaps) } - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared()) + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) return nil } diff --git a/pkg/schedule/filter/counter.go b/pkg/schedule/filter/counter.go index 0619bbdde29c..0120ef5b6663 100644 --- a/pkg/schedule/filter/counter.go +++ b/pkg/schedule/filter/counter.go @@ -127,7 +127,6 @@ const ( storeStateTooManyPendingPeer storeStateRejectLeader storeStateSlowTrend - storeStateRecentlySplitRegions filtersLen ) @@ -157,7 +156,6 @@ var filters = [filtersLen]string{ "store-state-too-many-pending-peers-filter", "store-state-reject-leader-filter", "store-state-slow-trend-filter", - "store-state-recently-split-regions-filter", } // String implements fmt.Stringer interface. diff --git a/pkg/schedule/filter/counter_test.go b/pkg/schedule/filter/counter_test.go index f8b6c0bcb8dd..067a07f138b4 100644 --- a/pkg/schedule/filter/counter_test.go +++ b/pkg/schedule/filter/counter_test.go @@ -27,7 +27,7 @@ func TestString(t *testing.T) { expected string }{ {int(storeStateTombstone), "store-state-tombstone-filter"}, - {int(filtersLen - 1), "store-state-recently-split-regions-filter"}, + {int(filtersLen - 1), "store-state-slow-trend-filter"}, {int(filtersLen), "unknown"}, } diff --git a/pkg/schedule/filter/filters.go b/pkg/schedule/filter/filters.go index e76969127d1c..0d188e69180a 100644 --- a/pkg/schedule/filter/filters.go +++ b/pkg/schedule/filter/filters.go @@ -332,8 +332,6 @@ type StoreStateFilter struct { // If it checks failed, the operator will be put back to the waiting queue util the limit is available. // But the scheduler should keep the same with the operator level. OperatorLevel constant.PriorityLevel - // check the store not split recently in it if set true. - ForbidRecentlySplitRegions bool // Reason is used to distinguish the reason of store state filter Reason filterType } @@ -473,15 +471,6 @@ func (f *StoreStateFilter) hasRejectLeaderProperty(conf config.SharedConfigProvi return statusOK } -func (f *StoreStateFilter) hasRecentlySplitRegions(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { - if f.ForbidRecentlySplitRegions && store.HasRecentlySplitRegions() { - f.Reason = storeStateRecentlySplitRegions - return statusStoreRecentlySplitRegions - } - f.Reason = storeStateOK - return statusOK -} - // The condition table. // Y: the condition is temporary (expected to become false soon). // N: the condition is expected to be true for a long time. @@ -510,7 +499,7 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, conf config.SharedConfigPr var funcs []conditionFunc switch typ { case leaderSource: - funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected, f.hasRecentlySplitRegions} + funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected} case regionSource: funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots} case witnessSource: diff --git a/pkg/schedule/filter/region_filters.go b/pkg/schedule/filter/region_filters.go index 70cdb8500b0f..799cee7d90c8 100644 --- a/pkg/schedule/filter/region_filters.go +++ b/pkg/schedule/filter/region_filters.go @@ -24,6 +24,24 @@ import ( "github.com/tikv/pd/pkg/slice" ) +// SelectRegions selects regions that be selected from the list. +func SelectRegions(regions []*core.RegionInfo, filters ...RegionFilter) []*core.RegionInfo { + return filterRegionsBy(regions, func(r *core.RegionInfo) bool { + return slice.AllOf(filters, func(i int) bool { + return filters[i].Select(r).IsOK() + }) + }) +} + +func filterRegionsBy(regions []*core.RegionInfo, keepPred func(*core.RegionInfo) bool) (selected []*core.RegionInfo) { + for _, s := range regions { + if keepPred(s) { + selected = append(selected, s) + } + } + return +} + // SelectOneRegion selects one region that be selected from the list. func SelectOneRegion(regions []*core.RegionInfo, collector *plan.Collector, filters ...RegionFilter) *core.RegionInfo { for _, r := range regions { @@ -155,7 +173,7 @@ type SnapshotSenderFilter struct { senders map[uint64]struct{} } -// NewSnapshotSendFilter returns creates a RegionFilter that filters regions whose leader has sender limit on the specific store. +// NewSnapshotSendFilter returns creates a RegionFilter that filters regions with witness peer on the specific store. // level should be set as same with the operator priority level. func NewSnapshotSendFilter(stores []*core.StoreInfo, level constant.PriorityLevel) RegionFilter { senders := make(map[uint64]struct{}) @@ -175,28 +193,3 @@ func (f *SnapshotSenderFilter) Select(region *core.RegionInfo) *plan.Status { } return statusRegionLeaderSendSnapshotThrottled } - -// StoreRecentlySplitFilter filer the region whose leader store not recently split regions. -type StoreRecentlySplitFilter struct { - recentlySplitStores map[uint64]struct{} -} - -// NewStoreRecentlySplitFilter returns creates a StoreRecentlySplitFilter. -func NewStoreRecentlySplitFilter(stores []*core.StoreInfo) RegionFilter { - recentlySplitStores := make(map[uint64]struct{}) - for _, store := range stores { - if store.HasRecentlySplitRegions() { - recentlySplitStores[store.GetID()] = struct{}{} - } - } - return &StoreRecentlySplitFilter{recentlySplitStores: recentlySplitStores} -} - -// Select returns ok if the region leader not in the recentlySplitStores. -func (f *StoreRecentlySplitFilter) Select(region *core.RegionInfo) *plan.Status { - leaderStoreID := region.GetLeader().GetStoreId() - if _, ok := f.recentlySplitStores[leaderStoreID]; ok { - return statusStoreRecentlySplitRegions - } - return statusOK -} diff --git a/pkg/schedule/filter/status.go b/pkg/schedule/filter/status.go index 9b6665a2fa72..930c59e3ba87 100644 --- a/pkg/schedule/filter/status.go +++ b/pkg/schedule/filter/status.go @@ -39,9 +39,8 @@ var ( // store config limitation statusStoreRejectLeader = plan.NewStatus(plan.StatusStoreRejectLeader) - statusStoreNotMatchRule = plan.NewStatus(plan.StatusStoreNotMatchRule) - statusStoreNotMatchIsolation = plan.NewStatus(plan.StatusStoreNotMatchIsolation) - statusStoreRecentlySplitRegions = plan.NewStatus(plan.StatusStoreRecentlySplitRegions) + statusStoreNotMatchRule = plan.NewStatus(plan.StatusStoreNotMatchRule) + statusStoreNotMatchIsolation = plan.NewStatus(plan.StatusStoreNotMatchIsolation) // region filter status statusRegionPendingPeer = plan.NewStatus(plan.StatusRegionUnhealthy) diff --git a/pkg/schedule/plan/status.go b/pkg/schedule/plan/status.go index 847d03a17ff3..4242b6314939 100644 --- a/pkg/schedule/plan/status.go +++ b/pkg/schedule/plan/status.go @@ -72,8 +72,6 @@ const ( StatusStoreLowSpace = iota + 500 // StatusStoreNotExisted represents the store cannot be found in PD. StatusStoreNotExisted - // StatusStoreRecentlySplitRegions represents the store cannot be selected due to the region is splitting. - StatusStoreRecentlySplitRegions ) // TODO: define region status priority @@ -129,8 +127,7 @@ var statusText = map[StatusCode]string{ StatusStoreDown: "StoreDown", StatusStoreBusy: "StoreBusy", - StatusStoreNotExisted: "StoreNotExisted", - StatusStoreRecentlySplitRegions: "StoreRecentlySplitRegions", + StatusStoreNotExisted: "StoreNotExisted", // region StatusRegionHot: "RegionHot", diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 46f7fdc29cdd..e5516317f461 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -48,6 +48,8 @@ const ( // Default value is 4 which is subjected by scheduler-max-waiting-operator and leader-schedule-limit // If you want to increase balance speed more, please increase above-mentioned param. BalanceLeaderBatchSize = 4 + // MaxBalanceLeaderBatchSize is maximum of balance leader batch size + MaxBalanceLeaderBatchSize = 10 transferIn = "transfer-in" transferOut = "transfer-out" @@ -148,7 +150,7 @@ func (handler *balanceLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http handler.rd.JSON(w, httpCode, v) } -func (handler *balanceLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { +func (handler *balanceLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) { conf := handler.config.Clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -160,7 +162,6 @@ type balanceLeaderScheduler struct { conf *balanceLeaderSchedulerConfig handler http.Handler filters []filter.Filter - regionFilters filter.RegionFilter filterCounter *filter.Counter } @@ -180,7 +181,7 @@ func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceL option(s) } s.filters = []filter.Filter{ - &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, ForbidRecentlySplitRegions: true, OperatorLevel: constant.High}, + &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.High}, filter.NewSpecialUseFilter(s.GetName()), } return s @@ -276,7 +277,7 @@ func (cs *candidateStores) less(iID uint64, scorei float64, jID uint64, scorej f return scorei > scorej } -// hasStore returns true when there are leftover stores. +// hasStore returns returns true when there are leftover stores. func (cs *candidateStores) hasStore() bool { return cs.index < len(cs.stores) } @@ -348,7 +349,6 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun opInfluence := l.OpController.GetOpInfluence(cluster.GetBasicCluster()) kind := constant.NewScheduleKind(constant.LeaderKind, leaderSchedulePolicy) solver := newSolver(basePlan, kind, cluster, opInfluence) - l.regionFilters = filter.NewStoreRecentlySplitFilter(cluster.GetStores()) stores := cluster.GetStores() scoreFunc := func(store *core.StoreInfo) float64 { @@ -486,7 +486,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl // the worst follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator { solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.TargetStoreID(), l.conf.Ranges), - nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter(), l.regionFilters) + nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) if solver.Region == nil { log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.TargetStoreID())) balanceLeaderNoFollowerRegionCounter.Inc() @@ -508,7 +508,6 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla balanceLeaderNoLeaderRegionCounter.Inc() return nil } - finalFilters := l.filters conf := solver.GetSchedulerConfig() if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { diff --git a/pkg/schedule/schedulers/balance_test.go b/pkg/schedule/schedulers/balance_test.go index 3231716c6810..54fe8ff489bc 100644 --- a/pkg/schedule/schedulers/balance_test.go +++ b/pkg/schedule/schedulers/balance_test.go @@ -20,7 +20,6 @@ import ( "math/rand" "sort" "testing" - "time" "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/metapb" @@ -295,13 +294,6 @@ func (suite *balanceLeaderSchedulerTestSuite) TestBalanceLimit() { // Region1: F F F L suite.tc.UpdateLeaderCount(4, 16) suite.NotEmpty(suite.schedule()) - - // can't balance leader from 4 to 1 when store 1 has split in it. - store := suite.tc.GetStore(4) - store = store.Clone(core.SetRecentlySplitRegionsTime(time.Now())) - suite.tc.PutStore(store) - op := suite.schedule() - suite.Empty(op) } func (suite *balanceLeaderSchedulerTestSuite) TestBalanceLeaderSchedulePolicy() { diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index b0892a6736aa..ac409f901157 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -194,7 +194,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue } - changed := regionGuide(region, origin) + _, saveKV, _, _ := regionGuide(region, origin) overlaps := bc.PutRegion(region) if hasBuckets { @@ -202,7 +202,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { region.UpdateBuckets(buckets[i], old) } } - if changed.SaveKV { + if saveKV { err = regionStorage.SaveRegion(r) } if err == nil { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index d42dbb21ed13..22e1b16d822b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1113,16 +1113,12 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { cluster.HandleStatsAsync(c, region) } + hasRegionStats := c.regionStats != nil + // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. - changed := regionGuide(region, origin) - return c.SaveRegion(region, changed) -} - -// SaveRegion saves region info into cache and PD storage. -func (c *RaftCluster) SaveRegion(region *core.RegionInfo, changed *core.RegionChanged) (err error) { - hasRegionStats := c.regionStats != nil - if !c.isAPIServiceMode && !changed.SaveKV && !changed.SaveCache && !changed.IsNew { + isNew, saveKV, saveCache, needSync := regionGuide(region, origin) + if !c.isAPIServiceMode && !saveKV && !saveCache && !isNew { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -1136,15 +1132,14 @@ func (c *RaftCluster) SaveRegion(region *core.RegionInfo, changed *core.RegionCh }) var overlaps []*core.RegionInfo - - if changed.SaveCache { + if saveCache { failpoint.Inject("decEpoch", func() { region = region.Clone(core.SetRegionConfVer(2), core.SetRegionVersion(2)) }) // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // - // However, it can't solve the race condition of concurrent heartbeats from the same region. + // However it can't solve the race condition of concurrent heartbeats from the same region. if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } @@ -1155,7 +1150,7 @@ func (c *RaftCluster) SaveRegion(region *core.RegionInfo, changed *core.RegionCh } if !c.isAPIServiceMode { - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared()) + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) } if c.storage != nil { @@ -1171,7 +1166,7 @@ func (c *RaftCluster) SaveRegion(region *core.RegionInfo, changed *core.RegionCh errs.ZapError(err)) } } - if changed.SaveKV { + if saveKV { if err := c.storage.SaveRegion(region.GetMeta()); err != nil { log.Error("failed to save region to storage", zap.Uint64("region-id", region.GetID()), @@ -1182,12 +1177,13 @@ func (c *RaftCluster) SaveRegion(region *core.RegionInfo, changed *core.RegionCh } } - if changed.SaveKV || changed.NeedSync { + if saveKV || needSync { select { case c.changedRegions <- region: default: } } + return nil } diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 3036fe95b3ea..c1da97363b53 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -16,8 +16,6 @@ package cluster import ( "bytes" - "fmt" - "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" @@ -28,13 +26,11 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/pkg/versioninfo" "go.uber.org/zap" ) -// store doesn't pick balance leader source if the split region is bigger than maxSplitThreshold. -const maxSplitThreshold = 10 - // HandleRegionHeartbeat processes RegionInfo reports from client. func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if err := c.processRegionHeartbeat(region); err != nil { @@ -45,58 +41,6 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { return nil } -// ProcessRegionSplit to process split region into region cache. -// it's different with the region heartbeat, it's only fill some new region into the region cache. -// so it doesn't consider the leader and hot statistics. -func (c *RaftCluster) ProcessRegionSplit(regions []*metapb.Region) []error { - if err := c.checkSplitRegions(regions); err != nil { - return []error{err} - } - total := len(regions) - 1 - regions[0], regions[total] = regions[total], regions[0] - leaderStoreID := uint64(0) - if r := c.core.GetRegion(regions[0].GetId()); r != nil { - leaderStoreID = r.GetLeader().GetStoreId() - } - if leaderStoreID == 0 { - return []error{errors.New("origin region no leader")} - } - leaderStore := c.GetStore(leaderStoreID) - if leaderStore == nil { - return []error{errors.New("leader store not found")} - } - errList := make([]error, 0, total) - for _, region := range regions { - if len(region.GetPeers()) == 0 { - errList = append(errList, errors.New(fmt.Sprintf("region:%d has no peer", region.GetId()))) - continue - } - // region split initiator store will be leader with a high probability - leader := region.Peers[0] - if leaderStoreID > 0 { - for _, peer := range region.GetPeers() { - if peer.GetStoreId() == leaderStoreID { - leader = peer - break - } - } - } - region := core.NewRegionInfo(region, leader) - changed := &core.RegionChanged{ - IsNew: true, SaveKV: true, SaveCache: true, NeedSync: true, - } - if err := c.SaveRegion(region, changed); err != nil { - errList = append(errList, err) - } - } - // If the number of regions exceeds the threshold, update the last split time. - if len(regions) >= maxSplitThreshold { - newStore := leaderStore.Clone(core.SetRecentlySplitRegionsTime(time.Now())) - c.core.PutStore(newStore) - } - return errList -} - // HandleAskSplit handles the split request. func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) { if c.isSchedulingHalted() { @@ -221,6 +165,22 @@ func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (* return resp, nil } +func (c *RaftCluster) checkSplitRegion(left *metapb.Region, right *metapb.Region) error { + if left == nil || right == nil { + return errors.New("invalid split region") + } + + if !bytes.Equal(left.GetEndKey(), right.GetStartKey()) { + return errors.New("invalid split region") + } + + if len(right.GetEndKey()) == 0 || bytes.Compare(left.GetStartKey(), right.GetEndKey()) < 0 { + return nil + } + + return errors.New("invalid split region") +} + func (c *RaftCluster) checkSplitRegions(regions []*metapb.Region) error { if len(regions) <= 1 { return errors.New("invalid split region") @@ -244,18 +204,21 @@ func (c *RaftCluster) HandleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb left := request.GetLeft() right := request.GetRight() - if errs := c.ProcessRegionSplit([]*metapb.Region{left, right}); len(errs) > 0 { + err := c.checkSplitRegion(left, right) + if err != nil { log.Warn("report split region is invalid", logutil.ZapRedactStringer("left-region", core.RegionToHexMeta(left)), logutil.ZapRedactStringer("right-region", core.RegionToHexMeta(right)), - zap.Errors("errs", errs), - ) - // error[0] may be checker error, others are ignored. - return nil, errs[0] + errs.ZapError(err)) + return nil, err } + // Build origin region by using left and right. + originRegion := typeutil.DeepClone(right, core.RegionFactory) + originRegion.RegionEpoch = nil + originRegion.StartKey = left.GetStartKey() log.Info("region split, generate new region", - zap.Uint64("region-id", right.GetId()), + zap.Uint64("region-id", originRegion.GetId()), logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(left))) return &pdpb.ReportSplitResponse{}, nil } @@ -263,19 +226,21 @@ func (c *RaftCluster) HandleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb // HandleBatchReportSplit handles the batch report split request. func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitRequest) (*pdpb.ReportBatchSplitResponse, error) { regions := request.GetRegions() + hrm := core.RegionsToHexMeta(regions) - if errs := c.ProcessRegionSplit(regions); len(errs) > 0 { + err := c.checkSplitRegions(regions) + if err != nil { log.Warn("report batch split region is invalid", zap.Stringer("region-meta", hrm), - zap.Errors("errs", errs)) - // error[0] may be checker error, others are ignored. - return nil, errs[0] + errs.ZapError(err)) + return nil, err } last := len(regions) - 1 - originRegionID := regions[last].GetId() + originRegion := typeutil.DeepClone(regions[last], core.RegionFactory) + hrm = core.RegionsToHexMeta(regions[:last]) log.Info("region batch split, generate new regions", - zap.Uint64("region-id", originRegionID), - zap.Stringer("new-peer", hrm[:last]), + zap.Uint64("region-id", originRegion.GetId()), + zap.Stringer("origin", hrm), zap.Int("total", last)) return &pdpb.ReportBatchSplitResponse{}, nil } diff --git a/server/cluster/cluster_worker_test.go b/server/cluster/cluster_worker_test.go index 98b9b8380f12..b376b38edc3a 100644 --- a/server/cluster/cluster_worker_test.go +++ b/server/cluster/cluster_worker_test.go @@ -23,23 +23,9 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockid" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/storage" ) -func mockRegionPeer(cluster *RaftCluster, voters []uint64) []*metapb.Peer { - rst := make([]*metapb.Peer, len(voters)) - for i, v := range voters { - id, _ := cluster.AllocID() - rst[i] = &metapb.Peer{ - Id: id, - StoreId: v, - Role: metapb.PeerRole_Voter, - } - } - return rst -} - func TestReportSplit(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -48,56 +34,12 @@ func TestReportSplit(t *testing.T) { _, opt, err := newTestScheduleConfig() re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) - cluster.coordinator = schedule.NewCoordinator(cluster.ctx, cluster, nil) - right := &metapb.Region{Id: 1, StartKey: []byte("a"), EndKey: []byte("c"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), - RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}} - region := core.NewRegionInfo(right, right.Peers[0]) - cluster.putRegion(region) - store := newTestStores(1, "2.0.0") - cluster.core.PutStore(store[0]) - - // split failed, split region keys must be continuous. - left := &metapb.Region{Id: 2, StartKey: []byte("a"), EndKey: []byte("b"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), - RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}} - _, err = cluster.HandleReportSplit(&pdpb.ReportSplitRequest{Left: right, Right: left}) - re.Error(err) - - // split success with continuous region keys. - right = &metapb.Region{Id: 1, StartKey: []byte("b"), EndKey: []byte("c"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), - RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}} + left := &metapb.Region{Id: 1, StartKey: []byte("a"), EndKey: []byte("b")} + right := &metapb.Region{Id: 2, StartKey: []byte("b"), EndKey: []byte("c")} _, err = cluster.HandleReportSplit(&pdpb.ReportSplitRequest{Left: left, Right: right}) re.NoError(err) - // no range hole - storeID := region.GetLeader().GetStoreId() - re.Equal(storeID, cluster.GetRegionByKey([]byte("b")).GetLeader().GetStoreId()) - re.Equal(storeID, cluster.GetRegionByKey([]byte("a")).GetLeader().GetStoreId()) - re.Equal(uint64(1), cluster.GetRegionByKey([]byte("b")).GetID()) - re.Equal(uint64(2), cluster.GetRegionByKey([]byte("a")).GetID()) - - testdata := []struct { - regionID uint64 - startKey []byte - endKey []byte - }{ - { - regionID: 1, - startKey: []byte("b"), - endKey: []byte("c"), - }, { - regionID: 2, - startKey: []byte("a"), - endKey: []byte("b"), - }, - } - - for _, data := range testdata { - r := metapb.Region{} - ok, err := cluster.storage.LoadRegion(data.regionID, &r) - re.NoError(err) - re.True(ok) - re.Equal(data.startKey, r.GetStartKey()) - re.Equal(data.endKey, r.GetEndKey()) - } + _, err = cluster.HandleReportSplit(&pdpb.ReportSplitRequest{Left: right, Right: left}) + re.Error(err) } func TestReportBatchSplit(t *testing.T) { @@ -108,39 +50,12 @@ func TestReportBatchSplit(t *testing.T) { _, opt, err := newTestScheduleConfig() re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) - cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - store := newTestStores(1, "2.0.0") - cluster.core.PutStore(store[0]) - re.False(cluster.GetStore(1).HasRecentlySplitRegions()) regions := []*metapb.Region{ - {Id: 1, StartKey: []byte(""), EndKey: []byte("a"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3})}, - {Id: 2, StartKey: []byte("a"), EndKey: []byte("b"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3})}, - {Id: 3, StartKey: []byte("b"), EndKey: []byte("c"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3})}, - {Id: 4, StartKey: []byte("c"), EndKey: []byte(""), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3})}, - } - _, err = cluster.HandleBatchReportSplit(&pdpb.ReportBatchSplitRequest{Regions: regions}) - re.Error(err) - - meta := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte(""), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), - RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}} - region := core.NewRegionInfo(meta, meta.Peers[0]) - cluster.putRegion(region) - - regions = []*metapb.Region{ - {Id: 2, StartKey: []byte(""), EndKey: []byte("a"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, - {Id: 3, StartKey: []byte("a"), EndKey: []byte("b"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, - {Id: 4, StartKey: []byte("b"), EndKey: []byte("c"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, - {Id: 5, StartKey: []byte("c"), EndKey: []byte("d"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, - {Id: 6, StartKey: []byte("d"), EndKey: []byte("e"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, - {Id: 7, StartKey: []byte("e"), EndKey: []byte("f"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, - {Id: 8, StartKey: []byte("f"), EndKey: []byte("g"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, - {Id: 9, StartKey: []byte("g"), EndKey: []byte("h"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, - {Id: 10, StartKey: []byte("h"), EndKey: []byte("i"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, - - {Id: 1, StartKey: []byte("i"), EndKey: []byte(""), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, + {Id: 1, StartKey: []byte(""), EndKey: []byte("a")}, + {Id: 2, StartKey: []byte("a"), EndKey: []byte("b")}, + {Id: 3, StartKey: []byte("b"), EndKey: []byte("c")}, + {Id: 3, StartKey: []byte("c"), EndKey: []byte("")}, } _, err = cluster.HandleBatchReportSplit(&pdpb.ReportBatchSplitRequest{Regions: regions}) re.NoError(err) - - re.True(cluster.GetStore(1).HasRecentlySplitRegions()) } diff --git a/server/grpc_service.go b/server/grpc_service.go index d218c2bb0b60..5e40bc1c732c 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1428,24 +1428,10 @@ func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionReque if rc == nil { return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil } - var region *core.RegionInfo - // allow region miss temporarily if this key can't be found in the region tree. -retryLoop: - for retry := 0; retry <= 10; retry++ { - region = rc.GetRegionByKey(request.GetRegionKey()) - if region != nil { - break retryLoop - } - select { - case <-ctx.Done(): - break retryLoop - case <-time.After(10 * time.Millisecond): - } - } + region := rc.GetRegionByKey(request.GetRegionKey()) if region == nil { return &pdpb.GetRegionResponse{Header: s.header()}, nil } - var buckets *metapb.Buckets if rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() { buckets = region.GetBuckets() @@ -1487,21 +1473,7 @@ func (s *GrpcServer) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionR return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil } - var region *core.RegionInfo - // allow region miss temporarily if this key can't be found in the region tree. -retryLoop: - for retry := 0; retry <= 10; retry++ { - region = rc.GetPrevRegionByKey(request.GetRegionKey()) - if region != nil { - break retryLoop - } - select { - case <-ctx.Done(): - break retryLoop - case <-time.After(10 * time.Millisecond): - } - } - + region := rc.GetPrevRegionByKey(request.GetRegionKey()) if region == nil { return &pdpb.GetRegionResponse{Header: s.header()}, nil }