diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 81804e38c2a..fa26b4cd0cb 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -1151,8 +1151,10 @@ func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) { return "", ErrKeyspaceGroupNotExists(id) } - rootPath := keypath.TSOSvcRootPath() - primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, id) + primaryPath := keypath.LeaderPath(&keypath.MsParam{ + ServiceName: constant.TSOServiceName, + GroupID: id, + }) leader := &tsopb.Participant{} ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader) if err != nil { diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index f615a78fe05..31c8d307adb 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -254,7 +254,7 @@ func (s *Server) primaryElectionLoop() { } // To make sure the expected primary(if existed) and new primary are on the same server. - expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath()) + expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), &s.participant.MsParam) // skip campaign the primary if the expected primary is not empty and not equal to the current memberValue. // expected primary ONLY SET BY `{service}/primary/transfer` API. if len(expectedPrimary) > 0 && !strings.Contains(s.participant.MemberValue(), expectedPrimary) { @@ -313,7 +313,9 @@ func (s *Server) campaignLeader() { // check expected primary and watch the primary. exitPrimary := make(chan struct{}) lease, err := utils.KeepExpectedPrimaryAlive(ctx, s.GetClient(), exitPrimary, - s.cfg.LeaderLease, s.participant.GetLeaderPath(), s.participant.MemberValue(), constant.SchedulingServiceName) + s.cfg.LeaderLease, &keypath.MsParam{ + ServiceName: constant.SchedulingServiceName, + }, s.participant.MemberValue()) if err != nil { log.Error("prepare scheduling primary watch error", errs.ZapError(err)) return @@ -460,7 +462,7 @@ func (s *Server) startServer() (err error) { Id: uniqueID, // id is unique among all participants ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()}, } - s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), "primary election") + s.participant.InitInfo(p, "primary election") s.service = &Service{Server: s} s.AddServiceReadyCallback(s.startCluster) diff --git a/pkg/mcs/utils/expected_primary.go b/pkg/mcs/utils/expected_primary.go index 448344cf08d..c39345b004e 100644 --- a/pkg/mcs/utils/expected_primary.go +++ b/pkg/mcs/utils/expected_primary.go @@ -33,20 +33,9 @@ import ( "go.uber.org/zap" ) -// expectedPrimaryFlag is the flag to indicate the expected primary. -// 1. When the primary was campaigned successfully, it will set the `expected_primary` flag. -// 2. Using `{service}/primary/transfer` API will revoke the previous lease and set a new `expected_primary` flag. -// This flag used to help new primary to campaign successfully while other secondaries can skip the campaign. -const expectedPrimaryFlag = "expected_primary" - -// expectedPrimaryPath formats the primary path with the expected primary flag. -func expectedPrimaryPath(primaryPath string) string { - return fmt.Sprintf("%s/%s", primaryPath, expectedPrimaryFlag) -} - // GetExpectedPrimaryFlag gets the expected primary flag. -func GetExpectedPrimaryFlag(client *clientv3.Client, primaryPath string) string { - path := expectedPrimaryPath(primaryPath) +func GetExpectedPrimaryFlag(client *clientv3.Client, msParam *keypath.MsParam) string { + path := keypath.ExpectedPrimaryPath(msParam) primary, err := etcdutil.GetValue(client, path) if err != nil { log.Error("get expected primary flag error", errs.ZapError(err), zap.String("primary-path", path)) @@ -57,12 +46,12 @@ func GetExpectedPrimaryFlag(client *clientv3.Client, primaryPath string) string } // markExpectedPrimaryFlag marks the expected primary flag when the primary is specified. -func markExpectedPrimaryFlag(client *clientv3.Client, primaryPath string, leaderRaw string, leaseID clientv3.LeaseID) (int64, error) { - path := expectedPrimaryPath(primaryPath) +func markExpectedPrimaryFlag(client *clientv3.Client, msParam *keypath.MsParam, leaderRaw string, leaseID clientv3.LeaseID) (int64, error) { + path := keypath.ExpectedPrimaryPath(msParam) log.Info("set expected primary flag", zap.String("primary-path", path), zap.String("leader-raw", leaderRaw)) // write a flag to indicate the expected primary. resp, err := kv.NewSlowLogTxn(client). - Then(clientv3.OpPut(expectedPrimaryPath(primaryPath), leaderRaw, clientv3.WithLease(leaseID))). + Then(clientv3.OpPut(path, leaderRaw, clientv3.WithLease(leaseID))). Commit() if err != nil || !resp.Succeeded { log.Error("mark expected primary error", errs.ZapError(err), zap.String("primary-path", path)) @@ -77,23 +66,29 @@ func markExpectedPrimaryFlag(client *clientv3.Client, primaryPath string, leader // - changed by `{service}/primary/transfer` API. // - leader lease expired. // ONLY primary called this function. -func KeepExpectedPrimaryAlive(ctx context.Context, cli *clientv3.Client, exitPrimary chan<- struct{}, - leaseTimeout int64, leaderPath, memberValue, service string) (*election.Lease, error) { - log.Info("primary start to watch the expected primary", zap.String("service", service), zap.String("primary-value", memberValue)) - service = fmt.Sprintf("%s expected primary", service) +func KeepExpectedPrimaryAlive( + ctx context.Context, + cli *clientv3.Client, + exitPrimary chan<- struct{}, + leaseTimeout int64, + msParam *keypath.MsParam, + memberValue string) (*election.Lease, error) { + log.Info("primary start to watch the expected primary", + zap.String("service", msParam.ServiceName), zap.String("primary-value", memberValue)) + service := fmt.Sprintf("%s expected primary", msParam.ServiceName) lease := election.NewLease(cli, service) if err := lease.Grant(leaseTimeout); err != nil { return nil, err } - revision, err := markExpectedPrimaryFlag(cli, leaderPath, memberValue, lease.ID.Load().(clientv3.LeaseID)) + revision, err := markExpectedPrimaryFlag(cli, msParam, memberValue, lease.ID.Load().(clientv3.LeaseID)) if err != nil { log.Error("mark expected primary error", errs.ZapError(err)) return nil, err } // Keep alive the current expected primary leadership to indicate that the server is still alive. // Watch the expected primary path to check whether the expected primary has changed by `{service}/primary/transfer` API. - expectedPrimary := election.NewLeadership(cli, expectedPrimaryPath(leaderPath), service) + expectedPrimary := election.NewLeadership(cli, keypath.ExpectedPrimaryPath(msParam), service) expectedPrimary.SetLease(lease) expectedPrimary.Keep(ctx) @@ -165,15 +160,10 @@ func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName return errors.Errorf("failed to revoke current primary's lease: %v", err) } - var primaryPath string - switch serviceName { - case constant.SchedulingServiceName: - primaryPath = keypath.SchedulingPrimaryPath() - case constant.TSOServiceName: - tsoRootPath := keypath.TSOSvcRootPath() - primaryPath = keypath.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID) - } - _, err = markExpectedPrimaryFlag(client, primaryPath, primaryIDs[nextPrimaryID], grantResp.ID) + _, err = markExpectedPrimaryFlag(client, &keypath.MsParam{ + ServiceName: serviceName, + GroupID: keyspaceGroupID, + }, primaryIDs[nextPrimaryID], grantResp.ID) if err != nil { return errors.Errorf("failed to mark expected primary flag for %s, err: %v", serviceName, err) } diff --git a/pkg/member/member.go b/pkg/member/member.go index b1924835988..04e55c1a647 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -16,10 +16,8 @@ package member import ( "context" - "fmt" "math/rand" "os" - "path" "path/filepath" "strconv" "strings" @@ -52,11 +50,10 @@ type EmbeddedEtcdMember struct { leadership *election.Leadership leader atomic.Value // stored as *pdpb.Member // etcd and cluster information. - etcd *embed.Etcd - client *clientv3.Client - id uint64 // etcd server id. - member *pdpb.Member // current PD's info. - rootPath string + etcd *embed.Etcd + client *clientv3.Client + id uint64 // etcd server id. + member *pdpb.Member // current PD's info. // memberValue is the serialized string of `member`. It will be saved in // etcd leader key when the PD node is successfully elected as the PD leader // of the cluster. Every write will use it to check PD leadership. @@ -330,7 +327,7 @@ func (m *EmbeddedEtcdMember) IsSameLeader(leader any) bool { } // InitMemberInfo initializes the member info. -func (m *EmbeddedEtcdMember) InitMemberInfo(advertiseClientUrls, advertisePeerUrls, name string, rootPath string) { +func (m *EmbeddedEtcdMember) InitMemberInfo(advertiseClientUrls, advertisePeerUrls, name string) { leader := &pdpb.Member{ Name: name, MemberId: m.ID(), @@ -345,9 +342,8 @@ func (m *EmbeddedEtcdMember) InitMemberInfo(advertiseClientUrls, advertisePeerUr } m.member = leader m.memberValue = string(data) - m.rootPath = rootPath m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), "leader election") - log.Info("member joining election", zap.Stringer("member-info", m.member), zap.String("root-path", m.rootPath)) + log.Info("member joining election", zap.Stringer("member-info", m.member)) } // ResignEtcdLeader resigns current PD's etcd leadership. If nextLeader is empty, all @@ -379,13 +375,9 @@ func (m *EmbeddedEtcdMember) ResignEtcdLeader(ctx context.Context, from string, return m.MoveEtcdLeader(ctx, m.ID(), nextEtcdLeaderID) } -func (m *EmbeddedEtcdMember) getMemberLeaderPriorityPath(id uint64) string { - return path.Join(m.rootPath, fmt.Sprintf("member/%d/leader_priority", id)) -} - // SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader. func (m *EmbeddedEtcdMember) SetMemberLeaderPriority(id uint64, priority int) error { - key := m.getMemberLeaderPriorityPath(id) + key := keypath.MemberLeaderPriorityPath(id) res, err := m.leadership.LeaderTxn().Then(clientv3.OpPut(key, strconv.Itoa(priority))).Commit() if err != nil { return errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause() @@ -399,7 +391,7 @@ func (m *EmbeddedEtcdMember) SetMemberLeaderPriority(id uint64, priority int) er // DeleteMemberLeaderPriority removes a member's etcd leader priority config. func (m *EmbeddedEtcdMember) DeleteMemberLeaderPriority(id uint64) error { - key := m.getMemberLeaderPriorityPath(id) + key := keypath.MemberLeaderPriorityPath(id) res, err := m.leadership.LeaderTxn().Then(clientv3.OpDelete(key)).Commit() if err != nil { return errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause() @@ -413,7 +405,7 @@ func (m *EmbeddedEtcdMember) DeleteMemberLeaderPriority(id uint64) error { // GetMemberLeaderPriority loads a member's priority to be elected as the etcd leader. func (m *EmbeddedEtcdMember) GetMemberLeaderPriority(id uint64) (int, error) { - key := m.getMemberLeaderPriorityPath(id) + key := keypath.MemberLeaderPriorityPath(id) res, err := etcdutil.EtcdKVGet(m.client, key) if err != nil { return 0, err diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 26888f6152d..f3399f5d900 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -50,10 +50,9 @@ type Participant struct { keypath.MsParam leadership *election.Leadership // stored as member type - leader atomic.Value - client *clientv3.Client - rootPath string - member participant + leader atomic.Value + client *clientv3.Client + member participant // memberValue is the serialized string of `member`. It will be saved in the // leader key when this participant is successfully elected as the leader of // the group. Every write will use it to check the leadership. @@ -76,7 +75,7 @@ func NewParticipant(client *clientv3.Client, msParam keypath.MsParam) *Participa } // InitInfo initializes the member info. -func (m *Participant) InitInfo(p participant, rootPath string, purpose string) { +func (m *Participant) InitInfo(p participant, purpose string) { data, err := p.Marshal() if err != nil { // can't fail, so panic here. @@ -84,7 +83,6 @@ func (m *Participant) InitInfo(p participant, rootPath string, purpose string) { } m.member = p m.memberValue = string(data) - m.rootPath = rootPath m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose) m.lastLeaderUpdatedTime.Store(time.Now()) log.Info("participant joining election", zap.String("participant-info", p.String()), zap.String("leader-path", m.GetLeaderPath())) diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 52c30c38f1e..740317c676a 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -215,7 +215,10 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { } // To make sure the expected primary(if existed) and new primary are on the same server. - expectedPrimary := mcsutils.GetExpectedPrimaryFlag(gta.member.Client(), gta.member.GetLeaderPath()) + expectedPrimary := mcsutils.GetExpectedPrimaryFlag(gta.member.Client(), &keypath.MsParam{ + ServiceName: constant.TSOServiceName, + GroupID: gta.getGroupID(), + }) // skip campaign the primary if the expected primary is not empty and not equal to the current memberValue. // expected primary ONLY SET BY `{service}/primary/transfer` API. if len(expectedPrimary) > 0 && !strings.Contains(gta.member.MemberValue(), expectedPrimary) { @@ -284,7 +287,10 @@ func (gta *GlobalTSOAllocator) campaignLeader() { // check expected primary and watch the primary. exitPrimary := make(chan struct{}) lease, err := mcsutils.KeepExpectedPrimaryAlive(ctx, gta.member.Client(), exitPrimary, - gta.am.leaderLease, gta.member.GetLeaderPath(), gta.member.MemberValue(), constant.TSOServiceName) + gta.am.leaderLease, &keypath.MsParam{ + ServiceName: constant.TSOServiceName, + GroupID: gta.getGroupID(), + }, gta.member.MemberValue()) if err != nil { log.Error("prepare tso primary watch error", errs.ZapError(err)) return diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 09f20609920..86b43d0de45 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -755,7 +755,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro Id: uniqueID, // id is unique among all participants ListenUrls: []string{kgm.cfg.GetAdvertiseListenAddr()}, } - participant.InitInfo(p, keypath.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), "keyspace group primary election") + participant.InitInfo(p, "keyspace group primary election") // If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group // is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot // be broken until the entire split process is completed. @@ -1341,7 +1341,10 @@ mergeLoop: // Check if the keyspace group primaries in the merge map are all gone. if len(mergeMap) != 0 { for id := range mergeMap { - leaderPath := keypath.KeyspaceGroupPrimaryPath(kgm.tsoSvcRootPath, id) + leaderPath := keypath.LeaderPath(&keypath.MsParam{ + ServiceName: constant.TSOServiceName, + GroupID: id, + }) val, err := kgm.tsoSvcStorage.Load(leaderPath) if err != nil { log.Error("failed to check if the keyspace group primary in the merge list has gone", diff --git a/pkg/utils/keypath/key_path_v2.go b/pkg/utils/keypath/absolute_key_path.go similarity index 50% rename from pkg/utils/keypath/key_path_v2.go rename to pkg/utils/keypath/absolute_key_path.go index 208dd832bcd..40692dc14bd 100644 --- a/pkg/utils/keypath/key_path_v2.go +++ b/pkg/utils/keypath/absolute_key_path.go @@ -17,19 +17,31 @@ package keypath import ( "fmt" "path" + + "github.com/tikv/pd/pkg/mcs/utils/constant" ) +// Leader and primary are the same thing in this context. const ( - leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader" - memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path" - memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash" - memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version" - allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id" - keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id" + leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader" + memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path" + memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash" + memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version" + allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id" + keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id" + kemberLeaderPriorityPathFormat = "/pd/%d/member/%d/leader_priority" // "/pd/{cluster_id}/member/{member_id}/leader_priority" msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary" msTsoDefaultLeaderPathFormat = "/ms/%d/tso/00000/primary" // "/ms/{cluster_id}/tso/00000/primary" msTsoKespaceLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary" + + // `expected_primary` is the flag to indicate the expected primary/leader. + // 1. When the leader was campaigned successfully, it will set the `expected_primary` flag. + // 2. Using `{service}/primary/transfer` API will revoke the previous lease and set a new `expected_primary` flag. + // This flag used to help new primary to campaign successfully while other secondaries can skip the campaign. + msExpectedLeaderPathFormat = "/ms/%d/%s/primary/expected_primary" // "/ms/{cluster_id}/{service_name}/primary/expected_primary" + msTsoDefaultExpectedLeaderPathFormat = "/ms/%d/tso/00000/primary/expected_primary" // "/ms/{cluster_id}/tso/00000/primary" + msTsoKespaceExpectedLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary/expected_primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary" ) // MsParam is the parameter of micro service. @@ -48,7 +60,7 @@ func LeaderPath(p *MsParam) string { if p == nil || p.ServiceName == "" { return fmt.Sprintf(leaderPathFormat, ClusterID()) } - if p.ServiceName == "tso" { + if p.ServiceName == constant.TSOServiceName { if p.GroupID == 0 { return fmt.Sprintf(msTsoDefaultLeaderPathFormat, ClusterID()) } @@ -57,6 +69,17 @@ func LeaderPath(p *MsParam) string { return fmt.Sprintf(msLeaderPathFormat, ClusterID(), p.ServiceName) } +// ExpectedPrimaryPath returns the expected_primary path. +func ExpectedPrimaryPath(p *MsParam) string { + if p.ServiceName == constant.TSOServiceName { + if p.GroupID == 0 { + return fmt.Sprintf(msTsoDefaultExpectedLeaderPathFormat, ClusterID()) + } + return fmt.Sprintf(msTsoKespaceExpectedLeaderPathFormat, ClusterID(), p.GroupID) + } + return fmt.Sprintf(msExpectedLeaderPathFormat, ClusterID(), p.ServiceName) +} + // MemberBinaryDeployPath returns the member binary deploy path. func MemberBinaryDeployPath(id uint64) string { return fmt.Sprintf(memberBinaryDeployPathFormat, ClusterID(), id) @@ -81,3 +104,8 @@ func AllocIDPath() string { func KeyspaceAllocIDPath() string { return fmt.Sprintf(keyspaceAllocIDPathFormat, ClusterID()) } + +// MemberLeaderPriorityPath returns the member leader priority path. +func MemberLeaderPriorityPath(id uint64) string { + return fmt.Sprintf(kemberLeaderPriorityPathFormat, ClusterID(), id) +} diff --git a/pkg/utils/keypath/key_path.go b/pkg/utils/keypath/key_path.go index 03d497dc408..4d59fafe16f 100644 --- a/pkg/utils/keypath/key_path.go +++ b/pkg/utils/keypath/key_path.go @@ -310,12 +310,6 @@ func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp { return regexp.MustCompile(pattern) } -// SchedulingSvcRootPath returns the root path of scheduling service. -// Path: /ms/{cluster_id}/scheduling -func SchedulingSvcRootPath() string { - return svcRootPath(constant.SchedulingServiceName) -} - // TSOSvcRootPath returns the root path of tso service. // Path: /ms/{cluster_id}/tso func TSOSvcRootPath() string { @@ -333,30 +327,6 @@ func LegacyRootPath() string { return path.Join(pdRootPath, strconv.FormatUint(ClusterID(), 10)) } -// KeyspaceGroupPrimaryPath returns the path of keyspace group primary. -// default keyspace group: "/ms/{cluster_id}/tso/00000/primary". -// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary". -func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string { - electionPath := KeyspaceGroupsElectionPath(rootPath, keyspaceGroupID) - return path.Join(electionPath, constant.PrimaryKey) -} - -// SchedulingPrimaryPath returns the path of scheduling primary. -// Path: /ms/{cluster_id}/scheduling/primary -func SchedulingPrimaryPath() string { - return path.Join(SchedulingSvcRootPath(), constant.PrimaryKey) -} - -// KeyspaceGroupsElectionPath returns the path of keyspace groups election. -// default keyspace group: "/ms/{cluster_id}/tso/00000". -// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}". -func KeyspaceGroupsElectionPath(rootPath string, keyspaceGroupID uint32) string { - if keyspaceGroupID == constant.DefaultKeyspaceGroupID { - return path.Join(rootPath, "00000") - } - return path.Join(rootPath, constant.KeyspaceGroupsKey, keyspaceGroupsElectionKey, fmt.Sprintf("%05d", keyspaceGroupID)) -} - // GetCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id. func GetCompiledNonDefaultIDRegexp() *regexp.Regexp { rootPath := TSOSvcRootPath() diff --git a/server/server.go b/server/server.go index e49496ff8d8..d7bdd92d96d 100644 --- a/server/server.go +++ b/server/server.go @@ -432,7 +432,7 @@ func (s *Server) startServer(ctx context.Context) error { bs.ServerInfoGauge.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) s.rootPath = keypath.PDRootPath() - s.member.InitMemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name(), s.rootPath) + s.member.InitMemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name()) if err := s.member.SetMemberDeployPath(s.member.ID()); err != nil { return err } @@ -1970,15 +1970,19 @@ func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { func (s *Server) initTSOPrimaryWatcher() { serviceName := constant.TSOServiceName - tsoRootPath := keypath.TSOSvcRootPath() - tsoServicePrimaryKey := keypath.KeyspaceGroupPrimaryPath(tsoRootPath, constant.DefaultKeyspaceGroupID) + tsoServicePrimaryKey := keypath.LeaderPath(&keypath.MsParam{ + ServiceName: constant.TSOServiceName, + GroupID: constant.DefaultKeyspaceGroupID, + }) s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey) s.tsoPrimaryWatcher.StartWatchLoop() } func (s *Server) initSchedulingPrimaryWatcher() { serviceName := constant.SchedulingServiceName - primaryKey := keypath.SchedulingPrimaryPath() + primaryKey := keypath.LeaderPath(&keypath.MsParam{ + ServiceName: constant.SchedulingServiceName, + }) s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey) s.schedulingPrimaryWatcher.StartWatchLoop() } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 51f3fd37295..2e5bd321f6b 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -219,8 +219,10 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe // Make sure every keyspace group is using the right timestamp path // for loading/saving timestamp from/to etcd and the right primary path // for primary election. - rootPath := keypath.TSOSvcRootPath() - primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, param.keyspaceGroupID) + primaryPath := keypath.LeaderPath(&keypath.MsParam{ + ServiceName: constant.TSOServiceName, + GroupID: param.keyspaceGroupID, + }) timestampPath := keypath.FullTimestampPath(param.keyspaceGroupID) re.Equal(timestampPath, am.GetTimestampPath()) re.Equal(primaryPath, am.GetMember().GetLeaderPath())