From 2ef354f2f04a3eca2d74c9cde3d543501e16ea28 Mon Sep 17 00:00:00 2001 From: husharp Date: Mon, 11 Dec 2023 13:44:06 +0800 Subject: [PATCH] support ms api Signed-off-by: husharp --- client/http/api.go | 18 +++ client/http/client.go | 37 +++++ pkg/mcs/discovery/discover.go | 146 ++++++++++++++++++ pkg/mcs/tso/server/apis/v1/api.go | 22 +++ pkg/mcs/utils/util.go | 6 +- pkg/utils/apiutil/serverapi/middleware.go | 2 +- server/api/micro_service.go | 101 ++++++++++++ server/api/router.go | 5 + tests/integrations/mcs/members/member_test.go | 140 +++++++++++++++++ 9 files changed, 473 insertions(+), 4 deletions(-) create mode 100644 server/api/micro_service.go create mode 100644 tests/integrations/mcs/members/member_test.go diff --git a/client/http/api.go b/client/http/api.go index f744fd0c3955..024c9f872358 100644 --- a/client/http/api.go +++ b/client/http/api.go @@ -72,6 +72,9 @@ const ( MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts" Status = "/pd/api/v1/status" Version = "/pd/api/v1/version" + + // Micro Service + MicroServicePrefix = "/pd/api/v1/ms" ) // RegionByID returns the path of PD HTTP API to get region by ID. @@ -173,3 +176,18 @@ func PProfProfileAPIWithInterval(interval time.Duration) string { func PProfGoroutineWithDebugLevel(level int) string { return fmt.Sprintf("%s?debug=%d", PProfGoroutine, level) } + +// MSLeader returns the path of PD HTTP API to get the leader of microservice. +func MSLeader(service string) string { + return fmt.Sprintf("%s/leader/%s", MicroServicePrefix, service) +} + +// MSMembers returns the path of PD HTTP API to get the members of microservice. +func MSMembers(service string) string { + return fmt.Sprintf("%s/members/%s", MicroServicePrefix, service) +} + +// MSMembersByIP returns the path of PD HTTP API to get the members of microservice by ip. +func MSMembersByIP(service, ip string) string { + return fmt.Sprintf("%s/members/%s?ip=%s", MicroServicePrefix, service, ip) +} diff --git a/client/http/client.go b/client/http/client.go index b79aa9ca0029..6624bf7faa32 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -26,6 +26,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -79,6 +80,11 @@ type Client interface { /* Other interfaces */ GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error) + /* Micro Service */ + GetMSMembers(context.Context, string) ([]string, error) + GetMSLeader(context.Context, string) (*pdpb.Member, error) + DeleteMSMember(context.Context, string, string) error + /* Client-related methods */ // WithCallerID sets and returns a new client with the given caller ID. WithCallerID(string) Client @@ -729,3 +735,34 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin } return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil } + +// GetMSMembers gets the members of the microservice. +func (c *client) GetMSMembers(ctx context.Context, service string) ([]string, error) { + var members []string + err := c.requestWithRetry(ctx, + "GetMSMembers", MSMembers(service), + http.MethodGet, http.NoBody, &members) + if err != nil { + return nil, err + } + return members, nil +} + +// GetMSLeader gets the leader of the microservice. +func (c *client) GetMSLeader(ctx context.Context, service string) (*pdpb.Member, error) { + var leader *pdpb.Member + err := c.requestWithRetry(ctx, + "GetMSLeader", MSLeader(service), + http.MethodGet, http.NoBody, &leader) + if err != nil { + return nil, err + } + return leader, nil +} + +// DeleteMSMember deletes the member of the microservice. +func (c *client) DeleteMSMember(ctx context.Context, service, ip string) error { + return c.requestWithRetry(ctx, + "DeleteMSMember", MSMembersByIP(service, ip), + http.MethodDelete, http.NoBody, nil) +} diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 00e168114b06..82113249e260 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -15,8 +15,22 @@ package discovery import ( + "path" + "strconv" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/election" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" ) // Discover is used to get all the service instances of the specified service name. @@ -35,3 +49,135 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er } return values, nil } + +func isValid(id uint32) bool { + return id >= utils.DefaultKeyspaceGroupID && id <= utils.MaxKeyspaceGroupCountInUse +} + +func getMCSPrimaryPath(name, keyspaceGroupID string, client *clientv3.Client) (string, error) { + switch name { + case utils.TSOServiceName: + id := utils.DefaultKeyspaceGroupID + if len(keyspaceGroupID) > 0 { + keyspaceGroupID, err := strconv.ParseUint(keyspaceGroupID, 10, 64) + if err != nil || !isValid(uint32(keyspaceGroupID)) { + return "", errors.Errorf("invalid keyspace group id %d", keyspaceGroupID) + } + id = uint32(keyspaceGroupID) + } + + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return "", err + } + rootPath := endpoint.TSOSvcRootPath(clusterID) + primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, id) + return primaryPath, nil + case utils.SchedulingServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return "", err + } + return path.Join(endpoint.SchedulingSvcRootPath(clusterID), utils.PrimaryKey), nil + case utils.ResourceManagerServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return "", err + } + return path.Join(endpoint.ResourceManagerSvcRootPath(clusterID), utils.PrimaryKey), nil + default: + } + return "", errors.Errorf("unknown service name %s", name) +} + +func GetMCSPrimary(name, keyspaceGroupID string, client *clientv3.Client) (*pdpb.Member, int64, error) { + primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client) + if err != nil { + return nil, 0, err + } + + return election.GetLeader(client, primaryPath) +} + +func GetMembers(name string, client *clientv3.Client) (*clientv3.TxnResponse, error) { + switch name { + case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return nil, err + } + servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name) + resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit() + if err != nil { + return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause() + } + if !resps.Succeeded { + return nil, errs.ErrEtcdTxnConflict.FastGenByArgs() + } + if len(resps.Responses) == 0 { + return nil, errors.Errorf("no member found for service %s", name) + } + return resps, nil + } + return nil, errors.Errorf("unknown service name %s", name) +} + +func DeleteMemberByName(service, ip, keyspaceGroupID string, client *clientv3.Client) error { + resps, err := GetMembers(service, client) + if err != nil { + return err + } + + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + var entry ServiceRegistryEntry + if err = entry.Deserialize(keyValue.Value); err != nil { + log.Error("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)), zap.String("ip", ip), zap.Error(err)) + return err + } + + if ip == entry.ServiceAddr { + // delete if it is leader + primaryPath, err := getMCSPrimaryPath(service, keyspaceGroupID, client) + if err != nil { + return err + } + + primary := member.NewParticipantByService(service) + ok, _, err := etcdutil.GetProtoMsgWithModRev(client, primaryPath, primary) + if err != nil { + return err + } + if !ok { + return errors.Errorf("no primary found for service %s", service) + } + + // The format of leader name is address-groupID. + contents := strings.Split(primary.GetName(), "-") + log.Info("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)), + zap.String("ip", ip), zap.String("primaryPath", primaryPath), zap.String("primary", primary.GetName())) + if ip == contents[0] { + resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + if !resp.Succeeded { + return errs.ErrEtcdTxnConflict.FastGenByArgs() + } + if err != nil { + return err + } + } + + // delete member + _, err = kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(string(keyValue.Key))).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + + return nil + } + } + } + return errors.Errorf("no ip %s found for service %s", ip, service) +} diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index 33e1e0801aa6..5bede6acac53 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -102,6 +102,7 @@ func NewService(srv *tsoserver.Service) *Service { } s.RegisterAdminRouter() s.RegisterKeyspaceGroupRouter() + s.RegisterHealth() return s } @@ -118,6 +119,12 @@ func (s *Service) RegisterKeyspaceGroupRouter() { router.GET("/members", GetKeyspaceGroupMembers) } +// RegisterHealth registers the router of the health handler. +func (s *Service) RegisterHealth() { + router := s.root.Group("health") + router.GET("", GetHealth) +} + func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) var level string @@ -201,6 +208,21 @@ func ResetTS(c *gin.Context) { c.String(http.StatusOK, "Reset ts successfully.") } +func GetHealth(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) + am, err := svr.GetKeyspaceGroupManager().GetAllocatorManager(utils.DefaultKeyspaceGroupID) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + if am.GetMember().IsLeaderElected() { + c.IndentedJSON(http.StatusOK, "ok") + return + } + + c.String(http.StatusInternalServerError, "no leader elected") +} + // KeyspaceGroupMember contains the keyspace group and its member information. type KeyspaceGroupMember struct { Group *endpoint.KeyspaceGroup diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index 682e73f20ae3..a0708f9bf884 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -45,8 +45,8 @@ import ( const ( // maxRetryTimes is the max retry times for initializing the cluster ID. maxRetryTimes = 5 - // clusterIDPath is the path to store cluster id - clusterIDPath = "/pd/cluster_id" + // ClusterIDPath is the path to store cluster id + ClusterIDPath = "/pd/cluster_id" // retryInterval is the interval to retry. retryInterval = time.Second ) @@ -56,7 +56,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err ticker := time.NewTicker(retryInterval) defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { - if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 { + if clusterID, err := etcdutil.GetClusterID(client, ClusterIDPath); err == nil && clusterID != 0 { return clusterID, nil } select { diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index eb0f8a5f8eb6..da652b300f25 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -113,7 +113,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, } func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) { - if !h.s.IsAPIServiceMode() { + if !h.s.IsServiceIndependent(mcsutils.SchedulingServiceName) { return false, "" } if len(h.microserviceRedirectRules) == 0 { diff --git a/server/api/micro_service.go b/server/api/micro_service.go new file mode 100644 index 000000000000..d30375458c92 --- /dev/null +++ b/server/api/micro_service.go @@ -0,0 +1,101 @@ +package api + +import ( + "fmt" + "github.com/gorilla/mux" + "net/http" + + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/mcs/discovery" + "github.com/tikv/pd/server" + "github.com/unrolled/render" + "go.uber.org/zap" +) + +type microServiceHandler struct { + svr *server.Server + rd *render.Render +} + +func newMicroServiceHandlerHandler(svr *server.Server, rd *render.Render) *microServiceHandler { + return µServiceHandler{ + svr: svr, + rd: rd, + } +} + +// @Tags member +// @Summary List all PD servers in the cluster. +// @Produce json +// @Success 200 {object} pdpb.GetMembersResponse +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /members [get] +func (h *microServiceHandler) GetMembers(w http.ResponseWriter, r *http.Request) { + if service := mux.Vars(r)["service"]; len(service) > 0 { + resps, err := discovery.GetMembers(service, h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if resps == nil { + h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no members for %s", service)) + return + } + + var apis []string + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + var entry discovery.ServiceRegistryEntry + if err = entry.Deserialize(keyValue.Value); err != nil { + log.Info("deserialize failed", zap.String("key", string(keyValue.Key)), zap.Error(err)) + } + apis = append(apis, entry.ServiceAddr) + } + } + h.rd.JSON(w, http.StatusOK, apis) + return + } +} + +// @Tags leader +// @Summary Get the leader PD server of the cluster. +// @Produce json +// @Success 200 {object} pdpb.Member +// @Router /leader [get] +func (h *microServiceHandler) GetLeader(w http.ResponseWriter, r *http.Request) { + if service := mux.Vars(r)["service"]; len(service) > 0 { + leader, _, err := discovery.GetMCSPrimary(service, r.URL.Query().Get("keyspace_id"), h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if leader == nil { + h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no leader for %s", service)) + return + } + h.rd.JSON(w, http.StatusOK, leader) + return + } + h.rd.JSON(w, http.StatusOK, h.svr.GetLeader()) +} + +// @Tags member +// @Summary Delete a PD server from the cluster. +// @Produce json +// @Success 200 {string} string "removed, pd: {service}" +// @Failure 500 {string} string "PD server failed to proceed the request." +func (h *microServiceHandler) DeleteMember(w http.ResponseWriter, r *http.Request) { + client := h.svr.GetClient() + service := mux.Vars(r)["service"] + if ip := r.URL.Query().Get("ip"); len(service) > 0 { + err := discovery.DeleteMemberByName(service, ip, r.URL.Query().Get("keyspace_id"), client) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", service)) + return + } + + h.rd.JSON(w, http.StatusInternalServerError, "not support service") +} diff --git a/server/api/router.go b/server/api/router.go index d3c8f10cbf2d..9254f6d1afdd 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -295,6 +295,11 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(apiRouter, "/leader/resign", leaderHandler.ResignLeader, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/leader/transfer/{next_leader}", leaderHandler.TransferLeader, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + msHandler := newMicroServiceHandlerHandler(svr, rd) + registerFunc(apiRouter, "/ms/members/{service}", msHandler.GetMembers, setMethods(http.MethodGet), setAuditBackend(prometheus)) + registerFunc(apiRouter, "/ms/members/{service}", msHandler.DeleteMember, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) + registerFunc(apiRouter, "/ms/leader/{service}", msHandler.GetLeader, setMethods(http.MethodGet), setAuditBackend(prometheus)) + statsHandler := newStatsHandler(svr, rd) registerFunc(clusterRouter, "/stats/region", statsHandler.GetRegionStatus, setMethods(http.MethodGet), setAuditBackend(prometheus)) diff --git a/tests/integrations/mcs/members/member_test.go b/tests/integrations/mcs/members/member_test.go new file mode 100644 index 000000000000..a2eaf0a6d071 --- /dev/null +++ b/tests/integrations/mcs/members/member_test.go @@ -0,0 +1,140 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package members_test + +import ( + "context" + "github.com/pingcap/log" + "go.uber.org/zap" + testing2 "testing" + + "github.com/stretchr/testify/suite" + pdClient "github.com/tikv/pd/client/http" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/tempurl" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/tests" +) + +type memberTestSuite struct { + suite.Suite + ctx context.Context + cleanupFunc testutil.CleanupFunc + cluster *tests.TestCluster + server *tests.TestServer + backendEndpoints string + dialClient pdClient.Client +} + +func TestMemberTestSuite(t *testing2.T) { + suite.Run(t, new(memberTestSuite)) +} + +func (suite *memberTestSuite) SetupTest() { + ctx, cancel := context.WithCancel(context.Background()) + suite.ctx = ctx + cluster, err := tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster = cluster + suite.NoError(err) + suite.NoError(cluster.RunInitialServers()) + suite.NotEmpty(cluster.WaitLeader()) + suite.server = cluster.GetLeaderServer() + suite.NoError(suite.server.BootstrapCluster()) + suite.backendEndpoints = suite.server.GetAddr() + suite.dialClient = pdClient.NewClient([]string{suite.server.GetAddr()}) + suite.cleanupFunc = func() { + cancel() + } +} + +func (suite *memberTestSuite) TearDownTest() { + suite.cleanupFunc() + suite.cluster.Destroy() +} + +func (suite *memberTestSuite) TestLeaderCheck() { + // TSO + nodes := make(map[string]bs.Server) + for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ { + s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + defer cleanup() + nodes[s.GetAddr()] = s + } + tests.WaitForPrimaryServing(suite.Require(), nodes) + + re := suite.Require() + // Scheduling + nodes = make(map[string]bs.Server) + for i := 0; i < 3; i++ { + s, cleanup := tests.StartSingleSchedulingTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + defer cleanup() + nodes[s.GetAddr()] = s + } + tests.WaitForPrimaryServing(suite.Require(), nodes) + + leader, err := suite.dialClient.GetMSLeader(suite.ctx, "tso") + re.NotNil(leader) + re.NoError(err) + log.Info("check leader!!!!", zap.Any("leader", leader)) + leader, err = suite.dialClient.GetMSLeader(suite.ctx, "scheduling") + re.NotNil(leader) + re.NoError(err) + log.Info("check leader!!!!", zap.Any("leader", leader)) +} + +func (suite *memberTestSuite) TestMembersCheck() { + nodes := make(map[string]bs.Server) + for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ { + s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + defer cleanup() + nodes[s.GetAddr()] = s + } + tests.WaitForPrimaryServing(suite.Require(), nodes) + + // Scheduling + nodes = make(map[string]bs.Server) + for i := 0; i < 3; i++ { + s, cleanup := tests.StartSingleSchedulingTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + defer cleanup() + nodes[s.GetAddr()] = s + } + tests.WaitForPrimaryServing(suite.Require(), nodes) + + re := suite.Require() + + members, err := suite.dialClient.GetMSMembers(suite.ctx, "tso") + re.NoError(err) + for _, member := range members[:len(members)-1] { + // delete member + err = suite.dialClient.DeleteMSMember(suite.ctx, "tso", member) + re.NoError(err) + } + + members, err = suite.dialClient.GetMSMembers(suite.ctx, "scheduling") + for _, member := range members[:len(members)-1] { + // delete member + err = suite.dialClient.DeleteMSMember(suite.ctx, "scheduling", member) + re.NoError(err) + } + + // check members + members, err = suite.dialClient.GetMSMembers(suite.ctx, "tso") + re.Equal(1, len(members)) + re.NoError(err) + members, err = suite.dialClient.GetMSMembers(suite.ctx, "scheduling") + re.NoError(err) + re.Equal(1, len(members)) +}