diff --git a/errors.toml b/errors.toml index 6caee5de3d8..70697da0a9a 100644 --- a/errors.toml +++ b/errors.toml @@ -491,6 +491,16 @@ error = ''' init file log error, %s ''' +["PD:mcs:ErrNotFoundSchedulingAddr"] +error = ''' +cannot find scheduling address +''' + +["PD:mcs:ErrSchedulingServer"] +error = ''' +scheduling server meets %v +''' + ["PD:member:ErrCheckCampaign"] error = ''' check campaign failed diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 728e473ccfe..cbfcf97c7ec 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -402,3 +402,9 @@ var ( ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup")) ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup")) ) + +// Micro service errors +var ( + ErrNotFoundSchedulingAddr = errors.Normalize("cannot find scheduling address", errors.RFCCodeText("PD:mcs:ErrNotFoundSchedulingAddr")) + ErrSchedulingServer = errors.Normalize("scheduling server meets %v", errors.RFCCodeText("PD:mcs:ErrSchedulingServer")) +) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 39be00ef9a0..d0acdf39a09 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -15,7 +15,6 @@ package apis import ( - "fmt" "net/http" "strconv" "sync" @@ -26,6 +25,7 @@ import ( "github.com/gin-gonic/gin" "github.com/joho/godotenv" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" mcsutils "github.com/tikv/pd/pkg/mcs/utils" sche "github.com/tikv/pd/pkg/schedule/core" @@ -121,6 +121,8 @@ func NewService(srv *scheserver.Service) *Service { func (s *Service) RegisterAdminRouter() { router := s.root.Group("admin") router.PUT("/log", changeLogLevel) + router.DELETE("cache/regions", deleteAllRegionCache) + router.DELETE("cache/regions/:id", deleteRegionCacheByID) } // RegisterSchedulersRouter registers the router of the schedulers handler. @@ -160,6 +162,11 @@ func (s *Service) RegisterOperatorsRouter() { router.GET("/records", getOperatorRecords) } +// @Tags admin +// @Summary Change the log level. +// @Produce json +// @Success 200 {string} string "The log level is updated." +// @Router /admin/log [put] func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) var level string @@ -176,6 +183,46 @@ func changeLogLevel(c *gin.Context) { c.String(http.StatusOK, "The log level is updated.") } +// @Tags admin +// @Summary Drop all regions from cache. +// @Produce json +// @Success 200 {string} string "All regions are removed from server cache." +// @Router /admin/cache/regions [delete] +func deleteAllRegionCache(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + cluster := svr.GetCluster() + if cluster == nil { + c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error()) + return + } + cluster.DropCacheAllRegion() + c.String(http.StatusOK, "All regions are removed from server cache.") +} + +// @Tags admin +// @Summary Drop a specific region from cache. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {string} string "The region is removed from server cache." +// @Failure 400 {string} string "The input is invalid." +// @Router /admin/cache/regions/{id} [delete] +func deleteRegionCacheByID(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + cluster := svr.GetCluster() + if cluster == nil { + c.String(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error()) + return + } + regionIDStr := c.Param("id") + regionID, err := strconv.ParseUint(regionIDStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + cluster.DropCacheRegion(regionID) + c.String(http.StatusOK, "The region is removed from server cache.") +} + // @Tags operators // @Summary Get an operator by ID. // @Param region_id path int true "A Region's Id" @@ -475,7 +522,7 @@ func getHotRegions(typ utils.RWType, c *gin.Context) { for _, storeID := range storeIDs { id, err := strconv.ParseUint(storeID, 10, 64) if err != nil { - c.String(http.StatusBadRequest, fmt.Sprintf("invalid store id: %s", storeID)) + c.String(http.StatusBadRequest, errs.ErrInvalidStoreID.FastGenByArgs(storeID).Error()) return } _, err = handler.GetStore(id) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 769f7bf67f7..c88f18bb765 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -592,3 +592,13 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error { func (c *Cluster) IsPrepared() bool { return c.coordinator.GetPrepareChecker().IsPrepared() } + +// DropCacheAllRegion removes all cached regions. +func (c *Cluster) DropCacheAllRegion() { + c.ResetRegionCache() +} + +// DropCacheRegion removes a region from the cache. +func (c *Cluster) DropCacheRegion(id uint64) { + c.RemoveRegionIfExist(id) +} diff --git a/server/api/admin.go b/server/api/admin.go index c81193f1468..fdaf960b6c4 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -16,6 +16,7 @@ package api import ( "encoding/json" + "fmt" "io" "net/http" "strconv" @@ -24,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" "github.com/unrolled/render" @@ -59,7 +61,11 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request) return } rc.DropCacheRegion(regionID) - h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.") + if h.svr.IsAPIServiceMode() { + err = h.DeleteRegionCacheInSchedulingServer(regionID) + } + msg := "The region is removed from server cache." + h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) } // @Tags admin @@ -95,8 +101,11 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques } // Remove region from cache. rc.DropCacheRegion(regionID) - - h.rd.JSON(w, http.StatusOK, "The region is removed from server cache and region meta storage.") + if h.svr.IsAPIServiceMode() { + err = h.DeleteRegionCacheInSchedulingServer(regionID) + } + msg := "The region is removed from server cache and region meta storage." + h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) } // @Tags admin @@ -105,9 +114,14 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques // @Success 200 {string} string "All regions are removed from server cache." // @Router /admin/cache/regions [delete] func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Request) { + var err error rc := getCluster(r) rc.DropCacheAllRegion() - h.rd.JSON(w, http.StatusOK, "All regions are removed from server cache.") + if h.svr.IsAPIServiceMode() { + err = h.DeleteRegionCacheInSchedulingServer() + } + msg := "All regions are removed from server cache." + h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) } // Intentionally no swagger mark as it is supposed to be only used in @@ -197,3 +211,35 @@ func (h *adminHandler) RecoverAllocID(w http.ResponseWriter, r *http.Request) { _ = h.rd.Text(w, http.StatusOK, "") } + +func (h *adminHandler) DeleteRegionCacheInSchedulingServer(id ...uint64) error { + addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName) + if !ok { + return errs.ErrNotFoundSchedulingAddr.FastGenByArgs() + } + var idStr string + if len(id) > 0 { + idStr = strconv.FormatUint(id[0], 10) + } + url := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions/%s", addr, idStr) + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + return err + } + resp, err := h.svr.GetHTTPClient().Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return errs.ErrSchedulingServer.FastGenByArgs(resp.StatusCode) + } + return nil +} + +func (h *adminHandler) buildMsg(msg string, err error) string { + if h.svr.IsAPIServiceMode() && err != nil { + return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) + } + return msg +} diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 5284913813c..d6028204325 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -9,7 +9,9 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/statistics" @@ -218,3 +220,61 @@ func (suite *apiTestSuite) TestAPIForward() { testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) } + +func TestAdminRegionCache(t *testing.T) { + re := require.New(t) + checkAdminRegionCache := func(cluster *tests.TestCluster) { + r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r1) + r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r2) + r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r3) + + schedulingServer := cluster.GetSchedulingPrimaryServer() + re.Equal(3, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + + addr := schedulingServer.GetAddr() + urlPrefix := fmt.Sprintf("%s/scheduling/api/v1/admin/cache/regions", addr) + err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re)) + re.NoError(err) + re.Equal(2, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + + err = testutil.CheckDelete(testDialClient, urlPrefix, testutil.StatusOK(re)) + re.NoError(err) + re.Equal(0, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + } + env := tests.NewSchedulingTestEnvironment(t) + env.RunTestInAPIMode(checkAdminRegionCache) +} + +func TestAdminRegionCacheForward(t *testing.T) { + re := require.New(t) + checkAdminRegionCache := func(cluster *tests.TestCluster) { + r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r1) + r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r2) + r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetRegionConfVer(100), core.SetRegionVersion(100)) + tests.MustPutRegionInfo(re, cluster, r3) + + apiServer := cluster.GetLeaderServer().GetServer() + schedulingServer := cluster.GetSchedulingPrimaryServer() + re.Equal(3, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(3, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) + + addr := cluster.GetLeaderServer().GetAddr() + urlPrefix := fmt.Sprintf("%s/pd/api/v1/admin/cache/region", addr) + err := testutil.CheckDelete(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "30"), testutil.StatusOK(re)) + re.NoError(err) + re.Equal(2, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(2, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) + + err = testutil.CheckDelete(testDialClient, urlPrefix+"s", testutil.StatusOK(re)) + re.NoError(err) + re.Equal(0, schedulingServer.GetCluster().GetRegionCount([]byte{}, []byte{})) + re.Equal(0, apiServer.GetRaftCluster().GetRegionCount([]byte{}, []byte{}).Count) + } + env := tests.NewSchedulingTestEnvironment(t) + env.RunTestInAPIMode(checkAdminRegionCache) +}