Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refine code, decrease indent, and rename #8276

Merged
merged 12 commits into from
Jun 14, 2024
122 changes: 63 additions & 59 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,47 +130,50 @@ func (m *Manager) UpdateProgress(progress string, current, remaining float64, is
m.Lock()
defer m.Unlock()

if p, exist := m.progresses[progress]; exist {
for _, op := range opts {
op(p)
}
p.remaining = remaining
if p.total < remaining {
p.total = remaining
}
p, exist := m.progresses[progress]
if !exist {
return
}

p.history.PushBack(current)
p.currentWindowLength++
for _, op := range opts {
op(p)
}
p.remaining = remaining
if p.total < remaining {
p.total = remaining
}

// try to move `front` into correct place.
for p.currentWindowLength > p.windowLength {
p.front = p.front.Next()
p.currentWindowLength--
}
for p.currentWindowLength < p.windowLength && p.front.Prev() != nil {
p.front = p.front.Prev()
p.currentWindowLength++
}
p.history.PushBack(current)
p.currentWindowLength++

for p.history.Len() > p.windowCapacity {
p.history.Remove(p.history.Front())
}
// try to move `front` into correct place.
for p.currentWindowLength > p.windowLength {
p.front = p.front.Next()
p.currentWindowLength--
}
for p.currentWindowLength < p.windowLength && p.front.Prev() != nil {
p.front = p.front.Prev()
p.currentWindowLength++
}

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (current - p.front.Value.(float64)) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.front.Value.(float64) - current) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
}
if p.lastSpeed < 0 {
p.lastSpeed = 0
}
for p.history.Len() > p.windowCapacity {
p.history.Remove(p.history.Front())
}

// It means it just init and we haven't update the progress
if p.history.Len() <= 1 {
p.lastSpeed = 0
} else if isInc {
// the value increases, e.g., [1, 2, 3]
p.lastSpeed = (current - p.front.Value.(float64)) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
} else {
// the value decreases, e.g., [3, 2, 1]
p.lastSpeed = (p.front.Value.(float64) - current) /
(float64(p.currentWindowLength-1) * p.updateInterval.Seconds())
}
if p.lastSpeed < 0 {
p.lastSpeed = 0
}
}

Expand Down Expand Up @@ -201,39 +204,40 @@ func (m *Manager) GetProgresses(filter func(p string) bool) []string {
m.RLock()
defer m.RUnlock()

processes := []string{}
progresses := make([]string, 0, len(m.progresses))
for p := range m.progresses {
if filter(p) {
processes = append(processes, p)
progresses = append(progresses, p)
}
}
return processes
return progresses
}

// Status returns the current progress status of a give name.
func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed float64, err error) {
func (m *Manager) Status(progressName string) (progress, leftSeconds, currentSpeed float64, err error) {
m.RLock()
defer m.RUnlock()

if p, exist := m.progresses[progress]; exist {
process = 1 - p.remaining/p.total
if process < 0 {
process = 0
err = errs.ErrProgressWrongStatus.FastGenByArgs(fmt.Sprintf("the remaining: %v is larger than the total: %v", p.remaining, p.total))
return
}
currentSpeed = p.lastSpeed
// When the progress is newly added, there is no last speed.
if p.lastSpeed == 0 && p.history.Len() <= 1 {
currentSpeed = 0
}

leftSeconds = p.remaining / currentSpeed
if math.IsNaN(leftSeconds) || math.IsInf(leftSeconds, 0) {
leftSeconds = math.MaxFloat64
}
p, exist := m.progresses[progressName]
if !exist {
err = errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the progress: %s", progressName))
return
}
progress = 1 - p.remaining/p.total
if progress < 0 {
progress = 0
err = errs.ErrProgressWrongStatus.FastGenByArgs(fmt.Sprintf("the remaining: %v is larger than the total: %v", p.remaining, p.total))
return
}
err = errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the progress: %s", progress))
currentSpeed = p.lastSpeed
// When the progress is newly added, there is no last speed.
if p.lastSpeed == 0 && p.history.Len() <= 1 {
currentSpeed = 0
}

leftSeconds = p.remaining / currentSpeed
if math.IsNaN(leftSeconds) || math.IsInf(leftSeconds, 0) {
leftSeconds = math.MaxFloat64
}
return
}
9 changes: 4 additions & 5 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func (s *evictSlowStoreScheduler) IsScheduleAllowed(cluster sche.SchedulerCluste

func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
evictSlowStoreCounter.Inc()
var ops []*operator.Operator

if s.conf.evictStore() != 0 {
store := cluster.GetStore(s.conf.evictStore())
Expand All @@ -298,7 +297,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool
return s.schedulerEvictLeader(cluster), nil
}
s.cleanupEvictLeader(cluster)
return ops, nil
return nil, nil
}

var slowStore *core.StoreInfo
Expand All @@ -311,14 +310,14 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool
if (store.IsPreparing() || store.IsServing()) && store.IsSlow() {
// Do nothing if there is more than one slow store.
if slowStore != nil {
return ops, nil
return nil, nil
}
slowStore = store
}
}

if slowStore == nil || slowStore.GetSlowScore() < slowStoreEvictThreshold {
return ops, nil
return nil, nil
}

// If there is only one slow store, evict leaders from that store.
Expand All @@ -327,7 +326,7 @@ func (s *evictSlowStoreScheduler) Schedule(cluster sche.SchedulerCluster, _ bool
err := s.prepareEvictLeader(cluster, slowStore.GetID())
if err != nil {
log.Info("prepare for evicting leader failed", zap.Error(err), zap.Uint64("store-id", slowStore.GetID()))
return ops, nil
return nil, nil
}
return s.schedulerEvictLeader(cluster), nil
}
Expand Down
35 changes: 17 additions & 18 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ func (s *ScheduleController) Stop() {

// Schedule tries to create some operators.
func (s *ScheduleController) Schedule(diagnosable bool) []*operator.Operator {
retry:
for i := 0; i < maxScheduleRetries; i++ {
// no need to retry if schedule should stop to speed exit
select {
Expand All @@ -470,29 +471,27 @@ func (s *ScheduleController) Schedule(diagnosable bool) []*operator.Operator {
if diagnosable {
s.diagnosticRecorder.SetResultFromPlans(ops, plans)
}
foundDisabled := false
if len(ops) == 0 {
continue
}

// If we have schedule, reset interval to the minimal interval.
s.nextInterval = s.Scheduler.GetMinInterval()
for _, op := range ops {
if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil {
region := s.cluster.GetRegion(op.RegionID())
if region == nil {
continue
}
if labelMgr.ScheduleDisabled(region) {
denySchedulersByLabelerCounter.Inc()
foundDisabled = true
break
}
region := s.cluster.GetRegion(op.RegionID())
if region == nil {
continue retry
}
}
if len(ops) > 0 {
// If we have schedule, reset interval to the minimal interval.
s.nextInterval = s.Scheduler.GetMinInterval()
// try regenerating operators
if foundDisabled {
labelMgr := s.cluster.GetRegionLabeler()
if labelMgr == nil {
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that if the region == nil, do we still need to count it in ops?

Copy link
Member Author

@okJiang okJiang Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i, data := l.rangeList.GetData(region.GetStartKey(), region.GetEndKey()); i != -1 {

seems it will be panic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

267dc63

region == nil means the ops are not normal right now, so can just retry

}
return ops
if labelMgr.ScheduleDisabled(region) {
denySchedulersByLabelerCounter.Inc()
continue retry
}
}
return ops
}
s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval)
return nil
Expand Down
37 changes: 19 additions & 18 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,21 +1345,22 @@ func (c *RaftCluster) RemoveStore(storeID uint64, physicallyDestroyed bool) erro
zap.Uint64("store-id", storeID),
zap.String("store-address", newStore.GetAddress()),
zap.Bool("physically-destroyed", newStore.IsPhysicallyDestroyed()))
err := c.setStore(newStore)
if err == nil {
regionSize := float64(c.GetStoreRegionSize(storeID))
c.resetProgress(storeID, store.GetAddress())
c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration()))
// record the current store limit in memory
c.prevStoreLimit[storeID] = map[storelimit.Type]float64{
storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer),
storelimit.RemovePeer: c.GetStoreLimitByType(storeID, storelimit.RemovePeer),
}
// TODO: if the persist operation encounters error, the "Unlimited" will be rollback.
// And considering the store state has changed, RemoveStore is actually successful.
_ = c.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited)

if err := c.setStore(newStore); err != nil {
return err
}
return err
regionSize := float64(c.GetStoreRegionSize(storeID))
c.resetProgress(storeID, store.GetAddress())
c.progressManager.AddProgress(encodeRemovingProgressKey(storeID), regionSize, regionSize, nodeStateCheckJobInterval, progress.WindowDurationOption(c.GetCoordinator().GetPatrolRegionsDuration()))
// record the current store limit in memory
c.prevStoreLimit[storeID] = map[storelimit.Type]float64{
storelimit.AddPeer: c.GetStoreLimitByType(storeID, storelimit.AddPeer),
storelimit.RemovePeer: c.GetStoreLimitByType(storeID, storelimit.RemovePeer),
}
// TODO: if the persist operation encounters error, the "Unlimited" will be rollback.
// And considering the store state has changed, RemoveStore is actually successful.
_ = c.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited)
return nil
}

func (c *RaftCluster) checkReplicaBeforeOfflineStore(storeID uint64) error {
Expand Down Expand Up @@ -1846,14 +1847,14 @@ func (c *RaftCluster) updateProgress(storeID uint64, storeAddress, action string
return
}
c.progressManager.UpdateProgress(progressName, current, remaining, isInc, opts...)
process, ls, cs, err := c.progressManager.Status(progressName)
progress, leftSeconds, currentSpeed, err := c.progressManager.Status(progressName)
if err != nil {
log.Error("get progress status failed", zap.String("progress", progressName), zap.Float64("remaining", remaining), errs.ZapError(err))
return
}
storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process)
storesSpeedGauge.WithLabelValues(storeAddress, storeLabel, action).Set(cs)
storesETAGauge.WithLabelValues(storeAddress, storeLabel, action).Set(ls)
storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(progress)
storesSpeedGauge.WithLabelValues(storeAddress, storeLabel, action).Set(currentSpeed)
storesETAGauge.WithLabelValues(storeAddress, storeLabel, action).Set(leftSeconds)
}

func (c *RaftCluster) resetProgress(storeID uint64, storeAddress string) {
Expand Down