diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 02f519dbc75d..87482c208895 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -93,7 +93,7 @@ func (ls *Leadership) getLease() *lease { return l.(*lease) } -func (ls *Leadership) setLease(lease *lease) { +func (ls *Leadership) SetLease(lease *lease) { ls.lease.Store(lease) } @@ -156,7 +156,7 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl client: ls.client, lease: clientv3.NewLease(ls.client), } - ls.setLease(newLease) + ls.SetLease(newLease) failpoint.Inject("skipGrantLeader", func(val failpoint.Value) { var member pdpb.Member diff --git a/pkg/election/lease.go b/pkg/election/lease.go index a6b49fb99f84..5e5edb3e8073 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -48,6 +48,14 @@ type lease struct { expireTime atomic.Value } +func NewLease(client *clientv3.Client, purpose string) *lease { + return &lease{ + Purpose: purpose, + client: client, + lease: clientv3.NewLease(client), + } +} + // Grant uses `lease.Grant` to initialize the lease and expireTime. func (l *lease) Grant(leaseTimeout int64) error { if l == nil { diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 1ce5ecda51dd..3e1d678cffbb 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -45,14 +45,14 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er } // GetMSMembers returns all the members of the specified service name. -func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry, error) { - switch name { +func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistryEntry, error) { + switch serviceName { case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName: clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath) if err != nil { return nil, err } - servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name) + servicePath := ServicePath(strconv.FormatUint(clusterID, 10), serviceName) resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit() if err != nil { return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause() @@ -75,5 +75,5 @@ func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry, return entries, nil } - return nil, errors.Errorf("unknown service name %s", name) + return nil, errors.Errorf("unknown service name %s", serviceName) } diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index be3277f3fc63..4f327146d4b0 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -29,6 +29,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" mcsutils "github.com/tikv/pd/pkg/mcs/utils" @@ -39,11 +40,14 @@ import ( "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/unrolled/render" + "go.etcd.io/etcd/clientv3" ) // APIPathPrefix is the prefix of the API path. @@ -112,6 +116,7 @@ func NewService(srv *scheserver.Service) *Service { rd: createIndentRender(), } s.RegisterAdminRouter() + s.RegisterMemberRouter() s.RegisterConfigRouter() s.RegisterOperatorsRouter() s.RegisterSchedulersRouter() @@ -130,7 +135,13 @@ func (s *Service) RegisterAdminRouter() { router.DELETE("cache/regions/:id", deleteRegionCacheByID) } -// RegisterSchedulersRouter registers the router of the schedulers handler. +// RegisterMemberRouter registers the router of the member handler. +func (s *Service) RegisterMemberRouter() { + router := s.root.Group("member") + router.POST("/primary/transfer", transferPrimary) +} + +// RegisterSchedulersRouter registers the router of the schedulers' handler. func (s *Service) RegisterSchedulersRouter() { router := s.root.Group("schedulers") router.GET("", getSchedulers) @@ -259,6 +270,42 @@ func getConfig(c *gin.Context) { c.IndentedJSON(http.StatusOK, cfg) } +func transferPrimary(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + if svr.IsServing() { + c.AbortWithStatusJSON(http.StatusInternalServerError, "now is primary") + return + } + + newLease := election.NewLease(svr.GetClient(), "primary election") + if err := newLease.Grant(mcsutils.DefaultLeaderLease); err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, "newLease grant error") + } + + // remove primary lease + primaryKey := endpoint.SchedulingPrimaryPath(svr.GetClusterID()) + deleteResp, err := kv.NewSlowLogTxn(svr.GetClient()). + Then( + clientv3.OpDelete(primaryKey), + ).Commit() + if err != nil || !deleteResp.Succeeded { + c.AbortWithStatusJSON(http.StatusInternalServerError, "delete resp error") + } + + memberValue := svr.GetParticipant() + memberValue.GetLeadership().SetLease(newLease) + putResp, err := kv.NewSlowLogTxn(svr.GetClient()). + Then( + clientv3.OpPut(primaryKey, memberValue.MemberValue(), clientv3.WithLease(newLease.ID.Load().(clientv3.LeaseID))), + ). + Commit() + if err != nil || !putResp.Succeeded { + c.AbortWithStatusJSON(http.StatusInternalServerError, "put resp error") + } + + c.IndentedJSON(http.StatusOK, "transfer submitted!") +} + // @Tags admin // @Summary Drop all regions from cache. // @Produce json diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 47a7cf9962b1..ec5a11bb2baf 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -128,6 +128,14 @@ func (s *Server) GetBackendEndpoints() string { return s.cfg.BackendEndpoints } +func (s *Server) GetClusterID() uint64 { + return s.clusterID +} + +func (s *Server) GetParticipant() *member.Participant { + return s.participant +} + // SetLogLevel sets log level. func (s *Server) SetLogLevel(level string) error { if !logutil.IsLevelLegal(level) { @@ -249,18 +257,35 @@ func (s *Server) primaryElectionLoop() { func (s *Server) campaignLeader() { log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name())) - if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil { - if err.Error() == errs.ErrEtcdTxnConflict.Error() { - log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully", - zap.String("campaign-scheduling-primary-name", s.participant.Name())) - } else { - log.Error("campaign scheduling primary meets error due to etcd error", - zap.String("campaign-scheduling-primary-name", s.participant.Name()), - errs.ZapError(err)) + leader, _, err := s.participant.GetPersistentLeader() + if err != nil { + log.Error("getting the leader meets error", errs.ZapError(err)) + return + } + if leader != nil && !s.participant.IsSameLeader(leader) { + leader, ok := leader.(*schedulingpb.Participant) + if !ok { + log.Error("failed to get the leader", zap.Any("leader", leader)) + return } + log.Info("the scheduling primary/leader is already elected", zap.Stringer("scheduling-primary", leader)) return } + if leader == nil { + if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil { + if err.Error() == errs.ErrEtcdTxnConflict.Error() { + log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully", + zap.String("campaign-scheduling-primary-name", s.participant.Name())) + } else { + log.Error("campaign scheduling primary meets error due to etcd error", + zap.String("campaign-scheduling-primary-name", s.participant.Name()), + errs.ZapError(err)) + } + return + } + } + // Start keepalive the leadership and enable Scheduling service. ctx, cancel := context.WithCancel(s.serverLoopCtx) var resetLeaderOnce sync.Once @@ -290,6 +315,19 @@ func (s *Server) campaignLeader() { member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1) log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) + go func() { + log.Info("[primary] start to watch the primary", zap.Stringer("scheduling-primary", s.participant.GetLeader())) + _, revision, err := s.participant.GetPersistentLeader() + if err != nil { + log.Error("[primary] getting the leader meets error", errs.ZapError(err)) + return + } + // Watch will keep looping and never return unless the primary/leader has changed. + s.participant.GetLeadership().Watch(s.serverLoopCtx, revision) + s.participant.UnsetLeader() + log.Info("[primary] the scheduling primary has changed, try to re-campaign a primary") + }() + leaderTicker := time.NewTicker(utils.LeaderTickInterval) defer leaderTicker.Stop() diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index 44f4b353d58e..5dc3d615ca39 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -25,14 +25,17 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" tsoserver "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/tikv/pd/pkg/utils/logutil" "github.com/unrolled/render" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -94,6 +97,7 @@ func NewService(srv *tsoserver.Service) *Service { rd: createIndentRender(), } s.RegisterAdminRouter() + s.RegisterMemberRouter() s.RegisterKeyspaceGroupRouter() s.RegisterHealthRouter() s.RegisterConfigRouter() @@ -107,6 +111,12 @@ func (s *Service) RegisterAdminRouter() { router.PUT("/log", changeLogLevel) } +// RegisterMemberRouter registers the router of the member handler. +func (s *Service) RegisterMemberRouter() { + router := s.root.Group("member") + router.POST("/primary/transfer", transferPrimary) +} + // RegisterKeyspaceGroupRouter registers the router of the TSO keyspace group handler. func (s *Service) RegisterKeyspaceGroupRouter() { router := s.root.Group("keyspace-groups") @@ -141,6 +151,46 @@ func changeLogLevel(c *gin.Context) { c.String(http.StatusOK, "The log level is updated.") } +func transferPrimary(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) + if svr.IsServing() { + c.AbortWithStatusJSON(http.StatusInternalServerError, "now is primary") + return + } + + newLease := election.NewLease(svr.GetClient(), "transfer-primary") + if err := newLease.Grant(utils.DefaultLeaderLease); err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, "newLease grant error") + } + + tsoRootPath := endpoint.TSOSvcRootPath(svr.GetClusterID()) + primaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, utils.DefaultKeyspaceGroupID) + // delete previous primary firstly + deleteResp, err := kv.NewSlowLogTxn(svr.GetClient()). + Then( + clientv3.OpDelete(primaryKey), + ).Commit() + if err != nil || !deleteResp.Succeeded { + c.AbortWithStatusJSON(http.StatusInternalServerError, "delete resp error") + } + + memberValue, err := svr.GetMember(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) + memberValue.GetLeadership().SetLease(newLease) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, "get tso member") + } + putResp, err := kv.NewSlowLogTxn(svr.GetClient()). + Then( + clientv3.OpPut(primaryKey, memberValue.MemberValue(), clientv3.WithLease(newLease.ID.Load().(clientv3.LeaseID))), + ). + Commit() + if err != nil || !putResp.Succeeded { + c.AbortWithStatusJSON(http.StatusInternalServerError, "put resp error") + } + + c.IndentedJSON(http.StatusOK, "transfer submitted!") +} + // ResetTSParams is the input json body params of ResetTS type ResetTSParams struct { TSO string `json:"tso"` diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index c38c7142730b..d2dde6eeff24 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -121,6 +121,10 @@ func (s *Server) ServerLoopWgAdd(n int) { s.serverLoopWg.Add(n) } +func (s *Server) GetClusterID() uint64 { + return s.clusterID +} + // SetUpRestHandler sets up the REST handler. func (s *Server) SetUpRestHandler() (http.Handler, apiutil.APIServiceGroup) { return SetUpRestHandler(s.service) diff --git a/pkg/member/member.go b/pkg/member/member.go index af504d839638..4522eb7ae331 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -148,8 +148,8 @@ func (m *EmbeddedEtcdMember) setLeader(member *pdpb.Member) { m.lastLeaderUpdatedTime.Store(time.Now()) } -// unsetLeader unsets the member's PD leader. -func (m *EmbeddedEtcdMember) unsetLeader() { +// UnsetLeader unsets the member's PD leader. +func (m *EmbeddedEtcdMember) UnsetLeader() { m.leader.Store(&pdpb.Member{}) m.lastLeaderUpdatedTime.Store(time.Now()) } @@ -210,8 +210,8 @@ func (m *EmbeddedEtcdMember) PreCheckLeader() error { return nil } -// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). -func (m *EmbeddedEtcdMember) getPersistentLeader() (*pdpb.Member, int64, error) { +// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). +func (m *EmbeddedEtcdMember) GetPersistentLeader() (any, int64, error) { leader := &pdpb.Member{} ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { @@ -233,17 +233,17 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) { return nil, true } - leader, revision, err := m.getPersistentLeader() + leaderRaw, revision, err := m.GetPersistentLeader() if err != nil { log.Error("getting pd leader meets error", errs.ZapError(err)) time.Sleep(200 * time.Millisecond) return nil, true } - if leader == nil { + if leaderRaw == nil { // no leader yet return nil, false } - + leader := leaderRaw.(*pdpb.Member) if m.IsSameLeader(leader) { // oh, we are already a PD leader, which indicates we may meet something wrong // in previous CampaignLeader. We should delete the leadership and campaign again. @@ -269,14 +269,14 @@ func (m *EmbeddedEtcdMember) CheckLeader() (ElectionLeader, bool) { func (m *EmbeddedEtcdMember) WatchLeader(ctx context.Context, leader *pdpb.Member, revision int64) { m.setLeader(leader) m.leadership.Watch(ctx, revision) - m.unsetLeader() + m.UnsetLeader() } // ResetLeader is used to reset the PD member's current leadership. // Basically it will reset the leader lease and unset leader info. func (m *EmbeddedEtcdMember) ResetLeader() { m.leadership.Reset() - m.unsetLeader() + m.UnsetLeader() } // CheckPriority checks whether the etcd leader should be moved according to the priority. @@ -324,8 +324,8 @@ func (m *EmbeddedEtcdMember) GetEtcdLeader() uint64 { } // IsSameLeader checks whether a server is the leader itself. -func (m *EmbeddedEtcdMember) IsSameLeader(leader *pdpb.Member) bool { - return leader.GetMemberId() == m.ID() +func (m *EmbeddedEtcdMember) IsSameLeader(leader any) bool { + return leader.(*pdpb.Member).GetMemberId() == m.ID() } // InitMemberInfo initializes the member info. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 8a0ffadd31e2..7a962cad120a 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -154,8 +154,8 @@ func (m *Participant) setLeader(member participant) { m.lastLeaderUpdatedTime.Store(time.Now()) } -// unsetLeader unsets the member's leader. -func (m *Participant) unsetLeader() { +// UnsetLeader unsets the member's leader. +func (m *Participant) UnsetLeader() { leader := NewParticipantByService(m.serviceName) m.leader.Store(leader) m.lastLeaderUpdatedTime.Store(time.Now()) @@ -205,8 +205,8 @@ func (*Participant) PreCheckLeader() error { return nil } -// getPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). -func (m *Participant) getPersistentLeader() (participant, int64, error) { +// GetPersistentLeader gets the corresponding leader from etcd by given leaderPath (as the key). +func (m *Participant) GetPersistentLeader() (any, int64, error) { leader := NewParticipantByService(m.serviceName) ok, rev, err := etcdutil.GetProtoMsgWithModRev(m.client, m.GetLeaderPath(), leader) if err != nil { @@ -228,17 +228,18 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) { return nil, true } - leader, revision, err := m.getPersistentLeader() + leaderRaw, revision, err := m.GetPersistentLeader() if err != nil { log.Error("getting the leader meets error", errs.ZapError(err)) time.Sleep(200 * time.Millisecond) return nil, true } - if leader == nil { + if leaderRaw == nil { // no leader yet return nil, false } + leader := leaderRaw.(participant) if m.IsSameLeader(leader) { // oh, we are already the leader, which indicates we may meet something wrong // in previous CampaignLeader. We should delete the leadership and campaign again. @@ -264,19 +265,19 @@ func (m *Participant) CheckLeader() (ElectionLeader, bool) { func (m *Participant) WatchLeader(ctx context.Context, leader participant, revision int64) { m.setLeader(leader) m.leadership.Watch(ctx, revision) - m.unsetLeader() + m.UnsetLeader() } // ResetLeader is used to reset the member's current leadership. // Basically it will reset the leader lease and unset leader info. func (m *Participant) ResetLeader() { m.leadership.Reset() - m.unsetLeader() + m.UnsetLeader() } // IsSameLeader checks whether a server is the leader itself. -func (m *Participant) IsSameLeader(leader participant) bool { - return leader.GetId() == m.ID() +func (m *Participant) IsSameLeader(leader any) bool { + return leader.(participant).GetId() == m.ID() } // CheckPriority checks whether there is another participant has higher priority and resign it as the leader if so. diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index f1683de1352d..8353bd0cc775 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -145,6 +145,12 @@ type ElectionMember interface { GetDCLocationPath(id uint64) string // PreCheckLeader does some pre-check before checking whether it's the leader. PreCheckLeader() error + // GetPersistentLeader returns the persistent leader. + GetPersistentLeader() (any, int64, error) + // IsSameLeader checks whether the leader is the same as the given leader. + IsSameLeader(leader any) bool + // UnsetLeader unsets the member's leader. + UnsetLeader() } // AllocatorManager is used to manage the TSO Allocators a PD server holds. diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index f90dc5f26fec..cda9a2ac74b7 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -568,23 +568,42 @@ func (gta *GlobalTSOAllocator) campaignLeader() { log.Info("start to campaign the primary", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name())) - if err := gta.am.member.CampaignLeader(gta.ctx, gta.am.leaderLease); err != nil { - if errors.Is(err, errs.ErrEtcdTxnConflict) { - log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - zap.String("campaign-tso-primary-name", gta.member.Name())) - } else if errors.Is(err, errs.ErrCheckCampaign) { - log.Info("campaign tso primary meets error due to pre-check campaign failed, the tso keyspace group may be in split", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - zap.String("campaign-tso-primary-name", gta.member.Name())) - } else { - log.Error("campaign tso primary meets error due to etcd error", - logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), - zap.String("campaign-tso-primary-name", gta.member.Name()), errs.ZapError(err)) + + leader, _, err := gta.member.GetPersistentLeader() + if err != nil { + log.Error("getting TSO leader meets error", errs.ZapError(err)) + time.Sleep(200 * time.Millisecond) + return + } + if leader != nil && !gta.member.IsSameLeader(leader) { + leader, ok := leader.(member.ElectionLeader) + if !ok { + log.Error("failed to get the leader", zap.Any("leader", leader)) + return } + log.Info("the TSO primary/leader is already elected", zap.Stringer("tso-primary", leader)) return } + if leader == nil { + if err := gta.am.member.CampaignLeader(gta.ctx, gta.am.leaderLease); err != nil { + if errors.Is(err, errs.ErrEtcdTxnConflict) { + log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.String("campaign-tso-primary-name", gta.member.Name())) + } else if errors.Is(err, errs.ErrCheckCampaign) { + log.Info("campaign tso primary meets error due to pre-check campaign failed, the tso keyspace group may be in split", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.String("campaign-tso-primary-name", gta.member.Name())) + } else { + log.Error("campaign tso primary meets error due to etcd error", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.String("campaign-tso-primary-name", gta.member.Name()), errs.ZapError(err)) + } + return + } + } + // Start keepalive the leadership and enable TSO service. // TSO service is strictly enabled/disabled by the leader lease for 2 reasons: // 1. lease based approach is not affected by thread pause, slow runtime schedule, etc. @@ -596,7 +615,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { gta.member.ResetLeader() }) - // maintain the the leadership, after this, TSO can be service. + // maintain the leadership, after this, TSO can be service. gta.member.KeepLeader(ctx) log.Info("campaign tso primary ok", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), @@ -635,6 +654,19 @@ func (gta *GlobalTSOAllocator) campaignLeader() { logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("tso-primary-name", gta.member.Name())) + go func() { + log.Info("[primary] start to watch the primary", zap.Uint64("tso-primary", gta.member.GetLeaderID())) + _, revision, err := gta.member.GetPersistentLeader() + if err != nil { + log.Error("[primary] getting the leader meets error", errs.ZapError(err)) + return + } + // Watch will keep looping and never return unless the primary/leader has changed. + gta.member.GetLeadership().Watch(gta.ctx, revision) + gta.member.UnsetLeader() + log.Info("[primary] the TSO primary has changed, try to re-campaign a primary") + }() + leaderTicker := time.NewTicker(mcsutils.LeaderTickInterval) defer leaderTicker.Stop()