From 21f937823904698eae123f699f9444e850652a6f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 26 Jan 2024 16:58:51 +0800 Subject: [PATCH] This is an automated cherry-pick of #7765 close tikv/pd#4399 Signed-off-by: ti-chi-bot --- pkg/schedule/config/config.go | 499 ++++++ server/cluster/cluster_test.go | 1441 +++++++++++++++++ .../mcs/scheduling/server_test.go | 673 ++++++++ 3 files changed, 2613 insertions(+) create mode 100644 tests/integrations/mcs/scheduling/server_test.go diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 06dd3f31cfa..a890a7f1e7d 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -91,6 +91,7 @@ type Config interface { UseRaftV2() } +<<<<<<< HEAD // StoreConfig is the interface that wraps the StoreConfig related methods. type StoreConfig interface { GetRegionMaxSize() uint64 @@ -100,4 +101,502 @@ type StoreConfig interface { IsRaftKV2() bool // for test purpose SetRegionBucketEnabled(bool) +======= +func adjustSchedulers(v *SchedulerConfigs, defValue SchedulerConfigs) { + if len(*v) == 0 { + // Make a copy to avoid changing DefaultSchedulers unexpectedly. + // When reloading from storage, the config is passed to json.Unmarshal. + // Without clone, the DefaultSchedulers could be overwritten. + *v = append(defValue[:0:0], defValue...) + } +} + +// ScheduleConfig is the schedule configuration. +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +type ScheduleConfig struct { + // If the snapshot count of one store is greater than this value, + // it will never be used as a source or target store. + MaxSnapshotCount uint64 `toml:"max-snapshot-count" json:"max-snapshot-count"` + MaxPendingPeerCount uint64 `toml:"max-pending-peer-count" json:"max-pending-peer-count"` + // If both the size of region is smaller than MaxMergeRegionSize + // and the number of rows in region is smaller than MaxMergeRegionKeys, + // it will try to merge with adjacent regions. + MaxMergeRegionSize uint64 `toml:"max-merge-region-size" json:"max-merge-region-size"` + MaxMergeRegionKeys uint64 `toml:"max-merge-region-keys" json:"max-merge-region-keys"` + // SplitMergeInterval is the minimum interval time to permit merge after split. + SplitMergeInterval typeutil.Duration `toml:"split-merge-interval" json:"split-merge-interval"` + // SwitchWitnessInterval is the minimum interval that allows a peer to become a witness again after it is promoted to non-witness. + SwitchWitnessInterval typeutil.Duration `toml:"switch-witness-interval" json:"switch-witness-interval"` + // EnableOneWayMerge is the option to enable one way merge. This means a Region can only be merged into the next region of it. + EnableOneWayMerge bool `toml:"enable-one-way-merge" json:"enable-one-way-merge,string"` + // EnableCrossTableMerge is the option to enable cross table merge. This means two Regions can be merged with different table IDs. + // This option only works when key type is "table". + EnableCrossTableMerge bool `toml:"enable-cross-table-merge" json:"enable-cross-table-merge,string"` + // PatrolRegionInterval is the interval for scanning region during patrol. + PatrolRegionInterval typeutil.Duration `toml:"patrol-region-interval" json:"patrol-region-interval"` + // MaxStoreDownTime is the max duration after which + // a store will be considered to be down if it hasn't reported heartbeats. + MaxStoreDownTime typeutil.Duration `toml:"max-store-down-time" json:"max-store-down-time"` + // MaxStorePreparingTime is the max duration after which + // a store will be considered to be preparing. + MaxStorePreparingTime typeutil.Duration `toml:"max-store-preparing-time" json:"max-store-preparing-time"` + // LeaderScheduleLimit is the max coexist leader schedules. + LeaderScheduleLimit uint64 `toml:"leader-schedule-limit" json:"leader-schedule-limit"` + // LeaderSchedulePolicy is the option to balance leader, there are some policies supported: ["count", "size"], default: "count" + LeaderSchedulePolicy string `toml:"leader-schedule-policy" json:"leader-schedule-policy"` + // RegionScheduleLimit is the max coexist region schedules. + RegionScheduleLimit uint64 `toml:"region-schedule-limit" json:"region-schedule-limit"` + // WitnessScheduleLimit is the max coexist witness schedules. + WitnessScheduleLimit uint64 `toml:"witness-schedule-limit" json:"witness-schedule-limit"` + // ReplicaScheduleLimit is the max coexist replica schedules. + ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit" json:"replica-schedule-limit"` + // MergeScheduleLimit is the max coexist merge schedules. + MergeScheduleLimit uint64 `toml:"merge-schedule-limit" json:"merge-schedule-limit"` + // HotRegionScheduleLimit is the max coexist hot region schedules. + HotRegionScheduleLimit uint64 `toml:"hot-region-schedule-limit" json:"hot-region-schedule-limit"` + // HotRegionCacheHitThreshold is the cache hits threshold of the hot region. + // If the number of times a region hits the hot cache is greater than this + // threshold, it is considered a hot region. + HotRegionCacheHitsThreshold uint64 `toml:"hot-region-cache-hits-threshold" json:"hot-region-cache-hits-threshold"` + // StoreBalanceRate is the maximum of balance rate for each store. + // WARN: StoreBalanceRate is deprecated. + StoreBalanceRate float64 `toml:"store-balance-rate" json:"store-balance-rate,omitempty"` + // StoreLimit is the limit of scheduling for stores. + StoreLimit map[uint64]StoreLimitConfig `toml:"store-limit" json:"store-limit"` + // TolerantSizeRatio is the ratio of buffer size for balance scheduler. + TolerantSizeRatio float64 `toml:"tolerant-size-ratio" json:"tolerant-size-ratio"` + // + // high space stage transition stage low space stage + // |--------------------|-----------------------------|-------------------------| + // ^ ^ ^ ^ + // 0 HighSpaceRatio * capacity LowSpaceRatio * capacity capacity + // + // LowSpaceRatio is the lowest usage ratio of store which regraded as low space. + // When in low space, store region score increases to very large and varies inversely with available size. + LowSpaceRatio float64 `toml:"low-space-ratio" json:"low-space-ratio"` + // HighSpaceRatio is the highest usage ratio of store which regraded as high space. + // High space means there is a lot of spare capacity, and store region score varies directly with used size. + HighSpaceRatio float64 `toml:"high-space-ratio" json:"high-space-ratio"` + // RegionScoreFormulaVersion is used to control the formula used to calculate region score. + RegionScoreFormulaVersion string `toml:"region-score-formula-version" json:"region-score-formula-version"` + // SchedulerMaxWaitingOperator is the max coexist operators for each scheduler. + SchedulerMaxWaitingOperator uint64 `toml:"scheduler-max-waiting-operator" json:"scheduler-max-waiting-operator"` + // WARN: DisableLearner is deprecated. + // DisableLearner is the option to disable using AddLearnerNode instead of AddNode. + DisableLearner bool `toml:"disable-raft-learner" json:"disable-raft-learner,string,omitempty"` + // DisableRemoveDownReplica is the option to prevent replica checker from + // removing down replicas. + // WARN: DisableRemoveDownReplica is deprecated. + DisableRemoveDownReplica bool `toml:"disable-remove-down-replica" json:"disable-remove-down-replica,string,omitempty"` + // DisableReplaceOfflineReplica is the option to prevent replica checker from + // replacing offline replicas. + // WARN: DisableReplaceOfflineReplica is deprecated. + DisableReplaceOfflineReplica bool `toml:"disable-replace-offline-replica" json:"disable-replace-offline-replica,string,omitempty"` + // DisableMakeUpReplica is the option to prevent replica checker from making up + // replicas when replica count is less than expected. + // WARN: DisableMakeUpReplica is deprecated. + DisableMakeUpReplica bool `toml:"disable-make-up-replica" json:"disable-make-up-replica,string,omitempty"` + // DisableRemoveExtraReplica is the option to prevent replica checker from + // removing extra replicas. + // WARN: DisableRemoveExtraReplica is deprecated. + DisableRemoveExtraReplica bool `toml:"disable-remove-extra-replica" json:"disable-remove-extra-replica,string,omitempty"` + // DisableLocationReplacement is the option to prevent replica checker from + // moving replica to a better location. + // WARN: DisableLocationReplacement is deprecated. + DisableLocationReplacement bool `toml:"disable-location-replacement" json:"disable-location-replacement,string,omitempty"` + + // EnableRemoveDownReplica is the option to enable replica checker to remove down replica. + EnableRemoveDownReplica bool `toml:"enable-remove-down-replica" json:"enable-remove-down-replica,string"` + // EnableReplaceOfflineReplica is the option to enable replica checker to replace offline replica. + EnableReplaceOfflineReplica bool `toml:"enable-replace-offline-replica" json:"enable-replace-offline-replica,string"` + // EnableMakeUpReplica is the option to enable replica checker to make up replica. + EnableMakeUpReplica bool `toml:"enable-make-up-replica" json:"enable-make-up-replica,string"` + // EnableRemoveExtraReplica is the option to enable replica checker to remove extra replica. + EnableRemoveExtraReplica bool `toml:"enable-remove-extra-replica" json:"enable-remove-extra-replica,string"` + // EnableLocationReplacement is the option to enable replica checker to move replica to a better location. + EnableLocationReplacement bool `toml:"enable-location-replacement" json:"enable-location-replacement,string"` + // EnableDebugMetrics is the option to enable debug metrics. + EnableDebugMetrics bool `toml:"enable-debug-metrics" json:"enable-debug-metrics,string"` + // EnableJointConsensus is the option to enable using joint consensus as an operator step. + EnableJointConsensus bool `toml:"enable-joint-consensus" json:"enable-joint-consensus,string"` + // EnableTiKVSplitRegion is the option to enable tikv split region. + // on ebs-based BR we need to disable it with TTL + EnableTiKVSplitRegion bool `toml:"enable-tikv-split-region" json:"enable-tikv-split-region,string"` + + // Schedulers support for loading customized schedulers + Schedulers SchedulerConfigs `toml:"schedulers" json:"schedulers-v2"` // json v2 is for the sake of compatible upgrade + + // Only used to display + SchedulersPayload map[string]interface{} `toml:"schedulers-payload" json:"schedulers-payload"` + + // Controls the time interval between write hot regions info into leveldb. + HotRegionsWriteInterval typeutil.Duration `toml:"hot-regions-write-interval" json:"hot-regions-write-interval"` + + // The day of hot regions data to be reserved. 0 means close. + HotRegionsReservedDays uint64 `toml:"hot-regions-reserved-days" json:"hot-regions-reserved-days"` + + // MaxMovableHotPeerSize is the threshold of region size for balance hot region and split bucket scheduler. + // Hot region must be split before moved if it's region size is greater than MaxMovableHotPeerSize. + MaxMovableHotPeerSize int64 `toml:"max-movable-hot-peer-size" json:"max-movable-hot-peer-size,omitempty"` + + // EnableDiagnostic is the option to enable using diagnostic + EnableDiagnostic bool `toml:"enable-diagnostic" json:"enable-diagnostic,string"` + + // EnableWitness is the option to enable using witness + EnableWitness bool `toml:"enable-witness" json:"enable-witness,string"` + + // SlowStoreEvictingAffectedStoreRatioThreshold is the affected ratio threshold when judging a store is slow + // A store's slowness must affect more than `store-count * SlowStoreEvictingAffectedStoreRatioThreshold` to trigger evicting. + SlowStoreEvictingAffectedStoreRatioThreshold float64 `toml:"slow-store-evicting-affected-store-ratio-threshold" json:"slow-store-evicting-affected-store-ratio-threshold,omitempty"` + + // StoreLimitVersion is the version of store limit. + // v1: which is based on the region count by rate limit. + // v2: which is based on region size by window size. + StoreLimitVersion string `toml:"store-limit-version" json:"store-limit-version,omitempty"` + + // HaltScheduling is the option to halt the scheduling. Once it's on, PD will halt the scheduling, + // and any other scheduling configs will be ignored. + HaltScheduling bool `toml:"halt-scheduling" json:"halt-scheduling,string,omitempty"` +} + +// Clone returns a cloned scheduling configuration. +func (c *ScheduleConfig) Clone() *ScheduleConfig { + schedulers := append(c.Schedulers[:0:0], c.Schedulers...) + var storeLimit map[uint64]StoreLimitConfig + if c.StoreLimit != nil { + storeLimit = make(map[uint64]StoreLimitConfig, len(c.StoreLimit)) + for k, v := range c.StoreLimit { + storeLimit[k] = v + } + } + cfg := *c + cfg.StoreLimit = storeLimit + cfg.Schedulers = schedulers + cfg.SchedulersPayload = nil + return &cfg +} + +// Adjust adjusts the config. +func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool) error { + if !meta.IsDefined("max-snapshot-count") { + configutil.AdjustUint64(&c.MaxSnapshotCount, defaultMaxSnapshotCount) + } + if !meta.IsDefined("max-pending-peer-count") { + configutil.AdjustUint64(&c.MaxPendingPeerCount, defaultMaxPendingPeerCount) + } + if !meta.IsDefined("max-merge-region-size") { + configutil.AdjustUint64(&c.MaxMergeRegionSize, defaultMaxMergeRegionSize) + } + configutil.AdjustDuration(&c.SplitMergeInterval, DefaultSplitMergeInterval) + configutil.AdjustDuration(&c.SwitchWitnessInterval, defaultSwitchWitnessInterval) + configutil.AdjustDuration(&c.PatrolRegionInterval, defaultPatrolRegionInterval) + configutil.AdjustDuration(&c.MaxStoreDownTime, defaultMaxStoreDownTime) + configutil.AdjustDuration(&c.HotRegionsWriteInterval, defaultHotRegionsWriteInterval) + configutil.AdjustDuration(&c.MaxStorePreparingTime, defaultMaxStorePreparingTime) + if !meta.IsDefined("leader-schedule-limit") { + configutil.AdjustUint64(&c.LeaderScheduleLimit, defaultLeaderScheduleLimit) + } + if !meta.IsDefined("region-schedule-limit") { + configutil.AdjustUint64(&c.RegionScheduleLimit, defaultRegionScheduleLimit) + } + if !meta.IsDefined("witness-schedule-limit") { + configutil.AdjustUint64(&c.WitnessScheduleLimit, defaultWitnessScheduleLimit) + } + if !meta.IsDefined("replica-schedule-limit") { + configutil.AdjustUint64(&c.ReplicaScheduleLimit, defaultReplicaScheduleLimit) + } + if !meta.IsDefined("merge-schedule-limit") { + configutil.AdjustUint64(&c.MergeScheduleLimit, defaultMergeScheduleLimit) + } + if !meta.IsDefined("hot-region-schedule-limit") { + configutil.AdjustUint64(&c.HotRegionScheduleLimit, defaultHotRegionScheduleLimit) + } + if !meta.IsDefined("hot-region-cache-hits-threshold") { + configutil.AdjustUint64(&c.HotRegionCacheHitsThreshold, defaultHotRegionCacheHitsThreshold) + } + if !meta.IsDefined("tolerant-size-ratio") { + configutil.AdjustFloat64(&c.TolerantSizeRatio, defaultTolerantSizeRatio) + } + if !meta.IsDefined("scheduler-max-waiting-operator") { + configutil.AdjustUint64(&c.SchedulerMaxWaitingOperator, defaultSchedulerMaxWaitingOperator) + } + if !meta.IsDefined("leader-schedule-policy") { + configutil.AdjustString(&c.LeaderSchedulePolicy, defaultLeaderSchedulePolicy) + } + if !meta.IsDefined("store-limit-version") { + configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion) + } + + if !meta.IsDefined("enable-joint-consensus") { + c.EnableJointConsensus = defaultEnableJointConsensus + } + if !meta.IsDefined("enable-tikv-split-region") { + c.EnableTiKVSplitRegion = defaultEnableTiKVSplitRegion + } + if !meta.IsDefined("enable-cross-table-merge") { + c.EnableCrossTableMerge = defaultEnableCrossTableMerge + } + configutil.AdjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio) + configutil.AdjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio) + if !meta.IsDefined("enable-diagnostic") { + c.EnableDiagnostic = defaultEnableDiagnostic + } + + if !meta.IsDefined("enable-witness") { + c.EnableWitness = defaultEnableWitness + } + + // new cluster:v2, old cluster:v1 + if !meta.IsDefined("region-score-formula-version") && !reloading { + configutil.AdjustString(&c.RegionScoreFormulaVersion, defaultRegionScoreFormulaVersion) + } + + if !meta.IsDefined("halt-scheduling") { + c.HaltScheduling = defaultHaltScheduling + } + + adjustSchedulers(&c.Schedulers, DefaultSchedulers) + + for k, b := range c.migrateConfigurationMap() { + v, err := c.parseDeprecatedFlag(meta, k, *b[0], *b[1]) + if err != nil { + return err + } + *b[0], *b[1] = false, v // reset old flag false to make it ignored when marshal to JSON + } + + if c.StoreBalanceRate != 0 { + DefaultStoreLimit = StoreLimit{AddPeer: c.StoreBalanceRate, RemovePeer: c.StoreBalanceRate} + c.StoreBalanceRate = 0 + } + + if c.StoreLimit == nil { + c.StoreLimit = make(map[uint64]StoreLimitConfig) + } + + if !meta.IsDefined("hot-regions-reserved-days") { + configutil.AdjustUint64(&c.HotRegionsReservedDays, defaultHotRegionsReservedDays) + } + + if !meta.IsDefined("max-movable-hot-peer-size") { + configutil.AdjustInt64(&c.MaxMovableHotPeerSize, defaultMaxMovableHotPeerSize) + } + + if !meta.IsDefined("slow-store-evicting-affected-store-ratio-threshold") { + configutil.AdjustFloat64(&c.SlowStoreEvictingAffectedStoreRatioThreshold, defaultSlowStoreEvictingAffectedStoreRatioThreshold) + } + return c.Validate() +} + +func (c *ScheduleConfig) migrateConfigurationMap() map[string][2]*bool { + return map[string][2]*bool{ + "remove-down-replica": {&c.DisableRemoveDownReplica, &c.EnableRemoveDownReplica}, + "replace-offline-replica": {&c.DisableReplaceOfflineReplica, &c.EnableReplaceOfflineReplica}, + "make-up-replica": {&c.DisableMakeUpReplica, &c.EnableMakeUpReplica}, + "remove-extra-replica": {&c.DisableRemoveExtraReplica, &c.EnableRemoveExtraReplica}, + "location-replacement": {&c.DisableLocationReplacement, &c.EnableLocationReplacement}, + } +} + +// GetMaxMergeRegionKeys returns the max merge keys. +// it should keep consistent with tikv: https://github.com/tikv/tikv/pull/12484 +func (c *ScheduleConfig) GetMaxMergeRegionKeys() uint64 { + if keys := c.MaxMergeRegionKeys; keys != 0 { + return keys + } + return c.MaxMergeRegionSize * 10000 +} + +func (c *ScheduleConfig) parseDeprecatedFlag(meta *configutil.ConfigMetaData, name string, old, new bool) (bool, error) { + oldName, newName := "disable-"+name, "enable-"+name + defineOld, defineNew := meta.IsDefined(oldName), meta.IsDefined(newName) + switch { + case defineNew && defineOld: + if new == old { + return false, errors.Errorf("config item %s and %s(deprecated) are conflict", newName, oldName) + } + return new, nil + case defineNew && !defineOld: + return new, nil + case !defineNew && defineOld: + return !old, nil // use !disable-* + case !defineNew && !defineOld: + return true, nil // use default value true + } + return false, nil // unreachable. +} + +// MigrateDeprecatedFlags updates new flags according to deprecated flags. +func (c *ScheduleConfig) MigrateDeprecatedFlags() { + c.DisableLearner = false + if c.StoreBalanceRate != 0 { + DefaultStoreLimit = StoreLimit{AddPeer: c.StoreBalanceRate, RemovePeer: c.StoreBalanceRate} + c.StoreBalanceRate = 0 + } + for _, b := range c.migrateConfigurationMap() { + // If old=false (previously disabled), set both old and new to false. + if *b[0] { + *b[0], *b[1] = false, false + } + } +} + +// Validate is used to validate if some scheduling configurations are right. +func (c *ScheduleConfig) Validate() error { + if c.TolerantSizeRatio < 0 { + return errors.New("tolerant-size-ratio should be non-negative") + } + if c.LowSpaceRatio < 0 || c.LowSpaceRatio > 1 { + return errors.New("low-space-ratio should between 0 and 1") + } + if c.HighSpaceRatio < 0 || c.HighSpaceRatio > 1 { + return errors.New("high-space-ratio should between 0 and 1") + } + if c.LowSpaceRatio <= c.HighSpaceRatio { + return errors.New("low-space-ratio should be larger than high-space-ratio") + } + if c.LeaderSchedulePolicy != "count" && c.LeaderSchedulePolicy != "size" { + return errors.Errorf("leader-schedule-policy %v is invalid", c.LeaderSchedulePolicy) + } + if c.SlowStoreEvictingAffectedStoreRatioThreshold == 0 { + return errors.Errorf("slow-store-evicting-affected-store-ratio-threshold is not set") + } + return nil +} + +// Deprecated is used to find if there is an option has been deprecated. +func (c *ScheduleConfig) Deprecated() error { + if c.DisableLearner { + return errors.New("disable-raft-learner has already been deprecated") + } + if c.DisableRemoveDownReplica { + return errors.New("disable-remove-down-replica has already been deprecated") + } + if c.DisableReplaceOfflineReplica { + return errors.New("disable-replace-offline-replica has already been deprecated") + } + if c.DisableMakeUpReplica { + return errors.New("disable-make-up-replica has already been deprecated") + } + if c.DisableRemoveExtraReplica { + return errors.New("disable-remove-extra-replica has already been deprecated") + } + if c.DisableLocationReplacement { + return errors.New("disable-location-replacement has already been deprecated") + } + if c.StoreBalanceRate != 0 { + return errors.New("store-balance-rate has already been deprecated") + } + return nil +} + +// StoreLimitConfig is a config about scheduling rate limit of different types for a store. +type StoreLimitConfig struct { + AddPeer float64 `toml:"add-peer" json:"add-peer"` + RemovePeer float64 `toml:"remove-peer" json:"remove-peer"` +} + +// SchedulerConfigs is a slice of customized scheduler configuration. +type SchedulerConfigs []SchedulerConfig + +// SchedulerConfig is customized scheduler configuration +type SchedulerConfig struct { + Type string `toml:"type" json:"type"` + Args []string `toml:"args" json:"args"` + Disable bool `toml:"disable" json:"disable"` + ArgsPayload string `toml:"args-payload" json:"args-payload"` +} + +// DefaultSchedulers are the schedulers be created by default. +// If these schedulers are not in the persistent configuration, they +// will be created automatically when reloading. +var DefaultSchedulers = SchedulerConfigs{ + {Type: "balance-region"}, + {Type: "balance-leader"}, + {Type: "hot-region"}, + {Type: "evict-slow-store"}, +} + +// IsDefaultScheduler checks whether the scheduler is enable by default. +func IsDefaultScheduler(typ string) bool { + for _, c := range DefaultSchedulers { + if typ == c.Type { + return true + } + } + return false +} + +// ReplicationConfig is the replication configuration. +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +type ReplicationConfig struct { + // MaxReplicas is the number of replicas for each region. + MaxReplicas uint64 `toml:"max-replicas" json:"max-replicas"` + + // The label keys specified the location of a store. + // The placement priorities is implied by the order of label keys. + // For example, ["zone", "rack"] means that we should place replicas to + // different zones first, then to different racks if we don't have enough zones. + LocationLabels typeutil.StringSlice `toml:"location-labels" json:"location-labels"` + // StrictlyMatchLabel strictly checks if the label of TiKV is matched with LocationLabels. + StrictlyMatchLabel bool `toml:"strictly-match-label" json:"strictly-match-label,string"` + + // When PlacementRules feature is enabled. MaxReplicas, LocationLabels and IsolationLabels are not used any more. + EnablePlacementRules bool `toml:"enable-placement-rules" json:"enable-placement-rules,string"` + + // EnablePlacementRuleCache controls whether use cache during rule checker + EnablePlacementRulesCache bool `toml:"enable-placement-rules-cache" json:"enable-placement-rules-cache,string"` + + // IsolationLevel is used to isolate replicas explicitly and forcibly if it's not empty. + // Its value must be empty or one of LocationLabels. + // Example: + // location-labels = ["zone", "rack", "host"] + // isolation-level = "zone" + // With configuration like above, PD ensure that all replicas be placed in different zones. + // Even if a zone is down, PD will not try to make up replicas in other zone + // because other zones already have replicas on it. + IsolationLevel string `toml:"isolation-level" json:"isolation-level"` +} + +// Clone makes a deep copy of the config. +func (c *ReplicationConfig) Clone() *ReplicationConfig { + locationLabels := append(c.LocationLabels[:0:0], c.LocationLabels...) + cfg := *c + cfg.LocationLabels = locationLabels + return &cfg +} + +// Validate is used to validate if some replication configurations are right. +func (c *ReplicationConfig) Validate() error { + foundIsolationLevel := false + for _, label := range c.LocationLabels { + err := ValidateLabels([]*metapb.StoreLabel{{Key: label}}) + if err != nil { + return err + } + // IsolationLevel should be empty or one of LocationLabels + if !foundIsolationLevel && label == c.IsolationLevel { + foundIsolationLevel = true + } + } + if c.IsolationLevel != "" && !foundIsolationLevel { + return errors.New("isolation-level must be one of location-labels or empty") + } + return nil +} + +// Adjust adjusts the config. +func (c *ReplicationConfig) Adjust(meta *configutil.ConfigMetaData) error { + configutil.AdjustUint64(&c.MaxReplicas, DefaultMaxReplicas) + if !meta.IsDefined("enable-placement-rules") { + c.EnablePlacementRules = defaultEnablePlacementRules + } + if !meta.IsDefined("strictly-match-label") { + c.StrictlyMatchLabel = defaultStrictlyMatchLabel + } + if !meta.IsDefined("location-labels") { + c.LocationLabels = defaultLocationLabels + } + return c.Validate() +>>>>>>> 5b939c6fc (config: disable witness related schedulers by default (#7765)) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 1ee338e2182..6ffa7b86baa 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -2222,3 +2222,1444 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error { return nil } +<<<<<<< HEAD +======= + +func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind operator.OpKind, steps ...operator.OpStep) *operator.Operator { + return operator.NewTestOperator(regionID, regionEpoch, kind, steps...) +} + +func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { + id, err := c.AllocID() + if err != nil { + return nil, err + } + return &metapb.Peer{Id: id, StoreId: storeID}, nil +} + +func (c *testCluster) addRegionStore(storeID uint64, regionCount int, regionSizes ...uint64) error { + var regionSize uint64 + if len(regionSizes) == 0 { + regionSize = uint64(regionCount) * 10 + } else { + regionSize = regionSizes[0] + } + + stats := &pdpb.StoreStats{} + stats.Capacity = 100 * units.GiB + stats.UsedSize = regionSize * units.MiB + stats.Available = stats.Capacity - stats.UsedSize + newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetRegionCount(regionCount), + core.SetRegionSize(int64(regionSize)), + core.SetLastHeartbeatTS(time.Now()), + ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) addLeaderRegion(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) error { + region := newTestRegionMeta(regionID) + leader, _ := c.AllocPeer(leaderStoreID) + region.Peers = []*metapb.Peer{leader} + for _, followerStoreID := range followerStoreIDs { + peer, _ := c.AllocPeer(followerStoreID) + region.Peers = append(region.Peers, peer) + } + regionInfo := core.NewRegionInfo(region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10)) + return c.putRegion(regionInfo) +} + +func (c *testCluster) updateLeaderCount(storeID uint64, leaderCount int) error { + store := c.GetStore(storeID) + newStore := store.Clone( + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + ) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error { + stats := &pdpb.StoreStats{} + newStore := core.NewStoreInfo(&metapb.Store{Id: storeID}, + core.SetStoreStats(stats), + core.SetLeaderCount(leaderCount), + core.SetLeaderSize(int64(leaderCount)*10), + core.SetLastHeartbeatTS(time.Now()), + ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) setStoreDown(storeID uint64) error { + store := c.GetStore(storeID) + newStore := store.Clone( + core.SetStoreState(metapb.StoreState_Up), + core.SetLastHeartbeatTS(typeutil.ZeroTime), + ) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) setStoreOffline(storeID uint64) error { + store := c.GetStore(storeID) + newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline, false)) + c.Lock() + defer c.Unlock() + return c.putStoreLocked(newStore) +} + +func (c *testCluster) LoadRegion(regionID uint64, followerStoreIDs ...uint64) error { + // regions load from etcd will have no leader + region := newTestRegionMeta(regionID) + region.Peers = []*metapb.Peer{} + for _, id := range followerStoreIDs { + peer, _ := c.AllocPeer(id) + region.Peers = append(region.Peers, peer) + } + return c.putRegion(core.NewRegionInfo(region, nil, core.SetSource(core.Storage))) +} + +func TestBasic(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + re.NoError(tc.addLeaderRegion(1, 1)) + + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) + re.Equal(op1.RegionID(), oc.GetOperator(1).RegionID()) + + // Region 1 already has an operator, cannot add another one. + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op2) + re.Equal(uint64(0), oc.OperatorCount(operator.OpRegion)) + + // Remove the operator manually, then we can add a new operator. + re.True(oc.RemoveOperator(op1)) + op3 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op3) + re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) + re.Equal(op3.RegionID(), oc.GetOperator(1).RegionID()) +} + +func TestDispatch(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + co.GetPrepareChecker().SetPrepared() + // Transfer peer from store 4 to store 1. + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addRegionStore(3, 30)) + re.NoError(tc.addRegionStore(2, 20)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + // Transfer leader from store 4 to store 2. + re.NoError(tc.updateLeaderCount(4, 50)) + re.NoError(tc.updateLeaderCount(3, 50)) + re.NoError(tc.updateLeaderCount(2, 20)) + re.NoError(tc.updateLeaderCount(1, 10)) + re.NoError(tc.addLeaderRegion(2, 4, 3, 2)) + + go co.RunUntilStop() + + // Wait for schedule and turn off balance. + waitOperator(re, co, 1) + controller := co.GetSchedulersController() + operatorutil.CheckTransferPeer(re, co.GetOperatorController().GetOperator(1), operator.OpKind(0), 4, 1) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + waitOperator(re, co, 2) + operatorutil.CheckTransferLeader(re, co.GetOperatorController().GetOperator(2), operator.OpKind(0), 4, 2) + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + + stream := mockhbstream.NewHeartbeatStream() + + // Transfer peer. + region := tc.GetRegion(1).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitRemovePeer(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Transfer leader. + region = tc.GetRegion(2).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + waitTransferLeader(re, stream, region, 2) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func dispatchHeartbeat(co *schedule.Coordinator, region *core.RegionInfo, stream hbstream.HeartbeatStream) error { + co.GetHeartbeatStreams().BindStream(region.GetLeader().GetStoreId(), stream) + if err := co.GetCluster().(*RaftCluster).putRegion(region.Clone(core.SetSource(core.Heartbeat))); err != nil { + return err + } + co.GetOperatorController().Dispatch(region, operator.DispatchFromHeartBeat, nil) + return nil +} + +func TestCollectMetricsConcurrent(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, func(tc *testCluster) { + tc.regionStats = statistics.NewRegionStatistics( + tc.GetBasicCluster(), + tc.GetOpts(), + nil) + }, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + rc := co.GetCluster().(*RaftCluster) + rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.schedulingController.coordinator = co + controller := co.GetSchedulersController() + // Make sure there are no problem when concurrent write and read + var wg sync.WaitGroup + count := 10 + wg.Add(count + 1) + for i := 0; i <= count; i++ { + go func(i int) { + defer wg.Done() + for j := 0; j < 1000; j++ { + re.NoError(tc.addRegionStore(uint64(i%5), rand.Intn(200))) + } + }(i) + } + for i := 0; i < 1000; i++ { + co.CollectHotSpotMetrics() + controller.CollectSchedulerMetrics() + rc.collectSchedulingMetrics() + } + schedule.ResetHotSpotMetrics() + schedulers.ResetSchedulerMetrics() + rc.resetSchedulingMetrics() + wg.Wait() +} + +func TestCollectMetrics(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, func(tc *testCluster) { + tc.regionStats = statistics.NewRegionStatistics( + tc.GetBasicCluster(), + tc.GetOpts(), + nil) + }, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + rc := co.GetCluster().(*RaftCluster) + rc.schedulingController = newSchedulingController(rc.serverCtx, rc.GetBasicCluster(), rc.GetOpts(), rc.GetRuleManager()) + rc.schedulingController.coordinator = co + controller := co.GetSchedulersController() + count := 10 + for i := 0; i <= count; i++ { + for k := 0; k < 200; k++ { + item := &statistics.HotPeerStat{ + StoreID: uint64(i % 5), + RegionID: uint64(i*1000 + k), + Loads: []float64{10, 20, 30}, + HotDegree: 10, + AntiCount: utils.HotRegionAntiCount, // for write + } + tc.hotStat.HotCache.Update(item, utils.Write) + } + } + + for i := 0; i < 1000; i++ { + co.CollectHotSpotMetrics() + controller.CollectSchedulerMetrics() + rc.collectSchedulingMetrics() + } + stores := co.GetCluster().GetStores() + regionStats := co.GetCluster().RegionWriteStats() + status1 := statistics.CollectHotPeerInfos(stores, regionStats) + status2 := statistics.GetHotStatus(stores, co.GetCluster().GetStoresLoads(), regionStats, utils.Write, co.GetCluster().GetSchedulerConfig().IsTraceRegionFlow()) + for _, s := range status2.AsLeader { + s.Stats = nil + } + for _, s := range status2.AsPeer { + s.Stats = nil + } + re.Equal(status1, status2) + schedule.ResetHotSpotMetrics() + schedulers.ResetSchedulerMetrics() + rc.resetSchedulingMetrics() +} + +func prepare(setCfg func(*sc.ScheduleConfig), setTc func(*testCluster), run func(*schedule.Coordinator), re *require.Assertions) (*testCluster, *schedule.Coordinator, func()) { + ctx, cancel := context.WithCancel(context.Background()) + cfg, opt, err := newTestScheduleConfig() + re.NoError(err) + if setCfg != nil { + setCfg(cfg) + } + tc := newTestCluster(ctx, opt) + hbStreams := hbstream.NewTestHeartbeatStreams(ctx, tc.meta.GetId(), tc, true /* need to run */) + if setTc != nil { + setTc(tc) + } + co := schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + if run != nil { + run(co) + } + return tc, co, func() { + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + hbStreams.Close() + cancel() + } +} + +func checkRegionAndOperator(re *require.Assertions, tc *testCluster, co *schedule.Coordinator, regionID uint64, expectAddOperator int) { + ops := co.GetCheckerController().CheckRegion(tc.GetRegion(regionID)) + if ops == nil { + re.Equal(0, expectAddOperator) + } else { + re.Equal(expectAddOperator, co.GetOperatorController().AddWaitingOperator(ops...)) + } +} + +func TestCheckRegion(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(nil, nil, nil, re) + hbStreams, opt := co.GetHeartbeatStreams(), tc.opt + defer cleanup() + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addLeaderRegion(1, 2, 3)) + checkRegionAndOperator(re, tc, co, 1, 1) + operatorutil.CheckAddPeer(re, co.GetOperatorController().GetOperator(1), operator.OpReplica, 1) + checkRegionAndOperator(re, tc, co, 1, 0) + + r := tc.GetRegion(1) + p := &metapb.Peer{Id: 1, StoreId: 1, Role: metapb.PeerRole_Learner} + r = r.Clone( + core.WithAddPeer(p), + core.WithPendingPeers(append(r.GetPendingPeers(), p)), + ) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 0) + + tc = newTestCluster(ctx, opt) + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 0) + r = r.Clone(core.WithPendingPeers(nil)) + re.NoError(tc.putRegion(r)) + checkRegionAndOperator(re, tc, co, 1, 1) + op := co.GetOperatorController().GetOperator(1) + re.Equal(1, op.Len()) + re.Equal(uint64(1), op.Step(0).(operator.PromoteLearner).ToStore) + checkRegionAndOperator(re, tc, co, 1, 0) +} + +func TestCheckRegionWithScheduleDeny(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(4, 4)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addLeaderRegion(1, 2, 3)) + region := tc.GetRegion(1) + re.NotNil(region) + // test with label schedule=deny + labelerManager := tc.GetRegionLabeler() + labelerManager.SetLabelRule(&labeler.LabelRule{ + ID: "schedulelabel", + Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}}, + RuleType: labeler.KeyRange, + Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}}, + }) + + // should allow to do rule checker + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 1, 1) + + // should not allow to merge + tc.opt.SetSplitMergeInterval(time.Duration(0)) + re.NoError(tc.addLeaderRegion(2, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(3, 2, 3, 4)) + region = tc.GetRegion(2) + re.True(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 0) + // delete label rule, should allow to do merge + labelerManager.DeleteLabelRule("schedulelabel") + re.False(labelerManager.ScheduleDisabled(region)) + checkRegionAndOperator(re, tc, co, 2, 2) +} + +func TestCheckerIsBusy(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + cfg.ReplicaScheduleLimit = 0 // ensure replica checker is busy + cfg.MergeScheduleLimit = 10 + }, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 0)) + num := 1 + typeutil.MaxUint64(tc.opt.GetReplicaScheduleLimit(), tc.opt.GetMergeScheduleLimit()) + var operatorKinds = []operator.OpKind{ + operator.OpReplica, operator.OpRegion | operator.OpMerge, + } + for i, operatorKind := range operatorKinds { + for j := uint64(0); j < num; j++ { + regionID := j + uint64(i+1)*num + re.NoError(tc.addLeaderRegion(regionID, 1)) + switch operatorKind { + case operator.OpReplica: + op := newTestOperator(regionID, tc.GetRegion(regionID).GetRegionEpoch(), operatorKind) + re.Equal(1, co.GetOperatorController().AddWaitingOperator(op)) + case operator.OpRegion | operator.OpMerge: + if regionID%2 == 1 { + ops, err := operator.CreateMergeRegionOperator("merge-region", co.GetCluster(), tc.GetRegion(regionID), tc.GetRegion(regionID-1), operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + } + } + } + } + checkRegionAndOperator(re, tc, co, num, 0) +} + +func TestMergeRegionCancelOneOperator(t *testing.T) { + re := require.New(t) + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + source := core.NewRegionInfo( + &metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("a"), + }, + nil, + ) + target := core.NewRegionInfo( + &metapb.Region{ + Id: 2, + StartKey: []byte("a"), + EndKey: []byte("t"), + }, + nil, + ) + re.NoError(tc.putRegion(source)) + re.NoError(tc.putRegion(target)) + + // Cancel source region. + ops, err := operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel source operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(source.GetID())) + re.Empty(co.GetOperatorController().GetOperators()) + + // Cancel target region. + ops, err = operator.CreateMergeRegionOperator("merge-region", tc, source, target, operator.OpMerge) + re.NoError(err) + re.Len(ops, co.GetOperatorController().AddWaitingOperator(ops...)) + // Cancel target operator. + co.GetOperatorController().RemoveOperator(co.GetOperatorController().GetOperator(target.GetID())) + re.Empty(co.GetOperatorController().GetOperators()) +} + +func TestReplica(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + // Turn off balance. + cfg.LeaderScheduleLimit = 0 + cfg.RegionScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addRegionStore(4, 4)) + + stream := mockhbstream.NewHeartbeatStream() + + // Add peer to store 1. + re.NoError(tc.addLeaderRegion(1, 2, 3)) + region := tc.GetRegion(1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Peer in store 3 is down, remove peer in store 3 and add peer to store 4. + re.NoError(tc.setStoreDown(3)) + downPeer := &pdpb.PeerStats{ + Peer: region.GetStorePeer(3), + DownSeconds: 24 * 60 * 60, + } + region = region.Clone( + core.WithDownPeers(append(region.GetDownPeers(), downPeer)), + ) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 4) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 4) + region = region.Clone(core.WithDownPeers(nil)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Remove peer from store 3. + re.NoError(tc.addLeaderRegion(2, 1, 2, 3, 4)) + region = tc.GetRegion(2) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitRemovePeer(re, stream, region, 3) // store3 is down, we should remove it firstly. + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + + // Remove offline peer directly when it's pending. + re.NoError(tc.addLeaderRegion(3, 1, 2, 3)) + re.NoError(tc.setStoreOffline(3)) + region = tc.GetRegion(3) + region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(3)})) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func TestCheckCache(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + // Turn off replica scheduling. + cfg.ReplicaScheduleLimit = 0 + }, nil, nil, re) + defer cleanup() + + re.NoError(tc.addRegionStore(1, 0)) + re.NoError(tc.addRegionStore(2, 0)) + re.NoError(tc.addRegionStore(3, 0)) + + // Add a peer with two replicas. + re.NoError(tc.addLeaderRegion(1, 2, 3)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/break-patrol", `return`)) + + // case 1: operator cannot be created due to replica-schedule-limit restriction + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(co.GetCheckerController().GetWaitingRegions(), 1) + + // cancel the replica-schedule-limit restriction + cfg := tc.GetScheduleConfig() + cfg.ReplicaScheduleLimit = 10 + tc.SetScheduleConfig(cfg) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + oc := co.GetOperatorController() + re.Len(oc.GetOperators(), 1) + re.Empty(co.GetCheckerController().GetWaitingRegions()) + + // case 2: operator cannot be created due to store limit restriction + oc.RemoveOperator(oc.GetOperator(1)) + tc.SetStoreLimit(1, storelimit.AddPeer, 0) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(co.GetCheckerController().GetWaitingRegions(), 1) + + // cancel the store limit restriction + tc.SetStoreLimit(1, storelimit.AddPeer, 10) + time.Sleep(time.Second) + co.GetWaitGroup().Add(1) + co.PatrolRegions() + re.Len(oc.GetOperators(), 1) + re.Empty(co.GetCheckerController().GetWaitingRegions()) + + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/break-patrol")) +} + +func TestPeerState(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + + // Transfer peer from store 4 to store 1. + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addRegionStore(2, 10)) + re.NoError(tc.addRegionStore(3, 10)) + re.NoError(tc.addRegionStore(4, 40)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + + stream := mockhbstream.NewHeartbeatStream() + + // Wait for schedule. + waitOperator(re, co, 1) + operatorutil.CheckTransferPeer(re, co.GetOperatorController().GetOperator(1), operator.OpKind(0), 4, 1) + + region := tc.GetRegion(1).Clone() + + // Add new peer. + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 1) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 1) + + // If the new peer is pending, the operator will not finish. + region = region.Clone(core.WithPendingPeers(append(region.GetPendingPeers(), region.GetStorePeer(1)))) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) + re.NotNil(co.GetOperatorController().GetOperator(region.GetID())) + + // The new peer is not pending now, the operator will finish. + // And we will proceed to remove peer in store 4. + region = region.Clone(core.WithPendingPeers(nil)) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitRemovePeer(re, stream, region, 4) + re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) + region = tc.GetRegion(1).Clone() + re.NoError(dispatchHeartbeat(co, region, stream)) + waitNoResponse(re, stream) +} + +func TestShouldRun(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 5)) + re.NoError(tc.addLeaderStore(2, 2)) + re.NoError(tc.addLeaderStore(3, 0)) + re.NoError(tc.addLeaderStore(4, 0)) + re.NoError(tc.LoadRegion(1, 1, 2, 3)) + re.NoError(tc.LoadRegion(2, 1, 2, 3)) + re.NoError(tc.LoadRegion(3, 1, 2, 3)) + re.NoError(tc.LoadRegion(4, 1, 2, 3)) + re.NoError(tc.LoadRegion(5, 1, 2, 3)) + re.NoError(tc.LoadRegion(6, 2, 1, 4)) + re.NoError(tc.LoadRegion(7, 2, 1, 4)) + re.False(co.ShouldRun()) + re.Equal(2, tc.GetStoreRegionCount(4)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + // store4 needs Collect two region + {6, false}, + {7, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) + re.NoError(tc.processRegionHeartbeat(nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) + re.Error(tc.processRegionHeartbeat(newRegion)) + re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt()) +} + +func TestShouldRunWithNonLeaderRegions(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + tc.RaftCluster.coordinator = co + defer cleanup() + + re.NoError(tc.addLeaderStore(1, 10)) + re.NoError(tc.addLeaderStore(2, 0)) + re.NoError(tc.addLeaderStore(3, 0)) + for i := 0; i < 10; i++ { + re.NoError(tc.LoadRegion(uint64(i+1), 1, 2, 3)) + } + re.False(co.ShouldRun()) + re.Equal(10, tc.GetStoreRegionCount(1)) + + testCases := []struct { + regionID uint64 + ShouldRun bool + }{ + {1, false}, + {2, false}, + {3, false}, + {4, false}, + {5, false}, + {6, false}, + {7, false}, + {8, false}, + {9, true}, + } + + for _, testCase := range testCases { + r := tc.GetRegion(testCase.regionID) + nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) + re.NoError(tc.processRegionHeartbeat(nr)) + re.Equal(testCase.ShouldRun, co.ShouldRun()) + } + nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} + newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) + re.Error(tc.processRegionHeartbeat(newRegion)) + re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt()) + + // Now, after server is prepared, there exist some regions with no leader. + re.Equal(uint64(0), tc.GetRegion(10).GetLeader().GetStoreId()) +} + +func TestAddScheduler(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + controller := co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), len(sc.DefaultSchedulers)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.EvictSlowStoreName)) + re.Empty(controller.GetSchedulerNames()) + + stream := mockhbstream.NewHeartbeatStream() + + // Add stores 1,2,3 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + re.NoError(tc.addLeaderStore(3, 1)) + // Add regions 1 with leader in store 1 and followers in stores 2,3 + re.NoError(tc.addLeaderRegion(1, 1, 2, 3)) + // Add regions 2 with leader in store 2 and followers in stores 1,3 + re.NoError(tc.addLeaderRegion(2, 2, 1, 3)) + // Add regions 3 with leader in store 3 and followers in stores 1,2 + re.NoError(tc.addLeaderRegion(3, 3, 1, 2)) + + oc := co.GetOperatorController() + + // test ConfigJSONDecoder create + bl, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + re.NoError(err) + conf, err := bl.EncodeConfig() + re.NoError(err) + data := make(map[string]interface{}) + err = json.Unmarshal(conf, &data) + re.NoError(err) + batch := data["batch"].(float64) + re.Equal(4, int(batch)) + gls, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"0"}), controller.RemoveScheduler) + re.NoError(err) + re.Error(controller.AddScheduler(gls)) + re.Error(controller.RemoveScheduler(gls.GetName())) + + gls, err = schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls)) + + hb, err := schedulers.CreateScheduler(schedulers.HotRegionType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigJSONDecoder([]byte("{}"))) + re.NoError(err) + conf, err = hb.EncodeConfig() + re.NoError(err) + data = make(map[string]interface{}) + re.NoError(json.Unmarshal(conf, &data)) + re.Contains(data, "enable-for-tiflash") + re.Equal("true", data["enable-for-tiflash"].(string)) + + // Transfer all leaders to store 1. + waitOperator(re, co, 2) + region2 := tc.GetRegion(2) + re.NoError(dispatchHeartbeat(co, region2, stream)) + region2 = waitTransferLeader(re, stream, region2, 1) + re.NoError(dispatchHeartbeat(co, region2, stream)) + waitNoResponse(re, stream) + + waitOperator(re, co, 3) + region3 := tc.GetRegion(3) + re.NoError(dispatchHeartbeat(co, region3, stream)) + region3 = waitTransferLeader(re, stream, region3, 1) + re.NoError(dispatchHeartbeat(co, region3, stream)) + waitNoResponse(re, stream) +} + +func TestPersistScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + defaultCount := len(sc.DefaultSchedulers) + // Add stores 1,2 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + + controller := co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), defaultCount) + oc := co.GetOperatorController() + storage := tc.RaftCluster.storage + + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls1, "1")) + evict, err := schedulers.CreateScheduler(schedulers.EvictLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.EvictLeaderType, []string{"2"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(evict, "2")) + re.Len(controller.GetSchedulerNames(), defaultCount+2) + sches, _, err := storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Len(sches, defaultCount+2) + + // remove all default schedulers + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.EvictSlowStoreName)) + // only remains 2 items with independent config. + re.Len(controller.GetSchedulerNames(), 2) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(storage)) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + // make a new coordinator for testing + // whether the schedulers added or removed in dynamic way are recorded in opt + _, newOpt, err := newTestScheduleConfig() + re.NoError(err) + shuffle, err := schedulers.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedulers.ConfigJSONDecoder([]byte("null"))) + re.NoError(err) + re.NoError(controller.AddScheduler(shuffle)) + // suppose we add a new default enable scheduler + sc.DefaultSchedulers = append(sc.DefaultSchedulers, sc.SchedulerConfig{Type: "shuffle-region"}) + defer func() { + sc.DefaultSchedulers = sc.DefaultSchedulers[:len(sc.DefaultSchedulers)-1] + }() + re.Len(newOpt.GetSchedulers(), defaultCount) + re.NoError(newOpt.Reload(storage)) + // only remains 3 items with independent config. + sches, _, err = storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Len(sches, 3) + + // option have 9 items because the default scheduler do not remove. + re.Len(newOpt.GetSchedulers(), defaultCount+3) + re.NoError(newOpt.Persist(storage)) + tc.RaftCluster.SetScheduleConfig(newOpt.GetScheduleConfig()) + + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + controller = co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), 3) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + // suppose restart PD again + _, newOpt, err = newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(storage)) + tc.RaftCluster.SetScheduleConfig(newOpt.GetScheduleConfig()) + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + controller = co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), 3) + bls, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + re.NoError(controller.AddScheduler(bls)) + brs, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + re.NoError(controller.AddScheduler(brs)) + re.Len(controller.GetSchedulerNames(), 5) + + // the scheduler option should contain 9 items + // the `hot scheduler` are disabled + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount+3) + re.NoError(controller.RemoveScheduler(schedulers.GrantLeaderName)) + // the scheduler that is not enable by default will be completely deleted + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount+2) + re.Len(controller.GetSchedulerNames(), 4) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(co.GetCluster().GetStorage())) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + _, newOpt, err = newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(co.GetCluster().GetStorage())) + tc.RaftCluster.SetScheduleConfig(newOpt.GetScheduleConfig()) + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + + co.Run() + controller = co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), 4) + re.NoError(controller.RemoveScheduler(schedulers.EvictLeaderName)) + re.Len(controller.GetSchedulerNames(), 3) +} + +func TestRemoveScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + cfg.ReplicaScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + + // Add stores 1,2 + re.NoError(tc.addLeaderStore(1, 1)) + re.NoError(tc.addLeaderStore(2, 1)) + defaultCount := len(sc.DefaultSchedulers) + controller := co.GetSchedulersController() + re.Len(controller.GetSchedulerNames(), defaultCount) + oc := co.GetOperatorController() + storage := tc.RaftCluster.storage + + gls1, err := schedulers.CreateScheduler(schedulers.GrantLeaderType, oc, storage, schedulers.ConfigSliceDecoder(schedulers.GrantLeaderType, []string{"1"}), controller.RemoveScheduler) + re.NoError(err) + re.NoError(controller.AddScheduler(gls1, "1")) + re.Len(controller.GetSchedulerNames(), defaultCount+1) + sches, _, err := storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Len(sches, defaultCount+1) + + // remove all schedulers + re.NoError(controller.RemoveScheduler(schedulers.BalanceLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.BalanceRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.HotRegionName)) + re.NoError(controller.RemoveScheduler(schedulers.GrantLeaderName)) + re.NoError(controller.RemoveScheduler(schedulers.EvictSlowStoreName)) + // all removed + sches, _, err = storage.LoadAllSchedulerConfigs() + re.NoError(err) + re.Empty(sches) + re.Empty(controller.GetSchedulerNames()) + re.NoError(co.GetCluster().GetSchedulerConfig().Persist(co.GetCluster().GetStorage())) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + + // suppose restart PD again + _, newOpt, err := newTestScheduleConfig() + re.NoError(err) + re.NoError(newOpt.Reload(tc.storage)) + tc.RaftCluster.SetScheduleConfig(newOpt.GetScheduleConfig()) + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + re.Empty(controller.GetSchedulerNames()) + // the option remains default scheduler + re.Len(co.GetCluster().GetSchedulerConfig().(*config.PersistOptions).GetSchedulers(), defaultCount) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() +} + +func TestRestart(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + // Turn off balance, we test add replica only. + cfg.LeaderScheduleLimit = 0 + cfg.RegionScheduleLimit = 0 + }, nil, func(co *schedule.Coordinator) { co.Run() }, re) + hbStreams := co.GetHeartbeatStreams() + defer cleanup() + + // Add 3 stores (1, 2, 3) and a region with 1 replica on store 1. + re.NoError(tc.addRegionStore(1, 1)) + re.NoError(tc.addRegionStore(2, 2)) + re.NoError(tc.addRegionStore(3, 3)) + re.NoError(tc.addLeaderRegion(1, 1)) + region := tc.GetRegion(1) + + // Add 1 replica on store 2. + stream := mockhbstream.NewHeartbeatStream() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 2) + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitPromoteLearner(re, stream, region, 2) + co.Stop() + co.GetSchedulersController().Wait() + co.GetWaitGroup().Wait() + + // Recreate coordinator then add another replica on store 3. + co = schedule.NewCoordinator(ctx, tc.RaftCluster, hbStreams) + co.Run() + re.NoError(dispatchHeartbeat(co, region, stream)) + region = waitAddLearner(re, stream, region, 3) + re.NoError(dispatchHeartbeat(co, region, stream)) + waitPromoteLearner(re, stream, region, 3) +} + +func TestPauseScheduler(t *testing.T) { + re := require.New(t) + + _, co, cleanup := prepare(nil, nil, func(co *schedule.Coordinator) { co.Run() }, re) + defer cleanup() + controller := co.GetSchedulersController() + _, err := controller.IsSchedulerAllowed("test") + re.Error(err) + controller.PauseOrResumeScheduler(schedulers.BalanceLeaderName, 60) + paused, _ := controller.IsSchedulerPaused(schedulers.BalanceLeaderName) + re.True(paused) + pausedAt, err := controller.GetPausedSchedulerDelayAt(schedulers.BalanceLeaderName) + re.NoError(err) + resumeAt, err := controller.GetPausedSchedulerDelayUntil(schedulers.BalanceLeaderName) + re.NoError(err) + re.Equal(int64(60), resumeAt-pausedAt) + allowed, _ := controller.IsSchedulerAllowed(schedulers.BalanceLeaderName) + re.False(allowed) +} + +func BenchmarkPatrolRegion(b *testing.B) { + re := require.New(b) + + mergeLimit := uint64(4100) + regionNum := 10000 + + tc, co, cleanup := prepare(func(cfg *sc.ScheduleConfig) { + cfg.MergeScheduleLimit = mergeLimit + }, nil, nil, re) + defer cleanup() + + tc.opt.SetSplitMergeInterval(time.Duration(0)) + for i := 1; i < 4; i++ { + if err := tc.addRegionStore(uint64(i), regionNum, 96); err != nil { + return + } + } + for i := 0; i < regionNum; i++ { + if err := tc.addLeaderRegion(uint64(i), 1, 2, 3); err != nil { + return + } + } + + listen := make(chan int) + go func() { + oc := co.GetOperatorController() + listen <- 0 + for { + if oc.OperatorCount(operator.OpMerge) == mergeLimit { + co.Stop() + return + } + } + }() + <-listen + + co.GetWaitGroup().Add(1) + b.ResetTimer() + co.PatrolRegions() +} + +func waitOperator(re *require.Assertions, co *schedule.Coordinator, regionID uint64) { + testutil.Eventually(re, func() bool { + return co.GetOperatorController().GetOperator(regionID) != nil + }) +} + +func TestOperatorCount(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader)) + re.Equal(uint64(0), oc.OperatorCount(operator.OpRegion)) + + re.NoError(tc.addLeaderRegion(1, 1)) + re.NoError(tc.addLeaderRegion(2, 2)) + { + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) // 1:leader + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + oc.AddWaitingOperator(op2) + re.Equal(uint64(2), oc.OperatorCount(operator.OpLeader)) // 1:leader, 2:leader + re.True(oc.RemoveOperator(op1)) + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) // 2:leader + } + + { + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion) + oc.AddWaitingOperator(op1) + re.Equal(uint64(1), oc.OperatorCount(operator.OpRegion)) // 1:region 2:leader + re.Equal(uint64(1), oc.OperatorCount(operator.OpLeader)) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion) + op2.SetPriorityLevel(constant.High) + oc.AddWaitingOperator(op2) + re.Equal(uint64(2), oc.OperatorCount(operator.OpRegion)) // 1:region 2:region + re.Equal(uint64(0), oc.OperatorCount(operator.OpLeader)) + } +} + +func TestStoreOverloaded(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + opt := tc.GetOpts() + re.NoError(tc.addRegionStore(4, 100)) + re.NoError(tc.addRegionStore(3, 100)) + re.NoError(tc.addRegionStore(2, 100)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + region := tc.GetRegion(1).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + start := time.Now() + { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Len(ops, 1) + op1 := ops[0] + re.NotNil(op1) + re.True(oc.AddOperator(op1)) + re.True(oc.RemoveOperator(op1)) + } + for { + time.Sleep(time.Millisecond * 10) + ops, _ := lb.Schedule(tc, false /* dryRun */) + if time.Since(start) > time.Second { + break + } + re.Empty(ops) + } + + // reset all stores' limit + // scheduling one time needs 1/10 seconds + opt.SetAllStoresLimit(storelimit.AddPeer, 600) + opt.SetAllStoresLimit(storelimit.RemovePeer, 600) + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Len(ops, 1) + op := ops[0] + re.True(oc.AddOperator(op)) + re.True(oc.RemoveOperator(op)) + } + // sleep 1 seconds to make sure that the token is filled up + time.Sleep(time.Second) + for i := 0; i < 100; i++ { + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.NotEmpty(ops) + } +} + +func TestStoreOverloadedWithReplace(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + lb, err := schedulers.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedulers.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) + re.NoError(err) + + re.NoError(tc.addRegionStore(4, 100)) + re.NoError(tc.addRegionStore(3, 100)) + re.NoError(tc.addRegionStore(2, 100)) + re.NoError(tc.addRegionStore(1, 10)) + re.NoError(tc.addLeaderRegion(1, 2, 3, 4)) + re.NoError(tc.addLeaderRegion(2, 1, 3, 4)) + region := tc.GetRegion(1).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + region = tc.GetRegion(2).Clone(core.SetApproximateSize(60)) + tc.putRegion(region) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 1}) + re.True(oc.AddOperator(op1)) + op2 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 2}) + op2.SetPriorityLevel(constant.High) + re.True(oc.AddOperator(op2)) + op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 3}) + re.False(oc.AddOperator(op3)) + ops, _ := lb.Schedule(tc, false /* dryRun */) + re.Empty(ops) + // sleep 2 seconds to make sure that token is filled up + time.Sleep(2 * time.Second) + ops, _ = lb.Schedule(tc, false /* dryRun */) + re.NotEmpty(ops) +} + +func TestDownStoreLimit(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + rc := co.GetCheckerController().GetRuleChecker() + + tc.addRegionStore(1, 100) + tc.addRegionStore(2, 100) + tc.addRegionStore(3, 100) + tc.addLeaderRegion(1, 1, 2, 3) + + region := tc.GetRegion(1) + tc.setStoreDown(1) + tc.SetStoreLimit(1, storelimit.RemovePeer, 1) + + region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: region.GetStorePeer(1), + DownSeconds: 24 * 60 * 60, + }, + }), core.SetApproximateSize(1)) + tc.putRegion(region) + for i := uint64(1); i < 20; i++ { + tc.addRegionStore(i+3, 100) + op := rc.Check(region) + re.NotNil(op) + re.True(oc.AddOperator(op)) + oc.RemoveOperator(op) + } + + region = region.Clone(core.SetApproximateSize(100)) + tc.putRegion(region) + for i := uint64(20); i < 25; i++ { + tc.addRegionStore(i+3, 100) + op := rc.Check(region) + re.NotNil(op) + re.True(oc.AddOperator(op)) + oc.RemoveOperator(op) + } +} + +// FIXME: remove after move into schedulers package +type mockLimitScheduler struct { + schedulers.Scheduler + limit uint64 + counter *operator.Controller + kind operator.OpKind +} + +func (s *mockLimitScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool { + return s.counter.OperatorCount(s.kind) < s.limit +} + +func TestController(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + oc := co.GetOperatorController() + + re.NoError(tc.addLeaderRegion(1, 1)) + re.NoError(tc.addLeaderRegion(2, 2)) + scheduler, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + lb := &mockLimitScheduler{ + Scheduler: scheduler, + counter: oc, + kind: operator.OpLeader, + } + + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) + + for i := schedulers.MinScheduleInterval; sc.GetInterval() != schedulers.MaxScheduleInterval; i = sc.GetNextInterval(i) { + re.Equal(i, sc.GetInterval()) + re.Empty(sc.Schedule(false)) + } + // limit = 2 + lb.limit = 2 + // count = 0 + { + re.True(sc.AllowSchedule(false)) + op1 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op1)) + // count = 1 + re.True(sc.AllowSchedule(false)) + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op2)) + // count = 2 + re.False(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op1)) + // count = 1 + re.True(sc.AllowSchedule(false)) + } + + op11 := newTestOperator(1, tc.GetRegion(1).GetRegionEpoch(), operator.OpLeader) + // add a PriorityKind operator will remove old operator + { + op3 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpHotRegion) + op3.SetPriorityLevel(constant.High) + re.Equal(1, oc.AddWaitingOperator(op11)) + re.False(sc.AllowSchedule(false)) + re.Equal(1, oc.AddWaitingOperator(op3)) + re.True(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op3)) + } + + // add a admin operator will remove old operator + { + op2 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op2)) + re.False(sc.AllowSchedule(false)) + op4 := newTestOperator(2, tc.GetRegion(2).GetRegionEpoch(), operator.OpAdmin) + op4.SetPriorityLevel(constant.High) + re.Equal(1, oc.AddWaitingOperator(op4)) + re.True(sc.AllowSchedule(false)) + re.True(oc.RemoveOperator(op4)) + } + + // test wrong region id. + { + op5 := newTestOperator(3, &metapb.RegionEpoch{}, operator.OpHotRegion) + re.Equal(0, oc.AddWaitingOperator(op5)) + } + + // test wrong region epoch. + re.True(oc.RemoveOperator(op11)) + epoch := &metapb.RegionEpoch{ + Version: tc.GetRegion(1).GetRegionEpoch().GetVersion() + 1, + ConfVer: tc.GetRegion(1).GetRegionEpoch().GetConfVer(), + } + { + op6 := newTestOperator(1, epoch, operator.OpLeader) + re.Equal(0, oc.AddWaitingOperator(op6)) + } + epoch.Version-- + { + op6 := newTestOperator(1, epoch, operator.OpLeader) + re.Equal(1, oc.AddWaitingOperator(op6)) + re.True(oc.RemoveOperator(op6)) + } +} + +func TestInterval(t *testing.T) { + re := require.New(t) + + tc, co, cleanup := prepare(nil, nil, nil, re) + defer cleanup() + + lb, err := schedulers.CreateScheduler(schedulers.BalanceLeaderType, co.GetOperatorController(), storage.NewStorageWithMemoryBackend(), schedulers.ConfigSliceDecoder(schedulers.BalanceLeaderType, []string{"", ""})) + re.NoError(err) + sc := schedulers.NewScheduleController(tc.ctx, co.GetCluster(), co.GetOperatorController(), lb) + + // If no operator for x seconds, the next check should be in x/2 seconds. + idleSeconds := []int{5, 10, 20, 30, 60} + for _, n := range idleSeconds { + sc.SetInterval(schedulers.MinScheduleInterval) + for totalSleep := time.Duration(0); totalSleep <= time.Second*time.Duration(n); totalSleep += sc.GetInterval() { + re.Empty(sc.Schedule(false)) + } + re.Less(sc.GetInterval(), time.Second*time.Duration(n/2)) + } +} + +func waitAddLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddLearnerNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + return region.Clone( + core.WithAddPeer(res.GetChangePeer().GetPeer()), + core.WithIncConfVer(), + ) +} + +func waitPromoteLearner(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + // Remove learner than add voter. + return region.Clone( + core.WithRemoveStorePeer(storeID), + core.WithAddPeer(res.GetChangePeer().GetPeer()), + ) +} + +func waitRemovePeer(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + return res.GetRegionId() == region.GetID() && + res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_RemoveNode && + res.GetChangePeer().GetPeer().GetStoreId() == storeID + } + return false + }) + return region.Clone( + core.WithRemoveStorePeer(storeID), + core.WithIncConfVer(), + ) +} + +func waitTransferLeader(re *require.Assertions, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo { + var res *pdpb.RegionHeartbeatResponse + testutil.Eventually(re, func() bool { + if res = stream.Recv().(*pdpb.RegionHeartbeatResponse); res != nil { + if res.GetRegionId() == region.GetID() { + for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) { + if peer.GetStoreId() == storeID { + return true + } + } + } + } + return false + }) + return region.Clone( + core.WithLeader(region.GetStorePeer(storeID)), + ) +} + +func waitNoResponse(re *require.Assertions, stream mockhbstream.HeartbeatStream) { + testutil.Eventually(re, func() bool { + res := stream.Recv() + return res == nil + }) +} +>>>>>>> 5b939c6fc (config: disable witness related schedulers by default (#7765)) diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go new file mode 100644 index 00000000000..86b6955e4dd --- /dev/null +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -0,0 +1,673 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduling + +import ( + "context" + "fmt" + "net/http" + "reflect" + "testing" + "time" + + "github.com/docker/go-units" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core/storelimit" + mcs "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/schedule/operator" + "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/server" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, testutil.LeakOptions...) +} + +type serverTestSuite struct { + suite.Suite + ctx context.Context + cancel context.CancelFunc + cluster *tests.TestCluster + pdLeader *tests.TestServer + backendEndpoints string +} + +func TestServerTestSuite(t *testing.T) { + suite.Run(t, new(serverTestSuite)) +} + +func (suite *serverTestSuite) SetupSuite() { + var err error + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + re.NoError(err) + + err = suite.cluster.RunInitialServers() + re.NoError(err) + + leaderName := suite.cluster.WaitLeader() + suite.pdLeader = suite.cluster.GetServer(leaderName) + suite.backendEndpoints = suite.pdLeader.GetAddr() + re.NoError(suite.pdLeader.BootstrapCluster()) +} + +func (suite *serverTestSuite) TearDownSuite() { + re := suite.Require() + suite.cluster.Destroy() + suite.cancel() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) +} + +func (suite *serverTestSuite) TestAllocID() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + time.Sleep(200 * time.Millisecond) + id, err := tc.GetPrimaryServer().GetCluster().AllocID() + re.NoError(err) + re.NotEqual(uint64(0), id) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) +} + +func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) + pd2, err := suite.cluster.Join(suite.ctx) + re.NoError(err) + err = pd2.Run() + re.NoError(err) + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + time.Sleep(200 * time.Millisecond) + cluster := tc.GetPrimaryServer().GetCluster() + id, err := cluster.AllocID() + re.NoError(err) + re.NotEqual(uint64(0), id) + suite.cluster.ResignLeader() + leaderName := suite.cluster.WaitLeader() + suite.pdLeader = suite.cluster.GetServer(leaderName) + suite.backendEndpoints = suite.pdLeader.GetAddr() + time.Sleep(time.Second) + id1, err := cluster.AllocID() + re.NoError(err) + re.Greater(id1, id) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember")) + // Update the pdLeader in test suite. + suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader()) + suite.backendEndpoints = suite.pdLeader.GetAddr() + suite.TearDownSuite() + suite.SetupSuite() +} + +func (suite *serverTestSuite) TestPrimaryChange() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + primary := tc.GetPrimaryServer() + oldPrimaryAddr := primary.GetAddr() + testutil.Eventually(re, func() bool { + watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName) + return ok && oldPrimaryAddr == watchedAddr && + len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames()) == 6 + }) + // change primary + primary.Close() + tc.WaitForPrimaryServing(re) + primary = tc.GetPrimaryServer() + newPrimaryAddr := primary.GetAddr() + re.NotEqual(oldPrimaryAddr, newPrimaryAddr) + testutil.Eventually(re, func() bool { + watchedAddr, ok := suite.pdLeader.GetServicePrimaryAddr(suite.ctx, mcs.SchedulingServiceName) + return ok && newPrimaryAddr == watchedAddr && + len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames()) == 6 + }) +} + +func (suite *serverTestSuite) TestForwardStoreHeartbeat() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + s := &server.GrpcServer{Server: suite.pdLeader.GetServer()} + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: 1, + Address: "tikv1", + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + + testutil.Eventually(re, func() bool { + resp1, err := s.StoreHeartbeat( + context.Background(), &pdpb.StoreHeartbeatRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Stats: &pdpb.StoreStats{ + StoreId: 1, + Capacity: 1798985089024, + Available: 1709868695552, + UsedSize: 85150956358, + KeysWritten: 20000, + BytesWritten: 199, + KeysRead: 10000, + BytesRead: 99, + }, + }, + ) + re.NoError(err) + re.Empty(resp1.GetHeader().GetError()) + store := tc.GetPrimaryServer().GetCluster().GetStore(1) + return store.GetStoreStats().GetCapacity() == uint64(1798985089024) && + store.GetStoreStats().GetAvailable() == uint64(1709868695552) && + store.GetStoreStats().GetUsedSize() == uint64(85150956358) && + store.GetStoreStats().GetKeysWritten() == uint64(20000) && + store.GetStoreStats().GetBytesWritten() == uint64(199) && + store.GetStoreStats().GetKeysRead() == uint64(10000) && + store.GetStoreStats().GetBytesRead() == uint64(99) + }) +} + +func (suite *serverTestSuite) TestSchedulingServiceFallback() { + re := suite.Require() + leaderServer := suite.pdLeader.GetServer() + conf := leaderServer.GetMicroServiceConfig().Clone() + // Change back to the default value. + conf.EnableSchedulingFallback = true + leaderServer.SetMicroServiceConfig(*conf) + // API server will execute scheduling jobs since there is no scheduling server. + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + // After scheduling server is started, API server will not execute scheduling jobs. + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Scheduling server is responsible for executing scheduling jobs. + testutil.Eventually(re, func() bool { + return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning() + }) + tc.GetPrimaryServer().Close() + // Stop scheduling server. API server will execute scheduling jobs again. + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + tc1, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc1.Destroy() + tc1.WaitForPrimaryServing(re) + // After scheduling server is started, API server will not execute scheduling jobs. + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Scheduling server is responsible for executing scheduling jobs again. + testutil.Eventually(re, func() bool { + return tc1.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning() + }) +} + +func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() { + re := suite.Require() + + // API server will execute scheduling jobs since there is no scheduling server. + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + leaderServer := suite.pdLeader.GetServer() + // After Disabling scheduling service fallback, the API server will stop scheduling. + conf := leaderServer.GetMicroServiceConfig().Clone() + conf.EnableSchedulingFallback = false + leaderServer.SetMicroServiceConfig(*conf) + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Enable scheduling service fallback again, the API server will restart scheduling. + conf.EnableSchedulingFallback = true + leaderServer.SetMicroServiceConfig(*conf) + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + // After scheduling server is started, API server will not execute scheduling jobs. + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Scheduling server is responsible for executing scheduling jobs. + testutil.Eventually(re, func() bool { + return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning() + }) + // Disable scheduling service fallback and stop scheduling server. API server won't execute scheduling jobs again. + conf.EnableSchedulingFallback = false + leaderServer.SetMicroServiceConfig(*conf) + tc.GetPrimaryServer().Close() + time.Sleep(time.Second) + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) +} + +func (suite *serverTestSuite) TestSchedulerSync() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + schedulersController := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetSchedulersController() + checkEvictLeaderSchedulerExist(re, schedulersController, false) + // Add a new evict-leader-scheduler through the API server. + api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + // Check if the evict-leader-scheduler is added. + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) + // Add a store_id to the evict-leader-scheduler through the API server. + err = suite.pdLeader.GetServer().GetRaftCluster().PutStore( + &metapb.Store{ + Id: 2, + Address: "mock://2", + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + Version: "7.0.0", + }, + ) + re.NoError(err) + api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + // Delete a store_id from the evict-leader-scheduler through the API server. + api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", schedulers.EvictLeaderName, 1)) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{2}) + // Add a store_id to the evict-leader-scheduler through the API server by the scheduler handler. + api.MustCallSchedulerConfigAPI(re, http.MethodPost, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"config"}, map[string]interface{}{ + "name": schedulers.EvictLeaderName, + "store_id": 1, + }) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1, 2}) + // Delete a store_id from the evict-leader-scheduler through the API server by the scheduler handler. + api.MustCallSchedulerConfigAPI(re, http.MethodDelete, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"delete", "2"}, nil) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) + // If the last store is deleted, the scheduler should be removed. + api.MustCallSchedulerConfigAPI(re, http.MethodDelete, suite.backendEndpoints, schedulers.EvictLeaderName, []string{"delete", "1"}, nil) + // Check if the scheduler is removed. + checkEvictLeaderSchedulerExist(re, schedulersController, false) + + // Delete the evict-leader-scheduler through the API server by removing the last store_id. + api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) + api.MustDeleteScheduler(re, suite.backendEndpoints, fmt.Sprintf("%s-%d", schedulers.EvictLeaderName, 1)) + checkEvictLeaderSchedulerExist(re, schedulersController, false) + + // Delete the evict-leader-scheduler through the API server. + api.MustAddScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + checkEvictLeaderSchedulerExist(re, schedulersController, true) + checkEvictLeaderStoreIDs(re, schedulersController, []uint64{1}) + api.MustDeleteScheduler(re, suite.backendEndpoints, schedulers.EvictLeaderName) + checkEvictLeaderSchedulerExist(re, schedulersController, false) + + // The default scheduler could not be deleted, it could only be disabled. + defaultSchedulerNames := []string{ + schedulers.BalanceLeaderName, + schedulers.BalanceRegionName, + schedulers.HotRegionName, + } + checkDisabled := func(name string, shouldDisabled bool) { + re.NotNil(schedulersController.GetScheduler(name), name) + testutil.Eventually(re, func() bool { + disabled, err := schedulersController.IsSchedulerDisabled(name) + re.NoError(err, name) + return disabled == shouldDisabled + }) + } + for _, name := range defaultSchedulerNames { + checkDisabled(name, false) + api.MustDeleteScheduler(re, suite.backendEndpoints, name) + checkDisabled(name, true) + } + for _, name := range defaultSchedulerNames { + checkDisabled(name, true) + api.MustAddScheduler(re, suite.backendEndpoints, name, nil) + checkDisabled(name, false) + } +} + +func checkEvictLeaderSchedulerExist(re *require.Assertions, sc *schedulers.Controller, exist bool) { + testutil.Eventually(re, func() bool { + if !exist { + return sc.GetScheduler(schedulers.EvictLeaderName) == nil + } + return sc.GetScheduler(schedulers.EvictLeaderName) != nil + }) +} + +func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller, expected []uint64) { + handler, ok := sc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + re.True(ok) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + re.True(ok) + var evictStoreIDs []uint64 + testutil.Eventually(re, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) + re.ElementsMatch(evictStoreIDs, expected) +} + +func (suite *serverTestSuite) TestForwardRegionHeartbeat() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + s := &server.GrpcServer{Server: suite.pdLeader.GetServer()} + for i := uint64(1); i <= 3; i++ { + resp, err := s.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: i, + Address: fmt.Sprintf("mock://%d", i), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + } + + grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr()) + stream, err := grpcPDClient.RegionHeartbeat(suite.ctx) + re.NoError(err) + peers := []*metapb.Peer{ + {Id: 11, StoreId: 1}, + {Id: 22, StoreId: 2}, + {Id: 33, StoreId: 3}, + } + queryStats := &pdpb.QueryStats{ + Get: 5, + Coprocessor: 6, + Scan: 7, + Put: 8, + Delete: 9, + DeleteRange: 10, + AcquirePessimisticLock: 11, + Rollback: 12, + Prewrite: 13, + Commit: 14, + } + interval := &pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10} + downPeers := []*pdpb.PeerStats{{Peer: peers[2], DownSeconds: 100}} + pendingPeers := []*metapb.Peer{peers[2]} + regionReq := &pdpb.RegionHeartbeatRequest{ + Header: testutil.NewRequestHeader(suite.pdLeader.GetClusterID()), + Region: &metapb.Region{Id: 10, Peers: peers, StartKey: []byte("a"), EndKey: []byte("b")}, + Leader: peers[0], + DownPeers: downPeers, + PendingPeers: pendingPeers, + BytesWritten: 10, + BytesRead: 20, + KeysWritten: 100, + KeysRead: 200, + ApproximateSize: 30 * units.MiB, + ApproximateKeys: 300, + Interval: interval, + QueryStats: queryStats, + Term: 1, + CpuUsage: 100, + } + err = stream.Send(regionReq) + re.NoError(err) + testutil.Eventually(re, func() bool { + region := tc.GetPrimaryServer().GetCluster().GetRegion(10) + return region.GetBytesRead() == 20 && region.GetBytesWritten() == 10 && + region.GetKeysRead() == 200 && region.GetKeysWritten() == 100 && region.GetTerm() == 1 && + region.GetApproximateKeys() == 300 && region.GetApproximateSize() == 30 && + reflect.DeepEqual(region.GetLeader(), peers[0]) && + reflect.DeepEqual(region.GetInterval(), interval) && region.GetReadQueryNum() == 18 && region.GetWriteQueryNum() == 77 && + reflect.DeepEqual(region.GetDownPeers(), downPeers) && reflect.DeepEqual(region.GetPendingPeers(), pendingPeers) + }) +} + +func (suite *serverTestSuite) TestStoreLimit() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + oc := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetOperatorController() + leaderServer := suite.pdLeader.GetServer() + conf := leaderServer.GetReplicationConfig().Clone() + conf.MaxReplicas = 1 + leaderServer.SetReplicationConfig(*conf) + grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr()) + for i := uint64(1); i <= 2; i++ { + resp, err := grpcPDClient.PutStore( + context.Background(), &pdpb.PutStoreRequest{ + Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()}, + Store: &metapb.Store{ + Id: i, + Address: fmt.Sprintf("mock://%d", i), + State: metapb.StoreState_Up, + Version: "7.0.0", + }, + }, + ) + re.NoError(err) + re.Empty(resp.GetHeader().GetError()) + } + + stream, err := grpcPDClient.RegionHeartbeat(suite.ctx) + re.NoError(err) + for i := uint64(2); i <= 10; i++ { + peers := []*metapb.Peer{{Id: i, StoreId: 1}} + regionReq := &pdpb.RegionHeartbeatRequest{ + Header: testutil.NewRequestHeader(suite.pdLeader.GetClusterID()), + Region: &metapb.Region{ + Id: i, + Peers: peers, + StartKey: []byte(fmt.Sprintf("t%d", i)), + EndKey: []byte(fmt.Sprintf("t%d", i+1)), + }, + Leader: peers[0], + ApproximateSize: 10 * units.MiB, + } + err = stream.Send(regionReq) + re.NoError(err) + } + + leaderServer.GetRaftCluster().SetStoreLimit(1, storelimit.AddPeer, 60) + leaderServer.GetRaftCluster().SetStoreLimit(1, storelimit.RemovePeer, 60) + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.AddPeer, 60) + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 60) + // There is a time window between setting store limit in API service side and capturing the change in scheduling service. + waitSyncFinish(re, tc, storelimit.AddPeer, 60) + for i := uint64(1); i <= 5; i++ { + op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorSuccess(re, oc, op) + } + op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorFail(re, oc, op) + + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.AddPeer, 120) + waitSyncFinish(re, tc, storelimit.AddPeer, 120) + for i := uint64(1); i <= 10; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorSuccess(re, oc, op) + } + leaderServer.GetRaftCluster().SetAllStoresLimit(storelimit.AddPeer, 60) + waitSyncFinish(re, tc, storelimit.AddPeer, 60) + for i := uint64(1); i <= 5; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorSuccess(re, oc, op) + } + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100}) + checkOperatorFail(re, oc, op) + + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 60) + waitSyncFinish(re, tc, storelimit.RemovePeer, 60) + for i := uint64(1); i <= 5; i++ { + op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorSuccess(re, oc, op) + } + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorFail(re, oc, op) + + leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 120) + waitSyncFinish(re, tc, storelimit.RemovePeer, 120) + for i := uint64(1); i <= 10; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorSuccess(re, oc, op) + } + leaderServer.GetRaftCluster().SetAllStoresLimit(storelimit.RemovePeer, 60) + waitSyncFinish(re, tc, storelimit.RemovePeer, 60) + for i := uint64(1); i <= 5; i++ { + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorSuccess(re, oc, op) + } + op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) + checkOperatorFail(re, oc, op) +} + +func checkOperatorSuccess(re *require.Assertions, oc *operator.Controller, op *operator.Operator) { + re.True(oc.AddOperator(op)) + re.True(oc.RemoveOperator(op)) + re.True(op.IsEnd()) + re.Equal(op, oc.GetOperatorStatus(op.RegionID()).Operator) +} + +func checkOperatorFail(re *require.Assertions, oc *operator.Controller, op *operator.Operator) { + re.False(oc.AddOperator(op)) + re.False(oc.RemoveOperator(op)) +} + +func waitSyncFinish(re *require.Assertions, tc *tests.TestSchedulingCluster, typ storelimit.Type, expectedLimit float64) { + testutil.Eventually(re, func() bool { + return tc.GetPrimaryServer().GetCluster().GetSharedConfig().GetStoreLimitByType(2, typ) == expectedLimit + }) +} + +type multipleServerTestSuite struct { + suite.Suite + ctx context.Context + cancel context.CancelFunc + cluster *tests.TestCluster + pdLeader *tests.TestServer + backendEndpoints string +} + +func TestMultipleServerTestSuite(t *testing.T) { + suite.Run(t, new(multipleServerTestSuite)) +} + +func (suite *multipleServerTestSuite) SetupSuite() { + var err error + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 2) + re.NoError(err) + + err = suite.cluster.RunInitialServers() + re.NoError(err) + + leaderName := suite.cluster.WaitLeader() + suite.pdLeader = suite.cluster.GetServer(leaderName) + suite.backendEndpoints = suite.pdLeader.GetAddr() + re.NoError(suite.pdLeader.BootstrapCluster()) +} + +func (suite *multipleServerTestSuite) TearDownSuite() { + re := suite.Require() + suite.cluster.Destroy() + suite.cancel() + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) +} + +func (suite *multipleServerTestSuite) TestReElectLeader() { + re := suite.Require() + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + + rc := suite.pdLeader.GetServer().GetRaftCluster() + re.NotNil(rc) + regionLen := 100 + regions := tests.InitRegions(regionLen) + for _, region := range regions { + err = rc.HandleRegionHeartbeat(region) + re.NoError(err) + } + + originLeaderName := suite.pdLeader.GetLeader().GetName() + suite.pdLeader.ResignLeader() + newLeaderName := suite.cluster.WaitLeader() + re.NotEqual(originLeaderName, newLeaderName) + + suite.pdLeader.ResignLeader() + newLeaderName = suite.cluster.WaitLeader() + re.Equal(originLeaderName, newLeaderName) + + rc = suite.pdLeader.GetServer().GetRaftCluster() + rc.IsPrepared() +}