diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 02f519dbc75..3ee413818a5 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -34,11 +34,12 @@ import ( ) const ( - defaultCampaignTimesSlot = 10 - watchLoopUnhealthyTimeout = 60 * time.Second - campaignTimesRecordTimeout = 5 * time.Minute + defaultCampaignTimesSlot = 10 + watchLoopUnhealthyTimeout = 60 * time.Second ) +var campaignTimesRecordTimeout = 5 * time.Minute + // GetLeader gets the corresponding leader from etcd by given leaderPath (as the key). func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) { leader := &pdpb.Member{} @@ -114,6 +115,7 @@ func (ls *Leadership) GetLeaderKey() string { } // GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`. +// Need to make sure `AddCampaignTimes` is called before this function. func (ls *Leadership) GetCampaignTimesNum() int { if ls == nil { return 0 @@ -129,8 +131,8 @@ func (ls *Leadership) ResetCampaignTimes() { ls.campaignTimes = make([]time.Time, 0, defaultCampaignTimesSlot) } -// addCampaignTimes is used to add the campaign times of the leader. -func (ls *Leadership) addCampaignTimes() { +// AddCampaignTimes is used to add the campaign times of the leader. +func (ls *Leadership) AddCampaignTimes() { if ls == nil { return } @@ -138,7 +140,7 @@ func (ls *Leadership) addCampaignTimes() { if time.Since(ls.campaignTimes[i]) > campaignTimesRecordTimeout { // remove the time which is more than `campaignTimesRecordTimeout` // array is sorted by time - ls.campaignTimes = ls.campaignTimes[i:] + ls.campaignTimes = ls.campaignTimes[i+1:] break } } @@ -148,7 +150,6 @@ func (ls *Leadership) addCampaignTimes() { // Campaign is used to campaign the leader with given lease and returns a leadership func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error { - ls.addCampaignTimes() ls.leaderValue = leaderData // Create a new lease to campaign newLease := &lease{ diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index b5480200764..cb93fb3a99f 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -262,3 +262,36 @@ func TestRequestProgress(t *testing.T) { checkWatcherRequestProgress(false) checkWatcherRequestProgress(true) } + +func TestCampaignTimes(t *testing.T) { + re := require.New(t) + _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) + defer clean() + leadership := NewLeadership(client, "test_leader", "test_leader") + + // all the campaign times are within the timeout. + campaignTimesRecordTimeout = 10 * time.Second + defer func() { + campaignTimesRecordTimeout = 5 * time.Minute + }() + for i := 0; i < 3; i++ { + leadership.AddCampaignTimes() + time.Sleep(100 * time.Millisecond) + } + re.Equal(3, leadership.GetCampaignTimesNum()) + + // only the last 2 records are valid. + campaignTimesRecordTimeout = 200 * time.Millisecond + for i := 0; i < 3; i++ { + leadership.AddCampaignTimes() + time.Sleep(100 * time.Millisecond) + } + re.Equal(2, leadership.GetCampaignTimesNum()) + + time.Sleep(200 * time.Millisecond) + // need to wait for the next addCampaignTimes to update the campaign time. + re.Equal(2, leadership.GetCampaignTimesNum()) + // check campaign leader frequency. + leadership.AddCampaignTimes() + re.Equal(1, leadership.GetCampaignTimesNum()) +} diff --git a/pkg/member/member.go b/pkg/member/member.go index 0daec8e8df6..afbf4e18464 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -182,10 +182,11 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time { // and make it become a PD leader. // leader should be changed when campaign leader frequently. func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error { + m.leadership.AddCampaignTimes() failpoint.Inject("skipCampaignLeaderCheck", func() { failpoint.Return(m.leadership.Campaign(leaseTimeout, m.MemberValue())) }) - if m.leadership.GetCampaignTimesNum() >= campaignLeaderFrequencyTimes { + if m.leadership.GetCampaignTimesNum() > campaignLeaderFrequencyTimes { log.Warn("campaign times is too frequent, resign and campaign again", zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath())) if err := m.ResignEtcdLeader(ctx, m.Name(), ""); err != nil { diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index f428fb7c198..192b2a2fb8a 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -327,20 +327,26 @@ func TestCampaignLeaderFrequently(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 5) + cluster, err := tests.NewTestCluster(ctx, 3) defer cluster.Destroy() re.NoError(err) err = cluster.RunInitialServers() re.NoError(err) + // the 1st time campaign leader. cluster.WaitLeader() leader := cluster.GetLeader() re.NotEmpty(cluster.GetLeader()) - for i := 0; i < 3; i++ { + // need to prevent 3 times(including the above 1st time) campaign leader in 5 min. + for i := 0; i < 2; i++ { cluster.GetServers()[cluster.GetLeader()].ResetPDLeader() cluster.WaitLeader() + re.Equal(leader, cluster.GetLeader()) } + // check for the 4th time. + cluster.GetLeaderServer().ResetPDLeader() + cluster.WaitLeader() // PD leader should be different from before because etcd leader changed. re.NotEmpty(cluster.GetLeader()) re.NotEqual(leader, cluster.GetLeader())