Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Nov 16, 2023
1 parent 7dbe607 commit d57a96a
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 7 deletions.
189 changes: 189 additions & 0 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,21 @@
package discovery

import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"path"
"strconv"
"strings"
)

// Discover is used to get all the service instances of the specified service name.
Expand All @@ -35,3 +48,179 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
}
return values, nil
}

func isValid(id uint32) bool {
return id >= utils.DefaultKeyspaceGroupID && id <= utils.MaxKeyspaceGroupCountInUse
}

func getMCSPrimaryPath(name, keyspaceGroupID string, client *clientv3.Client) (string, error) {
switch name {
case utils.TSOServiceName:
id := utils.DefaultKeyspaceGroupID
if len(keyspaceGroupID) > 0 {
keyspaceGroupID, err := strconv.ParseUint(keyspaceGroupID, 10, 64)
if err != nil || !isValid(uint32(keyspaceGroupID)) {
return "", errors.Errorf("invalid keyspace group id %s", keyspaceGroupID)

Check failure on line 63 in pkg/mcs/discovery/discover.go

View workflow job for this annotation

GitHub Actions / statics

printf: github.com/pingcap/errors.Errorf format %s has arg keyspaceGroupID of wrong type uint64 (govet)

Check failure on line 63 in pkg/mcs/discovery/discover.go

View workflow job for this annotation

GitHub Actions / chunks (9)

github.com/pingcap/errors.Errorf format %s has arg keyspaceGroupID of wrong type uint64
}
id = uint32(keyspaceGroupID)
}

clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
rootPath := endpoint.TSOSvcRootPath(clusterID)
primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, id)
return primaryPath, nil
case utils.SchedulingServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
return path.Join(endpoint.SchedulingSvcRootPath(clusterID), utils.PrimaryKey), nil
case utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
return path.Join(endpoint.ResourceManagerSvcRootPath(clusterID), utils.PrimaryKey), nil
default:
}
return "", errors.Errorf("unknown service name %s", name)
}

func GetMCSPrimary(name, keyspaceGroupID string, client *clientv3.Client) (*pdpb.Member, int64, error) {
primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client)
if err != nil {
return nil, 0, err
}

return election.GetLeader(client, primaryPath)
}

func GetMembers(name string, client *clientv3.Client) (*clientv3.TxnResponse, error) {
switch name {
case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
}
if !resps.Succeeded {
return nil, errs.ErrEtcdTxnConflict.FastGenByArgs()
}
if len(resps.Responses) == 0 {
return nil, errors.Errorf("no member found for service %s", name)
}
return resps, nil
}
return nil, errors.Errorf("unknown service name %s", name)
}

func DeleteMemberByName(service, ip, keyspaceGroupID string, client *clientv3.Client) error {
resps, err := GetMembers(service, client)
if err != nil {
return err
}

for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
var entry ServiceRegistryEntry
if err = entry.Deserialize(keyValue.Value); err != nil {
log.Error("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)), zap.String("ip", ip), zap.Error(err))
return err
}

if ip == entry.ServiceAddr {
// delete if it is leader
primaryPath, err := getMCSPrimaryPath(service, keyspaceGroupID, client)
if err != nil {
return err
}

primary := member.NewParticipantByService(service)
ok, _, err := etcdutil.GetProtoMsgWithModRev(client, primaryPath, primary)
if err != nil {
return err
}
if !ok {
return errors.Errorf("no primary found for service %s", service)
}

// The format of leader name is address-groupID.
contents := strings.Split(primary.GetName(), "-")
log.Info("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)),
zap.String("ip", ip), zap.String("primaryPath", primaryPath), zap.String("primary", primary.GetName()))
if ip == contents[0] {
err = deletePrimaryLeader(service, keyspaceGroupID, client)
if err != nil {
return err
}
}

// delete member
_, err = kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(string(keyValue.Key))).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}

return nil
}
}
}
return errors.Errorf("no ip %s found for service %s", ip, service)
}

func deletePrimaryLeader(name, keyspaceGroupID string, client *clientv3.Client) error {
log.Info("deletePrimaryLeader", zap.String("name", name), zap.String("keyspaceGroupID", keyspaceGroupID))
primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client)
if err != nil {
return err
}
resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
return nil
}

func TransferPrimaryLeader(name, newLeader, keyspaceGroupID string, client *clientv3.Client) error {
// delete old leader
primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client)
if err != nil {
return err
}
resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
// set new leader
resps, err := GetMembers(name, client)
if err != nil {
return err
}
for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
if newLeader == string(keyValue.Value) {
// set new leader
_, err := kv.NewSlowLogTxn(client).Then(clientv3.OpPut(primaryPath, string(keyValue.Value))).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
return nil
}
}
}

return errors.Errorf("no new leader found for service %s", name)
}
6 changes: 3 additions & 3 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
const (
// maxRetryTimes is the max retry times for initializing the cluster ID.
maxRetryTimes = 5
// clusterIDPath is the path to store cluster id
clusterIDPath = "/pd/cluster_id"
// ClusterIDPath is the path to store cluster id
ClusterIDPath = "/pd/cluster_id"
// retryInterval is the interval to retry.
retryInterval = time.Second
)
Expand All @@ -56,7 +56,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 {
if clusterID, err := etcdutil.GetClusterID(client, ClusterIDPath); err == nil && clusterID != 0 {
return clusterID, nil
}
select {
Expand Down
6 changes: 3 additions & 3 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo {
type ElectionMember interface {
// ID returns the unique ID in the election group. For example, it can be unique
// server id of a cluster or the unique keyspace group replica id of the election
// group comprised of the replicas of a keyspace group.
// group composed of the replicas of a keyspace group.
ID() uint64
// ID returns the unique name in the election group.
// Name returns the unique name in the election group.
Name() string
// MemberValue returns the member value.
MemberValue() string
// GetMember() returns the current member
// GetMember returns the current member
GetMember() interface{}
// Client returns the etcd client.
Client() *clientv3.Client
Expand Down
68 changes: 67 additions & 1 deletion server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand Down Expand Up @@ -52,6 +53,28 @@ func newMemberHandler(svr *server.Server, rd *render.Render) *memberHandler {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /members [get]
func (h *memberHandler) GetMembers(w http.ResponseWriter, r *http.Request) {
if service := r.URL.Query().Get("service"); len(service) > 0 {
log.Info("get members", zap.String("service", service))
resps, err := discovery.GetMembers(service, h.svr.GetClient())
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if resps == nil {
h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no members for %s", service))
return
}

var apis []string
for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
apis = append(apis, string(keyValue.Value))
}
}
h.rd.JSON(w, http.StatusOK, apis)
return
}

members, err := getMembers(h.svr)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
Expand Down Expand Up @@ -167,6 +190,24 @@ func (h *memberHandler) DeleteMemberByName(w http.ResponseWriter, r *http.Reques
h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", name))
}

func (h *memberHandler) DeleteMemberByMCS(w http.ResponseWriter, r *http.Request) {
client := h.svr.GetClient()

if service := r.URL.Query().Get("service"); len(service) > 0 {
if ip := r.URL.Query().Get("ip"); len(service) > 0 {
err := discovery.DeleteMemberByName(service, ip, r.URL.Query().Get("keyspace_id"), client)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
h.rd.JSON(w, http.StatusOK, fmt.Sprintf("removed, pd: %s", service))
return
}

h.rd.JSON(w, http.StatusInternalServerError, "not support service")
}

// @Tags member
// @Summary Remove a PD server from the cluster.
// @Param id path integer true "PD server Id"
Expand Down Expand Up @@ -277,6 +318,19 @@ func newLeaderHandler(svr *server.Server, rd *render.Render) *leaderHandler {
// @Success 200 {object} pdpb.Member
// @Router /leader [get]
func (h *leaderHandler) GetLeader(w http.ResponseWriter, r *http.Request) {
if service := r.URL.Query().Get("service"); len(service) > 0 {
leader, _, err := discovery.GetMCSPrimary(service, r.URL.Query().Get("keyspace_id"), h.svr.GetClient())
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if leader == nil {
h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("no leader for %s", service))
return
}
h.rd.JSON(w, http.StatusOK, leader)
return
}
h.rd.JSON(w, http.StatusOK, h.svr.GetLeader())
}

Expand Down Expand Up @@ -304,7 +358,19 @@ func (h *leaderHandler) ResignLeader(w http.ResponseWriter, r *http.Request) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /leader/transfer/{nextLeader} [post]
func (h *leaderHandler) TransferLeader(w http.ResponseWriter, r *http.Request) {
err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), mux.Vars(r)["next_leader"])
newLeader := mux.Vars(r)["next_leader"]
if service := r.URL.Query().Get("service"); len(service) > 0 {
log.Info("transfer leader", zap.String("service", service), zap.String("nextLeader", newLeader))
err := discovery.TransferPrimaryLeader(service, newLeader, r.URL.Query().Get("group_id"), h.svr.GetClient())
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, fmt.Sprintf("The transfer command is submitted. %s", newLeader))
return
}

err := h.svr.GetMember().ResignEtcdLeader(h.svr.Context(), h.svr.Name(), newLeader)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(apiRouter, "/members/name/{name}", memberHandler.DeleteMemberByName, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/members/id/{id}", memberHandler.DeleteMemberByID, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/members/name/{name}", memberHandler.SetMemberPropertyByName, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/members/mcs", memberHandler.DeleteMemberByMCS, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))

leaderHandler := newLeaderHandler(svr, rd)
registerFunc(apiRouter, "/leader", leaderHandler.GetLeader, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down

0 comments on commit d57a96a

Please sign in to comment.