Skip to content

Commit

Permalink
Merge branch 'master' into case-delete
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Sep 30, 2024
2 parents c959207 + 26ced22 commit 4109a9a
Show file tree
Hide file tree
Showing 27 changed files with 393 additions and 338 deletions.
6 changes: 3 additions & 3 deletions pkg/schedule/checker/learner_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func NewLearnerChecker(cluster sche.CheckerCluster) *LearnerChecker {
}

// Check verifies a region's role, creating an Operator if need.
func (l *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator {
if l.IsPaused() {
func (c *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator {
if c.IsPaused() {
learnerCheckerPausedCounter.Inc()
return nil
}
for _, p := range region.GetLearners() {
op, err := operator.CreatePromoteLearnerOperator("promote-learner", l.cluster, region, p)
op, err := operator.CreatePromoteLearnerOperator("promote-learner", c.cluster, region, p)
if err != nil {
log.Debug("fail to create promote learner operator", errs.ZapError(err))
continue
Expand Down
50 changes: 25 additions & 25 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,32 @@ func (*MergeChecker) GetType() types.CheckerSchedulerType {

// RecordRegionSplit put the recently split region into cache. MergeChecker
// will skip check it for a while.
func (m *MergeChecker) RecordRegionSplit(regionIDs []uint64) {
func (c *MergeChecker) RecordRegionSplit(regionIDs []uint64) {
for _, regionID := range regionIDs {
m.splitCache.PutWithTTL(regionID, nil, m.conf.GetSplitMergeInterval())
c.splitCache.PutWithTTL(regionID, nil, c.conf.GetSplitMergeInterval())
}
}

// Check verifies a region's replicas, creating an Operator if need.
func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
func (c *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
mergeCheckerCounter.Inc()

if m.IsPaused() {
if c.IsPaused() {
mergeCheckerPausedCounter.Inc()
return nil
}

// update the split cache.
// It must be called before the following merge checker logic.
m.splitCache.UpdateTTL(m.conf.GetSplitMergeInterval())
c.splitCache.UpdateTTL(c.conf.GetSplitMergeInterval())

expireTime := m.startTime.Add(m.conf.GetSplitMergeInterval())
expireTime := c.startTime.Add(c.conf.GetSplitMergeInterval())
if time.Now().Before(expireTime) {
mergeCheckerRecentlyStartCounter.Inc()
return nil
}

if m.splitCache.Exists(region.GetID()) {
if c.splitCache.Exists(region.GetID()) {
mergeCheckerRecentlySplitCounter.Inc()
return nil
}
Expand All @@ -113,7 +113,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
}

// region is not small enough
if !region.NeedMerge(int64(m.conf.GetMaxMergeRegionSize()), int64(m.conf.GetMaxMergeRegionKeys())) {
if !region.NeedMerge(int64(c.conf.GetMaxMergeRegionSize()), int64(c.conf.GetMaxMergeRegionKeys())) {
mergeCheckerNoNeedCounter.Inc()
return nil
}
Expand All @@ -124,24 +124,24 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
return nil
}

if !filter.IsRegionReplicated(m.cluster, region) {
if !filter.IsRegionReplicated(c.cluster, region) {
mergeCheckerAbnormalReplicaCounter.Inc()
return nil
}

// skip hot region
if m.cluster.IsRegionHot(region) {
if c.cluster.IsRegionHot(region) {
mergeCheckerHotRegionCounter.Inc()
return nil
}

prev, next := m.cluster.GetAdjacentRegions(region)
prev, next := c.cluster.GetAdjacentRegions(region)

var target *core.RegionInfo
if m.checkTarget(region, next) {
if c.checkTarget(region, next) {
target = next
}
if !m.conf.IsOneWayMergeEnabled() && m.checkTarget(region, prev) { // allow a region can be merged by two ways.
if !c.conf.IsOneWayMergeEnabled() && c.checkTarget(region, prev) { // allow a region can be merged by two ways.
if target == nil || prev.GetApproximateSize() < next.GetApproximateSize() { // pick smaller
target = prev
}
Expand All @@ -152,7 +152,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
return nil
}

regionMaxSize := m.cluster.GetStoreConfig().GetRegionMaxSize()
regionMaxSize := c.cluster.GetStoreConfig().GetRegionMaxSize()
maxTargetRegionSizeThreshold := int64(float64(regionMaxSize) * float64(maxTargetRegionFactor))
if maxTargetRegionSizeThreshold < maxTargetRegionSize {
maxTargetRegionSizeThreshold = maxTargetRegionSize
Expand All @@ -161,22 +161,22 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
mergeCheckerTargetTooLargeCounter.Inc()
return nil
}
if err := m.cluster.GetStoreConfig().CheckRegionSize(uint64(target.GetApproximateSize()+region.GetApproximateSize()),
m.conf.GetMaxMergeRegionSize()); err != nil {
if err := c.cluster.GetStoreConfig().CheckRegionSize(uint64(target.GetApproximateSize()+region.GetApproximateSize()),
c.conf.GetMaxMergeRegionSize()); err != nil {
mergeCheckerSplitSizeAfterMergeCounter.Inc()
return nil
}

if err := m.cluster.GetStoreConfig().CheckRegionKeys(uint64(target.GetApproximateKeys()+region.GetApproximateKeys()),
m.conf.GetMaxMergeRegionKeys()); err != nil {
if err := c.cluster.GetStoreConfig().CheckRegionKeys(uint64(target.GetApproximateKeys()+region.GetApproximateKeys()),
c.conf.GetMaxMergeRegionKeys()); err != nil {
mergeCheckerSplitKeysAfterMergeCounter.Inc()
return nil
}

log.Debug("try to merge region",
logutil.ZapRedactStringer("from", core.RegionToHexMeta(region.GetMeta())),
logutil.ZapRedactStringer("to", core.RegionToHexMeta(target.GetMeta())))
ops, err := operator.CreateMergeRegionOperator("merge-region", m.cluster, region, target, operator.OpMerge)
ops, err := operator.CreateMergeRegionOperator("merge-region", c.cluster, region, target, operator.OpMerge)
if err != nil {
log.Warn("create merge region operator failed", errs.ZapError(err))
return nil
Expand All @@ -189,28 +189,28 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
return ops
}

func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
func (c *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
if adjacent == nil {
mergeCheckerAdjNotExistCounter.Inc()
return false
}

if m.splitCache.Exists(adjacent.GetID()) {
if c.splitCache.Exists(adjacent.GetID()) {
mergeCheckerAdjRecentlySplitCounter.Inc()
return false
}

if m.cluster.IsRegionHot(adjacent) {
if c.cluster.IsRegionHot(adjacent) {
mergeCheckerAdjRegionHotCounter.Inc()
return false
}

if !AllowMerge(m.cluster, region, adjacent) {
if !AllowMerge(c.cluster, region, adjacent) {
mergeCheckerAdjDisallowMergeCounter.Inc()
return false
}

if !checkPeerStore(m.cluster, region, adjacent) {
if !checkPeerStore(c.cluster, region, adjacent) {
mergeCheckerAdjAbnormalPeerStoreCounter.Inc()
return false
}
Expand All @@ -220,7 +220,7 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
return false
}

if !filter.IsRegionReplicated(m.cluster, adjacent) {
if !filter.IsRegionReplicated(c.cluster, adjacent) {
mergeCheckerAdjAbnormalReplicaCounter.Inc()
return false
}
Expand Down
Loading

0 comments on commit 4109a9a

Please sign in to comment.