Skip to content

Commit

Permalink
mcs: support admin/cache http interface in scheduling server (tikv#7279)
Browse files Browse the repository at this point in the history
ref tikv#5839

Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 authored and rleungx committed Dec 1, 2023
1 parent cc78b67 commit 8e9cf1f
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 6 deletions.
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,16 @@ error = '''
init file log error, %s
'''

["PD:mcs:ErrNotFoundSchedulingAddr"]
error = '''
cannot find scheduling address
'''

["PD:mcs:ErrSchedulingServer"]
error = '''
scheduling server meets %v
'''

["PD:member:ErrCheckCampaign"]
error = '''
check campaign failed
Expand Down
6 changes: 6 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,9 @@ var (
ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup"))
ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup"))
)

// Micro service errors
var (
ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr"))
ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer"))
)
51 changes: 49 additions & 2 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package apis

import (
"fmt"
"net/http"
"strconv"
"sync"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
sche "github.com/tikv/pd/pkg/schedule/core"
Expand Down Expand Up @@ -121,6 +121,8 @@ func NewService(srv *scheserver.Service) *Service {
func (s *Service) RegisterAdminRouter() {
router := s.root.Group("admin")
router.PUT("/log", changeLogLevel)
router.DELETE("cache/regions", deleteAllRegionCache)
router.DELETE("cache/regions/:id", deleteRegionCacheByID)
}

// RegisterSchedulersRouter registers the router of the schedulers handler.
Expand Down Expand Up @@ -160,6 +162,11 @@ func (s *Service) RegisterOperatorsRouter() {
router.GET("/records", getOperatorRecords)
}

// @Tags admin
// @Summary Change the log level.
// @Produce json
// @Success 200 {string} string "The log level is updated."
// @Router /admin/log [put]
func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var level string
Expand All @@ -176,6 +183,46 @@ func changeLogLevel(c *gin.Context) {
c.String(http.StatusOK, "The log level is updated.")
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
// @Success 200 {string} string "All regions are removed from server cache."
// @Router /admin/cache/regions [delete]
func deleteAllRegionCache(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cluster := svr.GetCluster()
if cluster == nil {
c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error())
return
}
cluster.DropCacheAllRegion()
c.String(http.StatusOK, "All regions are removed from server cache.")
}

// @Tags admin
// @Summary Drop a specific region from cache.
// @Param id path integer true "Region Id"
// @Produce json
// @Success 200 {string} string "The region is removed from server cache."
// @Failure 400 {string} string "The input is invalid."
// @Router /admin/cache/regions/{id} [delete]
func deleteRegionCacheByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cluster := svr.GetCluster()
if cluster == nil {
c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error())
return
}
regionIDStr := c.Param("id")
regionID, err := strconv.ParseUint(regionIDStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
cluster.DropCacheRegion(regionID)
c.String(http.StatusOK, "The region is removed from server cache.")
}

// @Tags operators
// @Summary Get an operator by ID.
// @Param region_id path int true "A Region's Id"
Expand Down Expand Up @@ -475,7 +522,7 @@ func getHotRegions(typ utils.RWType, c *gin.Context) {
for _, storeID := range storeIDs {
id, err := strconv.ParseUint(storeID, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, fmt.Sprintf("invalid store id: %s", storeID))
c.String(http.StatusBadRequest, errs.ErrInvalidStoreID.FastGenByArgs(storeID).Error())
return
}
_, err = handler.GetStore(id)
Expand Down
10 changes: 10 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,13 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
func (c *Cluster) IsPrepared() bool {
return c.coordinator.GetPrepareChecker().IsPrepared()
}

// DropCacheAllRegion removes all cached regions.
func (c *Cluster) DropCacheAllRegion() {
c.ResetRegionCache()
}

// DropCacheRegion removes a region from the cache.
func (c *Cluster) DropCacheRegion(id uint64) {
c.RemoveRegionIfExist(id)
}
54 changes: 50 additions & 4 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package api

import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
Expand Down Expand Up @@ -59,7 +61,11 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request)
return
}
rc.DropCacheRegion(regionID)
h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.")
if h.svr.IsAPIServiceMode() {
err = h.DeleteRegionCacheInSchedulingServer(regionID)
}
msg := "The region is removed from server cache."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))
}

// @Tags admin
Expand Down Expand Up @@ -95,8 +101,11 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques
}
// Remove region from cache.
rc.DropCacheRegion(regionID)

h.rd.JSON(w, http.StatusOK, "The region is removed from server cache and region meta storage.")
if h.svr.IsAPIServiceMode() {
err = h.DeleteRegionCacheInSchedulingServer(regionID)
}
msg := "The region is removed from server cache and region meta storage."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))
}

// @Tags admin
Expand All @@ -105,9 +114,14 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques
// @Success 200 {string} string "All regions are removed from server cache."
// @Router /admin/cache/regions [delete]
func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Request) {
var err error
rc := getCluster(r)
rc.DropCacheAllRegion()
h.rd.JSON(w, http.StatusOK, "All regions are removed from server cache.")
if h.svr.IsAPIServiceMode() {
err = h.DeleteRegionCacheInSchedulingServer()
}
msg := "All regions are removed from server cache."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))
}

// Intentionally no swagger mark as it is supposed to be only used in
Expand Down Expand Up @@ -197,3 +211,35 @@ func (h *adminHandler) RecoverAllocID(w http.ResponseWriter, r *http.Request) {

_ = h.rd.Text(w, http.StatusOK, "")
}

func (h *adminHandler) DeleteRegionCacheInSchedulingServer(id ...uint64) error {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName)
if !ok {
return errs.ErrNotFoundSchedulingAddr.FastGenByArgs()
}
var idStr string
if len(id) > 0 {
idStr = strconv.FormatUint(id[0], 10)
}
url := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions/%s", addr, idStr)
req, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
return err
}
resp, err := h.svr.GetHTTPClient().Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errs.ErrSchedulingServer.FastGenByArgs(resp.StatusCode)
}
return nil
}

func (h *adminHandler) buildMsg(msg string, err error) string {
if h.svr.IsAPIServiceMode() && err != nil {
return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error())
}
return msg
}
60 changes: 60 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/core"
_ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/statistics"
Expand Down Expand Up @@ -218,3 +220,61 @@ func (suite *apiTestSuite) TestAPIForward() {
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)
}

func TestAdminRegionCache(t *testing.T) {
re := require.New(t)
checkAdminRegionCache := func(cluster *tests.TestCluster) {
r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r1)
r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r2)
r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r3)

schedulingServer := cluster.GetSchedulingPrimaryServer()
re.Equal(3, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{}))

addr := schedulingServer.GetAddr()
urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions", addr)
err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re))
re.NoError(err)
re.Equal(2, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{}))

err = testutil.CheckDelete(testDialClient, urlPrefix, testutil.StatusOK(re))
re.NoError(err)
re.Equal(0, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{}))
}
env := tests.NewSchedulingTestEnvironment(t)
env.RunTestInAPIMode(checkAdminRegionCache)
}

func TestAdminRegionCacheForward(t *testing.T) {
re := require.New(t)
checkAdminRegionCache := func(cluster *tests.TestCluster) {
r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r1)
r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r2)
r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100))
tests.MustPutRegionInfo(re, cluster, r3)

apiServer := cluster.GetLeaderServer().GetServer()
schedulingServer := cluster.GetSchedulingPrimaryServer()
re.Equal(3, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{}))
re.Equal(3, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count)

addr := cluster.GetLeaderServer().GetAddr()
urlPrefix := fmt.Sprintf("%s/pd/api/v1/admin/cache/region", addr)
err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re))
re.NoError(err)
re.Equal(2, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{}))
re.Equal(2, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count)

err = testutil.CheckDelete(testDialClient, urlPrefix+"s", testutil.StatusOK(re))
re.NoError(err)
re.Equal(0, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{}))
re.Equal(0, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count)
}
env := tests.NewSchedulingTestEnvironment(t)
env.RunTestInAPIMode(checkAdminRegionCache)
}

0 comments on commit 8e9cf1f

Please sign in to comment.