Skip to content

Commit

Permalink
schedule: add check action when poll the opeators from opNotifierQueu…
Browse files Browse the repository at this point in the history
…e(pick from 8010) (tikv#8086)

ref tikv#7992

Signed-off-by: TonsnakeLin <[email protected]>
  • Loading branch information
TonsnakeLin committed May 15, 2024
1 parent 0ecafcb commit acecac4
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 2 deletions.
10 changes: 10 additions & 0 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ const (
OperatorExpireTime = 3 * time.Second
)

// CancelReasonType is the type of cancel reason.
type CancelReasonType string

var (
// RegionNotFound is the cancel reason when the region is not found.
RegionNotFound CancelReasonType = "region not found"
// EpochNotMatch is the cancel reason when the region epoch is not match.
EpochNotMatch CancelReasonType = "epoch not match"
)

// Operator contains execution steps generated by scheduler.
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type Operator struct {
Expand Down
28 changes: 26 additions & 2 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next
if !ok || op == nil {
return nil, true
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
// Check the operator lightly. It cant't dispatch the op for some scenario.
var reason operator.CancelReasonType
r, reason = oc.checkOperatorLightly(op)
if len(reason) != 0 {
_ = oc.removeOperatorLocked(op)
if op.Cancel() {
log.Warn("remove operator because region disappeared",
Expand Down Expand Up @@ -296,6 +298,7 @@ func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int
if isMerge {
// count two merge operators as one, so wopStatus.ops[desc] should
// not be updated here
// TODO: call checkAddOperator ...
i++
added++
oc.wop.PutOperator(ops[i])
Expand Down Expand Up @@ -433,6 +436,27 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato
return !expired
}

// checkOperatorLightly checks whether the ops can be dispatched in Controller::pollNeedDispatchRegion.
// The operators can't be dispatched for some scenarios, such as region disappeared, region changed ...
// `region` is the target region of `op`.
func (oc *OperatorController) checkOperatorLightly(op *operator.Operator) (*core.RegionInfo, operator.CancelReasonType) {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
operatorCounter.WithLabelValues(op.Desc(), "not-found").Inc()
return nil, operator.RegionNotFound
}

// It may be suitable for all kinds of operator but not merge-region.
// But to be cautions, it only takes effect on merge-region currently.
// If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed.
// The changing for conf_version of epoch doesn't modify the region key range, skip it.
if (op.Kind()&operator.OpMerge != 0) && region.GetRegionEpoch().GetVersion() > op.RegionEpoch().GetVersion() {
operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc()
return nil, operator.EpochNotMatch
}
return region, ""
}

func isHigherPriorityOperator(new, old *operator.Operator) bool {
return new.GetPriorityLevel() > old.GetPriorityLevel()
}
Expand Down
128 changes: 128 additions & 0 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,134 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() {
suite.False(next)
}

// issue #7992
func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() {
re := suite.Require()
opts := mockconfig.NewTestOptions()

Check failure on line 373 in server/schedule/operator_controller_test.go

View workflow job for this annotation

GitHub Actions / statics

undefined: mockconfig
cluster := mockcluster.NewCluster(suite.ctx, opts)
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewOperatorController(suite.ctx, cluster, stream)
cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})

source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1})
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(source)
target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1})
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(target)

ops, err := operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge)
re.NoError(err)
re.Len(ops, 2)
re.Equal(2, controller.AddWaitingOperator(ops...))
// Change next push time to now, it's used to make test case faster.
controller.opNotifierQueue[0].time = time.Now()

// first poll gets source region op.
r, next := controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)

// second poll gets target region op.
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, target)

// third poll removes the two merge-region ops.
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Len(controller.opNotifierQueue, 1)
re.Len(controller.operators, 1)
re.Empty(controller.wop.ListOperator())
re.NotNil(controller.opRecords.Get(101))

// fourth poll removes target region op from opNotifierQueue
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, target)
re.Len(controller.opNotifierQueue, 1)
delete(controller.operators, 101)
delete(controller.operators, 102)
_ = heap.Pop(&controller.opNotifierQueue).(*operatorWithTime)
re.Len(controller.opNotifierQueue, 0)

// Add the two ops to waiting operators again.
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0}
controller.opRecords.ttl.Remove(101)
controller.opRecords.ttl.Remove(102)
ops, err = operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge)
re.NoError(err)
re.Equal(2, controller.AddWaitingOperator(ops...))
re.Len(controller.opNotifierQueue, 2)
// change the target RegionEpoch
// first poll gets source region from opNotifierQueue
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)

r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Len(controller.opNotifierQueue, 1)
re.Len(controller.operators, 1)
re.Empty(controller.wop.ListOperator())
re.NotNil(controller.opRecords.Get(102))

controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)
re.Len(controller.opNotifierQueue, 1)
}

func (suite *operatorControllerTestSuite) TestCheckOperatorLightly() {
re := suite.Require()
opts := mockconfig.NewTestOptions()

Check failure on line 460 in server/schedule/operator_controller_test.go

View workflow job for this annotation

GitHub Actions / statics

undefined: mockconfig (typecheck)
cluster := mockcluster.NewCluster(suite.ctx, opts)
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewOperatorController(suite.ctx, cluster, stream)
cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})

source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1})
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(source)
target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1})
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(target)

ops, err := operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge)
re.NoError(err)
re.Len(ops, 2)

// check successfully
r, reason := controller.checkOperatorLightly(ops[0])
re.Empty(reason)
re.Equal(r, source)

// check failed because of region disappeared
cluster.RemoveRegion(target)
r, reason = controller.checkOperatorLightly(ops[1])
re.Nil(r)
re.Equal(reason, operator.RegionNotFound)

// check failed because of verions of region epoch changed
cluster.PutRegion(target)
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
r, reason = controller.checkOperatorLightly(ops[0])
re.Nil(r)
re.Equal(reason, operator.EpochNotMatch)
}

func (suite *operatorControllerTestSuite) TestStoreLimit() {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(suite.ctx, opt)
Expand Down

0 comments on commit acecac4

Please sign in to comment.