Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: make scheduling server support checker and scheduler http interface #7131

Merged
merged 3 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 98 additions & 73 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"net/http"
"strconv"
"sync"
"time"

"github.com/gin-contrib/cors"
"github.com/gin-contrib/gzip"
Expand Down Expand Up @@ -120,12 +119,15 @@
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
router.GET("/diagnostic/:name", getDiagnosticResult)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? It can pass tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature is not widely used, to reduce the problem, I think we can hold it instead of expose it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best to keep forwarding, one reason is that it will make matching rules simple, and another reason is to avoid us missing an interface or creating incompatibility issues

router.POST("/:name", pauseOrResumeScheduler)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we split it into two APIs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we only support one-to-one forwarding at this time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is since we use the new framework for scheduling service, we should make it more standard. The previous one is not good enough, so we should make it more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to achieve one-to-two forwarding, i.e. post scheduler/{name} is at the api server and forwards to post scheduler/{name}/pause and post scheduler/{name}/resume, we may need to add a check for body to the match rule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make the new API more clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't want to change it and since it is only used by forwarding now, I'm ok with the current status. But in the future, it's better to separate these actions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will add TODO about it.

}

// RegisterCheckersRouter registers the router of the checkers handler.
func (s *Service) RegisterCheckersRouter() {
router := s.root.Group("checkers")
router.GET("/:name", getCheckerByName)
router.POST("/:name", pauseOrResumeChecker)
}

// RegisterOperatorsRouter registers the router of the operators handler.
Expand Down Expand Up @@ -279,24 +281,54 @@
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /checkers/{name} [get]
func getCheckerByName(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
handler := c.MustGet(handlerKey).(*handler.Handler)
name := c.Param("name")
co := svr.GetCoordinator()
isPaused, err := co.IsCheckerPaused(name)
output, err := handler.GetCheckerStatus(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
output := map[string]bool{
"paused": isPaused,
}
c.IndentedJSON(http.StatusOK, output)
}

type schedulerPausedPeriod struct {
Name string `json:"name"`
PausedAt time.Time `json:"paused_at"`
ResumeAt time.Time `json:"resume_at"`
// FIXME: details of input json body params
// @Tags checker
// @Summary Pause or resume region merge.
// @Accept json
// @Param name path string true "The name of the checker."
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "Pause or resume the scheduler successfully."
// @Failure 400 {string} string "Bad format request."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /checker/{name} [post]
func pauseOrResumeChecker(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
var input map[string]int
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return

Check warning on line 310 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L309-L310

Added lines #L309 - L310 were not covered by tests
}

name := c.Param("name")
t, ok := input["delay"]
if !ok {
c.String(http.StatusBadRequest, "missing pause time")
return
}
if t < 0 {
c.String(http.StatusBadRequest, "delay cannot be negative")
return
}
if err := handler.PauseOrResumeChecker(name, int64(t)); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if t == 0 {
c.String(http.StatusOK, "Resume the checker successfully.")
} else {
c.String(http.StatusOK, "Pause the checker successfully.")
}
}

// @Tags schedulers
Expand All @@ -306,70 +338,63 @@
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers [get]
func getSchedulers(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
co := svr.GetCoordinator()
sc := co.GetSchedulersController()
schedulers := sc.GetSchedulerNames()

handler := c.MustGet(handlerKey).(*handler.Handler)
status := c.Query("status")
_, needTS := c.GetQuery("timestamp")
switch status {
case "paused":
var pausedSchedulers []string
pausedPeriods := []schedulerPausedPeriod{}
for _, scheduler := range schedulers {
paused, err := sc.IsSchedulerPaused(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if paused {
if needTS {
s := schedulerPausedPeriod{
Name: scheduler,
PausedAt: time.Time{},
ResumeAt: time.Time{},
}
pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.PausedAt = time.Unix(pausedAt, 0)
resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.ResumeAt = time.Unix(resumeAt, 0)
pausedPeriods = append(pausedPeriods, s)
} else {
pausedSchedulers = append(pausedSchedulers, scheduler)
}
}
}
if needTS {
c.IndentedJSON(http.StatusOK, pausedPeriods)
} else {
c.IndentedJSON(http.StatusOK, pausedSchedulers)
}
output, err := handler.GetSchedulerByStatus(status, needTS)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return

Check warning on line 347 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L346-L347

Added lines #L346 - L347 were not covered by tests
}
c.IndentedJSON(http.StatusOK, output)
}

// @Tags schedulers
// @Summary List schedulers diagnostic result.
// @Produce json
// @Success 200 {array} string
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/diagnostic/{name} [get]
func getDiagnosticResult(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
name := c.Param("name")
result, err := handler.GetDiagnosticResult(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return

Check warning on line 364 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L363-L364

Added lines #L363 - L364 were not covered by tests
}
c.IndentedJSON(http.StatusOK, result)
}

// FIXME: details of input json body params
// @Tags scheduler
// @Summary Pause or resume a scheduler.
// @Accept json
// @Param name path string true "The name of the scheduler."
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "Pause or resume the scheduler successfully."
// @Failure 400 {string} string "Bad format request."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/{name} [post]
func pauseOrResumeScheduler(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

var input map[string]int64
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return

Check warning on line 386 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L385-L386

Added lines #L385 - L386 were not covered by tests
}

name := c.Param("name")
t, ok := input["delay"]
if !ok {
c.String(http.StatusBadRequest, "missing pause time")
return

Check warning on line 393 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L392-L393

Added lines #L392 - L393 were not covered by tests
}
if err := handler.PauseOrResumeScheduler(name, t); err != nil {
c.String(http.StatusInternalServerError, err.Error())

Check warning on line 396 in pkg/mcs/scheduling/server/apis/v1/api.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/apis/v1/api.go#L396

Added line #L396 was not covered by tests
return
case "disabled":
var disabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if disabled {
disabledSchedulers = append(disabledSchedulers, scheduler)
}
}
c.IndentedJSON(http.StatusOK, disabledSchedulers)
default:
c.IndentedJSON(http.StatusOK, schedulers)
}
c.String(http.StatusOK, "Pause or resume the scheduler successfully.")
}
151 changes: 151 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.uber.org/zap"
)

// Server is the interface for handler about schedule.
Expand Down Expand Up @@ -720,3 +722,152 @@
}
return storeIDToPeerRole, true
}

// GetCheckerStatus returns the status of the checker.
func (h *Handler) GetCheckerStatus(name string) (map[string]bool, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 730 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L730

Added line #L730 was not covered by tests
}
isPaused, err := co.IsCheckerPaused(name)
if err != nil {
return nil, err

Check warning on line 734 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L734

Added line #L734 was not covered by tests
}
return map[string]bool{
"paused": isPaused,
}, nil
}

// GetSchedulerNames returns all names of schedulers.
func (h *Handler) GetSchedulerNames() ([]string, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 745 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L743-L745

Added lines #L743 - L745 were not covered by tests
}
return co.GetSchedulersController().GetSchedulerNames(), nil

Check warning on line 747 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L747

Added line #L747 was not covered by tests
}

type schedulerPausedPeriod struct {
Name string `json:"name"`
PausedAt time.Time `json:"paused_at"`
ResumeAt time.Time `json:"resume_at"`
}

// GetSchedulerByStatus returns all names of schedulers by status.
func (h *Handler) GetSchedulerByStatus(status string, needTS bool) (interface{}, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 760 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L760

Added line #L760 was not covered by tests
}
sc := co.GetSchedulersController()
schedulers := sc.GetSchedulerNames()
switch status {
case "paused":
var pausedSchedulers []string
pausedPeriods := []schedulerPausedPeriod{}
for _, scheduler := range schedulers {
paused, err := sc.IsSchedulerPaused(scheduler)
if err != nil {
return nil, err

Check warning on line 771 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L771

Added line #L771 was not covered by tests
}
if paused {
if needTS {
s := schedulerPausedPeriod{
Name: scheduler,
PausedAt: time.Time{},
ResumeAt: time.Time{},

Check warning on line 778 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L775-L778

Added lines #L775 - L778 were not covered by tests
}
pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler)
if err != nil {
return nil, err

Check warning on line 782 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L780-L782

Added lines #L780 - L782 were not covered by tests
}
s.PausedAt = time.Unix(pausedAt, 0)
resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler)
if err != nil {
return nil, err

Check warning on line 787 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L784-L787

Added lines #L784 - L787 were not covered by tests
}
s.ResumeAt = time.Unix(resumeAt, 0)
pausedPeriods = append(pausedPeriods, s)

Check warning on line 790 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L789-L790

Added lines #L789 - L790 were not covered by tests
} else {
pausedSchedulers = append(pausedSchedulers, scheduler)
}
}
}
if needTS {
return pausedPeriods, nil

Check warning on line 797 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L797

Added line #L797 was not covered by tests
}
return pausedSchedulers, nil
case "disabled":
var disabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
return nil, err

Check warning on line 805 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L805

Added line #L805 was not covered by tests
}
if disabled {
disabledSchedulers = append(disabledSchedulers, scheduler)
}
}
return disabledSchedulers, nil
default:
return schedulers, nil
}
}

// GetDiagnosticResult returns the diagnostic results of the specified scheduler.
func (h *Handler) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult, error) {
if _, ok := schedulers.DiagnosableSummaryFunc[name]; !ok {
return nil, errs.ErrSchedulerUndiagnosable.FastGenByArgs(name)
}
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 824 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L824

Added line #L824 was not covered by tests
}
result, err := co.GetDiagnosticResult(name)
if err != nil {
return nil, err

Check warning on line 828 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L828

Added line #L828 was not covered by tests
}
return result, nil
}

// PauseOrResumeScheduler pauses a scheduler for delay seconds or resume a paused scheduler.
// t == 0 : resume scheduler.
// t > 0 : scheduler delays t seconds.
func (h *Handler) PauseOrResumeScheduler(name string, t int64) (err error) {
co := h.GetCoordinator()
if co == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 839 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L839

Added line #L839 was not covered by tests
}
if err = co.GetSchedulersController().PauseOrResumeScheduler(name, t); err != nil {
if t == 0 {
log.Error("can not resume scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
} else {
log.Error("can not pause scheduler", zap.String("scheduler-name", name), errs.ZapError(err))

Check warning on line 845 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L842-L845

Added lines #L842 - L845 were not covered by tests
}
} else {
if t == 0 {
log.Info("resume scheduler successfully", zap.String("scheduler-name", name))
} else {
log.Info("pause scheduler successfully", zap.String("scheduler-name", name), zap.Int64("pause-seconds", t))
}
}
return err
}

// PauseOrResumeChecker pauses checker for delay seconds or resume checker
// t == 0 : resume checker.
// t > 0 : checker delays t seconds.
func (h *Handler) PauseOrResumeChecker(name string, t int64) (err error) {
co := h.GetCoordinator()
if co == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 863 in pkg/schedule/handler/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/handler/handler.go#L863

Added line #L863 was not covered by tests
}
if err = co.PauseOrResumeChecker(name, t); err != nil {
if t == 0 {
log.Error("can not resume checker", zap.String("checker-name", name), errs.ZapError(err))
} else {
log.Error("can not pause checker", zap.String("checker-name", name), errs.ZapError(err))
}
}
return err
}
5 changes: 5 additions & 0 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
if len(h.microserviceRedirectRules) == 0 {
return false, ""
}
// Remove trailing '/' from the URL path
// It will be helpful when matching the redirect rules "schedulers" or "schedulers/{name}"
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
for _, rule := range h.microserviceRedirectRules {
if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) {
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
Expand All @@ -131,6 +134,8 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
} else {
r.URL.Path = rule.targetPath
}
log.Debug("redirect to micro service", zap.String("path", r.URL.Path), zap.String("target", addr),
zap.String("method", r.Method))
return true, addr
}
}
Expand Down
Loading
Loading