diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 00e168114b06..9befe20760d2 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -15,8 +15,21 @@ package discovery import ( + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/election" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" + "path" + "strconv" + "strings" ) // Discover is used to get all the service instances of the specified service name. @@ -35,3 +48,179 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er } return values, nil } + +func isValid(id uint32) bool { + return id >= utils.DefaultKeyspaceGroupID && id <= utils.MaxKeyspaceGroupCountInUse +} + +func getMCSPrimaryPath(name, keyspaceGroupID string, client *clientv3.Client) (string, error) { + switch name { + case utils.TSOServiceName: + id := utils.DefaultKeyspaceGroupID + if len(keyspaceGroupID) > 0 { + keyspaceGroupID, err := strconv.ParseUint(keyspaceGroupID, 10, 64) + if err != nil || !isValid(uint32(keyspaceGroupID)) { + return "", errors.Errorf("invalid keyspace group id %s", keyspaceGroupID) + } + id = uint32(keyspaceGroupID) + } + + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return "", err + } + rootPath := endpoint.TSOSvcRootPath(clusterID) + primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, id) + return primaryPath, nil + case utils.SchedulingServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return "", err + } + return path.Join(endpoint.SchedulingSvcRootPath(clusterID), utils.PrimaryKey), nil + case utils.ResourceManagerServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return "", err + } + return path.Join(endpoint.ResourceManagerSvcRootPath(clusterID), utils.PrimaryKey), nil + default: + } + return "", errors.Errorf("unknown service name %s", name) +} + +func GetMCSPrimary(name, keyspaceGroupID string, client *clientv3.Client) (*pdpb.Member, int64, error) { + primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client) + if err != nil { + return nil, 0, err + } + + return election.GetLeader(client, primaryPath) +} + +func GetMembers(name string, client *clientv3.Client) (*clientv3.TxnResponse, error) { + switch name { + case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName: + clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) + if err != nil { + return nil, err + } + servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name) + resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit() + if err != nil { + return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause() + } + if !resps.Succeeded { + return nil, errs.ErrEtcdTxnConflict.FastGenByArgs() + } + if len(resps.Responses) == 0 { + return nil, errors.Errorf("no member found for service %s", name) + } + return resps, nil + } + return nil, errors.Errorf("unknown service name %s", name) +} + +func DeleteMemberByName(service, ip, keyspaceGroupID string, client *clientv3.Client) error { + resps, err := GetMembers(service, client) + if err != nil { + return err + } + + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + var entry ServiceRegistryEntry + if err = entry.Deserialize(keyValue.Value); err != nil { + log.Error("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)), zap.String("ip", ip), zap.Error(err)) + return err + } + + if ip == entry.ServiceAddr { + // delete if it is leader + primaryPath, err := getMCSPrimaryPath(service, keyspaceGroupID, client) + if err != nil { + return err + } + + primary := member.NewParticipantByService(service) + ok, _, err := etcdutil.GetProtoMsgWithModRev(client, primaryPath, primary) + if err != nil { + return err + } + if !ok { + return errors.Errorf("no primary found for service %s", service) + } + + // The format of leader name is address-groupID. + contents := strings.Split(primary.GetName(), "-") + log.Info("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)), + zap.String("ip", ip), zap.String("primaryPath", primaryPath), zap.String("primary", primary.GetName())) + if ip == contents[0] { + err = deletePrimaryLeader(service, keyspaceGroupID, client) + if err != nil { + return err + } + } + + // delete member + _, err = kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(string(keyValue.Key))).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + + return nil + } + } + } + return errors.Errorf("no ip %s found for service %s", ip, service) +} + +func deletePrimaryLeader(name, keyspaceGroupID string, client *clientv3.Client) error { + log.Info("deletePrimaryLeader", zap.String("name", name), zap.String("keyspaceGroupID", keyspaceGroupID)) + primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client) + if err != nil { + return err + } + resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + if !resp.Succeeded { + return errs.ErrEtcdTxnConflict.FastGenByArgs() + } + return nil +} + +func TransferPrimaryLeader(name, newLeader, keyspaceGroupID string, client *clientv3.Client) error { + // delete old leader + primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client) + if err != nil { + return err + } + resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + if !resp.Succeeded { + return errs.ErrEtcdTxnConflict.FastGenByArgs() + } + // set new leader + resps, err := GetMembers(name, client) + if err != nil { + return err + } + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + if newLeader == string(keyValue.Value) { + // set new leader + _, err := kv.NewSlowLogTxn(client).Then(clientv3.OpPut(primaryPath, string(keyValue.Value))).Commit() + if err != nil { + return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() + } + return nil + } + } + } + + return errors.Errorf("no new leader found for service %s", name) +} diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index 682e73f20ae3..a0708f9bf884 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -45,8 +45,8 @@ import ( const ( // maxRetryTimes is the max retry times for initializing the cluster ID. maxRetryTimes = 5 - // clusterIDPath is the path to store cluster id - clusterIDPath = "/pd/cluster_id" + // ClusterIDPath is the path to store cluster id + ClusterIDPath = "/pd/cluster_id" // retryInterval is the interval to retry. retryInterval = time.Second ) @@ -56,7 +56,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err ticker := time.NewTicker(retryInterval) defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { - if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 { + if clusterID, err := etcdutil.GetClusterID(client, ClusterIDPath); err == nil && clusterID != 0 { return clusterID, nil } select { diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index df0ca0affc97..7f21b8659079 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo { type ElectionMember interface { // ID returns the unique ID in the election group. For example, it can be unique // server id of a cluster or the unique keyspace group replica id of the election - // group comprised of the replicas of a keyspace group. + // group composed of the replicas of a keyspace group. ID() uint64 - // ID returns the unique name in the election group. + // Name returns the unique name in the election group. Name() string // MemberValue returns the member value. MemberValue() string - // GetMember() returns the current member + // GetMember returns the current member GetMember() interface{} // Client returns the etcd client. Client() *clientv3.Client diff --git a/server/api/member.go b/server/api/member.go index 3016b76088b6..cc4eb53d8b8e 100644 --- a/server/api/member.go +++ b/server/api/member.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -52,6 +53,28 @@ func newMemberHandler(svr *server.Server, rd *render.Render) *memberHandler { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /members [get] func (h *memberHandler) GetMembers(w http.ResponseWriter, r *http.Request) { + if service := r.URL.Query().Get("service"); len(service) > 0 { + log.Info("get members", zap.String("service", service)) + resps, err := discovery.GetMembers(service, h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if resps == nil { + h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no members for %s", service)) + return + } + + var apis []string + for _, resp := range resps.Responses { + for _, keyValue := range resp.GetResponseRange().GetKvs() { + apis = append(apis, string(keyValue.Value)) + } + } + h.rd.JSON(w, http.StatusOK, apis) + return + } + members, err := getMembers(h.svr) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) @@ -167,6 +190,24 @@ func (h *memberHandler) DeleteMemberByName(w http.ResponseWriter, r *http.Reques h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", name)) } +func (h *memberHandler) DeleteMemberByMCS(w http.ResponseWriter, r *http.Request) { + client := h.svr.GetClient() + + if service := r.URL.Query().Get("service"); len(service) > 0 { + if ip := r.URL.Query().Get("ip"); len(service) > 0 { + err := discovery.DeleteMemberByName(service, ip, r.URL.Query().Get("keyspace_id"), client) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + } + h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", service)) + return + } + + h.rd.JSON(w, http.StatusInternalServerError, "not support service") +} + // @Tags member // @Summary Remove a PD server from the cluster. // @Param id path integer true "PD server Id" @@ -277,6 +318,19 @@ func newLeaderHandler(svr *server.Server, rd *render.Render) *leaderHandler { // @Success 200 {object} pdpb.Member // @Router /leader [get] func (h *leaderHandler) GetLeader(w http.ResponseWriter, r *http.Request) { + if service := r.URL.Query().Get("service"); len(service) > 0 { + leader, _, err := discovery.GetMCSPrimary(service, r.URL.Query().Get("keyspace_id"), h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + if leader == nil { + h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no leader for %s", service)) + return + } + h.rd.JSON(w, http.StatusOK, leader) + return + } h.rd.JSON(w, http.StatusOK, h.svr.GetLeader()) } @@ -304,7 +358,19 @@ func (h *leaderHandler) ResignLeader(w http.ResponseWriter, r *http.Request) { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /leader/transfer/{nextLeader} [post] func (h *leaderHandler) TransferLeader(w http.ResponseWriter, r *http.Request) { - err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), mux.Vars(r)["next_leader"]) + newLeader := mux.Vars(r)["next_leader"] + if service := r.URL.Query().Get("service"); len(service) > 0 { + log.Info("transfer leader", zap.String("service", service), zap.String("nextLeader", newLeader)) + err := discovery.TransferPrimaryLeader(service, newLeader, r.URL.Query().Get("group_id"), h.svr.GetClient()) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, fmt.Sprintf("The transfer command is submitted. %s", newLeader)) + return + } + + err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), newLeader) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/api/router.go b/server/api/router.go index d3c8f10cbf2d..d63cd4b0ea4c 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -289,6 +289,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(apiRouter, "/members/name/{name}", memberHandler.DeleteMemberByName, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/members/id/{id}", memberHandler.DeleteMemberByID, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/members/name/{name}", memberHandler.SetMemberPropertyByName, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + registerFunc(apiRouter, "/members/mcs", memberHandler.DeleteMemberByMCS, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus)) leaderHandler := newLeaderHandler(svr, rd) registerFunc(apiRouter, "/leader", leaderHandler.GetLeader, setMethods(http.MethodGet), setAuditBackend(prometheus))