Skip to content

Commit

Permalink
mcs: support hotspot http interface in scheduling server (#7184)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Oct 16, 2023
1 parent d0193d0 commit 55c67ac
Show file tree
Hide file tree
Showing 19 changed files with 768 additions and 524 deletions.
145 changes: 134 additions & 11 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package apis

import (
"fmt"
"net/http"
"strconv"
"sync"
Expand All @@ -26,11 +27,12 @@ import (
"github.com/joho/godotenv"
"github.com/pingcap/log"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -68,15 +70,11 @@ type Service struct {
}

type server struct {
server *scheserver.Server
*scheserver.Server
}

func (s *server) GetCoordinator() *schedule.Coordinator {
return s.server.GetCoordinator()
}

func (s *server) GetCluster() sche.SharedCluster {
return s.server.GetCluster()
func (s *server) GetCluster() sche.SchedulerCluster {
return s.Server.GetCluster()
}

func createIndentRender() *render.Render {
Expand All @@ -98,11 +96,11 @@ func NewService(srv *scheserver.Service) *Service {
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set(multiservicesapi.ServiceContextKey, srv.Server)
c.Set(handlerKey, handler.NewHandler(&server{server: srv.Server}))
c.Set(handlerKey, handler.NewHandler(&server{srv.Server}))
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
apiHandlerEngine.GET("metrics", utils.PromHandler())
apiHandlerEngine.GET("metrics", mcsutils.PromHandler())
pprof.Register(apiHandlerEngine)
root := apiHandlerEngine.Group(APIPathPrefix)
s := &Service{
Expand All @@ -115,6 +113,7 @@ func NewService(srv *scheserver.Service) *Service {
s.RegisterOperatorsRouter()
s.RegisterSchedulersRouter()
s.RegisterCheckersRouter()
s.RegisterHotspotRouter()
return s
}

Expand All @@ -141,6 +140,16 @@ func (s *Service) RegisterCheckersRouter() {
router.POST("/:name", pauseOrResumeChecker)
}

// RegisterHotspotRouter registers the router of the hotspot handler.
func (s *Service) RegisterHotspotRouter() {
router := s.root.Group("hotspot")
router.GET("/regions/write", getHotWriteRegions)
router.GET("/regions/read", getHotReadRegions)
router.GET("/regions/history", getHistoryHotRegions)
router.GET("/stores", getHotStores)
router.GET("/buckets", getHotBuckets)
}

// RegisterOperatorsRouter registers the router of the operators handler.
func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
Expand Down Expand Up @@ -425,3 +434,117 @@ func pauseOrResumeScheduler(c *gin.Context) {
}
c.String(http.StatusOK, "Pause or resume the scheduler successfully.")
}

// @Tags hotspot
// @Summary List the hot write regions.
// @Produce json
// @Success 200 {object} statistics.StoreHotPeersInfos
// @Failure 400 {string} string "The request is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/regions/write [get]
func getHotWriteRegions(c *gin.Context) {
getHotRegions(utils.Write, c)
}

// @Tags hotspot
// @Summary List the hot read regions.
// @Produce json
// @Success 200 {object} statistics.StoreHotPeersInfos
// @Failure 400 {string} string "The request is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/regions/read [get]
func getHotReadRegions(c *gin.Context) {
getHotRegions(utils.Read, c)
}

func getHotRegions(typ utils.RWType, c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

storeIDs := c.QueryArray("store_id")
if len(storeIDs) < 1 {
hotRegions, err := handler.GetHotRegions(typ)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, hotRegions)
return
}

var ids []uint64
for _, storeID := range storeIDs {
id, err := strconv.ParseUint(storeID, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, fmt.Sprintf("invalid store id: %s", storeID))
return
}
_, err = handler.GetStore(id)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
ids = append(ids, id)
}

hotRegions, err := handler.GetHotRegions(typ, ids...)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, hotRegions)
}

// @Tags hotspot
// @Summary List the hot stores.
// @Produce json
// @Success 200 {object} handler.HotStoreStats
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/stores [get]
func getHotStores(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
stores, err := handler.GetHotStores()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, stores)
}

// @Tags hotspot
// @Summary List the hot buckets.
// @Produce json
// @Success 200 {object} handler.HotBucketsResponse
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/buckets [get]
func getHotBuckets(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

regionIDs := c.QueryArray("region_id")
ids := make([]uint64, len(regionIDs))
for i, regionID := range regionIDs {
if id, err := strconv.ParseUint(regionID, 10, 64); err == nil {
ids[i] = id
}
}
ret, err := handler.GetHotBuckets(ids...)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, ret)
}

// @Tags hotspot
// @Summary List the history hot regions.
// @Accept json
// @Produce json
// @Success 200 {object} storage.HistoryHotRegions
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/regions/history [get]
func getHistoryHotRegions(c *gin.Context) {
// TODO: support history hotspot in scheduling server with stateless in the future.
// Ref: https://github.com/tikv/pd/pull/7183
var res storage.HistoryHotRegions
c.IndentedJSON(http.StatusOK, res)
}
6 changes: 6 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (c *Cluster) GetHotStat() *statistics.HotStat {
return c.hotStat
}

// GetStoresStats returns stores' statistics from cluster.
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat
func (c *Cluster) GetStoresStats() *statistics.StoresStats {
return c.hotStat.StoresStats
}

// GetRegionStats gets region statistics.
func (c *Cluster) GetRegionStats() *statistics.RegionStatistics {
return c.regionStats
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,11 @@ func (s *Server) GetBasicCluster() *core.BasicCluster {

// GetCoordinator returns the coordinator.
func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.GetCluster().GetCoordinator()
c := s.GetCluster()
if c == nil {
return nil
}
return c.GetCoordinator()
}

// ServerLoopWgDone decreases the server loop wait group.
Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ type SharedConfigProvider interface {
IsWitnessAllowed() bool
IsPlacementRulesCacheEnabled() bool
SetHaltScheduling(bool, string)
GetHotRegionCacheHitsThreshold() int

// for test purpose
SetPlacementRuleEnabled(bool)
SetPlacementRulesCacheEnabled(bool)
SetEnableWitness(bool)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,25 @@ func (c *Coordinator) GetHotRegionsByType(typ utils.RWType) *statistics.StoreHot
return infos
}

// GetHotRegions gets hot regions' statistics by RWType and storeIDs.
// If storeIDs is empty, it returns all hot regions' statistics by RWType.
func (c *Coordinator) GetHotRegions(typ utils.RWType, storeIDs ...uint64) *statistics.StoreHotPeersInfos {
hotRegions := c.GetHotRegionsByType(typ)
if len(storeIDs) > 0 && hotRegions != nil {
asLeader := statistics.StoreHotPeersStat{}
asPeer := statistics.StoreHotPeersStat{}
for _, storeID := range storeIDs {
asLeader[storeID] = hotRegions.AsLeader[storeID]
asPeer[storeID] = hotRegions.AsPeer[storeID]
}
return &statistics.StoreHotPeersInfos{
AsLeader: asLeader,
AsPeer: asPeer,
}
}
return hotRegions
}

// GetWaitGroup returns the wait group. Only for test purpose.
func (c *Coordinator) GetWaitGroup() *sync.WaitGroup {
return &c.wg
Expand Down
Loading

0 comments on commit 55c67ac

Please sign in to comment.