Skip to content

Commit

Permalink
api: support mcs api for members (#7372)
Browse files Browse the repository at this point in the history
ref #7519

Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp authored Dec 18, 2023
1 parent a4ab7d3 commit a16f99e
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 3 deletions.
7 changes: 7 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ const (
MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
// Micro Service
microServicePrefix = "/pd/api/v2/ms"
)

// RegionByID returns the path of PD HTTP API to get region by ID.
Expand Down Expand Up @@ -186,3 +188,8 @@ func PProfProfileAPIWithInterval(interval time.Duration) string {
func PProfGoroutineWithDebugLevel(level int) string {
return fmt.Sprintf("%s?debug=%d", PProfGoroutine, level)
}

// MicroServiceMembers returns the path of PD HTTP API to get the members of microservice.
func MicroServiceMembers(service string) string {
return fmt.Sprintf("%s/members/%s", microServicePrefix, service)
}
14 changes: 14 additions & 0 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Client interface {
AccelerateScheduleInBatch(context.Context, []*KeyRange) error
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]string, error)

/* Client-related methods */
// WithCallerID sets and returns a new client with the given caller ID.
Expand Down Expand Up @@ -844,3 +846,15 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}

// GetMicroServiceMembers gets the members of the microservice.
func (c *client) GetMicroServiceMembers(ctx context.Context, service string) ([]string, error) {
var members []string
err := c.requestWithRetry(ctx,
"GetMicroServiceMembers", MicroServiceMembers(service),
http.MethodGet, nil, &members)
if err != nil {
return nil, err
}
return members, nil
}
42 changes: 42 additions & 0 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,16 @@
package discovery

import (
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// Discover is used to get all the service instances of the specified service name.
Expand All @@ -35,3 +43,37 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
}
return values, nil
}

// GetMSMembers returns all the members of the specified service name.
func GetMSMembers(name string, client *clientv3.Client) ([]string, error) {
switch name {
case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
}
if !resps.Succeeded {
return nil, errs.ErrEtcdTxnConflict.FastGenByArgs()
}

var addrs []string
for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
var entry ServiceRegistryEntry
if err = entry.Deserialize(keyValue.Value); err != nil {
log.Error("try to deserialize service registry entry failed", zap.String("key", string(keyValue.Key)), zap.Error(err))
continue
}
addrs = append(addrs, entry.ServiceAddr)
}
}
return addrs, nil
}

return nil, errors.Errorf("unknown service name %s", name)
}
23 changes: 23 additions & 0 deletions pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewService(srv *tsoserver.Service) *Service {
}
s.RegisterAdminRouter()
s.RegisterKeyspaceGroupRouter()
s.RegisterHealth()
return s
}

Expand All @@ -118,6 +119,12 @@ func (s *Service) RegisterKeyspaceGroupRouter() {
router.GET("/members", GetKeyspaceGroupMembers)
}

// RegisterHealth registers the router of the health handler.
func (s *Service) RegisterHealth() {
router := s.root.Group("health")
router.GET("", GetHealth)
}

func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
var level string
Expand Down Expand Up @@ -201,6 +208,22 @@ func ResetTS(c *gin.Context) {
c.String(http.StatusOK, "Reset ts successfully.")
}

// GetHealth returns the health status of the TSO service.
func GetHealth(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service)
am, err := svr.GetKeyspaceGroupManager().GetAllocatorManager(utils.DefaultKeyspaceGroupID)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if am.GetMember().IsLeaderElected() {
c.IndentedJSON(http.StatusOK, "ok")
return
}

c.String(http.StatusInternalServerError, "no leader elected")
}

// KeyspaceGroupMember contains the keyspace group and its member information.
type KeyspaceGroupMember struct {
Group *endpoint.KeyspaceGroup
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
const (
// maxRetryTimes is the max retry times for initializing the cluster ID.
maxRetryTimes = 5
// clusterIDPath is the path to store cluster id
clusterIDPath = "/pd/cluster_id"
// ClusterIDPath is the path to store cluster id
ClusterIDPath = "/pd/cluster_id"
// retryInterval is the interval to retry.
retryInterval = time.Second
)
Expand All @@ -56,7 +56,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 {
if clusterID, err := etcdutil.GetClusterID(client, ClusterIDPath); err == nil && clusterID != 0 {
return clusterID, nil
}
select {
Expand Down
57 changes: 57 additions & 0 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handlers

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/apiv2/middlewares"
)

// RegisterMicroService registers microservice handler to the router.
func RegisterMicroService(r *gin.RouterGroup) {
router := r.Group("ms")
router.Use(middlewares.BootstrapChecker())
router.GET("members/:service", GetMembers)
}

// GetMembers gets all members of the cluster for the specified service.
// @Tags members
// @Summary Get all members of the cluster for the specified service.
// @Produce json
// @Success 200 {object} []string
// @Router /ms/members/{service} [get]
func GetMembers(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsAPIServiceMode() {
c.AbortWithStatusJSON(http.StatusServiceUnavailable, "not support micro service")
return
}

if service := c.Param("service"); len(service) > 0 {
addrs, err := discovery.GetMSMembers(service, svr.GetClient())
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, addrs)
return
}

c.AbortWithStatusJSON(http.StatusInternalServerError, "please specify service")
}
1 change: 1 addition & 0 deletions server/apiv2/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,6 @@ func NewV2Handler(_ context.Context, svr *server.Server) (http.Handler, apiutil.
root := router.Group(apiV2Prefix)
handlers.RegisterKeyspace(root)
handlers.RegisterTSOKeyspaceGroup(root)
handlers.RegisterMicroService(root)
return router, group, nil
}
100 changes: 100 additions & 0 deletions tests/integrations/mcs/members/member_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package members_test

import (
"context"
"testing"

"github.com/stretchr/testify/suite"
pdClient "github.com/tikv/pd/client/http"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
)

type memberTestSuite struct {
suite.Suite
ctx context.Context
cleanupFunc []testutil.CleanupFunc
cluster *tests.TestCluster
server *tests.TestServer
backendEndpoints string
dialClient pdClient.Client
}

func TestMemberTestSuite(t *testing.T) {
suite.Run(t, new(memberTestSuite))
}

func (suite *memberTestSuite) SetupTest() {
ctx, cancel := context.WithCancel(context.Background())
suite.ctx = ctx
cluster, err := tests.NewTestAPICluster(suite.ctx, 1)
suite.cluster = cluster
suite.NoError(err)
suite.NoError(cluster.RunInitialServers())
suite.NotEmpty(cluster.WaitLeader())
suite.server = cluster.GetLeaderServer()
suite.NoError(suite.server.BootstrapCluster())
suite.backendEndpoints = suite.server.GetAddr()
suite.dialClient = pdClient.NewClient([]string{suite.server.GetAddr()})

// TSO
nodes := make(map[string]bs.Server)
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ {
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
nodes[s.GetAddr()] = s
suite.cleanupFunc = append(suite.cleanupFunc, func() {
cleanup()
})
}
tests.WaitForPrimaryServing(suite.Require(), nodes)

// Scheduling
nodes = make(map[string]bs.Server)
for i := 0; i < 3; i++ {
s, cleanup := tests.StartSingleSchedulingTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
nodes[s.GetAddr()] = s
suite.cleanupFunc = append(suite.cleanupFunc, func() {
cleanup()
})
}
tests.WaitForPrimaryServing(suite.Require(), nodes)

suite.cleanupFunc = append(suite.cleanupFunc, func() {
cancel()
})
}

func (suite *memberTestSuite) TearDownTest() {
for _, cleanup := range suite.cleanupFunc {
cleanup()
}
suite.cluster.Destroy()
}

func (suite *memberTestSuite) TestMembers() {
re := suite.Require()
members, err := suite.dialClient.GetMicroServiceMembers(suite.ctx, "tso")
re.NoError(err)
re.Len(members, utils.DefaultKeyspaceGroupReplicaCount)

members, err = suite.dialClient.GetMicroServiceMembers(suite.ctx, "scheduling")
re.NoError(err)
re.Len(members, 3)
}

0 comments on commit a16f99e

Please sign in to comment.