diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 755a7d6d331..5d0c5655a33 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -64,7 +64,10 @@ type Leadership struct { leaderKey string leaderValue string - LeaderWatch bool + leaderWatch struct { + syncutil.RWMutex + val bool + } keepAliveCtx context.Context keepAliveCancelFunc context.CancelFunc @@ -74,10 +77,6 @@ type Leadership struct { campaignTimes []time.Time } -func (ls *Leadership) SetLeaderWatch(val bool) { - ls.LeaderWatch = val -} - func (ls *Leadership) GetLeaderValue() string { return ls.leaderValue } @@ -123,6 +122,20 @@ func (ls *Leadership) GetLeaderKey() string { return ls.leaderKey } +// SetLeaderWatch sets the leader watch flag. +func (ls *Leadership) SetLeaderWatch(val bool) { + ls.leaderWatch.Lock() + ls.leaderWatch.val = val + ls.leaderWatch.Unlock() +} + +// GetLeaderWatch gets the leader watch flag. +func (ls *Leadership) GetLeaderWatch() bool { + ls.leaderWatch.RLock() + defer ls.leaderWatch.RUnlock() + return ls.leaderWatch.val +} + // GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`. func (ls *Leadership) GetCampaignTimesNum() int { if ls == nil { @@ -386,8 +399,8 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { return } // only API update the leader key to transfer the leader will meet - if ev.Type == mvccpb.PUT && ls.LeaderWatch { - log.Info("[LeaderWatch] current leadership is updated", zap.Int64("watchRevision", revision), + if ev.Type == mvccpb.PUT && ls.GetLeaderWatch() { + log.Info("[LeaderWatch] current leadership is updated", zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return } @@ -409,5 +422,5 @@ func (ls *Leadership) Reset() { } ls.keepAliveCancelFuncLock.Unlock() ls.getLease().Close() - ls.LeaderWatch = false + ls.SetLeaderWatch(false) } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index adb72fd52c8..0593058e3cd 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -17,6 +17,7 @@ package server import ( "context" "fmt" + "github.com/tikv/pd/pkg/utils/etcdutil" "net/http" "os" "os/signal" @@ -306,7 +307,7 @@ func (s *Server) campaignLeader() { log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) exitPrimary := make(chan struct{}) - go s.primaryWatch(exitPrimary) + go s.primaryWatch(ctx, exitPrimary) leaderTicker := time.NewTicker(utils.LeaderTickInterval) defer leaderTicker.Stop() @@ -329,7 +330,7 @@ func (s *Server) campaignLeader() { } } -func (s *Server) primaryWatch(exitPrimary chan struct{}) { +func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) { _, revision, err := s.participant.GetPersistentLeader() if err != nil { log.Error("[primary] getting the leader meets error", errs.ZapError(err)) @@ -341,10 +342,30 @@ func (s *Server) primaryWatch(exitPrimary chan struct{}) { s.participant.GetLeadership().Watch(s.serverLoopCtx, revision+1) s.participant.GetLeadership().SetLeaderWatch(false) + // only API update primary will set the expected leader + // check leader key whether deleted + leaderRaw, err := etcdutil.GetValue(s.participant.Client(), s.participant.GetLeaderPath()) + if err != nil { + log.Error("[primary] get primary key error", zap.Error(err)) + return + } + if leaderRaw == nil { + log.Info("[primary] leader key is deleted, the primary will step down") + return + } + utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath()) s.participant.UnsetLeader() - exitPrimary <- struct{}{} + for { + select { + case <-ctx.Done(): + log.Info("[primary] exit the primary watch loop") + return + case exitPrimary <- struct{}{}: + return + } + } } // Close closes the server. diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index 7bbebef5693..b4d737c99e3 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -16,7 +16,6 @@ package utils import ( "context" - "github.com/tikv/pd/pkg/storage/kv" "net" "net/http" "os" @@ -33,6 +32,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/soheilhy/cmux" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -91,7 +91,7 @@ func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) { resp, err := kv.NewSlowLogTxn(client). Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))). Commit() - if err != nil && !resp.Succeeded { + if err != nil || !resp.Succeeded { log.Error("change primary error", errs.ZapError(err)) return } @@ -99,13 +99,11 @@ func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) { // SetExpectedPrimary sets the expected primary key when the current primary has exited. func SetExpectedPrimary(client *clientv3.Client, leaderPath string) { - // write a flag to indicate the current primary has exited leaderRaw, err := etcdutil.GetValue(client, leaderPath) if err != nil { log.Error("[primary] get primary key error", zap.Error(err)) return } - // write a flag to indicate the current primary has exited resp, err := kv.NewSlowLogTxn(client). Then( @@ -113,7 +111,7 @@ func SetExpectedPrimary(client *clientv3.Client, leaderPath string) { // indicate the current primary has exited clientv3.OpDelete(leaderPath)). Commit() - if err != nil && !resp.Succeeded { + if err != nil || !resp.Succeeded { log.Error("change primary error", errs.ZapError(err)) return } diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 1ca854dfed9..b2e02ef09cb 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/tikv/pd/pkg/utils/etcdutil" "runtime/trace" "sync" "sync/atomic" @@ -647,7 +648,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { zap.String("tso-primary-name", gta.member.Name())) exitPrimary := make(chan struct{}) - go gta.primaryWatch(exitPrimary) + go gta.primaryWatch(ctx, exitPrimary) leaderTicker := time.NewTicker(mcsutils.LeaderTickInterval) defer leaderTicker.Stop() @@ -672,7 +673,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { } } -func (gta *GlobalTSOAllocator) primaryWatch(exitPrimary chan struct{}) { +func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary chan struct{}) { _, revision, err := gta.member.GetPersistentLeader() if err != nil { log.Error("[primary] getting the leader meets error", errs.ZapError(err)) @@ -686,10 +687,30 @@ func (gta *GlobalTSOAllocator) primaryWatch(exitPrimary chan struct{}) { gta.member.GetLeadership().Watch(gta.ctx, revision+1) gta.member.GetLeadership().SetLeaderWatch(false) + // only API update primary will set the expected leader + // check leader key whether deleted + leaderRaw, err := etcdutil.GetValue(gta.member.Client(), gta.member.GetLeaderPath()) + if err != nil { + log.Error("[primary] get primary key error", zap.Error(err)) + return + } + if leaderRaw == nil { + log.Info("[primary] leader key is deleted, the primary will step down") + return + } + mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath()) gta.member.UnsetLeader() - exitPrimary <- struct{}{} + for { + select { + case <-ctx.Done(): + log.Info("[primary] exit the primary watch loop") + return + case exitPrimary <- struct{}{}: + return + } + } } func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics {