Skip to content

Commit

Permalink
*: reset config if the input is invalid (#8632)
Browse files Browse the repository at this point in the history
close #8619

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Sep 24, 2024
1 parent edb43c0 commit 6b927e1
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 6 deletions.
10 changes: 10 additions & 0 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ func (conf *evictLeaderSchedulerConfig) removeStoreLocked(id uint64) (bool, erro
return false, errs.ErrScheduleConfigNotExist.FastGenByArgs()
}

func (conf *evictLeaderSchedulerConfig) removeStore(id uint64) {
conf.Lock()
defer conf.Unlock()
// if the store is not existed, no need to resume leader transfer
_, _ = conf.removeStoreLocked(id)
}

func (conf *evictLeaderSchedulerConfig) resetStoreLocked(id uint64, keyRange []core.KeyRange) {
if err := conf.cluster.PauseLeaderTransfer(id); err != nil {
log.Error("pause leader transfer failed", zap.Uint64("store-id", id), errs.ZapError(err))
Expand Down Expand Up @@ -408,6 +415,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R
batchFloat, ok := input["batch"].(float64)
if ok {
if batchFloat < 1 || batchFloat > 10 {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]")
return
}
Expand All @@ -417,6 +425,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R
ranges, ok := (input["ranges"]).([]string)
if ok {
if !inputHasStoreID {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, errs.ErrSchedulerConfig.FastGenByArgs("id"))
return
}
Expand All @@ -426,6 +435,7 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R

newRanges, err = getKeyRanges(ranges)
if err != nil {
handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,13 @@ func (handler *grantLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R

err := handler.config.buildWithArgs(args)
if err != nil {
_, _ = handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
err = handler.config.persist()
if err != nil {
handler.config.removeStore(id)
_, _ = handler.config.removeStore(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down
15 changes: 10 additions & 5 deletions plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,18 @@ func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.R

err := handler.config.BuildWithArgs(args)
if err != nil {
handler.config.mu.Lock()
handler.config.cluster.ResumeLeaderTransfer(id)
handler.config.mu.Unlock()
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
err = handler.config.Persist()
if err != nil {
handler.config.mu.Lock()
delete(handler.config.StoreIDWitRanges, id)
handler.config.cluster.ResumeLeaderTransfer(id)
handler.config.mu.Unlock()
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -301,22 +308,20 @@ func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.R

handler.config.mu.Lock()
defer handler.config.mu.Unlock()
_, exists := handler.config.StoreIDWitRanges[id]
ranges, exists := handler.config.StoreIDWitRanges[id]
if !exists {
handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist"))
return
}
delete(handler.config.StoreIDWitRanges, id)
handler.config.cluster.ResumeLeaderTransfer(id)

handler.config.mu.Unlock()
if err := handler.config.Persist(); err != nil {
handler.config.mu.Lock()
handler.config.StoreIDWitRanges[id] = ranges
_ = handler.config.cluster.PauseLeaderTransfer(id)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
handler.config.mu.Lock()

var resp any
if len(handler.config.StoreIDWitRanges) == 0 {
resp = noStoreInSchedulerInfo
Expand Down
54 changes: 54 additions & 0 deletions tools/pd-ctl/tests/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,60 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *pdTests.TestC
checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "")
}

func (suite *schedulerTestSuite) TestEvictLeaderScheduler() {
// FIXME: API mode may have the problem
suite.env.RunTestInPDMode(suite.checkEvictLeaderScheduler)
}

func (suite *schedulerTestSuite) checkEvictLeaderScheduler(cluster *pdTests.TestCluster) {
re := suite.Require()
pdAddr := cluster.GetConfig().GetClientURL()
cmd := ctl.GetRootCmd()

stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 2,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 3,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 4,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
}
for _, store := range stores {
pdTests.MustPutStore(re, cluster, store)
}

pdTests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"))
output, err := tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
output, err = tests.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}...)
re.NoError(err)
re.Contains(string(output), "Success!")
}

func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v any) string {
output, err := tests.ExecuteCommand(cmd, args...)
re.NoError(err)
Expand Down

0 comments on commit 6b927e1

Please sign in to comment.