Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: fix tso fallback due raft cluster did not stop tso service (part2) #8885

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (c *RaftCluster) InitCluster(
}

// Start starts a cluster.
func (c *RaftCluster) Start(s Server) error {
func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
c.Lock()
defer c.Unlock()

Expand All @@ -324,11 +324,29 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}
c.isAPIServiceMode = s.IsAPIServiceMode()
err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
}
c.checkTSOService()
// We should not manage tso service when bootstrap try to start raft cluster.
// It only is controlled by leader election.
// Ref: https://github.com/tikv/pd/issues/8836
if !bootstrap {
c.checkTSOService()
}
defer func() {
if !bootstrap && err != nil {
c.stopTSOJobsIfNeeded()
}
}()
failpoint.Inject("raftClusterReturn", func(val failpoint.Value) {
if val, ok := val.(bool); (ok && val) || !ok {
err = errors.New("raftClusterReturn")
} else {
err = nil
}
failpoint.Return(err)
})
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
Expand Down Expand Up @@ -2554,3 +2572,9 @@ func (c *RaftCluster) SetServiceIndependent(name string) {
func (c *RaftCluster) UnsetServiceIndependent(name string) {
c.independentServices.Delete(name)
}

// GetGlobalTSOAllocator return global tso allocator
// It only is used for test.
func (c *RaftCluster) GetGlobalTSOAllocator() tso.Allocator {
return c.tsoAllocator.GetAllocator()
}
10 changes: 8 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe
log.Warn("flush the bootstrap region failed", errs.ZapError(err))
}

if err := s.cluster.Start(s); err != nil {
if err := s.cluster.Start(s, true); err != nil {
return nil, err
}

Expand All @@ -776,7 +776,7 @@ func (s *Server) createRaftCluster() error {
return nil
}

return s.cluster.Start(s)
return s.cluster.Start(s, false)
}

func (s *Server) stopRaftCluster() {
Expand Down Expand Up @@ -2097,3 +2097,9 @@ func (s *Server) GetMaxResetTSGap() time.Duration {
func (s *Server) SetClient(client *clientv3.Client) {
s.client = client
}

// GetGlobalTSOAllocator return global tso allocator
// It only is used for test.
func (s *Server) GetGlobalTSOAllocator() tso.Allocator {
return s.cluster.GetGlobalTSOAllocator()
}
2 changes: 1 addition & 1 deletion tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ func TestSendApiWhenRestartRaftCluster(t *testing.T) {
output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError)
re.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first")

err = rc.Start(leader.GetServer())
err = rc.Start(leader.GetServer(), false)
re.NoError(err)
rc = leader.GetRaftCluster()
re.NotNil(rc)
Expand Down
99 changes: 95 additions & 4 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func TestRaftClusterRestart(t *testing.T) {
re.NotNil(rc)
rc.Stop()

err = rc.Start(leaderServer.GetServer())
err = rc.Start(leaderServer.GetServer(), false)
re.NoError(err)

rc = leaderServer.GetRaftCluster()
Expand Down Expand Up @@ -621,14 +621,105 @@ func TestRaftClusterMultipleRestart(t *testing.T) {
for range 100 {
// See https://github.com/tikv/pd/issues/8543
rc.Wait()
err = rc.Start(leaderServer.GetServer())
err = rc.Start(leaderServer.GetServer(), false)
re.NoError(err)
time.Sleep(time.Millisecond)
rc.Stop()
}
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
}

// TestRaftClusterStartTSOJob is used to test whether tso job service is normally closed
// when raft cluster is stopped ahead of time.
// Ref: https://github.com/tikv/pd/issues/8836
func TestRaftClusterStartTSOJob(t *testing.T) {
re := require.New(t)
name := "pd1"
// case 1: normal start
ctx, cancel := context.WithCancel(context.Background())
tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
re.NoError(tc.RunInitialServers())
re.NotEmpty(tc.WaitLeader())
leaderServer := tc.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()
testutil.Eventually(re, func() bool {
allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
return allocator.IsInitialize()
})
tc.Destroy()
cancel()
// case 2: return ahead of time but no error when start raft cluster
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/raftClusterReturn", `return(false)`))
ctx, cancel = context.WithCancel(context.Background())
tc, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
tc.WaitLeader()
testutil.Eventually(re, func() bool {
allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
return allocator.IsInitialize()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn"))
tc.Destroy()
cancel()
// case 3: meet error when start raft cluster
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/raftClusterReturn", `return(true)`))
ctx, cancel = context.WithCancel(context.Background())
tc, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
tc.WaitLeader()
testutil.Eventually(re, func() bool {
allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
return !allocator.IsInitialize()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn"))
tc.Destroy()
cancel()
// case 4: multiple bootstrap in 3 pd cluster
ctx, cancel = context.WithCancel(context.Background())
tc, err = tests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
re.NoError(tc.RunInitialServers())
re.NotEmpty(tc.WaitLeader())
leaderServer = tc.GetLeaderServer()
re.NotNil(leaderServer)
name = leaderServer.GetLeader().GetName()
wg := sync.WaitGroup{}
for range 3 {
wg.Add(1)
go func() {
leaderServer.BootstrapCluster()
wg.Done()
}()
}
wg.Wait()
testutil.Eventually(re, func() bool {
allocator := leaderServer.GetServer().GetGlobalTSOAllocator()
return allocator.IsInitialize()
})
re.NoError(tc.ResignLeader())
re.NotEmpty(tc.WaitLeader())
testutil.Eventually(re, func() bool {
allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
return !allocator.IsInitialize()
})
tc.Destroy()
cancel()
}

func newMetaStore(storeID uint64, addr, version string, state metapb.StoreState, deployPath string) *metapb.Store {
return &metapb.Store{Id: storeID, Address: addr, Version: version, State: state, DeployPath: deployPath}
}
Expand Down Expand Up @@ -1437,7 +1528,7 @@ func TestTransferLeaderForScheduler(t *testing.T) {
tc.WaitLeader()
leaderServer = tc.GetLeaderServer()
rc1 := leaderServer.GetServer().GetRaftCluster()
rc1.Start(leaderServer.GetServer())
rc1.Start(leaderServer.GetServer(), false)
re.NoError(err)
re.NotNil(rc1)
// region heartbeat
Expand All @@ -1457,7 +1548,7 @@ func TestTransferLeaderForScheduler(t *testing.T) {
tc.WaitLeader()
leaderServer = tc.GetLeaderServer()
rc = leaderServer.GetServer().GetRaftCluster()
rc.Start(leaderServer.GetServer())
rc.Start(leaderServer.GetServer(), false)
re.NotNil(rc)
// region heartbeat
id = leaderServer.GetAllocator()
Expand Down
Loading