Skip to content

Commit

Permalink
mcs: make scheduling server support checker and scheduler http interf…
Browse files Browse the repository at this point in the history
…ace (tikv#7131)

ref tikv#5839

Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 authored and rleungx committed Dec 1, 2023
1 parent 3af8f30 commit c547876
Show file tree
Hide file tree
Showing 18 changed files with 1,013 additions and 687 deletions.
173 changes: 100 additions & 73 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"net/http"
"strconv"
"sync"
"time"

"github.com/gin-contrib/cors"
"github.com/gin-contrib/gzip"
Expand Down Expand Up @@ -129,12 +128,17 @@ func (s *Service) RegisterAdminRouter() {
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
router.GET("/diagnostic/:name", getDiagnosticResult)
// TODO: in the future, we should split pauseOrResumeScheduler to two different APIs.
// And we need to do one-to-two forwarding in the API middleware.
router.POST("/:name", pauseOrResumeScheduler)
}

// RegisterCheckersRouter registers the router of the checkers handler.
func (s *Service) RegisterCheckersRouter() {
router := s.root.Group("checkers")
router.GET("/:name", getCheckerByName)
router.POST("/:name", pauseOrResumeChecker)
}

// RegisterOperatorsRouter registers the router of the operators handler.
Expand Down Expand Up @@ -304,24 +308,54 @@ func createOperator(c *gin.Context) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /checkers/{name} [get]
func getCheckerByName(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
handler := c.MustGet(handlerKey).(*handler.Handler)
name := c.Param("name")
co := svr.GetCoordinator()
isPaused, err := co.IsCheckerPaused(name)
output, err := handler.GetCheckerStatus(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
output := map[string]bool{
"paused": isPaused,
}
c.IndentedJSON(http.StatusOK, output)
}

type schedulerPausedPeriod struct {
Name string `json:"name"`
PausedAt time.Time `json:"paused_at"`
ResumeAt time.Time `json:"resume_at"`
// FIXME: details of input json body params
// @Tags checker
// @Summary Pause or resume region merge.
// @Accept json
// @Param name path string true "The name of the checker."
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "Pause or resume the scheduler successfully."
// @Failure 400 {string} string "Bad format request."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /checker/{name} [post]
func pauseOrResumeChecker(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
var input map[string]int
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

name := c.Param("name")
t, ok := input["delay"]
if !ok {
c.String(http.StatusBadRequest, "missing pause time")
return
}
if t < 0 {
c.String(http.StatusBadRequest, "delay cannot be negative")
return
}
if err := handler.PauseOrResumeChecker(name, int64(t)); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if t == 0 {
c.String(http.StatusOK, "Resume the checker successfully.")
} else {
c.String(http.StatusOK, "Pause the checker successfully.")
}
}

// @Tags schedulers
Expand All @@ -331,70 +365,63 @@ type schedulerPausedPeriod struct {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers [get]
func getSchedulers(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
co := svr.GetCoordinator()
sc := co.GetSchedulersController()
schedulers := sc.GetSchedulerNames()

handler := c.MustGet(handlerKey).(*handler.Handler)
status := c.Query("status")
_, needTS := c.GetQuery("timestamp")
switch status {
case "paused":
var pausedSchedulers []string
pausedPeriods := []schedulerPausedPeriod{}
for _, scheduler := range schedulers {
paused, err := sc.IsSchedulerPaused(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if paused {
if needTS {
s := schedulerPausedPeriod{
Name: scheduler,
PausedAt: time.Time{},
ResumeAt: time.Time{},
}
pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.PausedAt = time.Unix(pausedAt, 0)
resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.ResumeAt = time.Unix(resumeAt, 0)
pausedPeriods = append(pausedPeriods, s)
} else {
pausedSchedulers = append(pausedSchedulers, scheduler)
}
}
}
if needTS {
c.IndentedJSON(http.StatusOK, pausedPeriods)
} else {
c.IndentedJSON(http.StatusOK, pausedSchedulers)
}
output, err := handler.GetSchedulerByStatus(status, needTS)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, output)
}

// @Tags schedulers
// @Summary List schedulers diagnostic result.
// @Produce json
// @Success 200 {array} string
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/diagnostic/{name} [get]
func getDiagnosticResult(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
name := c.Param("name")
result, err := handler.GetDiagnosticResult(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, result)
}

// FIXME: details of input json body params
// @Tags scheduler
// @Summary Pause or resume a scheduler.
// @Accept json
// @Param name path string true "The name of the scheduler."
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "Pause or resume the scheduler successfully."
// @Failure 400 {string} string "Bad format request."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/{name} [post]
func pauseOrResumeScheduler(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

var input map[string]int64
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

name := c.Param("name")
t, ok := input["delay"]
if !ok {
c.String(http.StatusBadRequest, "missing pause time")
return
}
if err := handler.PauseOrResumeScheduler(name, t); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
case "disabled":
var disabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if disabled {
disabledSchedulers = append(disabledSchedulers, scheduler)
}
}
c.IndentedJSON(http.StatusOK, disabledSchedulers)
default:
c.IndentedJSON(http.StatusOK, schedulers)
}
c.String(http.StatusOK, "Pause or resume the scheduler successfully.")
}
151 changes: 151 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.uber.org/zap"
)

// Server is the interface for handler about schedule.
Expand Down Expand Up @@ -720,3 +722,152 @@ func parseStoreIDsAndPeerRole(ids interface{}, roles interface{}) (map[uint64]pl
}
return storeIDToPeerRole, true
}

// GetCheckerStatus returns the status of the checker.
func (h *Handler) GetCheckerStatus(name string) (map[string]bool, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
isPaused, err := co.IsCheckerPaused(name)
if err != nil {
return nil, err
}
return map[string]bool{
"paused": isPaused,
}, nil
}

// GetSchedulerNames returns all names of schedulers.
func (h *Handler) GetSchedulerNames() ([]string, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
return co.GetSchedulersController().GetSchedulerNames(), nil
}

type schedulerPausedPeriod struct {
Name string `json:"name"`
PausedAt time.Time `json:"paused_at"`
ResumeAt time.Time `json:"resume_at"`
}

// GetSchedulerByStatus returns all names of schedulers by status.
func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{}, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
sc := co.GetSchedulersController()
schedulers := sc.GetSchedulerNames()
switch status {
case "paused":
var pausedSchedulers []string
pausedPeriods := []schedulerPausedPeriod{}
for _, scheduler := range schedulers {
paused, err := sc.IsSchedulerPaused(scheduler)
if err != nil {
return nil, err
}
if paused {
if needTS {
s := schedulerPausedPeriod{
Name: scheduler,
PausedAt: time.Time{},
ResumeAt: time.Time{},
}
pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler)
if err != nil {
return nil, err
}
s.PausedAt = time.Unix(pausedAt, 0)
resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler)
if err != nil {
return nil, err
}
s.ResumeAt = time.Unix(resumeAt, 0)
pausedPeriods = append(pausedPeriods, s)
} else {
pausedSchedulers = append(pausedSchedulers, scheduler)
}
}
}
if needTS {
return pausedPeriods, nil
}
return pausedSchedulers, nil
case "disabled":
var disabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err
}
if disabled {
disabledSchedulers = append(disabledSchedulers, scheduler)
}
}
return disabledSchedulers, nil
default:
return schedulers, nil
}
}

// GetDiagnosticResult returns the diagnostic results of the specified scheduler.
func (h *Handler) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult, error) {
if _, ok := schedulers.DiagnosableSummaryFunc[name]; !ok {
return nil, errs.ErrSchedulerUndiagnosable.FastGenByArgs(name)
}
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
result, err := co.GetDiagnosticResult(name)
if err != nil {
return nil, err
}
return result, nil
}

// PauseOrResumeScheduler pauses a scheduler for delay seconds or resume a paused scheduler.
// t == 0 : resume scheduler.
// t > 0 : scheduler delays t seconds.
func (h *Handler) PauseOrResumeScheduler(name string, t int64) (err error) {
co := h.GetCoordinator()
if co == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if err = co.GetSchedulersController().PauseOrResumeScheduler(name, t); err != nil {
if t == 0 {
log.Error("can not resume scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
} else {
log.Error("can not pause scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
}
} else {
if t == 0 {
log.Info("resume scheduler successfully", zap.String("scheduler-name", name))
} else {
log.Info("pause scheduler successfully", zap.String("scheduler-name", name), zap.Int64("pause-seconds", t))
}
}
return err
}

// PauseOrResumeChecker pauses checker for delay seconds or resume checker
// t == 0 : resume checker.
// t > 0 : checker delays t seconds.
func (h *Handler) PauseOrResumeChecker(name string, t int64) (err error) {
co := h.GetCoordinator()
if co == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if err = co.PauseOrResumeChecker(name, t); err != nil {
if t == 0 {
log.Error("can not resume checker", zap.String("checker-name", name), errs.ZapError(err))
} else {
log.Error("can not pause checker", zap.String("checker-name", name), errs.ZapError(err))
}
}
return err
}
5 changes: 5 additions & 0 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
if len(h.microserviceRedirectRules) == 0 {
return false, ""
}
// Remove trailing '/' from the URL path
// It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}"
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
for _, rule := range h.microserviceRedirectRules {
if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) {
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
Expand All @@ -131,6 +134,8 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
} else {
r.URL.Path = rule.targetPath
}
log.Debug("redirect to micro service", zap.String("path", r.URL.Path), zap.String("target", addr),
zap.String("method", r.Method))
return true, addr
}
}
Expand Down
Loading

0 comments on commit c547876

Please sign in to comment.