diff --git a/pkg/schedule/operator_controller.go b/pkg/schedule/operator_controller.go index 692584cc14b..fa7a3b2db71 100644 --- a/pkg/schedule/operator_controller.go +++ b/pkg/schedule/operator_controller.go @@ -213,10 +213,16 @@ func (oc *OperatorController) pollNeedDispatchRegion() (r *core.RegionInfo, next if !ok || op == nil { return nil, true } - r = oc.cluster.GetRegion(regionID) - if r == nil { + // Check the operator lightly. It cant't dispatch the op for some scenario. + var reason CancelReasonType + r, reason = oc.checkOperatorLightly(op) + if len(reason) != 0 { _ = oc.removeOperatorLocked(op) +<<<<<<< HEAD:pkg/schedule/operator_controller.go if op.Cancel() { +======= + if op.Cancel(reason) { +>>>>>>> a2b0e3c6f (schedule: add check action when poll the opeators from opNotifierQueue (#8010)):pkg/schedule/operator/operator_controller.go log.Warn("remove operator because region disappeared", zap.Uint64("region-id", op.RegionID()), zap.Stringer("operator", op)) @@ -297,6 +303,7 @@ func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int if isMerge { // count two merge operators as one, so wopStatus.ops[desc] should // not be updated here + // TODO: call checkAddOperator ... i++ added++ oc.wop.PutOperator(ops[i]) @@ -434,7 +441,32 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato return !expired } +<<<<<<< HEAD:pkg/schedule/operator_controller.go func isHigherPriorityOperator(new, old *operator.Operator) bool { +======= +// checkOperatorLightly checks whether the ops can be dispatched in Controller::pollNeedDispatchRegion. +// The operators can't be dispatched for some scenarios, such as region disappeared, region changed ... +// `region` is the target region of `op`. +func (oc *Controller) checkOperatorLightly(op *Operator) (*core.RegionInfo, CancelReasonType) { + region := oc.cluster.GetRegion(op.RegionID()) + if region == nil { + operatorCounter.WithLabelValues(op.Desc(), "not-found").Inc() + return nil, RegionNotFound + } + + // It may be suitable for all kinds of operator but not merge-region. + // But to be cautions, it only takes effect on merge-region currently. + // If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed. + // The changing for conf_version of epoch doesn't modify the region key range, skip it. + if (op.Kind()&OpMerge != 0) && region.GetRegionEpoch().GetVersion() > op.RegionEpoch().GetVersion() { + operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc() + return nil, EpochNotMatch + } + return region, "" +} + +func isHigherPriorityOperator(new, old *Operator) bool { +>>>>>>> a2b0e3c6f (schedule: add check action when poll the opeators from opNotifierQueue (#8010)):pkg/schedule/operator/operator_controller.go return new.GetPriorityLevel() > old.GetPriorityLevel() } diff --git a/pkg/schedule/operator_controller_test.go b/pkg/schedule/operator_controller_test.go index aa65d74d284..24c52853c39 100644 --- a/pkg/schedule/operator_controller_test.go +++ b/pkg/schedule/operator_controller_test.go @@ -398,6 +398,131 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() { suite.False(next) } +// issue #7992 +func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() { + re := suite.Require() + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(suite.ctx, opts) + stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) + controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream) + cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + + source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(source) + target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(target) + + ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) + re.NoError(err) + re.Len(ops, 2) + re.Equal(2, controller.AddWaitingOperator(ops...)) + // Change next push time to now, it's used to make test case faster. + controller.opNotifierQueue[0].time = time.Now() + + // first poll gets source region op. + r, next := controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, source) + + // second poll gets target region op. + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, target) + + // third poll removes the two merge-region ops. + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Len(controller.opNotifierQueue, 1) + re.Empty(controller.operators) + re.Empty(controller.wop.ListOperator()) + re.NotNil(controller.records.Get(101)) + re.NotNil(controller.records.Get(102)) + + // fourth poll removes target region op from opNotifierQueue + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Empty(controller.opNotifierQueue) + + // Add the two ops to waiting operators again. + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0} + controller.records.ttl.Remove(101) + controller.records.ttl.Remove(102) + ops, err = CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) + re.NoError(err) + re.Equal(2, controller.AddWaitingOperator(ops...)) + // change the target RegionEpoch + // first poll gets source region from opNotifierQueue + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Equal(r, source) + + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Len(controller.opNotifierQueue, 1) + re.Empty(controller.operators) + re.Empty(controller.wop.ListOperator()) + re.NotNil(controller.records.Get(101)) + re.NotNil(controller.records.Get(102)) + + controller.opNotifierQueue[0].time = time.Now() + r, next = controller.pollNeedDispatchRegion() + re.True(next) + re.Nil(r) + re.Empty(controller.opNotifierQueue) +} + +func (suite *operatorControllerTestSuite) TestCheckOperatorLightly() { + re := suite.Require() + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(suite.ctx, opts) + stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */) + controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream) + cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + + source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(source) + target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1}) + target.GetMeta().RegionEpoch = &metapb.RegionEpoch{} + cluster.PutRegion(target) + + ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge) + re.NoError(err) + re.Len(ops, 2) + + // check successfully + r, reason := controller.checkOperatorLightly(ops[0]) + re.Empty(reason) + re.Equal(r, source) + + // check failed because of region disappeared + cluster.RemoveRegion(target) + r, reason = controller.checkOperatorLightly(ops[1]) + re.Nil(r) + re.Equal(reason, RegionNotFound) + + // check failed because of verions of region epoch changed + cluster.PutRegion(target) + source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1} + r, reason = controller.checkOperatorLightly(ops[0]) + re.Nil(r) + re.Equal(reason, EpochNotMatch) +} + func (suite *operatorControllerTestSuite) TestStoreLimit() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt)