diff --git a/pkg/core/region.go b/pkg/core/region.go index 4540f7aafb3..2fec30de132 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -682,9 +682,14 @@ func (r *RegionInfo) isRegionRecreated() bool { return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0) } +// RegionChanged is a struct that records the changes of the region. +type RegionChanged struct { + IsNew, SaveKV, SaveCache, NeedSync bool +} + // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) +type RegionGuideFunc func(region, origin *RegionInfo) *RegionChanged // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -697,18 +702,19 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - // Mark isNew if the region in cache does not have leader. - return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) { + // Mark IsNew if the region in cache does not have leader. + return func(region, origin *RegionInfo) (changed *RegionChanged) { + changed = &RegionChanged{} if origin == nil { if log.GetLevel() <= zap.DebugLevel { debug("insert new region", zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache, isNew = true, true, true + changed.SaveKV, changed.SaveCache, changed.IsNew = true, true, true } else { if !origin.IsFromHeartbeat() { - isNew = true + changed.IsNew = true } r := region.GetRegionEpoch() o := origin.GetRegionEpoch() @@ -721,7 +727,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-version", r.GetVersion()), ) } - saveKV, saveCache = true, true + changed.SaveKV, changed.SaveCache = true, true } if r.GetConfVer() > o.GetConfVer() { if log.GetLevel() <= zap.InfoLevel { @@ -732,11 +738,11 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-confver", r.GetConfVer()), ) } - saveKV, saveCache = true, true + changed.SaveCache, changed.SaveKV = true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() == 0 { - isNew = true + changed.IsNew = true } else if log.GetLevel() <= zap.InfoLevel { info("leader changed", zap.Uint64("region-id", region.GetID()), @@ -745,17 +751,17 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { ) } // We check it first and do not return because the log is important for us to investigate, - saveCache, needSync = true, true + changed.SaveCache, changed.NeedSync = true, true } if len(region.GetPeers()) != len(origin.GetPeers()) { - saveKV, saveCache = true, true + changed.SaveCache, changed.SaveKV = true, true return } if len(region.GetBuckets().GetKeys()) != len(origin.GetBuckets().GetKeys()) { if log.GetLevel() <= zap.DebugLevel { debug("bucket key changed", zap.Uint64("region-id", region.GetID())) } - saveKV, saveCache = true, true + changed.SaveCache, changed.SaveKV = true, true return } // Once flow has changed, will update the cache. @@ -763,39 +769,39 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || region.GetRoundBytesRead() != origin.GetRoundBytesRead() || region.flowRoundDivisor < origin.flowRoundDivisor { - saveCache, needSync = true, true + changed.SaveCache, changed.NeedSync = true, true return } if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) { if log.GetLevel() <= zap.DebugLevel { debug("down-peers changed", zap.Uint64("region-id", region.GetID())) } - saveCache, needSync = true, true + changed.SaveCache, changed.NeedSync = true, true return } if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) { if log.GetLevel() <= zap.DebugLevel { debug("pending-peers changed", zap.Uint64("region-id", region.GetID())) } - saveCache, needSync = true, true + changed.SaveCache, changed.NeedSync = true, true return } if region.GetApproximateSize() != origin.GetApproximateSize() || region.GetApproximateKeys() != origin.GetApproximateKeys() { - saveCache = true + changed.SaveCache = true return } if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN && (region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() || region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) { - saveCache = true + changed.SaveCache = true return } // Do not save to kv, because 1) flashback will be eventually set to // false, 2) flashback changes almost all regions in a cluster. // Saving kv may downgrade PD performance when there are many regions. if region.IsFlashbackChanged(origin) { - saveCache = true + changed.SaveCache = true return } } diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 1e6b43fbf96..3b58f5ee15a 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -333,8 +333,8 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, _, needSync := RegionGuide(regionA, regionB) - re.Equal(testCase.needSync, needSync) + changed := RegionGuide(regionA, regionB) + re.Equal(testCase.needSync, changed.NeedSync) } } diff --git a/pkg/core/store.go b/pkg/core/store.go index 1d3362cac0e..cafb443bb7d 100644 --- a/pkg/core/store.go +++ b/pkg/core/store.go @@ -36,6 +36,7 @@ const ( initialMinSpace = 8 * units.GiB // 2^33=8GB slowStoreThreshold = 80 awakenStoreInterval = 10 * time.Minute // 2 * slowScoreRecoveryTime + splitStoreWait = time.Minute // EngineKey is the label key used to indicate engine. EngineKey = "engine" @@ -50,22 +51,23 @@ const ( type StoreInfo struct { meta *metapb.Store *storeStats - pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader - slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it - slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it - leaderCount int - regionCount int - learnerCount int - witnessCount int - leaderSize int64 - regionSize int64 - pendingPeerCount int - lastPersistTime time.Time - leaderWeight float64 - regionWeight float64 - limiter storelimit.StoreLimit - minResolvedTS uint64 - lastAwakenTime time.Time + pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader + slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it + slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it + leaderCount int + regionCount int + learnerCount int + witnessCount int + leaderSize int64 + regionSize int64 + pendingPeerCount int + lastPersistTime time.Time + leaderWeight float64 + regionWeight float64 + limiter storelimit.StoreLimit + minResolvedTS uint64 + lastAwakenTime time.Time + recentlySplitRegionsTime time.Time } // NewStoreInfo creates StoreInfo with meta data. @@ -539,6 +541,11 @@ func (s *StoreInfo) NeedAwakenStore() bool { return s.GetLastHeartbeatTS().Sub(s.lastAwakenTime) > awakenStoreInterval } +// HasRecentlySplitRegions checks if there are some region are splitted in this store. +func (s *StoreInfo) HasRecentlySplitRegions() bool { + return time.Since(s.recentlySplitRegionsTime) < splitStoreWait +} + var ( // If a store's last heartbeat is storeDisconnectDuration ago, the store will // be marked as disconnected state. The value should be greater than tikv's diff --git a/pkg/core/store_option.go b/pkg/core/store_option.go index 8a2aa1ef089..4d8864ea478 100644 --- a/pkg/core/store_option.go +++ b/pkg/core/store_option.go @@ -274,3 +274,10 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption { store.lastAwakenTime = lastAwaken } } + +// SetRecentlySplitRegionsTime sets last split time for the store. +func SetRecentlySplitRegionsTime(recentlySplitRegionsTime time.Time) StoreCreateOption { + return func(store *StoreInfo) { + store.recentlySplitRegionsTime = recentlySplitRegionsTime + } +} diff --git a/pkg/mcs/resourcemanager/server/grpc_service.go b/pkg/mcs/resourcemanager/server/grpc_service.go index 5c1b5f0e458..d0fac920f2f 100644 --- a/pkg/mcs/resourcemanager/server/grpc_service.go +++ b/pkg/mcs/resourcemanager/server/grpc_service.go @@ -191,11 +191,16 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu } // Send the consumption to update the metrics. isBackground := req.GetIsBackground() + isTiFlash := req.GetIsTiflash() + if isBackground && isTiFlash { + return errors.New("background and tiflash cannot be true at the same time") + } s.manager.consumptionDispatcher <- struct { resourceGroupName string *rmpb.Consumption isBackground bool - }{resourceGroupName, req.GetConsumptionSinceLastRequest(), isBackground} + isTiFlash bool + }{resourceGroupName, req.GetConsumptionSinceLastRequest(), isBackground, isTiFlash} if isBackground { continue } diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 21866ee1156..df237bd0feb 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -60,6 +60,7 @@ type Manager struct { resourceGroupName string *rmpb.Consumption isBackground bool + isTiFlash bool } // record update time of each resource group consumptionRecord map[string]time.Time @@ -81,6 +82,7 @@ func NewManager[T ConfigProvider](srv bs.Server) *Manager { resourceGroupName string *rmpb.Consumption isBackground bool + isTiFlash bool }, defaultConsumptionChanSize), consumptionRecord: make(map[string]time.Time), } @@ -361,20 +363,23 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { if consumption == nil { continue } - backgroundType := "" + ruLabelType := tidbTypeLabel if consumptionInfo.isBackground { - backgroundType = backgroundTypeLabel + ruLabelType = backgroundTypeLabel + } + if consumptionInfo.isTiFlash { + ruLabelType = tiflashTypeLabel } var ( name = consumptionInfo.resourceGroupName - rruMetrics = readRequestUnitCost.WithLabelValues(name, backgroundType) - wruMetrics = writeRequestUnitCost.WithLabelValues(name, backgroundType) + rruMetrics = readRequestUnitCost.WithLabelValues(name, ruLabelType) + wruMetrics = writeRequestUnitCost.WithLabelValues(name, ruLabelType) sqlLayerRuMetrics = sqlLayerRequestUnitCost.WithLabelValues(name) - readByteMetrics = readByteCost.WithLabelValues(name, backgroundType) - writeByteMetrics = writeByteCost.WithLabelValues(name, backgroundType) - kvCPUMetrics = kvCPUCost.WithLabelValues(name, backgroundType) - sqlCPUMetrics = sqlCPUCost.WithLabelValues(name, backgroundType) + readByteMetrics = readByteCost.WithLabelValues(name, ruLabelType) + writeByteMetrics = writeByteCost.WithLabelValues(name, ruLabelType) + kvCPUMetrics = kvCPUCost.WithLabelValues(name, ruLabelType) + sqlCPUMetrics = sqlCPUCost.WithLabelValues(name, ruLabelType) readRequestCountMetrics = requestCount.WithLabelValues(name, readTypeLabel) writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel) ) diff --git a/pkg/mcs/resourcemanager/server/metrics.go b/pkg/mcs/resourcemanager/server/metrics.go index 083c44894ef..184eddc8ef9 100644 --- a/pkg/mcs/resourcemanager/server/metrics.go +++ b/pkg/mcs/resourcemanager/server/metrics.go @@ -26,6 +26,8 @@ const ( readTypeLabel = "read" writeTypeLabel = "write" backgroundTypeLabel = "background" + tiflashTypeLabel = "tiflash" + tidbTypeLabel = "tidb" ) var ( diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 0b9924f230b..b2986f722df 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -75,7 +75,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, checkMembershipCh: checkMembershipCh, } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) - err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels()) + err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) if err != nil { cancel() return nil, err @@ -433,8 +433,8 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. - isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) - if !saveCache && !isNew { + changed := core.GenerateRegionGuideFunc(true)(region, origin) + if !changed.SaveCache && !changed.IsNew { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -444,7 +444,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { } var overlaps []*core.RegionInfo - if saveCache { + if changed.SaveCache { // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // @@ -456,7 +456,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { cluster.HandleOverlaps(c, overlaps) } - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared()) return nil } diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 1ed7ab4eb9f..01282b40534 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -213,7 +213,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { func (mc *Cluster) initRuleManager() { if mc.RuleManager == nil { mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetSharedConfig()) - mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels) + mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel) } } diff --git a/pkg/schedule/checker/merge_checker.go b/pkg/schedule/checker/merge_checker.go index 0243bfbe165..1ce7bddd1dc 100644 --- a/pkg/schedule/checker/merge_checker.go +++ b/pkg/schedule/checker/merge_checker.go @@ -53,9 +53,9 @@ var ( mergeCheckerPausedCounter = checkerCounter.WithLabelValues(mergeCheckerName, "paused") mergeCheckerRecentlySplitCounter = checkerCounter.WithLabelValues(mergeCheckerName, "recently-split") mergeCheckerRecentlyStartCounter = checkerCounter.WithLabelValues(mergeCheckerName, "recently-start") - mergeCheckerSkipUninitRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "skip-uninit-region") + mergeCheckerNoLeaderCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-leader") mergeCheckerNoNeedCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-need") - mergeCheckerSpecialPeerCounter = checkerCounter.WithLabelValues(mergeCheckerName, "special-peer") + mergeCheckerUnhealthyRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "unhealthy-region") mergeCheckerAbnormalReplicaCounter = checkerCounter.WithLabelValues(mergeCheckerName, "abnormal-replica") mergeCheckerHotRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "hot-region") mergeCheckerNoTargetCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-target") @@ -129,7 +129,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { // when pd just started, it will load region meta from region storage, if region.GetLeader() == nil { - mergeCheckerSkipUninitRegionCounter.Inc() + mergeCheckerNoLeaderCounter.Inc() return nil } @@ -141,7 +141,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { // skip region has down peers or pending peers if !filter.IsRegionHealthy(region) { - mergeCheckerSpecialPeerCounter.Inc() + mergeCheckerUnhealthyRegionCounter.Inc() return nil } diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go index cbd7624f3b1..ad140e91606 100644 --- a/pkg/schedule/checker/rule_checker_test.go +++ b/pkg/schedule/checker/rule_checker_test.go @@ -112,6 +112,41 @@ func (suite *ruleCheckerTestSuite) TestAddRulePeerWithIsolationLevel() { suite.Equal(uint64(4), op.Step(0).(operator.AddLearner).ToStore) } +func (suite *ruleCheckerTestSuite) TestReplaceDownPeerWithIsolationLevel() { + suite.cluster.SetMaxStoreDownTime(100 * time.Millisecond) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z2", "host": "h3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z2", "host": "h4"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z3", "host": "h5"}) + suite.cluster.AddLabelsStore(6, 1, map[string]string{"zone": "z3", "host": "h6"}) + suite.cluster.AddLeaderRegionWithRange(1, "", "", 1, 3, 5) + suite.ruleManager.DeleteRule("pd", "default") + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone", "host"}, + IsolationLevel: "zone", + }) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + region := suite.cluster.GetRegion(1) + downPeer := []*pdpb.PeerStats{ + {Peer: region.GetStorePeer(5), DownSeconds: 6000}, + } + region = region.Clone(core.WithDownPeers(downPeer)) + suite.cluster.PutRegion(region) + suite.cluster.SetStoreDown(5) + suite.cluster.SetStoreDown(6) + time.Sleep(200 * time.Millisecond) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) +} + func (suite *ruleCheckerTestSuite) TestFixPeer() { suite.cluster.AddLeaderStore(1, 1) suite.cluster.AddLeaderStore(2, 1) diff --git a/pkg/schedule/filter/counter.go b/pkg/schedule/filter/counter.go index 0120ef5b666..0619bbdde29 100644 --- a/pkg/schedule/filter/counter.go +++ b/pkg/schedule/filter/counter.go @@ -127,6 +127,7 @@ const ( storeStateTooManyPendingPeer storeStateRejectLeader storeStateSlowTrend + storeStateRecentlySplitRegions filtersLen ) @@ -156,6 +157,7 @@ var filters = [filtersLen]string{ "store-state-too-many-pending-peers-filter", "store-state-reject-leader-filter", "store-state-slow-trend-filter", + "store-state-recently-split-regions-filter", } // String implements fmt.Stringer interface. diff --git a/pkg/schedule/filter/counter_test.go b/pkg/schedule/filter/counter_test.go index 067a07f138b..f8b6c0bcb8d 100644 --- a/pkg/schedule/filter/counter_test.go +++ b/pkg/schedule/filter/counter_test.go @@ -27,7 +27,7 @@ func TestString(t *testing.T) { expected string }{ {int(storeStateTombstone), "store-state-tombstone-filter"}, - {int(filtersLen - 1), "store-state-slow-trend-filter"}, + {int(filtersLen - 1), "store-state-recently-split-regions-filter"}, {int(filtersLen), "unknown"}, } diff --git a/pkg/schedule/filter/filters.go b/pkg/schedule/filter/filters.go index 0d188e69180..e76969127d1 100644 --- a/pkg/schedule/filter/filters.go +++ b/pkg/schedule/filter/filters.go @@ -332,6 +332,8 @@ type StoreStateFilter struct { // If it checks failed, the operator will be put back to the waiting queue util the limit is available. // But the scheduler should keep the same with the operator level. OperatorLevel constant.PriorityLevel + // check the store not split recently in it if set true. + ForbidRecentlySplitRegions bool // Reason is used to distinguish the reason of store state filter Reason filterType } @@ -471,6 +473,15 @@ func (f *StoreStateFilter) hasRejectLeaderProperty(conf config.SharedConfigProvi return statusOK } +func (f *StoreStateFilter) hasRecentlySplitRegions(_ config.SharedConfigProvider, store *core.StoreInfo) *plan.Status { + if f.ForbidRecentlySplitRegions && store.HasRecentlySplitRegions() { + f.Reason = storeStateRecentlySplitRegions + return statusStoreRecentlySplitRegions + } + f.Reason = storeStateOK + return statusOK +} + // The condition table. // Y: the condition is temporary (expected to become false soon). // N: the condition is expected to be true for a long time. @@ -499,7 +510,7 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, conf config.SharedConfigPr var funcs []conditionFunc switch typ { case leaderSource: - funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected} + funcs = []conditionFunc{f.isRemoved, f.isDown, f.pauseLeaderTransfer, f.isDisconnected, f.hasRecentlySplitRegions} case regionSource: funcs = []conditionFunc{f.isBusy, f.exceedRemoveLimit, f.tooManySnapshots} case witnessSource: diff --git a/pkg/schedule/filter/region_filters.go b/pkg/schedule/filter/region_filters.go index 799cee7d90c..70cdb8500b0 100644 --- a/pkg/schedule/filter/region_filters.go +++ b/pkg/schedule/filter/region_filters.go @@ -24,24 +24,6 @@ import ( "github.com/tikv/pd/pkg/slice" ) -// SelectRegions selects regions that be selected from the list. -func SelectRegions(regions []*core.RegionInfo, filters ...RegionFilter) []*core.RegionInfo { - return filterRegionsBy(regions, func(r *core.RegionInfo) bool { - return slice.AllOf(filters, func(i int) bool { - return filters[i].Select(r).IsOK() - }) - }) -} - -func filterRegionsBy(regions []*core.RegionInfo, keepPred func(*core.RegionInfo) bool) (selected []*core.RegionInfo) { - for _, s := range regions { - if keepPred(s) { - selected = append(selected, s) - } - } - return -} - // SelectOneRegion selects one region that be selected from the list. func SelectOneRegion(regions []*core.RegionInfo, collector *plan.Collector, filters ...RegionFilter) *core.RegionInfo { for _, r := range regions { @@ -173,7 +155,7 @@ type SnapshotSenderFilter struct { senders map[uint64]struct{} } -// NewSnapshotSendFilter returns creates a RegionFilter that filters regions with witness peer on the specific store. +// NewSnapshotSendFilter returns creates a RegionFilter that filters regions whose leader has sender limit on the specific store. // level should be set as same with the operator priority level. func NewSnapshotSendFilter(stores []*core.StoreInfo, level constant.PriorityLevel) RegionFilter { senders := make(map[uint64]struct{}) @@ -193,3 +175,28 @@ func (f *SnapshotSenderFilter) Select(region *core.RegionInfo) *plan.Status { } return statusRegionLeaderSendSnapshotThrottled } + +// StoreRecentlySplitFilter filer the region whose leader store not recently split regions. +type StoreRecentlySplitFilter struct { + recentlySplitStores map[uint64]struct{} +} + +// NewStoreRecentlySplitFilter returns creates a StoreRecentlySplitFilter. +func NewStoreRecentlySplitFilter(stores []*core.StoreInfo) RegionFilter { + recentlySplitStores := make(map[uint64]struct{}) + for _, store := range stores { + if store.HasRecentlySplitRegions() { + recentlySplitStores[store.GetID()] = struct{}{} + } + } + return &StoreRecentlySplitFilter{recentlySplitStores: recentlySplitStores} +} + +// Select returns ok if the region leader not in the recentlySplitStores. +func (f *StoreRecentlySplitFilter) Select(region *core.RegionInfo) *plan.Status { + leaderStoreID := region.GetLeader().GetStoreId() + if _, ok := f.recentlySplitStores[leaderStoreID]; ok { + return statusStoreRecentlySplitRegions + } + return statusOK +} diff --git a/pkg/schedule/filter/status.go b/pkg/schedule/filter/status.go index 930c59e3ba8..9b6665a2fa7 100644 --- a/pkg/schedule/filter/status.go +++ b/pkg/schedule/filter/status.go @@ -39,8 +39,9 @@ var ( // store config limitation statusStoreRejectLeader = plan.NewStatus(plan.StatusStoreRejectLeader) - statusStoreNotMatchRule = plan.NewStatus(plan.StatusStoreNotMatchRule) - statusStoreNotMatchIsolation = plan.NewStatus(plan.StatusStoreNotMatchIsolation) + statusStoreNotMatchRule = plan.NewStatus(plan.StatusStoreNotMatchRule) + statusStoreNotMatchIsolation = plan.NewStatus(plan.StatusStoreNotMatchIsolation) + statusStoreRecentlySplitRegions = plan.NewStatus(plan.StatusStoreRecentlySplitRegions) // region filter status statusRegionPendingPeer = plan.NewStatus(plan.StatusRegionUnhealthy) diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index 3bd272a00ac..909c0fa1078 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -66,7 +66,7 @@ func NewRuleManager(storage endpoint.RuleStorage, storeSetInformer core.StoreSet // Initialize loads rules from storage. If Placement Rules feature is never enabled, it creates default rule that is // compatible with previous configuration. -func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error { +func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string) error { m.Lock() defer m.Unlock() if m.initialized { @@ -93,6 +93,7 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error Role: Voter, Count: maxReplica - witnessCount, LocationLabels: locationLabels, + IsolationLevel: isolationLevel, }, { GroupID: "pd", @@ -101,6 +102,7 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error Count: witnessCount, IsWitness: true, LocationLabels: locationLabels, + IsolationLevel: isolationLevel, }, }..., ) @@ -111,6 +113,7 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error Role: Voter, Count: maxReplica, LocationLabels: locationLabels, + IsolationLevel: isolationLevel, }) } for _, defaultRule := range defaultRules { diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index e5be8d74cd2..a6454337aa8 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -34,7 +34,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru var err error manager := NewRuleManager(store, nil, mockconfig.NewTestOptions()) manager.conf.SetEnableWitness(enableWitness) - err = manager.Initialize(3, []string{"zone", "rack", "host"}) + err = manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) return store, manager } @@ -157,7 +157,7 @@ func TestSaveLoad(t *testing.T) { } m2 := NewRuleManager(store, nil, nil) - err := m2.Initialize(3, []string{"no", "labels"}) + err := m2.Initialize(3, []string{"no", "labels"}, "") re.NoError(err) re.Len(m2.GetAllRules(), 3) re.Equal(rules[0].String(), m2.GetRule("pd", "default").String()) @@ -173,7 +173,7 @@ func TestSetAfterGet(t *testing.T) { manager.SetRule(rule) m2 := NewRuleManager(store, nil, nil) - err := m2.Initialize(100, []string{}) + err := m2.Initialize(100, []string{}, "") re.NoError(err) rule = m2.GetRule("pd", "default") re.Equal(1, rule.Count) diff --git a/pkg/schedule/plan/status.go b/pkg/schedule/plan/status.go index 4242b631493..847d03a17ff 100644 --- a/pkg/schedule/plan/status.go +++ b/pkg/schedule/plan/status.go @@ -72,6 +72,8 @@ const ( StatusStoreLowSpace = iota + 500 // StatusStoreNotExisted represents the store cannot be found in PD. StatusStoreNotExisted + // StatusStoreRecentlySplitRegions represents the store cannot be selected due to the region is splitting. + StatusStoreRecentlySplitRegions ) // TODO: define region status priority @@ -127,7 +129,8 @@ var statusText = map[StatusCode]string{ StatusStoreDown: "StoreDown", StatusStoreBusy: "StoreBusy", - StatusStoreNotExisted: "StoreNotExisted", + StatusStoreNotExisted: "StoreNotExisted", + StatusStoreRecentlySplitRegions: "StoreRecentlySplitRegions", // region StatusRegionHot: "RegionHot", diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index e5516317f46..46f7fdc29cd 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -48,8 +48,6 @@ const ( // Default value is 4 which is subjected by scheduler-max-waiting-operator and leader-schedule-limit // If you want to increase balance speed more, please increase above-mentioned param. BalanceLeaderBatchSize = 4 - // MaxBalanceLeaderBatchSize is maximum of balance leader batch size - MaxBalanceLeaderBatchSize = 10 transferIn = "transfer-in" transferOut = "transfer-out" @@ -150,7 +148,7 @@ func (handler *balanceLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http handler.rd.JSON(w, httpCode, v) } -func (handler *balanceLeaderHandler) ListConfig(w http.ResponseWriter, r *http.Request) { +func (handler *balanceLeaderHandler) ListConfig(w http.ResponseWriter, _ *http.Request) { conf := handler.config.Clone() handler.rd.JSON(w, http.StatusOK, conf) } @@ -162,6 +160,7 @@ type balanceLeaderScheduler struct { conf *balanceLeaderSchedulerConfig handler http.Handler filters []filter.Filter + regionFilters filter.RegionFilter filterCounter *filter.Counter } @@ -181,7 +180,7 @@ func newBalanceLeaderScheduler(opController *operator.Controller, conf *balanceL option(s) } s.filters = []filter.Filter{ - &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.High}, + &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, ForbidRecentlySplitRegions: true, OperatorLevel: constant.High}, filter.NewSpecialUseFilter(s.GetName()), } return s @@ -277,7 +276,7 @@ func (cs *candidateStores) less(iID uint64, scorei float64, jID uint64, scorej f return scorei > scorej } -// hasStore returns returns true when there are leftover stores. +// hasStore returns true when there are leftover stores. func (cs *candidateStores) hasStore() bool { return cs.index < len(cs.stores) } @@ -349,6 +348,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun opInfluence := l.OpController.GetOpInfluence(cluster.GetBasicCluster()) kind := constant.NewScheduleKind(constant.LeaderKind, leaderSchedulePolicy) solver := newSolver(basePlan, kind, cluster, opInfluence) + l.regionFilters = filter.NewStoreRecentlySplitFilter(cluster.GetStores()) stores := cluster.GetStores() scoreFunc := func(store *core.StoreInfo) float64 { @@ -486,7 +486,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl // the worst follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator { solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.TargetStoreID(), l.conf.Ranges), - nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) + nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter(), l.regionFilters) if solver.Region == nil { log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.TargetStoreID())) balanceLeaderNoFollowerRegionCounter.Inc() @@ -508,6 +508,7 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla balanceLeaderNoLeaderRegionCounter.Inc() return nil } + finalFilters := l.filters conf := solver.GetSchedulerConfig() if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { diff --git a/pkg/schedule/schedulers/balance_test.go b/pkg/schedule/schedulers/balance_test.go index 54fe8ff489b..3231716c681 100644 --- a/pkg/schedule/schedulers/balance_test.go +++ b/pkg/schedule/schedulers/balance_test.go @@ -20,6 +20,7 @@ import ( "math/rand" "sort" "testing" + "time" "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/metapb" @@ -294,6 +295,13 @@ func (suite *balanceLeaderSchedulerTestSuite) TestBalanceLimit() { // Region1: F F F L suite.tc.UpdateLeaderCount(4, 16) suite.NotEmpty(suite.schedule()) + + // can't balance leader from 4 to 1 when store 1 has split in it. + store := suite.tc.GetStore(4) + store = store.Clone(core.SetRecentlySplitRegionsTime(time.Now())) + suite.tc.PutStore(store) + op := suite.schedule() + suite.Empty(op) } func (suite *balanceLeaderSchedulerTestSuite) TestBalanceLeaderSchedulePolicy() { diff --git a/pkg/statistics/region_collection_test.go b/pkg/statistics/region_collection_test.go index 232fb8b73d8..2706ffeb043 100644 --- a/pkg/statistics/region_collection_test.go +++ b/pkg/statistics/region_collection_test.go @@ -30,7 +30,7 @@ func TestRegionStatistics(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() manager := placement.NewRuleManager(store, nil, nil) - err := manager.Initialize(3, []string{"zone", "rack", "host"}) + err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) opt := mockconfig.NewTestOptions() opt.SetPlacementRuleEnabled(false) @@ -120,7 +120,7 @@ func TestRegionStatisticsWithPlacementRule(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() manager := placement.NewRuleManager(store, nil, nil) - err := manager.Initialize(3, []string{"zone", "rack", "host"}) + err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) opt := mockconfig.NewTestOptions() opt.SetPlacementRuleEnabled(true) diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index ac409f90115..b0892a6736a 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -194,7 +194,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue } - _, saveKV, _, _ := regionGuide(region, origin) + changed := regionGuide(region, origin) overlaps := bc.PutRegion(region) if hasBuckets { @@ -202,7 +202,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { region.UpdateBuckets(buckets[i], old) } } - if saveKV { + if changed.SaveKV { err = regionStorage.SaveRegion(r) } if err == nil { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index dbd640d6e8c..d42dbb21ed1 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -301,7 +301,7 @@ func (c *RaftCluster) Start(s Server) error { c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { - err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels()) + err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) if err != nil { return err } @@ -1113,12 +1113,16 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { cluster.HandleStatsAsync(c, region) } - hasRegionStats := c.regionStats != nil - // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. - isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !c.isAPIServiceMode && !saveKV && !saveCache && !isNew { + changed := regionGuide(region, origin) + return c.SaveRegion(region, changed) +} + +// SaveRegion saves region info into cache and PD storage. +func (c *RaftCluster) SaveRegion(region *core.RegionInfo, changed *core.RegionChanged) (err error) { + hasRegionStats := c.regionStats != nil + if !c.isAPIServiceMode && !changed.SaveKV && !changed.SaveCache && !changed.IsNew { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -1132,14 +1136,15 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { }) var overlaps []*core.RegionInfo - if saveCache { + + if changed.SaveCache { failpoint.Inject("decEpoch", func() { region = region.Clone(core.SetRegionConfVer(2), core.SetRegionVersion(2)) }) // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // - // However it can't solve the race condition of concurrent heartbeats from the same region. + // However, it can't solve the race condition of concurrent heartbeats from the same region. if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } @@ -1150,7 +1155,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } if !c.isAPIServiceMode { - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared()) } if c.storage != nil { @@ -1166,7 +1171,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { errs.ZapError(err)) } } - if saveKV { + if changed.SaveKV { if err := c.storage.SaveRegion(region.GetMeta()); err != nil { log.Error("failed to save region to storage", zap.Uint64("region-id", region.GetID()), @@ -1177,13 +1182,12 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } } - if saveKV || needSync { + if changed.SaveKV || changed.NeedSync { select { case c.changedRegions <- region: default: } } - return nil } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index c9d4d0f8f61..aa826e34406 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -243,7 +243,7 @@ func TestSetOfflineStore(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -440,7 +440,7 @@ func TestUpStore(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -543,7 +543,7 @@ func TestDeleteStoreUpdatesClusterVersion(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -1270,7 +1270,7 @@ func TestOfflineAndMerge(t *testing.T) { cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -2129,7 +2129,7 @@ func newTestRaftCluster( rc.InitCluster(id, opt, s, basicCluster, nil) rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { - err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index c1da97363b5..3036fe95b3e 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -16,6 +16,8 @@ package cluster import ( "bytes" + "fmt" + "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" @@ -26,11 +28,13 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/utils/logutil" - "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/pkg/versioninfo" "go.uber.org/zap" ) +// store doesn't pick balance leader source if the split region is bigger than maxSplitThreshold. +const maxSplitThreshold = 10 + // HandleRegionHeartbeat processes RegionInfo reports from client. func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if err := c.processRegionHeartbeat(region); err != nil { @@ -41,6 +45,58 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { return nil } +// ProcessRegionSplit to process split region into region cache. +// it's different with the region heartbeat, it's only fill some new region into the region cache. +// so it doesn't consider the leader and hot statistics. +func (c *RaftCluster) ProcessRegionSplit(regions []*metapb.Region) []error { + if err := c.checkSplitRegions(regions); err != nil { + return []error{err} + } + total := len(regions) - 1 + regions[0], regions[total] = regions[total], regions[0] + leaderStoreID := uint64(0) + if r := c.core.GetRegion(regions[0].GetId()); r != nil { + leaderStoreID = r.GetLeader().GetStoreId() + } + if leaderStoreID == 0 { + return []error{errors.New("origin region no leader")} + } + leaderStore := c.GetStore(leaderStoreID) + if leaderStore == nil { + return []error{errors.New("leader store not found")} + } + errList := make([]error, 0, total) + for _, region := range regions { + if len(region.GetPeers()) == 0 { + errList = append(errList, errors.New(fmt.Sprintf("region:%d has no peer", region.GetId()))) + continue + } + // region split initiator store will be leader with a high probability + leader := region.Peers[0] + if leaderStoreID > 0 { + for _, peer := range region.GetPeers() { + if peer.GetStoreId() == leaderStoreID { + leader = peer + break + } + } + } + region := core.NewRegionInfo(region, leader) + changed := &core.RegionChanged{ + IsNew: true, SaveKV: true, SaveCache: true, NeedSync: true, + } + if err := c.SaveRegion(region, changed); err != nil { + errList = append(errList, err) + } + } + // If the number of regions exceeds the threshold, update the last split time. + if len(regions) >= maxSplitThreshold { + newStore := leaderStore.Clone(core.SetRecentlySplitRegionsTime(time.Now())) + c.core.PutStore(newStore) + } + return errList +} + // HandleAskSplit handles the split request. func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) { if c.isSchedulingHalted() { @@ -165,22 +221,6 @@ func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (* return resp, nil } -func (c *RaftCluster) checkSplitRegion(left *metapb.Region, right *metapb.Region) error { - if left == nil || right == nil { - return errors.New("invalid split region") - } - - if !bytes.Equal(left.GetEndKey(), right.GetStartKey()) { - return errors.New("invalid split region") - } - - if len(right.GetEndKey()) == 0 || bytes.Compare(left.GetStartKey(), right.GetEndKey()) < 0 { - return nil - } - - return errors.New("invalid split region") -} - func (c *RaftCluster) checkSplitRegions(regions []*metapb.Region) error { if len(regions) <= 1 { return errors.New("invalid split region") @@ -204,21 +244,18 @@ func (c *RaftCluster) HandleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb left := request.GetLeft() right := request.GetRight() - err := c.checkSplitRegion(left, right) - if err != nil { + if errs := c.ProcessRegionSplit([]*metapb.Region{left, right}); len(errs) > 0 { log.Warn("report split region is invalid", logutil.ZapRedactStringer("left-region", core.RegionToHexMeta(left)), logutil.ZapRedactStringer("right-region", core.RegionToHexMeta(right)), - errs.ZapError(err)) - return nil, err + zap.Errors("errs", errs), + ) + // error[0] may be checker error, others are ignored. + return nil, errs[0] } - // Build origin region by using left and right. - originRegion := typeutil.DeepClone(right, core.RegionFactory) - originRegion.RegionEpoch = nil - originRegion.StartKey = left.GetStartKey() log.Info("region split, generate new region", - zap.Uint64("region-id", originRegion.GetId()), + zap.Uint64("region-id", right.GetId()), logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(left))) return &pdpb.ReportSplitResponse{}, nil } @@ -226,21 +263,19 @@ func (c *RaftCluster) HandleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb // HandleBatchReportSplit handles the batch report split request. func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitRequest) (*pdpb.ReportBatchSplitResponse, error) { regions := request.GetRegions() - hrm := core.RegionsToHexMeta(regions) - err := c.checkSplitRegions(regions) - if err != nil { + if errs := c.ProcessRegionSplit(regions); len(errs) > 0 { log.Warn("report batch split region is invalid", zap.Stringer("region-meta", hrm), - errs.ZapError(err)) - return nil, err + zap.Errors("errs", errs)) + // error[0] may be checker error, others are ignored. + return nil, errs[0] } last := len(regions) - 1 - originRegion := typeutil.DeepClone(regions[last], core.RegionFactory) - hrm = core.RegionsToHexMeta(regions[:last]) + originRegionID := regions[last].GetId() log.Info("region batch split, generate new regions", - zap.Uint64("region-id", originRegion.GetId()), - zap.Stringer("origin", hrm), + zap.Uint64("region-id", originRegionID), + zap.Stringer("new-peer", hrm[:last]), zap.Int("total", last)) return &pdpb.ReportBatchSplitResponse{}, nil } diff --git a/server/cluster/cluster_worker_test.go b/server/cluster/cluster_worker_test.go index b376b38edc3..98b9b8380f1 100644 --- a/server/cluster/cluster_worker_test.go +++ b/server/cluster/cluster_worker_test.go @@ -23,9 +23,23 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockid" + "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/storage" ) +func mockRegionPeer(cluster *RaftCluster, voters []uint64) []*metapb.Peer { + rst := make([]*metapb.Peer, len(voters)) + for i, v := range voters { + id, _ := cluster.AllocID() + rst[i] = &metapb.Peer{ + Id: id, + StoreId: v, + Role: metapb.PeerRole_Voter, + } + } + return rst +} + func TestReportSplit(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -34,12 +48,56 @@ func TestReportSplit(t *testing.T) { _, opt, err := newTestScheduleConfig() re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) - left := &metapb.Region{Id: 1, StartKey: []byte("a"), EndKey: []byte("b")} - right := &metapb.Region{Id: 2, StartKey: []byte("b"), EndKey: []byte("c")} - _, err = cluster.HandleReportSplit(&pdpb.ReportSplitRequest{Left: left, Right: right}) - re.NoError(err) + cluster.coordinator = schedule.NewCoordinator(cluster.ctx, cluster, nil) + right := &metapb.Region{Id: 1, StartKey: []byte("a"), EndKey: []byte("c"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}} + region := core.NewRegionInfo(right, right.Peers[0]) + cluster.putRegion(region) + store := newTestStores(1, "2.0.0") + cluster.core.PutStore(store[0]) + + // split failed, split region keys must be continuous. + left := &metapb.Region{Id: 2, StartKey: []byte("a"), EndKey: []byte("b"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}} _, err = cluster.HandleReportSplit(&pdpb.ReportSplitRequest{Left: right, Right: left}) re.Error(err) + + // split success with continuous region keys. + right = &metapb.Region{Id: 1, StartKey: []byte("b"), EndKey: []byte("c"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}} + _, err = cluster.HandleReportSplit(&pdpb.ReportSplitRequest{Left: left, Right: right}) + re.NoError(err) + // no range hole + storeID := region.GetLeader().GetStoreId() + re.Equal(storeID, cluster.GetRegionByKey([]byte("b")).GetLeader().GetStoreId()) + re.Equal(storeID, cluster.GetRegionByKey([]byte("a")).GetLeader().GetStoreId()) + re.Equal(uint64(1), cluster.GetRegionByKey([]byte("b")).GetID()) + re.Equal(uint64(2), cluster.GetRegionByKey([]byte("a")).GetID()) + + testdata := []struct { + regionID uint64 + startKey []byte + endKey []byte + }{ + { + regionID: 1, + startKey: []byte("b"), + endKey: []byte("c"), + }, { + regionID: 2, + startKey: []byte("a"), + endKey: []byte("b"), + }, + } + + for _, data := range testdata { + r := metapb.Region{} + ok, err := cluster.storage.LoadRegion(data.regionID, &r) + re.NoError(err) + re.True(ok) + re.Equal(data.startKey, r.GetStartKey()) + re.Equal(data.endKey, r.GetEndKey()) + } } func TestReportBatchSplit(t *testing.T) { @@ -50,12 +108,39 @@ func TestReportBatchSplit(t *testing.T) { _, opt, err := newTestScheduleConfig() re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) + store := newTestStores(1, "2.0.0") + cluster.core.PutStore(store[0]) + re.False(cluster.GetStore(1).HasRecentlySplitRegions()) regions := []*metapb.Region{ - {Id: 1, StartKey: []byte(""), EndKey: []byte("a")}, - {Id: 2, StartKey: []byte("a"), EndKey: []byte("b")}, - {Id: 3, StartKey: []byte("b"), EndKey: []byte("c")}, - {Id: 3, StartKey: []byte("c"), EndKey: []byte("")}, + {Id: 1, StartKey: []byte(""), EndKey: []byte("a"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3})}, + {Id: 2, StartKey: []byte("a"), EndKey: []byte("b"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3})}, + {Id: 3, StartKey: []byte("b"), EndKey: []byte("c"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3})}, + {Id: 4, StartKey: []byte("c"), EndKey: []byte(""), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3})}, + } + _, err = cluster.HandleBatchReportSplit(&pdpb.ReportBatchSplitRequest{Regions: regions}) + re.Error(err) + + meta := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte(""), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}} + region := core.NewRegionInfo(meta, meta.Peers[0]) + cluster.putRegion(region) + + regions = []*metapb.Region{ + {Id: 2, StartKey: []byte(""), EndKey: []byte("a"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, + {Id: 3, StartKey: []byte("a"), EndKey: []byte("b"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, + {Id: 4, StartKey: []byte("b"), EndKey: []byte("c"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, + {Id: 5, StartKey: []byte("c"), EndKey: []byte("d"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, + {Id: 6, StartKey: []byte("d"), EndKey: []byte("e"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, + {Id: 7, StartKey: []byte("e"), EndKey: []byte("f"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, + {Id: 8, StartKey: []byte("f"), EndKey: []byte("g"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, + {Id: 9, StartKey: []byte("g"), EndKey: []byte("h"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, + {Id: 10, StartKey: []byte("h"), EndKey: []byte("i"), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, + + {Id: 1, StartKey: []byte("i"), EndKey: []byte(""), Peers: mockRegionPeer(cluster, []uint64{1, 2, 3}), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}, } _, err = cluster.HandleBatchReportSplit(&pdpb.ReportBatchSplitRequest{Regions: regions}) re.NoError(err) + + re.True(cluster.GetStore(1).HasRecentlySplitRegions()) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 3f1c4d4a24e..14fdbf653aa 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -330,6 +330,13 @@ func (o *PersistOptions) SetEnableWitness(enable bool) { o.SetScheduleConfig(v) } +// SetMaxStoreDownTime to set the max store down time. It's only used to test. +func (o *PersistOptions) SetMaxStoreDownTime(time time.Duration) { + v := o.GetScheduleConfig().Clone() + v.MaxStoreDownTime = typeutil.NewDuration(time) + o.SetScheduleConfig(v) +} + // SetMaxMergeRegionSize sets the max merge region size. func (o *PersistOptions) SetMaxMergeRegionSize(maxMergeRegionSize uint64) { v := o.GetScheduleConfig().Clone() diff --git a/server/grpc_service.go b/server/grpc_service.go index 5e40bc1c732..d218c2bb0b6 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1428,10 +1428,24 @@ func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionReque if rc == nil { return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil } - region := rc.GetRegionByKey(request.GetRegionKey()) + var region *core.RegionInfo + // allow region miss temporarily if this key can't be found in the region tree. +retryLoop: + for retry := 0; retry <= 10; retry++ { + region = rc.GetRegionByKey(request.GetRegionKey()) + if region != nil { + break retryLoop + } + select { + case <-ctx.Done(): + break retryLoop + case <-time.After(10 * time.Millisecond): + } + } if region == nil { return &pdpb.GetRegionResponse{Header: s.header()}, nil } + var buckets *metapb.Buckets if rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() { buckets = region.GetBuckets() @@ -1473,7 +1487,21 @@ func (s *GrpcServer) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionR return &pdpb.GetRegionResponse{Header: s.notBootstrappedHeader()}, nil } - region := rc.GetPrevRegionByKey(request.GetRegionKey()) + var region *core.RegionInfo + // allow region miss temporarily if this key can't be found in the region tree. +retryLoop: + for retry := 0; retry <= 10; retry++ { + region = rc.GetPrevRegionByKey(request.GetRegionKey()) + if region != nil { + break retryLoop + } + select { + case <-ctx.Done(): + break retryLoop + case <-time.After(10 * time.Millisecond): + } + } + if region == nil { return &pdpb.GetRegionResponse{Header: s.header()}, nil } diff --git a/server/server.go b/server/server.go index 03e036a968e..2fb66387d7a 100644 --- a/server/server.go +++ b/server/server.go @@ -1030,7 +1030,7 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { } if cfg.EnablePlacementRules { // initialize rule manager. - if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil { return err } } else { @@ -1053,19 +1053,19 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { defaultRule := rc.GetRuleManager().GetRule("pd", "default") CheckInDefaultRule := func() error { - // replication config won't work when placement rule is enabled and exceeds one default rule + // replication config won't work when placement rule is enabled and exceeds one default rule if !(defaultRule != nil && len(defaultRule.StartKey) == 0 && len(defaultRule.EndKey) == 0) { - return errors.New("cannot update MaxReplicas or LocationLabels when placement rules feature is enabled and not only default rule exists, please update rule instead") + return errors.New("cannot update MaxReplicas, LocationLabels or IsolationLevel when placement rules feature is enabled and not only default rule exists, please update rule instead") } - if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.AreStringSlicesEqual(defaultRule.LocationLabels, []string(old.LocationLabels))) { + if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.AreStringSlicesEqual(defaultRule.LocationLabels, []string(old.LocationLabels)) && defaultRule.IsolationLevel == old.IsolationLevel) { return errors.New("cannot to update replication config, the default rules do not consistent with replication config, please update rule instead") } return nil } - if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.AreStringSlicesEqual(cfg.LocationLabels, old.LocationLabels)) { + if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.AreStringSlicesEqual(cfg.LocationLabels, old.LocationLabels) && cfg.IsolationLevel == old.IsolationLevel) { if err := CheckInDefaultRule(); err != nil { return err } @@ -1076,6 +1076,7 @@ func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error { if rule != nil { rule.Count = int(cfg.MaxReplicas) rule.LocationLabels = cfg.LocationLabels + rule.IsolationLevel = cfg.IsolationLevel rc := s.GetRaftCluster() if rc == nil { return errs.ErrNotBootstrapped.GenWithStackByArgs() diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index d63c950ef9a..6ed0841bf74 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -678,7 +678,7 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.Equal(expect, replicationCfg.MaxReplicas) } - checkLocaltionLabels := func(expect int) { + checkLocationLabels := func(expect int) { args := []string{"-u", pdAddr, "config", "show", "replication"} output, err := pdctl.ExecuteCommand(cmd, args...) re.NoError(err) @@ -687,6 +687,15 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.Len(replicationCfg.LocationLabels, expect) } + checkIsolationLevel := func(expect string) { + args := []string{"-u", pdAddr, "config", "show", "replication"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + replicationCfg := sc.ReplicationConfig{} + re.NoError(json.Unmarshal(output, &replicationCfg)) + re.Equal(replicationCfg.IsolationLevel, expect) + } + checkRuleCount := func(expect int) { args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", "pd", "--id", "default"} output, err := pdctl.ExecuteCommand(cmd, args...) @@ -705,6 +714,15 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.Len(rule.LocationLabels, expect) } + checkRuleIsolationLevel := func(expect string) { + args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", "pd", "--id", "default"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + rule := placement.Rule{} + re.NoError(json.Unmarshal(output, &rule)) + re.Equal(rule.IsolationLevel, expect) + } + // update successfully when placement rules is not enabled. output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "max-replicas", "2") re.NoError(err) @@ -713,8 +731,13 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "location-labels", "zone,host") re.NoError(err) re.Contains(string(output), "Success!") - checkLocaltionLabels(2) + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "isolation-level", "zone") + re.NoError(err) + re.Contains(string(output), "Success!") + checkLocationLabels(2) checkRuleLocationLabels(2) + checkIsolationLevel("zone") + checkRuleIsolationLevel("zone") // update successfully when only one default rule exists. output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "placement-rules", "enable") @@ -727,11 +750,18 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { checkMaxReplicas(3) checkRuleCount(3) + // We need to change isolation first because we will validate + // if the location label contains the isolation level when setting location labels. + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "isolation-level", "host") + re.NoError(err) + re.Contains(string(output), "Success!") output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "location-labels", "host") re.NoError(err) re.Contains(string(output), "Success!") - checkLocaltionLabels(1) + checkLocationLabels(1) checkRuleLocationLabels(1) + checkIsolationLevel("host") + checkRuleIsolationLevel("host") // update unsuccessfully when many rule exists. fname := t.TempDir() @@ -755,8 +785,10 @@ func TestUpdateDefaultReplicaConfig(t *testing.T) { re.NoError(err) checkMaxReplicas(4) checkRuleCount(4) - checkLocaltionLabels(1) + checkLocationLabels(1) checkRuleLocationLabels(1) + checkIsolationLevel("host") + checkRuleIsolationLevel("host") } func TestPDServerConfig(t *testing.T) { diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index c464c2c83d8..dcf7ef3ae6c 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -418,8 +418,10 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te } if testCase.placementRuleEnable { err := svr.GetRaftCluster().GetRuleManager().Initialize( - svr.GetRaftCluster().GetOpts().GetMaxReplicas(), - svr.GetRaftCluster().GetOpts().GetLocationLabels()) + suite.svr.GetRaftCluster().GetOpts().GetMaxReplicas(), + suite.svr.GetRaftCluster().GetOpts().GetLocationLabels(), + suite.svr.GetRaftCluster().GetOpts().GetIsolationLevel(), + ) suite.NoError(err) } if len(testCase.rules) > 0 {