Skip to content

Commit

Permalink
check primary
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed May 9, 2024
1 parent a3c5950 commit 7fa19d3
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 24 deletions.
17 changes: 17 additions & 0 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Leadership struct {
leaderKey string
leaderValue string

LeaderWatch bool

keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock syncutil.Mutex
Expand All @@ -72,6 +74,14 @@ type Leadership struct {
campaignTimes []time.Time
}

func (ls *Leadership) SetLeaderWatch(val bool) {
ls.LeaderWatch = val
}

func (ls *Leadership) GetLeaderValue() string {
return ls.leaderValue
}

// NewLeadership creates a new Leadership.
func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadership {
leadership := &Leadership{
Expand Down Expand Up @@ -375,6 +385,12 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
// only API update the leader key to transfer the leader will meet
if ev.Type == mvccpb.PUT && ls.LeaderWatch {
log.Info("[LeaderWatch] current leadership is updated", zap.Int64("watchRevision", revision),
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
}
revision = wresp.Header.Revision + 1
}
Expand All @@ -393,4 +409,5 @@ func (ls *Leadership) Reset() {
}
ls.keepAliveCancelFuncLock.Unlock()
ls.getLease().Close()
ls.LeaderWatch = false
}
62 changes: 58 additions & 4 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package discovery

import (
"math/rand"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
Expand All @@ -45,14 +48,14 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
}

// GetMSMembers returns all the members of the specified service name.
func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch name {
func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch serviceName {
case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name)
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), serviceName)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
Expand All @@ -75,5 +78,56 @@ func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry,
return entries, nil
}

return nil, errors.Errorf("unknown service name %s", name)
return nil, errors.Errorf("unknown service name %s", serviceName)
}

func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimary string) error {
log.Info("transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimary), zap.String("to", newPrimary))
entries, err := GetMSMembers(serviceName, client)
if err != nil {
return err
}

// Do nothing when I am the only member of cluster.
if len(entries) == 1 && newPrimary == "" {
return errors.New("no valid follower to transfer primary")
}

var primaryIDs []string
var memberValues []string
for _, member := range entries {
if (newPrimary == "" && member.ServiceAddr != oldPrimary) || (newPrimary != "" && member.ServiceAddr == newPrimary) {
primaryIDs = append(primaryIDs, member.ServiceAddr)
memberValues = append(memberValues, string(member.MemberValue))
}
}
if len(primaryIDs) == 0 {
return errors.New("no valid follower to transfer primary")
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))
nextPrimaryID := r.Intn(len(primaryIDs))

clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return errors.Errorf("failed to get cluster ID: %v", err)
}

var primaryKey string
switch serviceName {
case utils.SchedulingServiceName:
primaryKey = endpoint.SchedulingPrimaryPath(clusterID)
case utils.TSOServiceName:
tsoRootPath := endpoint.TSOSvcRootPath(clusterID)
primaryKey = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, utils.DefaultKeyspaceGroupID)
}

// update primary key to notify old primary server.
putResp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpPut(primaryKey, memberValues[nextPrimaryID])).
Commit()
if err != nil || !putResp.Succeeded {
return errors.Errorf("failed to write primary flag for %s", serviceName)
}
return nil
}
2 changes: 2 additions & 0 deletions pkg/mcs/discovery/registry_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (

// ServiceRegistryEntry is the registry entry of a service
type ServiceRegistryEntry struct {
Name string `json:"name"`
ServiceAddr string `json:"service-addr"`
Version string `json:"version"`
GitHash string `json:"git-hash"`
DeployPath string `json:"deploy-path"`
StartTimestamp int64 `json:"start-timestamp"`
MemberValue []byte `json:"member-value"`
}

// Serialize this service registry entry
Expand Down
41 changes: 41 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

func (s *Server) GetParticipant() *member.Participant {
return s.participant
}

// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
Expand Down Expand Up @@ -243,6 +247,17 @@ func (s *Server) primaryElectionLoop() {
log.Info("the scheduling primary has changed, try to re-campaign a primary")
}

// To make sure the expected leader(if exist) and primary are on the same server.
expectedPrimary := utils.GetExpectedPrimary(s.participant.GetLeaderPath(), s.GetClient())
if expectedPrimary != "" && expectedPrimary != s.participant.GetLeadership().GetLeaderValue() {
log.Info("skip campaigning of scheduling primary and check later",
zap.String("server-name", s.Name()),
zap.String("target-primary-id", expectedPrimary),
zap.Uint64("member-id", s.participant.ID()))
time.Sleep(200 * time.Millisecond)
continue
}

s.campaignLeader()
}
}
Expand Down Expand Up @@ -290,6 +305,9 @@ func (s *Server) campaignLeader() {
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

exitPrimary := make(chan struct{})
go s.primaryWatch(exitPrimary)

leaderTicker := time.NewTicker(utils.LeaderTickInterval)
defer leaderTicker.Stop()

Expand All @@ -304,10 +322,31 @@ func (s *Server) campaignLeader() {
// Server is closed and it should return nil.
log.Info("server is closed")
return
case <-exitPrimary:
log.Info("no longer a primary/leader because primary have been updated, the scheduling primary/leader will step down")
return
}
}
}

func (s *Server) primaryWatch(exitPrimary chan struct{}) {
_, revision, err := s.participant.GetPersistentLeader()
if err != nil {
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return
}
log.Info("[primary] start to watch the primary", zap.Stringer("scheduling-primary", s.participant.GetLeader()))
// Watch will keep looping and never return unless the primary has changed.
s.participant.GetLeadership().SetLeaderWatch(true)
s.participant.GetLeadership().Watch(s.serverLoopCtx, revision+1)
s.participant.GetLeadership().SetLeaderWatch(false)

utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath())

s.participant.UnsetLeader()
exitPrimary <- struct{}{}
}

// Close closes the server.
func (s *Server) Close() {
if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) {
Expand Down Expand Up @@ -425,6 +464,7 @@ func (s *Server) startServer() (err error) {
GitHash: versioninfo.PDGitHash,
DeployPath: deployPath,
StartTimestamp: s.StartTimestamp(),
Name: s.Name(),
}
uniqueName := s.cfg.GetAdvertiseListenAddr()
uniqueID := memberutil.GenerateUniqueID(uniqueName)
Expand All @@ -436,6 +476,7 @@ func (s *Server) startServer() (err error) {
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")
s.serviceID.MemberValue = []byte(s.participant.MemberValue())

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
Expand Down
4 changes: 4 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,17 @@ func (s *Server) startServer() (err error) {
GitHash: versioninfo.PDGitHash,
DeployPath: deployPath,
StartTimestamp: s.StartTimestamp(),
Name: s.Name(),
}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(), s.cfg.AdvertiseListenAddr,
s.clusterID, legacySvcRootPath, tsoSvcRootPath, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}
// Initialize the service ID with the member value of the primary of the default keyspace group.
memberValue, err := s.GetMember(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
s.serviceID.MemberValue = []byte(memberValue.MemberValue())

s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.service = &Service{Server: s}
Expand Down
49 changes: 49 additions & 0 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package utils

import (
"context"
"github.com/tikv/pd/pkg/storage/kv"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -51,6 +52,9 @@ const (
ClusterIDPath = "/pd/cluster_id"
// retryInterval is the interval to retry.
retryInterval = time.Second
// ExpectedPrimary is the path to store the expected primary
// ONLY SET VALUE BY API
ExpectedPrimary = "expected_primary"
)

// InitClusterID initializes the cluster ID.
Expand All @@ -70,6 +74,51 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes)
}

// GetExpectedPrimary indicates API has changed the primary, ONLY SET VALUE BY API.
func GetExpectedPrimary(keyPath string, client *clientv3.Client) string {
leader, err := etcdutil.GetValue(client, strings.Join([]string{keyPath, ExpectedPrimary}, "/"))
if err != nil {
log.Error("get expected primary key error", errs.ZapError(err))
return ""
}

return string(leader)
}

// RemoveExpectedPrimary removes the expected primary key.
func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) {
// remove expected leader key
resp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))).
Commit()
if err != nil && !resp.Succeeded {
log.Error("change primary error", errs.ZapError(err))
return
}
}

// SetExpectedPrimary sets the expected primary key when the current primary has exited.
func SetExpectedPrimary(client *clientv3.Client, leaderPath string) {
// write a flag to indicate the current primary has exited
leaderRaw, err := etcdutil.GetValue(client, leaderPath)
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
return
}

// write a flag to indicate the current primary has exited
resp, err := kv.NewSlowLogTxn(client).
Then(
clientv3.OpPut(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"), string(leaderRaw)),
// indicate the current primary has exited
clientv3.OpDelete(leaderPath)).
Commit()
if err != nil && !resp.Succeeded {
log.Error("change primary error", errs.ZapError(err))
return
}
}

// PromHandler is a handler to get prometheus metrics.
func PromHandler() gin.HandlerFunc {
return func(c *gin.Context) {
Expand Down
22 changes: 11 additions & 11 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func (m *EmbeddedEtcdMember) setLeader(member *pdpb.Member) {
m.lastLeaderUpdatedTime.Store(time.Now())
}

// unsetLeader unsets the member's PD leader.
func (m *EmbeddedEtcdMember) unsetLeader() {
// UnsetLeader unsets the member's PD leader.
func (m *EmbeddedEtcdMember) UnsetLeader() {
m.leader.Store(&pdpb.Member{})
m.lastLeaderUpdatedTime.Store(time.Now())
}
Expand Down Expand Up @@ -210,8 +210,8 @@ func (m *EmbeddedEtcdMember) PreCheckLeader() error {
return nil
}

// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *EmbeddedEtcdMember) getPersistentLeader() (*pdpb.Member, int64, error) {
// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func (m *EmbeddedEtcdMember) GetPersistentLeader() (any, int64, error) {
leader := &pdpb.Member{}
ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader)
if err != nil {
Expand All @@ -233,17 +233,17 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) {
return nil, true
}

leader, revision, err := m.getPersistentLeader()
leaderRaw, revision, err := m.GetPersistentLeader()
if err != nil {
log.Error("getting pd leader meets error", errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
return nil, true
}
if leader == nil {
if leaderRaw == nil {
// no leader yet
return nil, false
}

leader := leaderRaw.(*pdpb.Member)
if m.IsSameLeader(leader) {
// oh, we are already a PD leader, which indicates we may meet something wrong
// in previous CampaignLeader. We should delete the leadership and campaign again.
Expand All @@ -269,14 +269,14 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) {
func (m *EmbeddedEtcdMember) WatchLeader(ctx context.Context, leader *pdpb.Member, revision int64) {
m.setLeader(leader)
m.leadership.Watch(ctx, revision)
m.unsetLeader()
m.UnsetLeader()
}

// ResetLeader is used to reset the PD member's current leadership.
// Basically it will reset the leader lease and unset leader info.
func (m *EmbeddedEtcdMember) ResetLeader() {
m.leadership.Reset()
m.unsetLeader()
m.UnsetLeader()
}

// CheckPriority checks whether the etcd leader should be moved according to the priority.
Expand Down Expand Up @@ -324,8 +324,8 @@ func (m *EmbeddedEtcdMember) GetEtcdLeader() uint64 {
}

// IsSameLeader checks whether a server is the leader itself.
func (m *EmbeddedEtcdMember) IsSameLeader(leader *pdpb.Member) bool {
return leader.GetMemberId() == m.ID()
func (m *EmbeddedEtcdMember) IsSameLeader(leader any) bool {
return leader.(*pdpb.Member).GetMemberId() == m.ID()
}

// InitMemberInfo initializes the member info.
Expand Down
Loading

0 comments on commit 7fa19d3

Please sign in to comment.