Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keypath: unify leader/primary path #8859

Merged
merged 8 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,8 +1151,10 @@ func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
return "", ErrKeyspaceGroupNotExists(id)
}

rootPath := keypath.TSOSvcRootPath()
primaryPath := keypath.KeyspaceGroupPrimaryPath(rootPath, id)
primaryPath := keypath.LeaderPath(&keypath.MsParam{
ServiceName: constant.TSOServiceName,
GroupID: id,
})
leader := &tsopb.Participant{}
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader)
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *Server) primaryElectionLoop() {
}

// To make sure the expected primary(if existed) and new primary are on the same server.
expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath())
expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), &s.participant.MsParam)
// skip campaign the primary if the expected primary is not empty and not equal to the current memberValue.
// expected primary ONLY SET BY `{service}/primary/transfer` API.
if len(expectedPrimary) > 0 && !strings.Contains(s.participant.MemberValue(), expectedPrimary) {
Expand Down Expand Up @@ -313,7 +313,9 @@ func (s *Server) campaignLeader() {
// check expected primary and watch the primary.
exitPrimary := make(chan struct{})
lease, err := utils.KeepExpectedPrimaryAlive(ctx, s.GetClient(), exitPrimary,
s.cfg.LeaderLease, s.participant.GetLeaderPath(), s.participant.MemberValue(), constant.SchedulingServiceName)
s.cfg.LeaderLease, &keypath.MsParam{
ServiceName: constant.SchedulingServiceName,
}, s.participant.MemberValue())
if err != nil {
log.Error("prepare scheduling primary watch error", errs.ZapError(err))
return
Expand Down Expand Up @@ -460,7 +462,7 @@ func (s *Server) startServer() (err error) {
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, keypath.SchedulingSvcRootPath(), "primary election")
s.participant.InitInfo(p, "primary election")

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
Expand Down
52 changes: 21 additions & 31 deletions pkg/mcs/utils/expected_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,9 @@ import (
"go.uber.org/zap"
)

// expectedPrimaryFlag is the flag to indicate the expected primary.
// 1. When the primary was campaigned successfully, it will set the `expected_primary` flag.
// 2. Using `{service}/primary/transfer` API will revoke the previous lease and set a new `expected_primary` flag.
// This flag used to help new primary to campaign successfully while other secondaries can skip the campaign.
const expectedPrimaryFlag = "expected_primary"

// expectedPrimaryPath formats the primary path with the expected primary flag.
func expectedPrimaryPath(primaryPath string) string {
return fmt.Sprintf("%s/%s", primaryPath, expectedPrimaryFlag)
}

// GetExpectedPrimaryFlag gets the expected primary flag.
func GetExpectedPrimaryFlag(client *clientv3.Client, primaryPath string) string {
path := expectedPrimaryPath(primaryPath)
func GetExpectedPrimaryFlag(client *clientv3.Client, msParam *keypath.MsParam) string {
path := keypath.ExpectedPrimaryPath(msParam)
primary, err := etcdutil.GetValue(client, path)
if err != nil {
log.Error("get expected primary flag error", errs.ZapError(err), zap.String("primary-path", path))
Expand All @@ -57,12 +46,12 @@ func GetExpectedPrimaryFlag(client *clientv3.Client, primaryPath string) string
}

// markExpectedPrimaryFlag marks the expected primary flag when the primary is specified.
func markExpectedPrimaryFlag(client *clientv3.Client, primaryPath string, leaderRaw string, leaseID clientv3.LeaseID) (int64, error) {
path := expectedPrimaryPath(primaryPath)
func markExpectedPrimaryFlag(client *clientv3.Client, msParam *keypath.MsParam, leaderRaw string, leaseID clientv3.LeaseID) (int64, error) {
path := keypath.ExpectedPrimaryPath(msParam)
log.Info("set expected primary flag", zap.String("primary-path", path), zap.String("leader-raw", leaderRaw))
// write a flag to indicate the expected primary.
resp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpPut(expectedPrimaryPath(primaryPath), leaderRaw, clientv3.WithLease(leaseID))).
Then(clientv3.OpPut(path, leaderRaw, clientv3.WithLease(leaseID))).
Commit()
if err != nil || !resp.Succeeded {
log.Error("mark expected primary error", errs.ZapError(err), zap.String("primary-path", path))
Expand All @@ -77,23 +66,29 @@ func markExpectedPrimaryFlag(client *clientv3.Client, primaryPath string, leader
// - changed by `{service}/primary/transfer` API.
// - leader lease expired.
// ONLY primary called this function.
func KeepExpectedPrimaryAlive(ctx context.Context, cli *clientv3.Client, exitPrimary chan<- struct{},
leaseTimeout int64, leaderPath, memberValue, service string) (*election.Lease, error) {
log.Info("primary start to watch the expected primary", zap.String("service", service), zap.String("primary-value", memberValue))
service = fmt.Sprintf("%s expected primary", service)
func KeepExpectedPrimaryAlive(
ctx context.Context,
cli *clientv3.Client,
exitPrimary chan<- struct{},
leaseTimeout int64,
msParam *keypath.MsParam,
memberValue string) (*election.Lease, error) {
log.Info("primary start to watch the expected primary",
zap.String("service", msParam.ServiceName), zap.String("primary-value", memberValue))
service := fmt.Sprintf("%s expected primary", msParam.ServiceName)
lease := election.NewLease(cli, service)
if err := lease.Grant(leaseTimeout); err != nil {
return nil, err
}

revision, err := markExpectedPrimaryFlag(cli, leaderPath, memberValue, lease.ID.Load().(clientv3.LeaseID))
revision, err := markExpectedPrimaryFlag(cli, msParam, memberValue, lease.ID.Load().(clientv3.LeaseID))
if err != nil {
log.Error("mark expected primary error", errs.ZapError(err))
return nil, err
}
// Keep alive the current expected primary leadership to indicate that the server is still alive.
// Watch the expected primary path to check whether the expected primary has changed by `{service}/primary/transfer` API.
expectedPrimary := election.NewLeadership(cli, expectedPrimaryPath(leaderPath), service)
expectedPrimary := election.NewLeadership(cli, keypath.ExpectedPrimaryPath(msParam), service)
expectedPrimary.SetLease(lease)
expectedPrimary.Keep(ctx)

Expand Down Expand Up @@ -165,15 +160,10 @@ func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName
return errors.Errorf("failed to revoke current primary's lease: %v", err)
}

var primaryPath string
switch serviceName {
case constant.SchedulingServiceName:
primaryPath = keypath.SchedulingPrimaryPath()
case constant.TSOServiceName:
tsoRootPath := keypath.TSOSvcRootPath()
primaryPath = keypath.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID)
}
_, err = markExpectedPrimaryFlag(client, primaryPath, primaryIDs[nextPrimaryID], grantResp.ID)
_, err = markExpectedPrimaryFlag(client, &keypath.MsParam{
ServiceName: serviceName,
GroupID: keyspaceGroupID,
}, primaryIDs[nextPrimaryID], grantResp.ID)
if err != nil {
return errors.Errorf("failed to mark expected primary flag for %s, err: %v", serviceName, err)
}
Expand Down
26 changes: 9 additions & 17 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ package member

import (
"context"
"fmt"
"math/rand"
"os"
"path"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -52,11 +50,10 @@ type EmbeddedEtcdMember struct {
leadership *election.Leadership
leader atomic.Value // stored as *pdpb.Member
// etcd and cluster information.
etcd *embed.Etcd
client *clientv3.Client
id uint64 // etcd server id.
member *pdpb.Member // current PD's info.
rootPath string
etcd *embed.Etcd
client *clientv3.Client
id uint64 // etcd server id.
member *pdpb.Member // current PD's info.
// memberValue is the serialized string of `member`. It will be saved in
// etcd leader key when the PD node is successfully elected as the PD leader
// of the cluster. Every write will use it to check PD leadership.
Expand Down Expand Up @@ -330,7 +327,7 @@ func (m *EmbeddedEtcdMember) IsSameLeader(leader any) bool {
}

// InitMemberInfo initializes the member info.
func (m *EmbeddedEtcdMember) InitMemberInfo(advertiseClientUrls, advertisePeerUrls, name string, rootPath string) {
func (m *EmbeddedEtcdMember) InitMemberInfo(advertiseClientUrls, advertisePeerUrls, name string) {
leader := &pdpb.Member{
Name: name,
MemberId: m.ID(),
Expand All @@ -345,9 +342,8 @@ func (m *EmbeddedEtcdMember) InitMemberInfo(advertiseClientUrls, advertisePeerUr
}
m.member = leader
m.memberValue = string(data)
m.rootPath = rootPath
m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), "leader election")
log.Info("member joining election", zap.Stringer("member-info", m.member), zap.String("root-path", m.rootPath))
log.Info("member joining election", zap.Stringer("member-info", m.member))
}

// ResignEtcdLeader resigns current PD's etcd leadership. If nextLeader is empty, all
Expand Down Expand Up @@ -379,13 +375,9 @@ func (m *EmbeddedEtcdMember) ResignEtcdLeader(ctx context.Context, from string,
return m.MoveEtcdLeader(ctx, m.ID(), nextEtcdLeaderID)
}

func (m *EmbeddedEtcdMember) getMemberLeaderPriorityPath(id uint64) string {
return path.Join(m.rootPath, fmt.Sprintf("member/%d/leader_priority", id))
}

// SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader.
func (m *EmbeddedEtcdMember) SetMemberLeaderPriority(id uint64, priority int) error {
key := m.getMemberLeaderPriorityPath(id)
key := keypath.MemberLeaderPriorityPath(id)
res, err := m.leadership.LeaderTxn().Then(clientv3.OpPut(key, strconv.Itoa(priority))).Commit()
if err != nil {
return errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause()
Expand All @@ -399,7 +391,7 @@ func (m *EmbeddedEtcdMember) SetMemberLeaderPriority(id uint64, priority int) er

// DeleteMemberLeaderPriority removes a member's etcd leader priority config.
func (m *EmbeddedEtcdMember) DeleteMemberLeaderPriority(id uint64) error {
key := m.getMemberLeaderPriorityPath(id)
key := keypath.MemberLeaderPriorityPath(id)
res, err := m.leadership.LeaderTxn().Then(clientv3.OpDelete(key)).Commit()
if err != nil {
return errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause()
Expand All @@ -413,7 +405,7 @@ func (m *EmbeddedEtcdMember) DeleteMemberLeaderPriority(id uint64) error {

// GetMemberLeaderPriority loads a member's priority to be elected as the etcd leader.
func (m *EmbeddedEtcdMember) GetMemberLeaderPriority(id uint64) (int, error) {
key := m.getMemberLeaderPriorityPath(id)
key := keypath.MemberLeaderPriorityPath(id)
res, err := etcdutil.EtcdKVGet(m.client, key)
if err != nil {
return 0, err
Expand Down
10 changes: 4 additions & 6 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ type Participant struct {
keypath.MsParam
leadership *election.Leadership
// stored as member type
leader atomic.Value
client *clientv3.Client
rootPath string
member participant
leader atomic.Value
client *clientv3.Client
member participant
// memberValue is the serialized string of `member`. It will be saved in the
// leader key when this participant is successfully elected as the leader of
// the group. Every write will use it to check the leadership.
Expand All @@ -76,15 +75,14 @@ func NewParticipant(client *clientv3.Client, msParam keypath.MsParam) *Participa
}

// InitInfo initializes the member info.
func (m *Participant) InitInfo(p participant, rootPath string, purpose string) {
func (m *Participant) InitInfo(p participant, purpose string) {
data, err := p.Marshal()
if err != nil {
// can't fail, so panic here.
log.Fatal("marshal leader meet error", zap.String("member-name", p.String()), errs.ZapError(errs.ErrMarshalLeader, err))
}
m.member = p
m.memberValue = string(data)
m.rootPath = rootPath
m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose)
m.lastLeaderUpdatedTime.Store(time.Now())
log.Info("participant joining election", zap.String("participant-info", p.String()), zap.String("leader-path", m.GetLeaderPath()))
Expand Down
10 changes: 8 additions & 2 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() {
}

// To make sure the expected primary(if existed) and new primary are on the same server.
expectedPrimary := mcsutils.GetExpectedPrimaryFlag(gta.member.Client(), gta.member.GetLeaderPath())
expectedPrimary := mcsutils.GetExpectedPrimaryFlag(gta.member.Client(), &keypath.MsParam{
ServiceName: constant.TSOServiceName,
GroupID: gta.getGroupID(),
})
// skip campaign the primary if the expected primary is not empty and not equal to the current memberValue.
// expected primary ONLY SET BY `{service}/primary/transfer` API.
if len(expectedPrimary) > 0 && !strings.Contains(gta.member.MemberValue(), expectedPrimary) {
Expand Down Expand Up @@ -284,7 +287,10 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
// check expected primary and watch the primary.
exitPrimary := make(chan struct{})
lease, err := mcsutils.KeepExpectedPrimaryAlive(ctx, gta.member.Client(), exitPrimary,
gta.am.leaderLease, gta.member.GetLeaderPath(), gta.member.MemberValue(), constant.TSOServiceName)
gta.am.leaderLease, &keypath.MsParam{
ServiceName: constant.TSOServiceName,
GroupID: gta.getGroupID(),
}, gta.member.MemberValue())
if err != nil {
log.Error("prepare tso primary watch error", errs.ZapError(err))
return
Expand Down
7 changes: 5 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
Id: uniqueID, // id is unique among all participants
ListenUrls: []string{kgm.cfg.GetAdvertiseListenAddr()},
}
participant.InitInfo(p, keypath.KeyspaceGroupsElectionPath(kgm.tsoSvcRootPath, group.ID), "keyspace group primary election")
participant.InitInfo(p, "keyspace group primary election")
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down Expand Up @@ -1341,7 +1341,10 @@ mergeLoop:
// Check if the keyspace group primaries in the merge map are all gone.
if len(mergeMap) != 0 {
for id := range mergeMap {
leaderPath := keypath.KeyspaceGroupPrimaryPath(kgm.tsoSvcRootPath, id)
leaderPath := keypath.LeaderPath(&keypath.MsParam{
ServiceName: constant.TSOServiceName,
GroupID: id,
})
val, err := kgm.tsoSvcStorage.Load(leaderPath)
if err != nil {
log.Error("failed to check if the keyspace group primary in the merge list has gone",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,31 @@ package keypath
import (
"fmt"
"path"

"github.com/tikv/pd/pkg/mcs/utils/constant"
)

// Leader and primary are the same thing in this context.
const (
leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader"
memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path"
memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash"
memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version"
allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id"
keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id"
leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader"
memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path"
memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash"
memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version"
allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id"
keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id"
kemberLeaderPriorityPathFormat = "/pd/%d/member/%d/leader_priority" // "/pd/{cluster_id}/member/{member_id}/leader_priority"

msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary"
msTsoDefaultLeaderPathFormat = "/ms/%d/tso/00000/primary" // "/ms/{cluster_id}/tso/00000/primary"
msTsoKespaceLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary"

// `expected_primary` is the flag to indicate the expected primary/leader.
// 1. When the leader was campaigned successfully, it will set the `expected_primary` flag.
// 2. Using `{service}/primary/transfer` API will revoke the previous lease and set a new `expected_primary` flag.
// This flag used to help new primary to campaign successfully while other secondaries can skip the campaign.
msExpectedLeaderPathFormat = "/ms/%d/%s/primary/expected_primary" // "/ms/{cluster_id}/{service_name}/primary/expected_primary"
msTsoDefaultExpectedLeaderPathFormat = "/ms/%d/tso/00000/primary/expected_primary" // "/ms/{cluster_id}/tso/00000/primary"
msTsoKespaceExpectedLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary/expected_primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary"
)

// MsParam is the parameter of micro service.
Expand All @@ -48,7 +60,7 @@ func LeaderPath(p *MsParam) string {
if p == nil || p.ServiceName == "" {
return fmt.Sprintf(leaderPathFormat, ClusterID())
}
if p.ServiceName == "tso" {
if p.ServiceName == constant.TSOServiceName {
if p.GroupID == 0 {
return fmt.Sprintf(msTsoDefaultLeaderPathFormat, ClusterID())
}
Expand All @@ -57,6 +69,17 @@ func LeaderPath(p *MsParam) string {
return fmt.Sprintf(msLeaderPathFormat, ClusterID(), p.ServiceName)
}

// ExpectedPrimaryPath returns the expected_primary path.
func ExpectedPrimaryPath(p *MsParam) string {
if p.ServiceName == constant.TSOServiceName {
if p.GroupID == 0 {
return fmt.Sprintf(msTsoDefaultExpectedLeaderPathFormat, ClusterID())
}
return fmt.Sprintf(msTsoKespaceExpectedLeaderPathFormat, ClusterID(), p.GroupID)
}
return fmt.Sprintf(msExpectedLeaderPathFormat, ClusterID(), p.ServiceName)
}

// MemberBinaryDeployPath returns the member binary deploy path.
func MemberBinaryDeployPath(id uint64) string {
return fmt.Sprintf(memberBinaryDeployPathFormat, ClusterID(), id)
Expand All @@ -81,3 +104,8 @@ func AllocIDPath() string {
func KeyspaceAllocIDPath() string {
return fmt.Sprintf(keyspaceAllocIDPathFormat, ClusterID())
}

// MemberLeaderPriorityPath returns the member leader priority path.
func MemberLeaderPriorityPath(id uint64) string {
return fmt.Sprintf(kemberLeaderPriorityPathFormat, ClusterID(), id)
}
Loading