diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 38a6141cd26..3983e9c345d 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -15,9 +15,12 @@ package schedulers import ( + "net/http" "strconv" + "sync/atomic" "time" + "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -26,6 +29,8 @@ import ( "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/unrolled/render" "go.uber.org/zap" ) @@ -54,11 +59,28 @@ type evictSlowTrendSchedulerConfig struct { evictCandidate slowCandidate // Last chosen candidate for eviction. lastEvictCandidate slowCandidate - + // Duration gap for recovering the candidate, unit: s. + RecoveryDurationGap uint64 `json:"recovery-duration"` // Only evict one store for now EvictedStores []uint64 `json:"evict-by-trend-stores"` } +func initEvictSlowTrendSchedulerConfig(storage endpoint.ConfigStorage) *evictSlowTrendSchedulerConfig { + return &evictSlowTrendSchedulerConfig{ + storage: storage, + evictCandidate: slowCandidate{}, + lastEvictCandidate: slowCandidate{}, + RecoveryDurationGap: defaultRecoveryDurationGap, + EvictedStores: make([]uint64, 0), + } +} + +func (conf *evictSlowTrendSchedulerConfig) Clone() *evictSlowTrendSchedulerConfig { + return &evictSlowTrendSchedulerConfig{ + RecoveryDurationGap: atomic.LoadUint64(&conf.RecoveryDurationGap), + } +} + func (conf *evictSlowTrendSchedulerConfig) Persist() error { name := conf.getSchedulerName() data, err := EncodeConfig(conf) @@ -116,6 +138,15 @@ func (conf *evictSlowTrendSchedulerConfig) lastCandidateCapturedSecs() uint64 { return DurationSinceAsSecs(conf.lastEvictCandidate.captureTS) } +// readyForRecovery checks whether the last cpatured candidate is ready for recovery. +func (conf *evictSlowTrendSchedulerConfig) readyForRecovery() bool { + recoveryDurationGap := atomic.LoadUint64(&conf.RecoveryDurationGap) + failpoint.Inject("transientRecoveryGap", func() { + recoveryDurationGap = 0 + }) + return conf.lastCandidateCapturedSecs() >= recoveryDurationGap +} + func (conf *evictSlowTrendSchedulerConfig) captureCandidate(id uint64) { conf.evictCandidate = slowCandidate{ storeID: id, @@ -162,9 +193,52 @@ func (conf *evictSlowTrendSchedulerConfig) clearAndPersist(cluster sche.Schedule return oldID, conf.Persist() } +type evictSlowTrendHandler struct { + rd *render.Render + config *evictSlowTrendSchedulerConfig +} + +func newEvictSlowTrendHandler(config *evictSlowTrendSchedulerConfig) http.Handler { + h := &evictSlowTrendHandler{ + config: config, + rd: render.New(render.Options{IndentJSON: true}), + } + router := mux.NewRouter() + router.HandleFunc("/config", h.UpdateConfig).Methods(http.MethodPost) + router.HandleFunc("/list", h.ListConfig).Methods(http.MethodGet) + return router +} + +func (handler *evictSlowTrendHandler) UpdateConfig(w http.ResponseWriter, r *http.Request) { + var input map[string]interface{} + if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil { + return + } + recoveryDurationGapFloat, ok := input["recovery-duration"].(float64) + if !ok { + handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error()) + return + } + recoveryDurationGap := (uint64)(recoveryDurationGapFloat) + prevRecoveryDurationGap := atomic.LoadUint64(&handler.config.RecoveryDurationGap) + atomic.StoreUint64(&handler.config.RecoveryDurationGap, recoveryDurationGap) + log.Info("evict-slow-trend-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap)) + handler.rd.JSON(w, http.StatusOK, nil) +} + +func (handler *evictSlowTrendHandler) ListConfig(w http.ResponseWriter, r *http.Request) { + conf := handler.config.Clone() + handler.rd.JSON(w, http.StatusOK, conf) +} + type evictSlowTrendScheduler struct { *BaseScheduler - conf *evictSlowTrendSchedulerConfig + conf *evictSlowTrendSchedulerConfig + handler http.Handler +} + +func (s *evictSlowTrendScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.handler.ServeHTTP(w, r) } func (s *evictSlowTrendScheduler) GetName() string { @@ -244,7 +318,7 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun // slow node next time. log.Info("store evicted by slow trend has been removed", zap.Uint64("store-id", store.GetID())) storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_removed").Inc() - } else if checkStoreCanRecover(cluster, store, s.conf.lastCandidateCapturedSecs()) { + } else if checkStoreCanRecover(cluster, store) && s.conf.readyForRecovery() { log.Info("store evicted by slow trend has been recovered", zap.Uint64("store-id", store.GetID())) storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_recovered").Inc() } else { @@ -301,9 +375,11 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun } func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSlowTrendSchedulerConfig) Scheduler { + handler := newEvictSlowTrendHandler(conf) return &evictSlowTrendScheduler{ BaseScheduler: NewBaseScheduler(opController), conf: conf, + handler: handler, } } @@ -453,7 +529,7 @@ func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.Stor return slowerThanStoresNum >= expected } -func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo, recoveryGap uint64) bool { +func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { /* // // This might not be necessary, @@ -473,7 +549,7 @@ func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo, storeSlowTrendActionStatusGauge.WithLabelValues("recover.judging:got-event").Inc() } */ - return checkStoreFasterThanOthers(cluster, target) && checkStoreReadyForRecover(target, recoveryGap) + return checkStoreFasterThanOthers(cluster, target) } func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { @@ -507,19 +583,6 @@ func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.Stor return fasterThanStores >= expected } -// checkStoreReadyForRecover checks whether the given target store is ready for recover. -func checkStoreReadyForRecover(target *core.StoreInfo, recoveryGap uint64) bool { - durationGap := uint64(defaultRecoveryDurationGap) - failpoint.Inject("transientRecoveryGap", func() { - durationGap = 0 - }) - if targetSlowTrend := target.GetSlowTrend(); targetSlowTrend != nil { - // TODO: setting the recovery time in SlowTrend - return recoveryGap >= durationGap - } - return true -} - // DurationSinceAsSecs returns the duration gap since the given startTS, unit: s. func DurationSinceAsSecs(startTS time.Time) uint64 { return uint64(time.Since(startTS).Seconds()) diff --git a/pkg/schedule/schedulers/evict_slow_trend_test.go b/pkg/schedule/schedulers/evict_slow_trend_test.go index 2ff86524bdc..c6ad058455f 100644 --- a/pkg/schedule/schedulers/evict_slow_trend_test.go +++ b/pkg/schedule/schedulers/evict_slow_trend_test.go @@ -93,7 +93,7 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendBasicFuncs() { suite.Equal(*lastCapturedCandidate, es2.conf.evictCandidate) suite.Equal(es2.conf.candidateCapturedSecs(), uint64(0)) suite.Equal(es2.conf.lastCandidateCapturedSecs(), uint64(0)) - suite.False(checkStoreReadyForRecover(store, es2.conf.lastCandidateCapturedSecs())) + suite.False(es2.conf.readyForRecovery()) recoverTS := lastCapturedCandidate.recoverTS suite.True(recoverTS.After(lastCapturedCandidate.captureTS)) // Pop captured store 1 and mark it has recovered. diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index e6603778796..dde10610643 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -466,7 +466,7 @@ func schedulersRegister() { }) RegisterScheduler(EvictSlowTrendType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { - conf := &evictSlowTrendSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0), evictCandidate: slowCandidate{}, lastEvictCandidate: slowCandidate{}} + conf := initEvictSlowTrendSchedulerConfig(storage) if err := decoder(conf); err != nil { return nil, err } diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index b7cce7722c5..c5b118a9f5e 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -407,6 +407,21 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil) re.Contains(echo, "Success!") + // test evict-slow-trend scheduler config + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-slow-trend-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + re.Contains(echo, "evict-slow-trend-scheduler") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-trend-scheduler", "set", "recovery-duration", "100"}, nil) + re.Contains(echo, "Success!") + conf = make(map[string]interface{}) + mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-trend-scheduler", "show"}, &conf) + re.Equal(100., conf["recovery-duration"]) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-slow-trend-scheduler"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil) + re.NotContains(echo, "evict-slow-trend-scheduler") + // test show scheduler with paused and disabled status. checkSchedulerWithStatusCommand := func(status string, expected []string) { var schedulers []string diff --git a/tools/pd-ctl/pdctl/command/scheduler.go b/tools/pd-ctl/pdctl/command/scheduler.go index da37b459258..40bf9a07a4d 100644 --- a/tools/pd-ctl/pdctl/command/scheduler.go +++ b/tools/pd-ctl/pdctl/command/scheduler.go @@ -499,6 +499,7 @@ func NewConfigSchedulerCommand() *cobra.Command { newConfigGrantHotRegionCommand(), newConfigBalanceLeaderCommand(), newSplitBucketCommand(), + newConfigEvictSlowTrendCommand(), ) return c } @@ -775,6 +776,25 @@ func setShuffleRegionSchedulerRolesCommandFunc(cmd *cobra.Command, args []string cmd.Println("Success!") } +func newConfigEvictSlowTrendCommand() *cobra.Command { + c := &cobra.Command{ + Use: "evict-slow-trend-scheduler", + Short: "evict-slow-trend-scheduler config", + Run: listSchedulerConfigCommandFunc, + } + + c.AddCommand(&cobra.Command{ + Use: "show", + Short: "list the config item", + Run: listSchedulerConfigCommandFunc, + }, &cobra.Command{ + Use: "set ", + Short: "set the config item", + Run: func(cmd *cobra.Command, args []string) { postSchedulerConfigCommandFunc(cmd, c.Name(), args) }, + }) + return c +} + // NewDescribeSchedulerCommand returns command to describe the scheduler. func NewDescribeSchedulerCommand() *cobra.Command { c := &cobra.Command{