Skip to content

Commit

Permalink
mcs: support scheduler config and enable some tests
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Oct 25, 2023
1 parent 9f19361 commit 14e2985
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 75 deletions.
15 changes: 15 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
sche "github.com/tikv/pd/pkg/schedule/core"
Expand Down Expand Up @@ -128,6 +129,7 @@ func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
router.GET("/diagnostic/:name", getDiagnosticResult)
router.GET("/config/:name/:suffix", getSchedulerConfigByName)
// TODO: in the future, we should split pauseOrResumeScheduler to two different APIs.
// And we need to do one-to-two forwarding in the API middleware.
router.POST("/:name", pauseOrResumeScheduler)
Expand Down Expand Up @@ -385,6 +387,19 @@ func getSchedulers(c *gin.Context) {
c.IndentedJSON(http.StatusOK, output)
}

func getSchedulerConfigByName(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
handlers := svr.GetCoordinator().GetSchedulersController().GetSchedulerHandlers()
name := c.Param("name")
if _, ok := handlers[name]; !ok {
c.String(http.StatusNotFound, errs.ErrSchedulerNotFound.Error())
return
}
suffix := c.Param("suffix")
c.Request.URL.Path = "/" + suffix
handlers[name].ServeHTTP(c.Writer, c.Request)
}

// @Tags schedulers
// @Summary List schedulers diagnostic result.
// @Produce json
Expand Down
14 changes: 13 additions & 1 deletion pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,19 @@ func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{},
}
return disabledSchedulers, nil
default:
return schedulers, nil
// The default scheduler could not be deleted, it could only be disabled.
// TODO: Should we distinguish between disabled and removed schedulers?
var enabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err
}
if !disabled {
enabledSchedulers = append(enabledSchedulers, scheduler)
}
}
return enabledSchedulers, nil
}
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,29 @@ func StatusOK(re *require.Assertions) func([]byte, int, http.Header) {

// StatusNotOK is used to check whether http response code is not equal http.StatusOK.
func StatusNotOK(re *require.Assertions) func([]byte, int, http.Header) {
return func(_ []byte, i int, _ http.Header) {
re.NotEqual(http.StatusOK, i)
return func(resp []byte, i int, _ http.Header) {
re.NotEqual(http.StatusOK, i, "resp: "+string(resp))
}
}

// ExtractJSON is used to check whether given data can be extracted successfully.
func ExtractJSON(re *require.Assertions, data interface{}) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.NoError(json.Unmarshal(res, data))
return func(resp []byte, _ int, _ http.Header) {
re.NoError(json.Unmarshal(resp, data), "resp: "+string(resp))
}
}

// StringContain is used to check whether response context contains given string.
func StringContain(re *require.Assertions, sub string) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.Contains(string(res), sub)
return func(resp []byte, _ int, _ http.Header) {
re.Contains(string(resp), sub, "resp: "+string(resp))
}
}

// StringEqual is used to check whether response context equal given string.
func StringEqual(re *require.Assertions, str string) func([]byte, int, http.Header) {
return func(res []byte, _ int, _ http.Header) {
re.Contains(string(res), str)
return func(resp []byte, _ int, _ http.Header) {
re.Contains(string(resp), str, "resp: "+string(resp))
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
// "/schedulers", http.MethodGet
// "/schedulers/{name}", http.MethodPost
// "/schedulers/diagnostic/{name}", http.MethodGet
// "/scheduler-config", http.MethodGet
// "/hotspot/regions/read", http.MethodGet
// "/hotspot/regions/write", http.MethodGet
// "/hotspot/regions/history", http.MethodGet
Expand Down Expand Up @@ -90,6 +91,11 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/schedulers",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/scheduler-config",
scheapi.APIPathPrefix+"/schedulers/config",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers/", // Note: this means "/schedulers/{name}"
scheapi.APIPathPrefix+"/schedulers",
Expand Down
5 changes: 2 additions & 3 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,11 +789,10 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
},
StoreConfig: *o.GetStoreConfig(),
}
err := storage.SaveConfig(cfg)
failpoint.Inject("persistFail", func() {
err = errors.New("fail to persist")
failpoint.Return(errors.New("fail to persist"))
})
return err
return storage.SaveConfig(cfg)
}

// Reload reloads the configuration from the storage.
Expand Down
12 changes: 12 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (suite *apiTestSuite) TestAPIForward() {
// "/schedulers", http.MethodGet
// "/schedulers/{name}", http.MethodPost
// "/schedulers/diagnostic/{name}", http.MethodGet
// "/scheduler-config/", http.MethodGet
// Should not redirect:
// "/schedulers", http.MethodPost
// "/schedulers/{name}", http.MethodDelete
Expand All @@ -189,6 +190,17 @@ func (suite *apiTestSuite) TestAPIForward() {
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)

schedulers := []string{
"balance-leader-scheduler",
"balance-witness-scheduler",
"balance-hot-region-scheduler",
}
for _, schedulerName := range schedulers {
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s/%s/%s", urlPrefix, "scheduler-config", schedulerName, "list"), &resp,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
suite.NoError(err)
}

err = testutil.CheckPostJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "schedulers"), pauseArgs,
testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader))
re.NoError(err)
Expand Down
91 changes: 54 additions & 37 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package scheduler_test
import (
"context"
"encoding/json"
"reflect"
"strings"
"testing"
"time"

Expand All @@ -43,8 +45,7 @@ func TestSchedulerTestSuite(t *testing.T) {

func (suite *schedulerTestSuite) TestScheduler() {
env := tests.NewSchedulingTestEnvironment(suite.T())
// Fixme: use RunTestInTwoModes when sync deleted scheduler is supported.
env.RunTestInPDMode(suite.checkScheduler)
env.RunTestInTwoModes(suite.checkScheduler)
env.RunTestInTwoModes(suite.checkSchedulerDiagnostic)
}

Expand Down Expand Up @@ -83,20 +84,30 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {
}

checkSchedulerCommand := func(args []string, expected map[string]bool) {
if args != nil {
mustExec(re, cmd, args, nil)
}
var schedulers []string
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers)
for _, scheduler := range schedulers {
re.True(expected[scheduler])
}
testutil.Eventually(re, func() bool {
if args != nil {
mustExec(re, cmd, args, nil)
}
var schedulers []string
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, &schedulers)
if len(schedulers) != len(expected) {
return false
}
for _, scheduler := range schedulers {
if _, ok := expected[scheduler]; !ok {
return false
}
}
return true
})
}

checkSchedulerConfigCommand := func(expectedConfig map[string]interface{}, schedulerName string) {
configInfo := make(map[string]interface{})
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo)
re.Equal(expectedConfig, configInfo)
testutil.Eventually(re, func() bool {
configInfo := make(map[string]interface{})
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName}, &configInfo)
return reflect.DeepEqual(expectedConfig, configInfo)
})
}

leaderServer := cluster.GetLeaderServer()
Expand All @@ -106,7 +117,6 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {

// note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region.
tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10))
time.Sleep(3 * time.Second)

// scheduler show command
expected := map[string]bool{
Expand All @@ -120,7 +130,6 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {

// scheduler delete command
args := []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}
time.Sleep(10 * time.Second)
expected = map[string]bool{
"balance-leader-scheduler": true,
"balance-hot-region-scheduler": true,
Expand Down Expand Up @@ -160,8 +169,11 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {
checkSchedulerCommand(args, expected)

// check update success
expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}}
checkSchedulerConfigCommand(expectedConfig, schedulers[idx])
// FIXME: remove this check after scheduler config is updated
if cluster.GetSchedulingPrimaryServer() == nil {
expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}}
checkSchedulerConfigCommand(expectedConfig, schedulers[idx])
}

// scheduler delete command
args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]}
Expand Down Expand Up @@ -271,6 +283,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil)
re.NotContains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil)
Expand Down Expand Up @@ -412,24 +426,30 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) {
for _, schedulerName := range evictSlownessSchedulers {
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", schedulerName}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
re.Contains(echo, schedulerName)
testutil.Eventually(re, func() bool {
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
return strings.Contains(echo, schedulerName)
})
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "set", "recovery-duration", "100"}, nil)
re.Contains(echo, "Success!")
conf = make(map[string]interface{})
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", schedulerName, "show"}, &conf)
re.Equal(100., conf["recovery-duration"])
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", schedulerName}, nil)
re.Contains(echo, "Success!")
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
re.NotContains(echo, schedulerName)
testutil.Eventually(re, func() bool {
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show"}, nil)
return !strings.Contains(echo, schedulerName)
})
}

// test show scheduler with paused and disabled status.
checkSchedulerWithStatusCommand := func(status string, expected []string) {
var schedulers []string
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers)
re.Equal(expected, schedulers)
testutil.Eventually(re, func() bool {
var schedulers []string
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "show", "--status", status}, &schedulers)
return reflect.DeepEqual(expected, schedulers)
})
}

mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"})
Expand Down Expand Up @@ -469,13 +489,14 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu
cmd := pdctlCmd.GetRootCmd()

checkSchedulerDescribeCommand := func(schedulerName, expectedStatus, expectedSummary string) {
result := make(map[string]interface{})
testutil.Eventually(re, func() bool {
mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result)
return len(result) != 0
}, testutil.WithTickInterval(50*time.Millisecond))
re.Equal(expectedStatus, result["status"])
re.Equal(expectedSummary, result["summary"])
result := make(map[string]interface{})
testutil.Eventually(re, func() bool {
mightExec(re, cmd, []string{"-u", pdAddr, "scheduler", "describe", schedulerName}, &result)
return len(result) != 0
}, testutil.WithTickInterval(50*time.Millisecond))
return result["status"] == expectedStatus && result["summary"] == expectedSummary
})
}

stores := []*metapb.Store{
Expand Down Expand Up @@ -506,18 +527,14 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu

// note: because pdqsort is a unstable sort algorithm, set ApproximateSize for this region.
tests.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"), core.SetApproximateSize(10))
time.Sleep(3 * time.Second)

echo := mustExec(re, cmd, []string{"-u", pdAddr, "config", "set", "enable-diagnostic", "true"}, nil)
re.Contains(echo, "Success!")
checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ")

// scheduler delete command
// Fixme: use RunTestInTwoModes when sync deleted scheduler is supported.
if sche := cluster.GetSchedulingPrimaryServer(); sche == nil {
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil)
checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "")
}
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil)
checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "")

mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil)
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil)
Expand All @@ -530,7 +547,7 @@ func mustExec(re *require.Assertions, cmd *cobra.Command, args []string, v inter
if v == nil {
return string(output)
}
re.NoError(json.Unmarshal(output, v))
re.NoError(json.Unmarshal(output, v), string(output))
return ""
}

Expand Down
Loading

0 comments on commit 14e2985

Please sign in to comment.