Skip to content

Commit

Permalink
cluster: fix tso fallback due raft cluster did not stop tso service (…
Browse files Browse the repository at this point in the history
…part2) (#8885) (#8890)

ref #8477, close #8889

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: lhy1024 <[email protected]>
  • Loading branch information
ti-chi-bot and lhy1024 authored Dec 10, 2024
1 parent c0daa90 commit d190c0e
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 12 deletions.
41 changes: 36 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (c *RaftCluster) InitCluster(
}

// Start starts a cluster.
func (c *RaftCluster) Start(s Server) error {
func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
c.Lock()
defer c.Unlock()

Expand All @@ -327,11 +327,32 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}
c.isAPIServiceMode = s.IsAPIServiceMode()
err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
}
c.checkTSOService()
// We should not manage tso service when bootstrap try to start raft cluster.
// It only is controlled by leader election.
// Ref: https://github.com/tikv/pd/issues/8836
if !bootstrap {
c.checkTSOService()
}
defer func() {
if !bootstrap && err != nil {
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
}
}()
failpoint.Inject("raftClusterReturn", func(val failpoint.Value) {
if val, ok := val.(bool); (ok && val) || !ok {
err = errors.New("raftClusterReturn")
} else {
err = nil
}
failpoint.Return(err)
})
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
Expand Down Expand Up @@ -422,12 +443,12 @@ func (c *RaftCluster) checkTSOService() {
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
} else {
if err := c.startTSOJobsIfNeeded(); err != nil {
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
log.Info("TSO is provided by TSO server")
if !c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by TSO server")
c.SetServiceIndependent(constant.TSOServiceName)
}
}
Expand Down Expand Up @@ -2579,3 +2600,13 @@ func (c *RaftCluster) SetServiceIndependent(name string) {
func (c *RaftCluster) UnsetServiceIndependent(name string) {
c.independentServices.Delete(name)
}

// GetGlobalTSOAllocator return global tso allocator
// It only is used for test.
func (c *RaftCluster) GetGlobalTSOAllocator() tso.Allocator {
allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation)
if err != nil {
return nil
}
return allocator
}
10 changes: 8 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe
log.Warn("flush the bootstrap region failed", errs.ZapError(err))
}

if err := s.cluster.Start(s); err != nil {
if err := s.cluster.Start(s, true); err != nil {
return nil, err
}

Expand All @@ -795,7 +795,7 @@ func (s *Server) createRaftCluster() error {
return nil
}

return s.cluster.Start(s)
return s.cluster.Start(s, false)
}

func (s *Server) stopRaftCluster() {
Expand Down Expand Up @@ -2125,3 +2125,9 @@ func (s *Server) GetMaxResetTSGap() time.Duration {
func (s *Server) SetClient(client *clientv3.Client) {
s.client = client
}

// GetGlobalTSOAllocator return global tso allocator
// It only is used for test.
func (s *Server) GetGlobalTSOAllocator() tso.Allocator {
return s.cluster.GetGlobalTSOAllocator()
}
2 changes: 1 addition & 1 deletion tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ func TestSendApiWhenRestartRaftCluster(t *testing.T) {
output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError)
re.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first")

err = rc.Start(leader.GetServer())
err = rc.Start(leader.GetServer(), false)
re.NoError(err)
rc = leader.GetRaftCluster()
re.NotNil(rc)
Expand Down
99 changes: 95 additions & 4 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func TestRaftClusterRestart(t *testing.T) {
re.NotNil(rc)
rc.Stop()

err = rc.Start(leaderServer.GetServer())
err = rc.Start(leaderServer.GetServer(), false)
re.NoError(err)

rc = leaderServer.GetRaftCluster()
Expand Down Expand Up @@ -619,14 +619,105 @@ func TestRaftClusterMultipleRestart(t *testing.T) {
for range 100 {
// See https://github.com/tikv/pd/issues/8543
rc.Wait()
err = rc.Start(leaderServer.GetServer())
err = rc.Start(leaderServer.GetServer(), false)
re.NoError(err)
time.Sleep(time.Millisecond)
rc.Stop()
}
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
}

// TestRaftClusterStartTSOJob is used to test whether tso job service is normally closed
// when raft cluster is stopped ahead of time.
// Ref: https://github.com/tikv/pd/issues/8836
func TestRaftClusterStartTSOJob(t *testing.T) {
re := require.New(t)
name := "pd1"
// case 1: normal start
ctx, cancel := context.WithCancel(context.Background())
tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
re.NoError(tc.RunInitialServers())
re.NotEmpty(tc.WaitLeader())
leaderServer := tc.GetLeaderServer()
re.NotNil(leaderServer)
leaderServer.BootstrapCluster()
testutil.Eventually(re, func() bool {
allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
return allocator.IsInitialize()
})
tc.Destroy()
cancel()
// case 2: return ahead of time but no error when start raft cluster
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/raftClusterReturn", `return(false)`))
ctx, cancel = context.WithCancel(context.Background())
tc, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
tc.WaitLeader()
testutil.Eventually(re, func() bool {
allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
return allocator.IsInitialize()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn"))
tc.Destroy()
cancel()
// case 3: meet error when start raft cluster
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/raftClusterReturn", `return(true)`))
ctx, cancel = context.WithCancel(context.Background())
tc, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
tc.WaitLeader()
testutil.Eventually(re, func() bool {
allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
return !allocator.IsInitialize()
})
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn"))
tc.Destroy()
cancel()
// case 4: multiple bootstrap in 3 pd cluster
ctx, cancel = context.WithCancel(context.Background())
tc, err = tests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) {
conf.LeaderLease = 300
})
re.NoError(err)
re.NoError(tc.RunInitialServers())
re.NotEmpty(tc.WaitLeader())
leaderServer = tc.GetLeaderServer()
re.NotNil(leaderServer)
name = leaderServer.GetLeader().GetName()
wg := sync.WaitGroup{}
for range 3 {
wg.Add(1)
go func() {
leaderServer.BootstrapCluster()
wg.Done()
}()
}
wg.Wait()
testutil.Eventually(re, func() bool {
allocator := leaderServer.GetServer().GetGlobalTSOAllocator()
return allocator.IsInitialize()
})
re.NoError(tc.ResignLeader())
re.NotEmpty(tc.WaitLeader())
testutil.Eventually(re, func() bool {
allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator()
return !allocator.IsInitialize()
})
tc.Destroy()
cancel()
}

func newMetaStore(storeID uint64, addr, version string, state metapb.StoreState, deployPath string) *metapb.Store {
return &metapb.Store{Id: storeID, Address: addr, Version: version, State: state, DeployPath: deployPath}
}
Expand Down Expand Up @@ -1435,7 +1526,7 @@ func TestTransferLeaderForScheduler(t *testing.T) {
tc.WaitLeader()
leaderServer = tc.GetLeaderServer()
rc1 := leaderServer.GetServer().GetRaftCluster()
rc1.Start(leaderServer.GetServer())
rc1.Start(leaderServer.GetServer(), false)
re.NoError(err)
re.NotNil(rc1)
// region heartbeat
Expand All @@ -1455,7 +1546,7 @@ func TestTransferLeaderForScheduler(t *testing.T) {
tc.WaitLeader()
leaderServer = tc.GetLeaderServer()
rc = leaderServer.GetServer().GetRaftCluster()
rc.Start(leaderServer.GetServer())
rc.Start(leaderServer.GetServer(), false)
re.NotNil(rc)
// region heartbeat
id = leaderServer.GetAllocator()
Expand Down

0 comments on commit d190c0e

Please sign in to comment.