Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Nov 17, 2023
1 parent dda748a commit 3f5ae0a
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 4 deletions.
2 changes: 2 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (
MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
// Members
MembersPrefix = "/pd/api/v1/members?service=resource_manager"
)

// RegionByID returns the path of PD HTTP API to get region by ID.
Expand Down
13 changes: 13 additions & 0 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type Client interface {
DeletePlacementRule(context.Context, string, string) error
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
AccelerateSchedule(context.Context, []byte, []byte) error

GetMembers(ctx context.Context) ([]string, error)
Close()
}

Expand Down Expand Up @@ -236,6 +238,17 @@ func (c *client) request(
return nil
}

func (c *client) GetMembers(ctx context.Context) ([]string, error) {
var members []string
err := c.requestWithRetry(ctx,
"GetMembers", MembersPrefix,
http.MethodGet, nil, &members)
if err != nil {
return nil, err
}
return members, nil
}

// GetRegionByID gets the region info by ID.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) {
var region RegionInfo
Expand Down
166 changes: 166 additions & 0 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,22 @@
package discovery

import (
"path"
"strconv"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// Discover is used to get all the service instances of the specified service name.
Expand All @@ -35,3 +49,155 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
}
return values, nil
}

func isValid(id uint32) bool {
return id >= utils.DefaultKeyspaceGroupID && id <= utils.MaxKeyspaceGroupCountInUse
}

func getMCSPrimaryPath(name, keyspaceGroupID string, client *clientv3.Client) (string, error) {
switch name {
case utils.TSOServiceName:
id := utils.DefaultKeyspaceGroupID
if len(keyspaceGroupID) > 0 {
keyspaceGroupID, err := strconv.ParseUint(keyspaceGroupID, 10, 64)
if err != nil || !isValid(uint32(keyspaceGroupID)) {
return "", errors.Errorf("invalid keyspace group id %s", keyspaceGroupID)

Check failure on line 64 in pkg/mcs/discovery/discover.go

View workflow job for this annotation

GitHub Actions / statics

printf: github.com/pingcap/errors.Errorf format %s has arg keyspaceGroupID of wrong type uint64 (govet)

Check failure on line 64 in pkg/mcs/discovery/discover.go

View workflow job for this annotation

GitHub Actions / chunks (9)

github.com/pingcap/errors.Errorf format %s has arg keyspaceGroupID of wrong type uint64
}
id = uint32(keyspaceGroupID)
}

clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
rootPath := endpoint.TSOSvcRootPath(clusterID)
primaryPath := endpoint.KeyspaceGroupPrimaryPath(rootPath, id)
return primaryPath, nil
case utils.SchedulingServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
return path.Join(endpoint.SchedulingSvcRootPath(clusterID), utils.PrimaryKey), nil
case utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return "", err
}
return path.Join(endpoint.ResourceManagerSvcRootPath(clusterID), utils.PrimaryKey), nil
default:
}
return "", errors.Errorf("unknown service name %s", name)
}

func GetMCSPrimary(name, keyspaceGroupID string, client *clientv3.Client) (*pdpb.Member, int64, error) {
primaryPath, err := getMCSPrimaryPath(name, keyspaceGroupID, client)
if err != nil {
return nil, 0, err
}

return election.GetLeader(client, primaryPath)
}

func GetMembers(name string, client *clientv3.Client) (*clientv3.TxnResponse, error) {
switch name {
case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
}
if !resps.Succeeded {
return nil, errs.ErrEtcdTxnConflict.FastGenByArgs()
}
if len(resps.Responses) == 0 {
return nil, errors.Errorf("no member found for service %s", name)
}
return resps, nil
}
return nil, errors.Errorf("unknown service name %s", name)
}

func DeleteMemberByName(service, ip, keyspaceGroupID string, client *clientv3.Client) error {
resps, err := GetMembers(service, client)
if err != nil {
return err
}

for _, resp := range resps.Responses {
for _, keyValue := range resp.GetResponseRange().GetKvs() {
var entry ServiceRegistryEntry
if err = entry.Deserialize(keyValue.Value); err != nil {
log.Error("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)), zap.String("ip", ip), zap.Error(err))
return err
}

if ip == entry.ServiceAddr {
// delete if it is leader
primaryPath, err := getMCSPrimaryPath(service, keyspaceGroupID, client)
if err != nil {
return err
}

primary := member.NewParticipantByService(service)
ok, _, err := etcdutil.GetProtoMsgWithModRev(client, primaryPath, primary)
if err != nil {
return err
}
if !ok {
return errors.Errorf("no primary found for service %s", service)
}

// The format of leader name is address-groupID.
contents := strings.Split(primary.GetName(), "-")
log.Info("DeleteMemberByName", zap.String("key", string(keyValue.Key)), zap.String("value", string(keyValue.Value)),
zap.String("ip", ip), zap.String("primaryPath", primaryPath), zap.String("primary", primary.GetName()))
if ip == contents[0] {
resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
if err != nil {
return err
}
}

// delete member
_, err = kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(string(keyValue.Key))).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}

return nil
}
}
}
return errors.Errorf("no ip %s found for service %s", ip, service)
}

func ResignLeader(service, keyspaceGroupID string, client *clientv3.Client) error {
primaryPath, err := getMCSPrimaryPath(service, keyspaceGroupID, client)
if err != nil {
return err
}

resp, err := kv.NewSlowLogTxn(client).Then(clientv3.OpDelete(primaryPath)).Commit()
if err != nil {
return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause()
}
if !resp.Succeeded {
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
if err != nil {
return err
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const (
MaxKeyspaceGroupCountInUse = uint32(4096)

// DefaultKeyspaceGroupReplicaCount is the default replica count of keyspace group.
DefaultKeyspaceGroupReplicaCount = 2
DefaultKeyspaceGroupReplicaCount = 3

// DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica.
// It's used in keyspace group primary weighted-election to balance primaries' distribution.
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
const (
// maxRetryTimes is the max retry times for initializing the cluster ID.
maxRetryTimes = 5
// clusterIDPath is the path to store cluster id
clusterIDPath = "/pd/cluster_id"
// ClusterIDPath is the path to store cluster id
ClusterIDPath = "/pd/cluster_id"
// retryInterval is the interval to retry.
retryInterval = time.Second
)
Expand All @@ -56,7 +56,7 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if clusterID, err := etcdutil.GetClusterID(client, clusterIDPath); err == nil && clusterID != 0 {
if clusterID, err := etcdutil.GetClusterID(client, ClusterIDPath); err == nil && clusterID != 0 {
return clusterID, nil
}
select {
Expand Down
2 changes: 2 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
if err := json.Unmarshal(kv.Value, group); err != nil {
return errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause()
}
log.Info("[putFn] check update keyspace group", zap.Any("group", group))
kgm.updateKeyspaceGroup(group)
if group.ID == mcsutils.DefaultKeyspaceGroupID {
defaultKGConfigured = true
Expand All @@ -562,6 +563,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error {
// Retry the groups that are not initialized successfully before.
for id, group := range kgm.groupUpdateRetryList {
delete(kgm.groupUpdateRetryList, id)
log.Info("[postEventFn] check update keyspace group", zap.Any("group", group))
kgm.updateKeyspaceGroup(group)
}
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error)
return 0, err
}
for i, item := range resp.Kvs {
log.Info("item.Key is ", zap.ByteString("key", item.Key))
if resp.More && i == len(resp.Kvs)-1 {
// The last key is the start key of the next batch.
// To avoid to get the same key in the next load, we need to skip the last key.
Expand Down
Loading

0 comments on commit 3f5ae0a

Please sign in to comment.