Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 18, 2024
1 parent 512ca52 commit c408dd2
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 132 deletions.
41 changes: 5 additions & 36 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,19 @@ var (
// Location management, mainly used for cross data center deployment.
type ReplicaChecker struct {
PauseController
<<<<<<< HEAD
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
regionWaitingList cache.Cache
=======
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
pendingProcessedRegions *cache.TTLUint64
r *rand.Rand
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
r *rand.Rand
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, regionWaitingList cache.Cache) *ReplicaChecker {
return &ReplicaChecker{
<<<<<<< HEAD
cluster: cluster,
conf: conf,
regionWaitingList: regionWaitingList,
=======
cluster: cluster,
conf: conf,
pendingProcessedRegions: pendingProcessedRegions,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand Down Expand Up @@ -195,11 +183,7 @@ func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.O
log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID()))
replicaCheckerNoTargetStoreCounter.Inc()
if filterByTempState {
<<<<<<< HEAD
r.regionWaitingList.Put(region.GetID(), nil)
=======
c.pendingProcessedRegions.Put(region.GetID(), nil)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
c.regionWaitingList.Put(region.GetID(), nil)
}
return nil
}
Expand All @@ -226,11 +210,7 @@ func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *opera
old := c.strategy(c.r, region).SelectStoreToRemove(regionStores)
if old == 0 {
replicaCheckerNoWorstPeerCounter.Inc()
<<<<<<< HEAD
r.regionWaitingList.Put(region.GetID(), nil)
=======
c.pendingProcessedRegions.Put(region.GetID(), nil)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
c.regionWaitingList.Put(region.GetID(), nil)
return nil
}
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old)
Expand Down Expand Up @@ -295,11 +275,7 @@ func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status
}
log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID()))
if filterByTempState {
<<<<<<< HEAD
r.regionWaitingList.Put(region.GetID(), nil)
=======
c.pendingProcessedRegions.Put(region.GetID(), nil)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
c.regionWaitingList.Put(region.GetID(), nil)
}
return nil
}
Expand All @@ -319,17 +295,10 @@ func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status

func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy {
return &ReplicaStrategy{
<<<<<<< HEAD
checkerName: replicaCheckerName,
cluster: r.cluster,
locationLabels: r.conf.GetLocationLabels(),
isolationLevel: r.conf.GetIsolationLevel(),
=======
checkerName: c.Name(),
cluster: c.cluster,
locationLabels: c.conf.GetLocationLabels(),
isolationLevel: c.conf.GetIsolationLevel(),
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
region: region,
r: r,
}
Expand Down
22 changes: 2 additions & 20 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,45 +85,27 @@ var (
// RuleChecker fix/improve region by placement rules.
type RuleChecker struct {
PauseController
<<<<<<< HEAD
cluster sche.CheckerCluster
ruleManager *placement.RuleManager
name string
regionWaitingList cache.Cache
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
=======
cluster sche.CheckerCluster
ruleManager *placement.RuleManager
pendingProcessedRegions *cache.TTLUint64
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
r *rand.Rand
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
r *rand.Rand
}

// NewRuleChecker creates a checker instance.
func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker {
return &RuleChecker{
<<<<<<< HEAD
cluster: cluster,
ruleManager: ruleManager,
name: ruleCheckerName,
regionWaitingList: regionWaitingList,
pendingList: cache.NewDefaultCache(maxPendingListLen),
switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()),
record: newRecord(),
=======
cluster: cluster,
ruleManager: ruleManager,
pendingProcessedRegions: pendingProcessedRegions,
pendingList: cache.NewDefaultCache(maxPendingListLen),
switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()),
record: newRecord(),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand Down
20 changes: 2 additions & 18 deletions pkg/schedule/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,12 @@ func intervalGrow(x time.Duration, maxInterval time.Duration, typ intervalGrowth
// BaseScheduler is a basic scheduler for all other complex scheduler
type BaseScheduler struct {
OpController *operator.Controller
<<<<<<< HEAD
}

// NewBaseScheduler returns a basic scheduler
func NewBaseScheduler(opController *operator.Controller) *BaseScheduler {
return &BaseScheduler{OpController: opController}
=======
R *rand.Rand

name string
tp types.CheckerSchedulerType
conf schedulerConfig
}

// NewBaseScheduler returns a basic scheduler
func NewBaseScheduler(
opController *operator.Controller,
tp types.CheckerSchedulerType,
conf schedulerConfig,
) *BaseScheduler {
return &BaseScheduler{OpController: opController, tp: tp, conf: conf, R: rand.New(rand.NewSource(time.Now().UnixNano()))}
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
func NewBaseScheduler(opController *operator.Controller) *BaseScheduler {
return &BaseScheduler{OpController: opController, R: rand.New(rand.NewSource(time.Now().UnixNano()))}
}

func (s *BaseScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down
24 changes: 4 additions & 20 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster)

func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
evictLeaderCounter.Inc()
<<<<<<< HEAD
return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil
=======
return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf), nil
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize), nil
}

func uniqueAppendOperator(dst []*operator.Operator, src ...*operator.Operator) []*operator.Operator {
Expand All @@ -300,18 +296,10 @@ type evictLeaderStoresConf interface {
getKeyRangesByID(id uint64) []core.KeyRange
}

<<<<<<< HEAD
func scheduleEvictLeaderBatch(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator {
=======
func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator {
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
func scheduleEvictLeaderBatch(r *rand.Rand, name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf, batchSize int) []*operator.Operator {
var ops []*operator.Operator
for i := 0; i < batchSize; i++ {
<<<<<<< HEAD
once := scheduleEvictLeaderOnce(name, typ, cluster, conf)
=======
once := scheduleEvictLeaderOnce(r, name, cluster, conf)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
once := scheduleEvictLeaderOnce(r, name, typ, cluster, conf)
// no more regions
if len(once) == 0 {
break
Expand All @@ -325,11 +313,7 @@ func scheduleEvictLeaderBatch(r *rand.Rand, name string, cluster sche.SchedulerC
return ops
}

<<<<<<< HEAD
func scheduleEvictLeaderOnce(name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator {
=======
func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator {
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
func scheduleEvictLeaderOnce(r *rand.Rand, name, typ string, cluster sche.SchedulerCluster, conf evictLeaderStoresConf) []*operator.Operator {
stores := conf.getStores()
ops := make([]*operator.Operator, 0, len(stores))
for _, storeID := range stores {
Expand Down
6 changes: 1 addition & 5 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,7 @@ func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster sche.SchedulerClust
}

func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster sche.SchedulerCluster) []*operator.Operator {
<<<<<<< HEAD
return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize)
=======
return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize)
}

func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
Expand Down
6 changes: 1 addition & 5 deletions pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,7 @@ func (s *evictSlowTrendScheduler) scheduleEvictLeader(cluster sche.SchedulerClus
return nil
}
storeSlowTrendEvictedStatusGauge.WithLabelValues(store.GetAddress(), strconv.FormatUint(store.GetID(), 10)).Set(1)
<<<<<<< HEAD
return scheduleEvictLeaderBatch(s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize)
=======
return scheduleEvictLeaderBatch(s.R, s.GetName(), cluster, s.conf)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
return scheduleEvictLeaderBatch(s.R, s.GetName(), s.GetType(), cluster, s.conf, EvictLeaderBatchSize)
}

func (s *evictSlowTrendScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
Expand Down
7 changes: 1 addition & 6 deletions pkg/schedule/schedulers/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,8 @@ func (s *labelScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([
}
f := filter.NewExcludedFilter(s.GetName(), nil, excludeStores)

<<<<<<< HEAD
target := filter.NewCandidates(cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true, OperatorLevel: constant.Medium}, f).
=======
target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), TransferLeader: true, OperatorLevel: constant.Medium}, f).
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: LabelName, TransferLeader: true, OperatorLevel: constant.Medium}, f).
RandomPick()
if target == nil {
log.Debug("label scheduler no target found for region", zap.Uint64("region-id", region.GetID()))
Expand Down
7 changes: 1 addition & 6 deletions pkg/schedule/schedulers/random_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,8 @@ func (s *randomMergeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster)
func (s *randomMergeScheduler) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) {
randomMergeCounter.Inc()

<<<<<<< HEAD
store := filter.NewCandidates(cluster.GetStores()).
FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true, OperatorLevel: constant.Low}).
=======
store := filter.NewCandidates(s.R, cluster.GetStores()).
FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true, OperatorLevel: constant.Low}).
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
FilterSource(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.conf.Name, MoveRegion: true, OperatorLevel: constant.Low}).
RandomPick()
if store == nil {
randomMergeNoSourceStoreCounter.Inc()
Expand Down
18 changes: 2 additions & 16 deletions pkg/schedule/schedulers/transfer_witness_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ batchLoop:
for i := 0; i < batchSize; i++ {
select {
case region := <-s.regions:
<<<<<<< HEAD
op, err := s.scheduleTransferWitnessLeader(name, typ, cluster, region)
=======
op, err := scheduleTransferWitnessLeader(s.R, name, cluster, region)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
op, err := s.scheduleTransferWitnessLeader(s.R, name, typ, cluster, region)
if err != nil {
log.Debug("fail to create transfer leader operator", errs.ZapError(err))
continue
Expand All @@ -105,11 +101,7 @@ batchLoop:
return ops
}

<<<<<<< HEAD
func (s *transferWitnessLeaderScheduler) scheduleTransferWitnessLeader(name, typ string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) {
=======
func scheduleTransferWitnessLeader(r *rand.Rand, name string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) {
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
func (s *transferWitnessLeaderScheduler) scheduleTransferWitnessLeader(r *rand.Rand, name, typ string, cluster sche.SchedulerCluster, region *core.RegionInfo) (*operator.Operator, error) {
var filters []filter.Filter
unhealthyPeerStores := make(map[uint64]struct{})
for _, peer := range region.GetDownPeers() {
Expand All @@ -118,14 +110,8 @@ func scheduleTransferWitnessLeader(r *rand.Rand, name string, cluster sche.Sched
for _, peer := range region.GetPendingPeers() {
unhealthyPeerStores[peer.GetStoreId()] = struct{}{}
}
<<<<<<< HEAD
filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores), &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent})
candidates := filter.NewCandidates(cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...)
=======
filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores),
&filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent})
candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...)
>>>>>>> 25dedabf5 (*: reduce rand NewSource (#8675))
// Compatible with old TiKV transfer leader logic.
target := candidates.RandomPick()
targets := candidates.PickAll()
Expand Down

0 comments on commit c408dd2

Please sign in to comment.