Skip to content

Commit

Permalink
support ms api
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Dec 11, 2023
1 parent 00674d0 commit 2ef354f
Show file tree
Hide file tree
Showing 9 changed files with 473 additions and 4 deletions.
18 changes: 18 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ const (
MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"

// Micro Service
MicroServicePrefix = "/pd/api/v1/ms"
)

// RegionByID returns the path of PD HTTP API to get region by ID.
Expand Down Expand Up @@ -173,3 +176,18 @@ func PProfProfileAPIWithInterval(interval time.Duration) string {
func PProfGoroutineWithDebugLevel(level int) string {
return fmt.Sprintf("%s?debug=%d", PProfGoroutine, level)
}

// MSLeader returns the path of PD HTTP API to get the leader of microservice.
func MSLeader(service string) string {
return fmt.Sprintf("%s/leader/%s", MicroServicePrefix, service)
}

// MSMembers returns the path of PD HTTP API to get the members of microservice.
func MSMembers(service string) string {
return fmt.Sprintf("%s/members/%s", MicroServicePrefix, service)
}

// MSMembersByIP returns the path of PD HTTP API to get the members of microservice by ip.
func MSMembersByIP(service, ip string) string {
return fmt.Sprintf("%s/members/%s?ip=%s", MicroServicePrefix, service, ip)
}
37 changes: 37 additions & 0 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -79,6 +80,11 @@ type Client interface {
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

/* Micro Service */
GetMSMembers(context.Context, string) ([]string, error)
GetMSLeader(context.Context, string) (*pdpb.Member, error)
DeleteMSMember(context.Context, string, string) error

/* Client-related methods */
// WithCallerID sets and returns a new client with the given caller ID.
WithCallerID(string) Client
Expand Down Expand Up @@ -729,3 +735,34 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}

// GetMSMembers gets the members of the microservice.
func (c *client) GetMSMembers(ctx context.Context, service string) ([]string, error) {
var members []string
err := c.requestWithRetry(ctx,
"GetMSMembers", MSMembers(service),
http.MethodGet, http.NoBody, &members)
if err != nil {
return nil, err
}
return members, nil
}

// GetMSLeader gets the leader of the microservice.
func (c *client) GetMSLeader(ctx context.Context, service string) (*pdpb.Member, error) {
var leader *pdpb.Member
err := c.requestWithRetry(ctx,
"GetMSLeader", MSLeader(service),
http.MethodGet, http.NoBody, &leader)
if err != nil {
return nil, err
}
return leader, nil
}

// DeleteMSMember deletes the member of the microservice.
func (c *client) DeleteMSMember(ctx context.Context, service, ip string) error {
return c.requestWithRetry(ctx,
"DeleteMSMember", MSMembersByIP(service, ip),
http.MethodDelete, http.NoBody, nil)
}
146 changes: 146 additions & 0 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,22 @@
package discovery

import (
"path"
"strconv"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// Discover is used to get all the service instances of the specified service name.
Expand All @@ -35,3 +49,135 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
}
return values, nil
}

func isValid(id uint32) bool {
return id >= utils.DefaultKeyspaceGroupID && id <= utils.MaxKeyspaceGroupCountInUse
}

func getMCSPrimaryPath(name, keyspaceGroupID string, client *clientv3.Client) (string, error) {
switch name {
case utils.TSOServiceName:
id := utils.DefaultKeyspaceGroupID
if len(keyspaceGroupID) > 0 {
keyspaceGroupID, err := strconv.ParseUint(keyspaceGroupID, 10, 64)
if err != nil || !isValid(uint32(keyspaceGroupID)) {
return "", errors.Errorf("invalid keyspace group id %d", keyspaceGroupID)
}
id = uint32(keyspaceGroupID)
}

clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
rootPath := endpoint.TSOSvcRootPath(clusterID)
primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, id)
return primaryPath, nil
case utils.SchedulingServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
return path.Join(endpoint.SchedulingSvcRootPath(clusterID), utils.PrimaryKey), nil
case utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
return path.Join(endpoint.ResourceManagerSvcRootPath(clusterID), utils.PrimaryKey), nil
default:
}
return "", errors.Errorf("unknown service name %s", name)
}

func GetMCSPrimary(name, keyspaceGroupID string, client *clientv3.Client) (*pdpb.Member, int64, error) {
primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client)
if err != nil {
return nil, 0, err
}

return election.GetLeader(client, primaryPath)
}

func GetMembers(name string, client *clientv3.Client) (*clientv3.TxnResponse, error) {
switch name {
case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
}
if !resps.Succeeded {
return nil, errs.ErrEtcdTxnConflict.FastGenByArgs()
}
if len(resps.Responses) == 0 {
return nil, errors.Errorf("no member found for service %s", name)
}
return resps, nil
}
return nil, errors.Errorf("unknown service name %s", name)
}

func DeleteMemberByName(service, ip, keyspaceGroupID string, client *clientv3.Client) error {
resps, err := GetMembers(service, client)
if err != nil {
return err
}

for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
var entry ServiceRegistryEntry
if err = entry.Deserialize(keyValue.Value); err != nil {
log.Error("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)), zap.String("ip", ip), zap.Error(err))
return err
}

if ip == entry.ServiceAddr {
// delete if it is leader
primaryPath, err := getMCSPrimaryPath(service, keyspaceGroupID, client)
if err != nil {
return err
}

primary := member.NewParticipantByService(service)
ok, _, err := etcdutil.GetProtoMsgWithModRev(client, primaryPath, primary)
if err != nil {
return err
}
if !ok {
return errors.Errorf("no primary found for service %s", service)
}

// The format of leader name is address-groupID.
contents := strings.Split(primary.GetName(), "-")
log.Info("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)),
zap.String("ip", ip), zap.String("primaryPath", primaryPath), zap.String("primary", primary.GetName()))
if ip == contents[0] {
resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
if err != nil {
return err
}
}

// delete member
_, err = kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(string(keyValue.Key))).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}

return nil
}
}
}
return errors.Errorf("no ip %s found for service %s", ip, service)
}
22 changes: 22 additions & 0 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewService(srv *tsoserver.Service) *Service {
}
s.RegisterAdminRouter()
s.RegisterKeyspaceGroupRouter()
s.RegisterHealth()
return s
}

Expand All @@ -118,6 +119,12 @@ func (s *Service) RegisterKeyspaceGroupRouter() {
router.GET("/members", GetKeyspaceGroupMembers)
}

// RegisterHealth registers the router of the health handler.
func (s *Service) RegisterHealth() {
router := s.root.Group("health")
router.GET("", GetHealth)
}

func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
var level string
Expand Down Expand Up @@ -201,6 +208,21 @@ func ResetTS(c *gin.Context) {
c.String(http.StatusOK, "Reset ts successfully.")
}

func GetHealth(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
am, err := svr.GetKeyspaceGroupManager().GetAllocatorManager(utils.DefaultKeyspaceGroupID)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if am.GetMember().IsLeaderElected() {
c.IndentedJSON(http.StatusOK, "ok")
return
}

c.String(http.StatusInternalServerError, "no leader elected")
}

// KeyspaceGroupMember contains the keyspace group and its member information.
type KeyspaceGroupMember struct {
Group *endpoint.KeyspaceGroup
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
const (
// maxRetryTimes is the max retry times for initializing the cluster ID.
maxRetryTimes = 5
// clusterIDPath is the path to store cluster id
clusterIDPath = "/pd/cluster_id"
// ClusterIDPath is the path to store cluster id
ClusterIDPath = "/pd/cluster_id"
// retryInterval is the interval to retry.
retryInterval = time.Second
)
Expand All @@ -56,7 +56,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 {
if clusterID, err := etcdutil.GetClusterID(client, ClusterIDPath); err == nil && clusterID != 0 {
return clusterID, nil
}
select {
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
}

func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) {
if !h.s.IsAPIServiceMode() {
if !h.s.IsServiceIndependent(mcsutils.SchedulingServiceName) {
return false, ""
}
if len(h.microserviceRedirectRules) == 0 {
Expand Down
Loading

0 comments on commit 2ef354f

Please sign in to comment.