Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: add metrics when there is potential reverse #8741

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,8 @@ func (s *balanceLeaderScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
batch := s.conf.getBatch()
balanceLeaderScheduleCounter.Inc()

leaderSchedulePolicy := cluster.GetSchedulerConfig().GetLeaderSchedulePolicy()
opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster())
kind := constant.NewScheduleKind(constant.LeaderKind, leaderSchedulePolicy)
solver := newSolver(basePlan, kind, cluster, opInfluence)
solver := newSolver(basePlan, s.tp, cluster, opInfluence)

stores := cluster.GetStores()
scoreFunc := func(store *core.StoreInfo) float64 {
Expand Down Expand Up @@ -516,7 +514,8 @@ func (s *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla
func (s *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator {
solver.Step++
defer func() { solver.Step-- }()
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(s.GetName()), solver.targetStoreScore(s.GetName())
solver.calcSourceStoreScore(s.GetName())
solver.calcTargetStoreScore(s.GetName())
if !solver.shouldBalance(s.GetName()) {
balanceLeaderSkipCounter.Inc()
if collector != nil {
Expand Down
14 changes: 8 additions & 6 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
sourceStores := filter.SelectSourceStores(stores, s.filters, conf, collector, s.filterCounter)
opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster())
s.OpController.GetFastOpInfluence(cluster.GetBasicCluster(), opInfluence)
kind := constant.NewScheduleKind(constant.RegionKind, constant.BySize)
solver := newSolver(basePlan, kind, cluster, opInfluence)
solver := newSolver(basePlan, s.tp, cluster, opInfluence)

sort.Slice(sourceStores, func(i, j int) bool {
iOp := solver.getOpInfluence(sourceStores[i].GetID())
Expand Down Expand Up @@ -137,7 +136,7 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
// sourcesStore is sorted by region score desc, so we pick the first store as source store.
for sourceIndex, solver.Source = range sourceStores {
retryLimit := s.retryQuota.getLimit(solver.Source)
solver.sourceScore = solver.sourceStoreScore(s.GetName())
solver.calcSourceStoreScore(s.GetName())
if sourceIndex == len(sourceStores)-1 {
break
}
Expand Down Expand Up @@ -223,14 +222,19 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
// candidates are sorted by region score desc, so we pick the last store as target store.
for i := range candidates.Stores {
solver.Target = candidates.Stores[len(candidates.Stores)-i-1]
solver.targetScore = solver.targetStoreScore(s.GetName())
solver.calcTargetStoreScore(s.GetName())
regionID := solver.Region.GetID()
sourceID := solver.Source.GetID()
targetID := solver.Target.GetID()
sourceLabel := strconv.FormatUint(sourceID, 10)
targetLabel := strconv.FormatUint(targetID, 10)
log.Debug("candidate store", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID))

if !solver.shouldBalance(s.GetName()) {
balanceRegionSkipCounter.Inc()
if solver.isPotentialReverse() {
balancePotentialReverseCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel)
}
if collector != nil {
collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed)))
}
Expand All @@ -252,8 +256,6 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co
collector.Collect()
}
solver.Step--
sourceLabel := strconv.FormatUint(sourceID, 10)
targetLabel := strconv.FormatUint(targetID, 10)
op.FinishedCounters = append(op.FinishedCounters,
balanceDirectionCounter.WithLabelValues(s.GetName(), sourceLabel, targetLabel),
)
Expand Down
27 changes: 16 additions & 11 deletions pkg/schedule/schedulers/balance_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestInfluenceAmp(t *testing.T) {
re := require.New(t)

R := int64(96)
kind := constant.NewScheduleKind(constant.RegionKind, constant.BySize)

influence := oc.GetOpInfluence(tc.GetBasicCluster())
influence.GetStoreInfluence(1).RegionSize = R
Expand All @@ -60,16 +59,18 @@ func TestInfluenceAmp(t *testing.T) {
region := tc.GetRegion(1).Clone(core.SetApproximateSize(R))
tc.PutRegion(region)
basePlan := plan.NewBalanceSchedulerPlan()
solver := newSolver(basePlan, kind, tc, influence)
solver := newSolver(basePlan, types.BalanceRegionScheduler, tc, influence)
solver.Source, solver.Target, solver.Region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1)
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("")
solver.calcSourceStoreScore("")
solver.calcTargetStoreScore("")
re.True(solver.shouldBalance(""))

// It will not schedule if the diff region count is greater than the sum
// of TolerantSizeRatio and influenceAmp*2.
tc.AddRegionStore(1, int(100+influenceAmp+2))
solver.Source = tc.GetStore(1)
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("")
solver.calcSourceStoreScore("")
solver.calcTargetStoreScore("")
re.False(solver.shouldBalance(""))
re.Less(solver.sourceScore-solver.targetScore, float64(1))
}
Expand Down Expand Up @@ -143,11 +144,12 @@ func TestShouldBalance(t *testing.T) {
region := tc.GetRegion(1).Clone(core.SetApproximateSize(testCase.regionSize))
tc.PutRegion(region)
tc.SetLeaderSchedulePolicy(testCase.kind.String())
kind := constant.NewScheduleKind(constant.LeaderKind, testCase.kind)
basePlan := plan.NewBalanceSchedulerPlan()
solver := newSolver(basePlan, kind, tc, oc.GetOpInfluence(tc.GetBasicCluster()))
solver := newSolver(basePlan, types.BalanceLeaderScheduler, tc, oc.GetOpInfluence(tc.GetBasicCluster()))
solver.kind = constant.NewScheduleKind(constant.LeaderKind, testCase.kind)
solver.Source, solver.Target, solver.Region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1)
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("")
solver.calcSourceStoreScore("")
solver.calcTargetStoreScore("")
re.Equal(testCase.expectedResult, solver.shouldBalance(""))
}

Expand All @@ -157,11 +159,12 @@ func TestShouldBalance(t *testing.T) {
tc.AddRegionStore(2, int(testCase.targetCount))
region := tc.GetRegion(1).Clone(core.SetApproximateSize(testCase.regionSize))
tc.PutRegion(region)
kind := constant.NewScheduleKind(constant.RegionKind, testCase.kind)
basePlan := plan.NewBalanceSchedulerPlan()
solver := newSolver(basePlan, kind, tc, oc.GetOpInfluence(tc.GetBasicCluster()))
solver := newSolver(basePlan, types.BalanceRegionScheduler, tc, oc.GetOpInfluence(tc.GetBasicCluster()))
solver.kind = constant.NewScheduleKind(constant.RegionKind, testCase.kind)
solver.Source, solver.Target, solver.Region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1)
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("")
solver.calcSourceStoreScore("")
solver.calcTargetStoreScore("")
re.Equal(testCase.expectedResult, solver.shouldBalance(""))
}
}
Expand Down Expand Up @@ -209,7 +212,9 @@ func TestTolerantRatio(t *testing.T) {
for _, t := range tbl {
tc.SetTolerantSizeRatio(t.ratio)
basePlan := plan.NewBalanceSchedulerPlan()
solver := newSolver(basePlan, t.kind, tc, operator.OpInfluence{})
solver := newSolver(basePlan, types.BalanceLeaderScheduler, tc, operator.OpInfluence{})
solver.kind = t.kind
solver.tolerantSizeRatio = adjustTolerantRatio(tc, t.kind)
solver.Region = region

sourceScore := t.expectTolerantResource(t.kind)
Expand Down
9 changes: 6 additions & 3 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ func (s *balanceWitnessScheduler) Schedule(cluster sche.SchedulerCluster, dryRun
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()

opInfluence := s.OpController.GetOpInfluence(cluster.GetBasicCluster())
kind := constant.NewScheduleKind(constant.WitnessKind, constant.ByCount)
solver := newSolver(basePlan, kind, cluster, opInfluence)
solver := newSolver(basePlan, s.tp, cluster, opInfluence)

stores := cluster.GetStores()
scoreFunc := func(store *core.StoreInfo) float64 {
Expand Down Expand Up @@ -329,9 +328,13 @@ func (s *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector *
func (s *balanceWitnessScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator {
solver.Step++
defer func() { solver.Step-- }()
solver.sourceScore, solver.targetScore = solver.sourceStoreScore(s.GetName()), solver.targetStoreScore(s.GetName())
solver.calcSourceStoreScore(s.GetName())
solver.calcTargetStoreScore(s.GetName())
if !solver.shouldBalance(s.GetName()) {
schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc()
if solver.isPotentialReverse() {
schedulerCounter.WithLabelValues(s.GetName(), "potential-reverse").Inc()
}
if collector != nil {
collector.Collect(plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed)))
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/schedule/schedulers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ var (
Help: "Counter of direction of balance related schedulers.",
}, []string{"type", "source", "target"})

balancePotentialReverseCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "potential_reverse",
Help: "Counter of direction which would introduce potential reverse.",
}, []string{"type", "source", "target"})

// TODO: pre-allocate gauge metrics
hotDirectionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
69 changes: 45 additions & 24 deletions pkg/schedule/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/statistics"
"go.uber.org/zap"
)
Expand All @@ -50,12 +51,25 @@
tolerantSizeRatio float64
tolerantSource int64
fit *placement.RegionFit

sourceScore float64
targetScore float64
}

func newSolver(basePlan *plan.BalanceSchedulerPlan, kind constant.ScheduleKind, cluster sche.SchedulerCluster, opInfluence operator.OpInfluence) *solver {
sourceScore float64
targetScore float64
sourceDelta int64
targetDelta int64
}

func newSolver(basePlan *plan.BalanceSchedulerPlan, tp types.CheckerSchedulerType, cluster sche.SchedulerCluster, opInfluence operator.OpInfluence) *solver {
var kind constant.ScheduleKind
switch tp {
case types.BalanceLeaderScheduler:
leaderSchedulePolicy := cluster.GetSchedulerConfig().GetLeaderSchedulePolicy()
kind = constant.NewScheduleKind(constant.LeaderKind, leaderSchedulePolicy)
case types.BalanceRegionScheduler:
kind = constant.NewScheduleKind(constant.RegionKind, constant.BySize)
case types.BalanceWitnessScheduler:
kind = constant.NewScheduleKind(constant.WitnessKind, constant.ByCount)
default:
log.Fatal("invalid scheduler type")

Check warning on line 71 in pkg/schedule/schedulers/utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/utils.go#L70-L71

Added lines #L70 - L71 were not covered by tests
}
return &solver{
BalanceSchedulerPlan: basePlan,
SchedulerCluster: cluster,
Expand Down Expand Up @@ -85,7 +99,7 @@
return strconv.FormatUint(p.targetStoreID(), 10)
}

func (p *solver) sourceStoreScore(scheduleName string) float64 {
func (p *solver) calcSourceStoreScore(scheduleName string) {
sourceID := p.Source.GetID()
tolerantResource := p.getTolerantResource()
// to avoid schedule too much, if A's core greater than B and C a little
Expand All @@ -99,22 +113,20 @@
opInfluenceStatus.WithLabelValues(scheduleName, strconv.FormatUint(sourceID, 10), "source").Set(float64(influence))
tolerantResourceStatus.WithLabelValues(scheduleName).Set(float64(tolerantResource))
}
var score float64
switch p.kind.Resource {
case constant.LeaderKind:
sourceDelta := influence - tolerantResource
score = p.Source.LeaderScore(p.kind.Policy, sourceDelta)
p.sourceDelta = influence - tolerantResource
p.sourceScore = p.Source.LeaderScore(p.kind.Policy, p.sourceDelta)
case constant.RegionKind:
sourceDelta := influence*influenceAmp - tolerantResource
score = p.Source.RegionScore(p.GetSchedulerConfig().GetRegionScoreFormulaVersion(), p.GetSchedulerConfig().GetHighSpaceRatio(), p.GetSchedulerConfig().GetLowSpaceRatio(), sourceDelta)
p.sourceDelta = influence*influenceAmp - tolerantResource
p.sourceScore = p.Source.RegionScore(p.GetSchedulerConfig().GetRegionScoreFormulaVersion(), p.GetSchedulerConfig().GetHighSpaceRatio(), p.GetSchedulerConfig().GetLowSpaceRatio(), p.sourceDelta)
case constant.WitnessKind:
sourceDelta := influence - tolerantResource
score = p.Source.WitnessScore(sourceDelta)
p.sourceDelta = influence - tolerantResource
p.sourceScore = p.Source.WitnessScore(p.sourceDelta)
}
return score
}

func (p *solver) targetStoreScore(scheduleName string) float64 {
func (p *solver) calcTargetStoreScore(scheduleName string) {
targetID := p.Target.GetID()
// to avoid schedule too much, if A's score less than B and C in small range,
// we want that A can be moved in one region not two
Expand All @@ -129,19 +141,17 @@
if p.GetSchedulerConfig().IsDebugMetricsEnabled() {
opInfluenceStatus.WithLabelValues(scheduleName, strconv.FormatUint(targetID, 10), "target").Set(float64(influence))
}
var score float64
switch p.kind.Resource {
case constant.LeaderKind:
targetDelta := influence + tolerantResource
score = p.Target.LeaderScore(p.kind.Policy, targetDelta)
p.targetDelta = influence + tolerantResource
p.targetScore = p.Target.LeaderScore(p.kind.Policy, p.targetDelta)
case constant.RegionKind:
targetDelta := influence*influenceAmp + tolerantResource
score = p.Target.RegionScore(p.GetSchedulerConfig().GetRegionScoreFormulaVersion(), p.GetSchedulerConfig().GetHighSpaceRatio(), p.GetSchedulerConfig().GetLowSpaceRatio(), targetDelta)
p.targetDelta = influence*influenceAmp + tolerantResource
p.targetScore = p.Target.RegionScore(p.GetSchedulerConfig().GetRegionScoreFormulaVersion(), p.GetSchedulerConfig().GetHighSpaceRatio(), p.GetSchedulerConfig().GetLowSpaceRatio(), p.targetDelta)
case constant.WitnessKind:
targetDelta := influence + tolerantResource
score = p.Target.WitnessScore(targetDelta)
p.targetDelta = influence + tolerantResource
p.targetScore = p.Target.WitnessScore(p.targetDelta)
}
return score
}

// Both of the source store's score and target store's score should be calculated before calling this function.
Expand All @@ -166,6 +176,17 @@
return shouldBalance
}

func (p *solver) isPotentialReverse() bool {
// p.sourceScore is considered as the source store's score after the region is moved out.
// p.targetScore is considered as the target store's score after the region is moved in.
// So original source store's score is p.sourceScore+p.sourceDelta, original target store's score is p.targetScore-p.targetDelta.
// If p.sourceScore+float64(p.sourceDelta) < p.targetScore-float64(p.targetDelta),
// it means although the source store's score is larger than the target store's score,
// after the region is moved out, the source store's score will be less than the target store's score.
// In another word, there will be a reverse after the region is moved.
return p.sourceScore+float64(p.sourceDelta) < p.targetScore-float64(p.targetDelta)
}

func (p *solver) getTolerantResource() int64 {
if p.tolerantSource > 0 {
return p.tolerantSource
Expand Down