From d190c0e9082de46128b756f93b1291768dda645a Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 10 Dec 2024 19:21:18 +0800 Subject: [PATCH] cluster: fix tso fallback due raft cluster did not stop tso service (part2) (#8885) (#8890) ref tikv/pd#8477, close tikv/pd#8889 Signed-off-by: lhy1024 Co-authored-by: lhy1024 --- server/cluster/cluster.go | 41 ++++++++++-- server/server.go | 10 ++- tests/server/api/api_test.go | 2 +- tests/server/cluster/cluster_test.go | 99 ++++++++++++++++++++++++++-- 4 files changed, 140 insertions(+), 12 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 673d176dd4e..ba019d4fec8 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -318,7 +318,7 @@ func (c *RaftCluster) InitCluster( } // Start starts a cluster. -func (c *RaftCluster) Start(s Server) error { +func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { c.Lock() defer c.Unlock() @@ -327,11 +327,32 @@ func (c *RaftCluster) Start(s Server) error { return nil } c.isAPIServiceMode = s.IsAPIServiceMode() - err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) + err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) if err != nil { return err } - c.checkTSOService() + // We should not manage tso service when bootstrap try to start raft cluster. + // It only is controlled by leader election. + // Ref: https://github.com/tikv/pd/issues/8836 + if !bootstrap { + c.checkTSOService() + } + defer func() { + if !bootstrap && err != nil { + if err := c.stopTSOJobsIfNeeded(); err != nil { + log.Error("failed to stop TSO jobs", errs.ZapError(err)) + return + } + } + }() + failpoint.Inject("raftClusterReturn", func(val failpoint.Value) { + if val, ok := val.(bool); (ok && val) || !ok { + err = errors.New("raftClusterReturn") + } else { + err = nil + } + failpoint.Return(err) + }) cluster, err := c.LoadClusterInfo() if err != nil { return err @@ -422,12 +443,12 @@ func (c *RaftCluster) checkTSOService() { log.Info("TSO is provided by PD") c.UnsetServiceIndependent(constant.TSOServiceName) } else { - if err := c.startTSOJobsIfNeeded(); err != nil { + if err := c.stopTSOJobsIfNeeded(); err != nil { log.Error("failed to stop TSO jobs", errs.ZapError(err)) return } - log.Info("TSO is provided by TSO server") if !c.IsServiceIndependent(constant.TSOServiceName) { + log.Info("TSO is provided by TSO server") c.SetServiceIndependent(constant.TSOServiceName) } } @@ -2579,3 +2600,13 @@ func (c *RaftCluster) SetServiceIndependent(name string) { func (c *RaftCluster) UnsetServiceIndependent(name string) { c.independentServices.Delete(name) } + +// GetGlobalTSOAllocator return global tso allocator +// It only is used for test. +func (c *RaftCluster) GetGlobalTSOAllocator() tso.Allocator { + allocator, err := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + return nil + } + return allocator +} diff --git a/server/server.go b/server/server.go index c88871658dc..6845906b6c1 100644 --- a/server/server.go +++ b/server/server.go @@ -777,7 +777,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe log.Warn("flush the bootstrap region failed", errs.ZapError(err)) } - if err := s.cluster.Start(s); err != nil { + if err := s.cluster.Start(s, true); err != nil { return nil, err } @@ -795,7 +795,7 @@ func (s *Server) createRaftCluster() error { return nil } - return s.cluster.Start(s) + return s.cluster.Start(s, false) } func (s *Server) stopRaftCluster() { @@ -2125,3 +2125,9 @@ func (s *Server) GetMaxResetTSGap() time.Duration { func (s *Server) SetClient(client *clientv3.Client) { s.client = client } + +// GetGlobalTSOAllocator return global tso allocator +// It only is used for test. +func (s *Server) GetGlobalTSOAllocator() tso.Allocator { + return s.cluster.GetGlobalTSOAllocator() +} diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 828213587d6..891ce40cb57 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -925,7 +925,7 @@ func TestSendApiWhenRestartRaftCluster(t *testing.T) { output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError) re.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first") - err = rc.Start(leader.GetServer()) + err = rc.Start(leader.GetServer(), false) re.NoError(err) rc = leader.GetRaftCluster() re.NotNil(rc) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index c8b8799b50d..0ae11ff488b 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -578,7 +578,7 @@ func TestRaftClusterRestart(t *testing.T) { re.NotNil(rc) rc.Stop() - err = rc.Start(leaderServer.GetServer()) + err = rc.Start(leaderServer.GetServer(), false) re.NoError(err) rc = leaderServer.GetRaftCluster() @@ -619,7 +619,7 @@ func TestRaftClusterMultipleRestart(t *testing.T) { for range 100 { // See https://github.com/tikv/pd/issues/8543 rc.Wait() - err = rc.Start(leaderServer.GetServer()) + err = rc.Start(leaderServer.GetServer(), false) re.NoError(err) time.Sleep(time.Millisecond) rc.Stop() @@ -627,6 +627,97 @@ func TestRaftClusterMultipleRestart(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) } +// TestRaftClusterStartTSOJob is used to test whether tso job service is normally closed +// when raft cluster is stopped ahead of time. +// Ref: https://github.com/tikv/pd/issues/8836 +func TestRaftClusterStartTSOJob(t *testing.T) { + re := require.New(t) + name := "pd1" + // case 1: normal start + ctx, cancel := context.WithCancel(context.Background()) + tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + conf.LeaderLease = 300 + }) + re.NoError(err) + re.NoError(tc.RunInitialServers()) + re.NotEmpty(tc.WaitLeader()) + leaderServer := tc.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() + testutil.Eventually(re, func() bool { + allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + return allocator.IsInitialize() + }) + tc.Destroy() + cancel() + // case 2: return ahead of time but no error when start raft cluster + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/raftClusterReturn", `return(false)`)) + ctx, cancel = context.WithCancel(context.Background()) + tc, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + conf.LeaderLease = 300 + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + testutil.Eventually(re, func() bool { + allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + return allocator.IsInitialize() + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn")) + tc.Destroy() + cancel() + // case 3: meet error when start raft cluster + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/raftClusterReturn", `return(true)`)) + ctx, cancel = context.WithCancel(context.Background()) + tc, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + conf.LeaderLease = 300 + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + testutil.Eventually(re, func() bool { + allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + return !allocator.IsInitialize() + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn")) + tc.Destroy() + cancel() + // case 4: multiple bootstrap in 3 pd cluster + ctx, cancel = context.WithCancel(context.Background()) + tc, err = tests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) { + conf.LeaderLease = 300 + }) + re.NoError(err) + re.NoError(tc.RunInitialServers()) + re.NotEmpty(tc.WaitLeader()) + leaderServer = tc.GetLeaderServer() + re.NotNil(leaderServer) + name = leaderServer.GetLeader().GetName() + wg := sync.WaitGroup{} + for range 3 { + wg.Add(1) + go func() { + leaderServer.BootstrapCluster() + wg.Done() + }() + } + wg.Wait() + testutil.Eventually(re, func() bool { + allocator := leaderServer.GetServer().GetGlobalTSOAllocator() + return allocator.IsInitialize() + }) + re.NoError(tc.ResignLeader()) + re.NotEmpty(tc.WaitLeader()) + testutil.Eventually(re, func() bool { + allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + return !allocator.IsInitialize() + }) + tc.Destroy() + cancel() +} + func newMetaStore(storeID uint64, addr, version string, state metapb.StoreState, deployPath string) *metapb.Store { return &metapb.Store{Id: storeID, Address: addr, Version: version, State: state, DeployPath: deployPath} } @@ -1435,7 +1526,7 @@ func TestTransferLeaderForScheduler(t *testing.T) { tc.WaitLeader() leaderServer = tc.GetLeaderServer() rc1 := leaderServer.GetServer().GetRaftCluster() - rc1.Start(leaderServer.GetServer()) + rc1.Start(leaderServer.GetServer(), false) re.NoError(err) re.NotNil(rc1) // region heartbeat @@ -1455,7 +1546,7 @@ func TestTransferLeaderForScheduler(t *testing.T) { tc.WaitLeader() leaderServer = tc.GetLeaderServer() rc = leaderServer.GetServer().GetRaftCluster() - rc.Start(leaderServer.GetServer()) + rc.Start(leaderServer.GetServer(), false) re.NotNil(rc) // region heartbeat id = leaderServer.GetAllocator()