From ce3d7e79030552e4fa527c9cbbf7afe83d33c6a2 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 9 Dec 2024 19:40:17 +0800 Subject: [PATCH 1/4] cluster: fix tso fallback due raft cluster did not stop tso service (part2) Signed-off-by: lhy1024 --- server/cluster/cluster.go | 19 ++++++++++ server/server.go | 6 +++ tests/server/cluster/cluster_test.go | 56 ++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 9b4630964b9..0d3c7530573 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -329,6 +329,19 @@ func (c *RaftCluster) Start(s Server) error { return err } c.checkTSOService() + defer func() { + // We need to try to stop tso jobs when the cluster is not running. + // Ref: https://github.com/tikv/pd/issues/8836 + if !c.running { + c.stopTSOJobsIfNeeded() + } + }() + failpoint.Inject("raftClusterReturn", func(val failpoint.Value) { + if val, ok := val.(bool); (ok && val) || !ok { + failpoint.Return(errors.New("raftClusterReturn")) + } + failpoint.Return(nil) + }) cluster, err := c.LoadClusterInfo() if err != nil { return err @@ -2554,3 +2567,9 @@ func (c *RaftCluster) SetServiceIndependent(name string) { func (c *RaftCluster) UnsetServiceIndependent(name string) { c.independentServices.Delete(name) } + +// GetGlobalTSOAllocator return global tso allocator +// It only is used for test. +func (c *RaftCluster) GetGlobalTSOAllocator() (tso.Allocator, error) { + return c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) +} diff --git a/server/server.go b/server/server.go index d7bdd92d96d..86cc3f135d7 100644 --- a/server/server.go +++ b/server/server.go @@ -2097,3 +2097,9 @@ func (s *Server) GetMaxResetTSGap() time.Duration { func (s *Server) SetClient(client *clientv3.Client) { s.client = client } + +// GetGlobalTSOAllocator return global tso allocator +// It only is used for test. +func (s *Server) GetGlobalTSOAllocator() (tso.Allocator, error) { + return s.cluster.GetGlobalTSOAllocator() +} diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index df0cf7d38a3..3988953102c 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -629,6 +629,62 @@ func TestRaftClusterMultipleRestart(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) } +// TestRaftClusterStartTSOJob is used to test whether tso job service is normally closed +// when raft cluster is stopped ahead of time. +// Ref: https://github.com/tikv/pd/issues/8836 +func TestRaftClusterStartTSOJob(t *testing.T) { + re := require.New(t) + name := "pd1" + // case 1: normal start + ctx, cancel := context.WithCancel(context.Background()) + tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + conf.LeaderLease = 300 + }) + re.NoError(err) + re.NoError(tc.RunInitialServers()) + re.NotEmpty(tc.WaitLeader()) + leaderServer := tc.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() + allocator, err := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + re.NoError(err) + re.True(allocator.IsInitialize()) + tc.Destroy() + cancel() + // case 2: return ahead of time but no error when start raft cluster + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/raftClusterReturn", `return(false)`)) + ctx, cancel = context.WithCancel(context.Background()) + tc, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + conf.LeaderLease = 300 + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + allocator, err = tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + re.NoError(err) + re.False(allocator.IsInitialize()) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn")) + tc.Destroy() + cancel() + // case 3: meet error when start raft cluster + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/raftClusterReturn", `return(true)`)) + ctx, cancel = context.WithCancel(context.Background()) + tc, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + conf.LeaderLease = 300 + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + allocator, err = tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + re.NoError(err) + re.False(allocator.IsInitialize()) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn")) + tc.Destroy() + cancel() +} + func newMetaStore(storeID uint64, addr, version string, state metapb.StoreState, deployPath string) *metapb.Store { return &metapb.Store{Id: storeID, Address: addr, Version: version, State: state, DeployPath: deployPath} } From 99ed2c35525b28f4a61d650c12aff31b0d8389ed Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 9 Dec 2024 19:55:44 +0800 Subject: [PATCH 2/4] fix conflict Signed-off-by: lhy1024 --- server/cluster/cluster.go | 4 ++-- server/server.go | 2 +- tests/server/cluster/cluster_test.go | 8 +++----- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 0d3c7530573..4371a2f9ed3 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -2570,6 +2570,6 @@ func (c *RaftCluster) UnsetServiceIndependent(name string) { // GetGlobalTSOAllocator return global tso allocator // It only is used for test. -func (c *RaftCluster) GetGlobalTSOAllocator() (tso.Allocator, error) { - return c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) +func (c *RaftCluster) GetGlobalTSOAllocator() tso.Allocator { + return c.tsoAllocator.GetAllocator() } diff --git a/server/server.go b/server/server.go index 86cc3f135d7..f3eb3cf84df 100644 --- a/server/server.go +++ b/server/server.go @@ -2100,6 +2100,6 @@ func (s *Server) SetClient(client *clientv3.Client) { // GetGlobalTSOAllocator return global tso allocator // It only is used for test. -func (s *Server) GetGlobalTSOAllocator() (tso.Allocator, error) { +func (s *Server) GetGlobalTSOAllocator() tso.Allocator { return s.cluster.GetGlobalTSOAllocator() } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 3988953102c..f4ef091982d 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -646,8 +646,7 @@ func TestRaftClusterStartTSOJob(t *testing.T) { leaderServer := tc.GetLeaderServer() re.NotNil(leaderServer) leaderServer.BootstrapCluster() - allocator, err := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() - re.NoError(err) + allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() re.True(allocator.IsInitialize()) tc.Destroy() cancel() @@ -661,8 +660,7 @@ func TestRaftClusterStartTSOJob(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - allocator, err = tc.GetServer(name).GetServer().GetGlobalTSOAllocator() - re.NoError(err) + allocator = tc.GetServer(name).GetServer().GetGlobalTSOAllocator() re.False(allocator.IsInitialize()) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn")) tc.Destroy() @@ -677,7 +675,7 @@ func TestRaftClusterStartTSOJob(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - allocator, err = tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + allocator = tc.GetServer(name).GetServer().GetGlobalTSOAllocator() re.NoError(err) re.False(allocator.IsInitialize()) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn")) From 93edbc111dd09bdf3f2493e1a22bda6bca28b828 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 10 Dec 2024 14:44:22 +0800 Subject: [PATCH 3/4] add more test Signed-off-by: lhy1024 --- tests/server/cluster/cluster_test.go | 51 ++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index f4ef091982d..e49d0833aae 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -646,8 +646,10 @@ func TestRaftClusterStartTSOJob(t *testing.T) { leaderServer := tc.GetLeaderServer() re.NotNil(leaderServer) leaderServer.BootstrapCluster() - allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() - re.True(allocator.IsInitialize()) + testutil.Eventually(re, func() bool { + allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + return allocator.IsInitialize() + }) tc.Destroy() cancel() // case 2: return ahead of time but no error when start raft cluster @@ -660,8 +662,10 @@ func TestRaftClusterStartTSOJob(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - allocator = tc.GetServer(name).GetServer().GetGlobalTSOAllocator() - re.False(allocator.IsInitialize()) + testutil.Eventually(re, func() bool { + allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + return !allocator.IsInitialize() + }) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn")) tc.Destroy() cancel() @@ -675,12 +679,45 @@ func TestRaftClusterStartTSOJob(t *testing.T) { err = tc.RunInitialServers() re.NoError(err) tc.WaitLeader() - allocator = tc.GetServer(name).GetServer().GetGlobalTSOAllocator() - re.NoError(err) - re.False(allocator.IsInitialize()) + testutil.Eventually(re, func() bool { + allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + return !allocator.IsInitialize() + }) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn")) tc.Destroy() cancel() + // case 4: multiple bootstrap in 3 pd cluster + ctx, cancel = context.WithCancel(context.Background()) + tc, err = tests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) { + conf.LeaderLease = 300 + }) + re.NoError(err) + re.NoError(tc.RunInitialServers()) + re.NotEmpty(tc.WaitLeader()) + leaderServer = tc.GetLeaderServer() + re.NotNil(leaderServer) + name = leaderServer.GetLeader().GetName() + wg := sync.WaitGroup{} + for range 3 { + wg.Add(1) + go func() { + leaderServer.BootstrapCluster() + wg.Done() + }() + } + wg.Wait() + testutil.Eventually(re, func() bool { + allocator := leaderServer.GetServer().GetGlobalTSOAllocator() + return allocator.IsInitialize() + }) + re.NoError(tc.ResignLeader()) + re.NotEmpty(tc.WaitLeader()) + testutil.Eventually(re, func() bool { + allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() + return !allocator.IsInitialize() + }) + tc.Destroy() + cancel() } func newMetaStore(storeID uint64, addr, version string, state metapb.StoreState, deployPath string) *metapb.Store { From bb862798de1c634e1478374cd93677b3cb2d1396 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 10 Dec 2024 18:12:08 +0800 Subject: [PATCH 4/4] add bootstrap flag Signed-off-by: lhy1024 --- server/cluster/cluster.go | 21 +++++++++++++-------- server/server.go | 4 ++-- tests/server/api/api_test.go | 2 +- tests/server/cluster/cluster_test.go | 10 +++++----- 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 4371a2f9ed3..10e9bf7ff1a 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -315,7 +315,7 @@ func (c *RaftCluster) InitCluster( } // Start starts a cluster. -func (c *RaftCluster) Start(s Server) error { +func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { c.Lock() defer c.Unlock() @@ -324,23 +324,28 @@ func (c *RaftCluster) Start(s Server) error { return nil } c.isAPIServiceMode = s.IsAPIServiceMode() - err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) + err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) if err != nil { return err } - c.checkTSOService() + // We should not manage tso service when bootstrap try to start raft cluster. + // It only is controlled by leader election. + // Ref: https://github.com/tikv/pd/issues/8836 + if !bootstrap { + c.checkTSOService() + } defer func() { - // We need to try to stop tso jobs when the cluster is not running. - // Ref: https://github.com/tikv/pd/issues/8836 - if !c.running { + if !bootstrap && err != nil { c.stopTSOJobsIfNeeded() } }() failpoint.Inject("raftClusterReturn", func(val failpoint.Value) { if val, ok := val.(bool); (ok && val) || !ok { - failpoint.Return(errors.New("raftClusterReturn")) + err = errors.New("raftClusterReturn") + } else { + err = nil } - failpoint.Return(nil) + failpoint.Return(err) }) cluster, err := c.LoadClusterInfo() if err != nil { diff --git a/server/server.go b/server/server.go index f3eb3cf84df..3ed3c9514ff 100644 --- a/server/server.go +++ b/server/server.go @@ -758,7 +758,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe log.Warn("flush the bootstrap region failed", errs.ZapError(err)) } - if err := s.cluster.Start(s); err != nil { + if err := s.cluster.Start(s, true); err != nil { return nil, err } @@ -776,7 +776,7 @@ func (s *Server) createRaftCluster() error { return nil } - return s.cluster.Start(s) + return s.cluster.Start(s, false) } func (s *Server) stopRaftCluster() { diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 14df5ff8eea..e1e4db3a26d 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -925,7 +925,7 @@ func TestSendApiWhenRestartRaftCluster(t *testing.T) { output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError) re.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first") - err = rc.Start(leader.GetServer()) + err = rc.Start(leader.GetServer(), false) re.NoError(err) rc = leader.GetRaftCluster() re.NotNil(rc) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index e49d0833aae..dfdb9cb8685 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -578,7 +578,7 @@ func TestRaftClusterRestart(t *testing.T) { re.NotNil(rc) rc.Stop() - err = rc.Start(leaderServer.GetServer()) + err = rc.Start(leaderServer.GetServer(), false) re.NoError(err) rc = leaderServer.GetRaftCluster() @@ -621,7 +621,7 @@ func TestRaftClusterMultipleRestart(t *testing.T) { for range 100 { // See https://github.com/tikv/pd/issues/8543 rc.Wait() - err = rc.Start(leaderServer.GetServer()) + err = rc.Start(leaderServer.GetServer(), false) re.NoError(err) time.Sleep(time.Millisecond) rc.Stop() @@ -664,7 +664,7 @@ func TestRaftClusterStartTSOJob(t *testing.T) { tc.WaitLeader() testutil.Eventually(re, func() bool { allocator := tc.GetServer(name).GetServer().GetGlobalTSOAllocator() - return !allocator.IsInitialize() + return allocator.IsInitialize() }) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/raftClusterReturn")) tc.Destroy() @@ -1528,7 +1528,7 @@ func TestTransferLeaderForScheduler(t *testing.T) { tc.WaitLeader() leaderServer = tc.GetLeaderServer() rc1 := leaderServer.GetServer().GetRaftCluster() - rc1.Start(leaderServer.GetServer()) + rc1.Start(leaderServer.GetServer(), false) re.NoError(err) re.NotNil(rc1) // region heartbeat @@ -1548,7 +1548,7 @@ func TestTransferLeaderForScheduler(t *testing.T) { tc.WaitLeader() leaderServer = tc.GetLeaderServer() rc = leaderServer.GetServer().GetRaftCluster() - rc.Start(leaderServer.GetServer()) + rc.Start(leaderServer.GetServer(), false) re.NotNil(rc) // region heartbeat id = leaderServer.GetAllocator()