diff --git a/client/http/api.go b/client/http/api.go index 2fae562dd207..e417db171151 100644 --- a/client/http/api.go +++ b/client/http/api.go @@ -64,6 +64,8 @@ const ( MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts" Status = "/pd/api/v1/status" Version = "/pd/api/v1/version" + // Members + MembersPrefix = "/pd/api/v1/members?service=resource_manager" ) // RegionByID returns the path of PD HTTP API to get region by ID. diff --git a/client/http/client.go b/client/http/client.go index 6fa2dd8cdfd8..3de13552b878 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -56,6 +56,8 @@ type Client interface { DeletePlacementRule(context.Context, string, string) error GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error) AccelerateSchedule(context.Context, []byte, []byte) error + + GetMembers(ctx context.Context) ([]string, error) Close() } @@ -236,6 +238,17 @@ func (c *client) request( return nil } +func (c *client) GetMembers(ctx context.Context) ([]string, error) { + var members []string + err := c.requestWithRetry(ctx, + "GetMembers", MembersPrefix, + http.MethodGet, nil, &members) + if err != nil { + return nil, err + } + return members, nil +} + // GetRegionByID gets the region info by ID. func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { var region RegionInfo diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 00e168114b06..b7f3d3a274cc 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -15,8 +15,22 @@ package discovery import ( + "path" + "strconv" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/election" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" ) // Discover is used to get all the service instances of the specified service name. @@ -35,3 +49,155 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er } return values, nil } + +func isValid(id uint32) bool { + return id >= utils.DefaultKeyspaceGroupID && id <= utils.MaxKeyspaceGroupCountInUse +} + +func getMCSPrimaryPath(name, keyspaceGroupID string, client *clientv3.Client) (string, error) { + switch name { + case utils.TSOServiceName: + id := utils.DefaultKeyspaceGroupID + if len(keyspaceGroupID) > 0 { + keyspaceGroupID, err := strconv.ParseUint(keyspaceGroupID, 10, 64) + if err != nil || !isValid(uint32(keyspaceGroupID)) { + return "", errors.Errorf("invalid keyspace group id %s", keyspaceGroupID) + } + id = uint32(keyspaceGroupID) + } + + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return "", err + } + rootPath := endpoint.TSOSvcRootPath(clusterID) + primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, id) + return primaryPath, nil + case utils.SchedulingServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return "", err + } + return path.Join(endpoint.SchedulingSvcRootPath(clusterID), utils.PrimaryKey), nil + case utils.ResourceManagerServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return "", err + } + return path.Join(endpoint.ResourceManagerSvcRootPath(clusterID), utils.PrimaryKey), nil + default: + } + return "", errors.Errorf("unknown service name %s", name) +} + +func GetMCSPrimary(name, keyspaceGroupID string, client *clientv3.Client) (*pdpb.Member, int64, error) { + primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client) + if err != nil { + return nil, 0, err + } + + return election.GetLeader(client, primaryPath) +} + +func GetMembers(name string, client *clientv3.Client) (*clientv3.TxnResponse, error) { + switch name { + case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return nil, err + } + servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name) + resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit() + if err != nil { + return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause() + } + if !resps.Succeeded { + return nil, errs.ErrEtcdTxnConflict.FastGenByArgs() + } + if len(resps.Responses) == 0 { + return nil, errors.Errorf("no member found for service %s", name) + } + return resps, nil + } + return nil, errors.Errorf("unknown service name %s", name) +} + +func DeleteMemberByName(service, ip, keyspaceGroupID string, client *clientv3.Client) error { + resps, err := GetMembers(service, client) + if err != nil { + return err + } + + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + var entry ServiceRegistryEntry + if err = entry.Deserialize(keyValue.Value); err != nil { + log.Error("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)), zap.String("ip", ip), zap.Error(err)) + return err + } + + if ip == entry.ServiceAddr { + // delete if it is leader + primaryPath, err := getMCSPrimaryPath(service, keyspaceGroupID, client) + if err != nil { + return err + } + + primary := member.NewParticipantByService(service) + ok, _, err := etcdutil.GetProtoMsgWithModRev(client, primaryPath, primary) + if err != nil { + return err + } + if !ok { + return errors.Errorf("no primary found for service %s", service) + } + + // The format of leader name is address-groupID. + contents := strings.Split(primary.GetName(), "-") + log.Info("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)), + zap.String("ip", ip), zap.String("primaryPath", primaryPath), zap.String("primary", primary.GetName())) + if ip == contents[0] { + resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + if !resp.Succeeded { + return errs.ErrEtcdTxnConflict.FastGenByArgs() + } + if err != nil { + return err + } + } + + // delete member + _, err = kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(string(keyValue.Key))).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + + return nil + } + } + } + return errors.Errorf("no ip %s found for service %s", ip, service) +} + +func ResignLeader(service, keyspaceGroupID string, client *clientv3.Client) error { + primaryPath, err := getMCSPrimaryPath(service, keyspaceGroupID, client) + if err != nil { + return err + } + + resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + if !resp.Succeeded { + return errs.ErrEtcdTxnConflict.FastGenByArgs() + } + if err != nil { + return err + } + + return nil +} diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index 6174852d89f0..58f78f725b09 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -83,7 +83,7 @@ const ( MaxKeyspaceGroupCountInUse = uint32(4096) // DefaultKeyspaceGroupReplicaCount is the default replica count of keyspace group. - DefaultKeyspaceGroupReplicaCount = 2 + DefaultKeyspaceGroupReplicaCount = 3 // DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica. // It's used in keyspace group primary weighted-election to balance primaries' distribution. diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index 682e73f20ae3..a0708f9bf884 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -45,8 +45,8 @@ import ( const ( // maxRetryTimes is the max retry times for initializing the cluster ID. maxRetryTimes = 5 - // clusterIDPath is the path to store cluster id - clusterIDPath = "/pd/cluster_id" + // ClusterIDPath is the path to store cluster id + ClusterIDPath = "/pd/cluster_id" // retryInterval is the interval to retry. retryInterval = time.Second ) @@ -56,7 +56,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err ticker := time.NewTicker(retryInterval) defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { - if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 { + if clusterID, err := etcdutil.GetClusterID(client, ClusterIDPath); err == nil && clusterID != 0 { return clusterID, nil } select { diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index badcb18d5d8d..3274ac819727 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -544,6 +544,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { if err := json.Unmarshal(kv.Value, group); err != nil { return errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause() } + log.Info("[putFn] check update keyspace group", zap.Any("group", group)) kgm.updateKeyspaceGroup(group) if group.ID == mcsutils.DefaultKeyspaceGroupID { defaultKGConfigured = true @@ -562,6 +563,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { // Retry the groups that are not initialized successfully before. for id, group := range kgm.groupUpdateRetryList { delete(kgm.groupUpdateRetryList, id) + log.Info("[postEventFn] check update keyspace group", zap.Any("group", group)) kgm.updateKeyspaceGroup(group) } return nil diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 03c2374efc63..48103ced3206 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -865,6 +865,7 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) return 0, err } for i, item := range resp.Kvs { + log.Info("item.Key is ", zap.ByteString("key", item.Key)) if resp.More && i == len(resp.Kvs)-1 { // The last key is the start key of the next batch. // To avoid to get the same key in the next load, we need to skip the last key. diff --git a/server/api/member.go b/server/api/member.go index 3016b76088b6..b436abb3ae5e 100644 --- a/server/api/member.go +++ b/server/api/member.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -52,6 +53,32 @@ func newMemberHandler(svr *server.Server, rd *render.Render) *memberHandler { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /members [get] func (h *memberHandler) GetMembers(w http.ResponseWriter, r *http.Request) { + if service := r.URL.Query().Get("service"); len(service) > 0 { + log.Info("get members", zap.String("service", service)) + resps, err := discovery.GetMembers(service, h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if resps == nil { + h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no members for %s", service)) + return + } + + var apis []string + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + var entry discovery.ServiceRegistryEntry + if err = entry.Deserialize(keyValue.Value); err != nil { + log.Info("deserialize failed", zap.String("key", string(keyValue.Key)), zap.Error(err)) + } + apis = append(apis, entry.ServiceAddr) + } + } + h.rd.JSON(w, http.StatusOK, apis) + return + } + members, err := getMembers(h.svr) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) @@ -167,6 +194,55 @@ func (h *memberHandler) DeleteMemberByName(w http.ResponseWriter, r *http.Reques h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", name)) } +func (h *memberHandler) DeleteMemberByMCS(w http.ResponseWriter, r *http.Request) { + client := h.svr.GetClient() + if service := r.URL.Query().Get("service"); len(service) > 0 { + if ip := r.URL.Query().Get("ip"); len(service) > 0 { + err := discovery.DeleteMemberByName(service, ip, r.URL.Query().Get("keyspace_id"), client) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", service)) + return + } + h.rd.JSON(w, http.StatusBadRequest, "not support ip") + return + } + + h.rd.JSON(w, http.StatusInternalServerError, "not support service") +} + +func (h *memberHandler) GetMembersByMCS(w http.ResponseWriter, r *http.Request) { + if service := r.URL.Query().Get("service"); len(service) > 0 { + resps, err := discovery.GetMembers(service, h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if resps == nil { + h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no members for %s", service)) + return + } + + var apis []string + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + var entry discovery.ServiceRegistryEntry + if err = entry.Deserialize(keyValue.Value); err != nil { + log.Info("deserialize failed", zap.String("key", string(keyValue.Key)), zap.Error(err)) + } + apis = append(apis, entry.ServiceAddr) + } + } + h.rd.JSON(w, http.StatusOK, apis) + return + } + + h.rd.JSON(w, http.StatusInternalServerError, "not get service") + return +} + // @Tags member // @Summary Remove a PD server from the cluster. // @Param id path integer true "PD server Id" @@ -277,6 +353,19 @@ func newLeaderHandler(svr *server.Server, rd *render.Render) *leaderHandler { // @Success 200 {object} pdpb.Member // @Router /leader [get] func (h *leaderHandler) GetLeader(w http.ResponseWriter, r *http.Request) { + if service := r.URL.Query().Get("service"); len(service) > 0 { + leader, _, err := discovery.GetMCSPrimary(service, r.URL.Query().Get("keyspace_id"), h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if leader == nil { + h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no leader for %s", service)) + return + } + h.rd.JSON(w, http.StatusOK, leader) + return + } h.rd.JSON(w, http.StatusOK, h.svr.GetLeader()) } @@ -287,6 +376,17 @@ func (h *leaderHandler) GetLeader(w http.ResponseWriter, r *http.Request) { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /leader/resign [post] func (h *leaderHandler) ResignLeader(w http.ResponseWriter, r *http.Request) { + if service := r.URL.Query().Get("service"); len(service) > 0 { + log.Info("ResignLeader", zap.String("service", service)) + err := discovery.ResignLeader(service, r.URL.Query().Get("group_id"), h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, "The resign command is submitted.") + return + } + err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), "") if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) diff --git a/server/api/router.go b/server/api/router.go index d3c8f10cbf2d..a2deb55623fc 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -289,6 +289,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(apiRouter, "/members/name/{name}", memberHandler.DeleteMemberByName, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/members/id/{id}", memberHandler.DeleteMemberByID, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/members/name/{name}", memberHandler.SetMemberPropertyByName, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + registerFunc(apiRouter, "/members/mcs", memberHandler.GetMembersByMCS, setMethods(http.MethodGet), setAuditBackend(localLog, prometheus)) + registerFunc(apiRouter, "/members/mcs", memberHandler.DeleteMemberByMCS, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) leaderHandler := newLeaderHandler(svr, rd) registerFunc(apiRouter, "/leader", leaderHandler.GetLeader, setMethods(http.MethodGet), setAuditBackend(prometheus)) diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index d2c88d01f098..a9c0f3e5fc4c 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -16,6 +16,7 @@ package client_test import ( "context" + "fmt" "math" "testing" @@ -128,3 +129,11 @@ func (suite *httpClientTestSuite) TestAccelerateSchedule() { suspectRegions = suite.cluster.GetLeaderServer().GetRaftCluster().GetSuspectRegions() re.Len(suspectRegions, 1) } + +func (suite *httpClientTestSuite) TestGetMembers() { + re := suite.Require() + members, err := suite.client.GetMembers(suite.ctx) + re.NoError(err) + //re.Len(members, 1) + fmt.Printf("%+v\n", members) +}