Skip to content

Commit

Permalink
cluster: fix tso fallback due raft cluster did not stop tso service (…
Browse files Browse the repository at this point in the history
…part2)

Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Dec 9, 2024
1 parent 90cc61b commit 13b2b32
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 0 deletions.
19 changes: 19 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,19 @@ func (c *RaftCluster) Start(s Server) error {
return err
}
c.checkTSOService()
defer func() {
// We need to try to stop tso jobs when the cluster is not running.
// Ref: https://github.com/tikv/pd/issues/8836
if !c.running {
c.stopTSOJobsIfNeeded()
}
}()
failpoint.Inject("raftClusterReturn", func(val failpoint.Value) {
if val, ok := val.(bool); (ok && val) || !ok {
failpoint.Return(errors.New("raftClusterReturn"))
}
failpoint.Return(nil)
})
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
Expand Down Expand Up @@ -2566,3 +2579,9 @@ func (c *RaftCluster) SetServiceIndependent(name string) {
func (c *RaftCluster) UnsetServiceIndependent(name string) {
c.independentServices.Delete(name)
}

// GetGlobalTSOAllocator return global tso allocator
// It only is used for test.
func (c *RaftCluster) GetGlobalTSOAllocator() (tso.Allocator, error) {
return c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
}
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2125,3 +2125,9 @@ func (s *Server) GetMaxResetTSGap() time.Duration {
func (s *Server) SetClient(client *clientv3.Client) {
s.client = client
}

// GetGlobalTSOAllocator return global tso allocator
// It only is used for test.
func (s *Server) GetGlobalTSOAllocator() (tso.Allocator, error) {
return s.cluster.GetGlobalTSOAllocator()
}
56 changes: 56 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,62 @@ func TestRaftClusterMultipleRestart(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
}

// TestRaftClusterStartTSOJob is used to test whether tso job service is normally closed
// when raft cluster is stopped ahead of time.
// Ref: https://github.com/tikv/pd/issues/8836
func TestRaftClusterStartTSOJob(t *testing.T) {
re := require.New(t)
name := "pd1"
// case 1: normal start
ctx, cancel := context.WithCancel(context.Background())
tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
re.NoError(tc.RunInitialServers())
re.NotEmpty(tc.WaitLeader())
leaderServer := tc.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()
allocator, err := tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
re.NoError(err)
re.True(allocator.IsInitialize())
tc.Destroy()
cancel()
// case 2: return ahead of time but no error when start raft cluster
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/raftClusterReturn", `return(false)`))
ctx, cancel = context.WithCancel(context.Background())
tc, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
tc.WaitLeader()
allocator, err = tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
re.NoError(err)
re.False(allocator.IsInitialize())
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn"))
tc.Destroy()
cancel()
// case 3: meet error when start raft cluster
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/raftClusterReturn", `return(true)`))
ctx, cancel = context.WithCancel(context.Background())
tc, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
tc.WaitLeader()
allocator, err = tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
re.NoError(err)
re.False(allocator.IsInitialize())
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn"))
tc.Destroy()
cancel()
}

func newMetaStore(storeID uint64, addr, version string, state metapb.StoreState, deployPath string) *metapb.Store {
return &metapb.Store{Id: storeID, Address: addr, Version: version, State: state, DeployPath: deployPath}
}
Expand Down

0 comments on commit 13b2b32

Please sign in to comment.