Skip to content

Commit

Permalink
test: make test stable after enable ms mode dynamic switch
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Dec 19, 2024
1 parent c5812ee commit 073cc5e
Show file tree
Hide file tree
Showing 17 changed files with 82 additions and 66 deletions.
2 changes: 1 addition & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) {
pdLeaderServer := cluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
backendEndpoints := pdLeaderServer.GetAddr()
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints)
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, cluster)
re.NoError(err)
defer tsoCluster.Destroy()
time.Sleep(100 * time.Millisecond)
Expand Down
6 changes: 3 additions & 3 deletions tests/integrations/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {
re.Equal(addr, returnedEntry.ServiceAddr)

// test primary when only one server
expectedPrimary := tests.WaitForPrimaryServing(re, map[string]bs.Server{addr: s})
expectedPrimary := tests.WaitForPrimaryServing(re, map[string]bs.Server{addr: s}, suite.cluster)
primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName)
re.True(exist)
re.Equal(expectedPrimary, primary)
Expand Down Expand Up @@ -131,15 +131,15 @@ func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName strin
serverMap[s.GetAddr()] = s
}

expectedPrimary := tests.WaitForPrimaryServing(re, serverMap)
expectedPrimary := tests.WaitForPrimaryServing(re, serverMap, suite.cluster)
primary, exist = suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName)
re.True(exist)
re.Equal(expectedPrimary, primary)
// close old primary
serverMap[primary].Close()
delete(serverMap, primary)

expectedPrimary = tests.WaitForPrimaryServing(re, serverMap)
expectedPrimary = tests.WaitForPrimaryServing(re, serverMap, suite.cluster)
// test API server discovery
client := suite.pdLeader.GetEtcdClient()
endpoints, err := discovery.Discover(client, serviceName)
Expand Down
16 changes: 8 additions & 8 deletions tests/integrations/mcs/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() {
cleanups = append(cleanups, cleanup)
nodes[s.GetAddr()] = s
}
tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)

// create a keyspace group.
kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{
Expand Down Expand Up @@ -151,7 +151,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() {
cleanups = append(cleanups, cleanup)
nodes[s.GetAddr()] = s
}
tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)

// miss replica.
id := 1
Expand Down Expand Up @@ -203,7 +203,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() {
s2, cleanup2 := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
defer cleanup2()
nodes[s2.GetAddr()] = s2
tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)
params = &handlers.AllocNodesForKeyspaceGroupParams{
Replica: constant.DefaultKeyspaceGroupReplicaCount + 1,
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (suite *keyspaceGroupTestSuite) TestSetNodes() {
nodes[s.GetAddr()] = s
nodesList = append(nodesList, s.GetAddr())
}
tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)

// the keyspace group is not exist.
id := 1
Expand Down Expand Up @@ -318,7 +318,7 @@ func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() {
cleanups = append(cleanups, cleanup)
nodes[s.GetAddr()] = s
}
tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)

// the default keyspace group is exist.
var kg *endpoint.KeyspaceGroup
Expand Down Expand Up @@ -352,7 +352,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodes() {
cleanups = append(cleanups, cleanup)
nodes[s.GetAddr()] = s
}
tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)

// create a keyspace group.
kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{
Expand Down Expand Up @@ -394,7 +394,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocOneNode() {
defer cleanupOldTSOserver()
nodes[oldTSOServer.GetAddr()] = oldTSOServer

tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)

// create a keyspace group.
kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{
Expand All @@ -421,7 +421,7 @@ func (suite *keyspaceGroupTestSuite) TestAllocOneNode() {
defer cleanupNewTSOServer()
nodes[newTSOServer.GetAddr()] = newTSOServer

tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)

// the member list will be updated
testutil.Eventually(re, func() bool {
Expand Down
14 changes: 7 additions & 7 deletions tests/integrations/mcs/members/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (suite *memberTestSuite) SetupTest() {
cleanup()
})
}
primary := tests.WaitForPrimaryServing(re, nodes)
primary := tests.WaitForPrimaryServing(re, nodes, suite.cluster)
members := mustGetKeyspaceGroupMembers(re, nodes[primary].(*tso.Server))
// Get the tso nodes
suite.tsoNodes = nodes
Expand All @@ -104,7 +104,7 @@ func (suite *memberTestSuite) SetupTest() {
cleanup()
})
}
tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)
suite.schedulingNodes = nodes

suite.cleanupFunc = append(suite.cleanupFunc, func() {
Expand Down Expand Up @@ -171,7 +171,7 @@ func (suite *memberTestSuite) TestPrimaryWorkWhileOtherServerClose() {
nodes[member.Name()].Close()
}
}
tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)

// primary should be same with before.
curPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
Expand Down Expand Up @@ -291,14 +291,14 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() {
re.Equal(http.StatusOK, resp.StatusCode)
resp.Body.Close()

tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)
newPrimary, err = suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
re.NoError(err)
re.NotEqual(primary, newPrimary)

// Close primary to push other nodes campaign primary
nodes[newPrimary].Close()
tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)
// Primary should be different with before
anotherPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
re.NoError(err)
Expand Down Expand Up @@ -356,7 +356,7 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpired() {
// TODO: Add campaign times check in mcs to avoid frequent campaign
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/skipGrantLeader"))
// Can still work after lease expired
tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)
}
}

Expand Down Expand Up @@ -413,7 +413,7 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown(
nodes[newPrimary].Close()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/skipGrantLeader"))

tests.WaitForPrimaryServing(re, nodes)
tests.WaitForPrimaryServing(re, nodes, suite.cluster)
// Primary should be different with before
onlyPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
re.NoError(err)
Expand Down
7 changes: 6 additions & 1 deletion tests/integrations/mcs/scheduling/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/stretchr/testify/suite"

"github.com/pingcap/failpoint"

"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/testutil"
Expand All @@ -46,6 +48,7 @@ func TestRule(t *testing.T) {

func (suite *ruleTestSuite) SetupSuite() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))

var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
Expand All @@ -63,12 +66,14 @@ func (suite *ruleTestSuite) SetupSuite() {
func (suite *ruleTestSuite) TearDownSuite() {
suite.cancel()
suite.cluster.Destroy()
re := suite.Require()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
}

func (suite *ruleTestSuite) TestRuleWatch() {
re := suite.Require()

tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoint)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()

Expand Down
24 changes: 12 additions & 12 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (suite *serverTestSuite) TearDownSuite() {
func (suite *serverTestSuite) TestAllocID() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`))
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand All @@ -110,7 +110,7 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() {
err = pd2.Run()
re.NotEmpty(suite.cluster.WaitLeader())
re.NoError(err)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -138,7 +138,7 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() {

func (suite *serverTestSuite) TestPrimaryChange() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand All @@ -164,7 +164,7 @@ func (suite *serverTestSuite) TestPrimaryChange() {

func (suite *serverTestSuite) TestForwardStoreHeartbeat() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -225,7 +225,7 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})

tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand All @@ -242,7 +242,7 @@ func (suite *serverTestSuite) TestSchedulingServiceFallback() {
testutil.Eventually(re, func() bool {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
tc1, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc1, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc1.Destroy()
tc1.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -278,7 +278,7 @@ func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})

tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand All @@ -302,7 +302,7 @@ func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() {

func (suite *serverTestSuite) TestSchedulerSync() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -422,7 +422,7 @@ func checkEvictLeaderStoreIDs(re *require.Assertions, sc *schedulers.Controller,

func (suite *serverTestSuite) TestForwardRegionHeartbeat() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -499,7 +499,7 @@ func (suite *serverTestSuite) TestForwardRegionHeartbeat() {

func (suite *serverTestSuite) TestStoreLimit() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -660,7 +660,7 @@ func (suite *multipleServerTestSuite) TestReElectLeader() {
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck"))
}()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down Expand Up @@ -692,7 +692,7 @@ func (suite *multipleServerTestSuite) TestReElectLeader() {

func (suite *serverTestSuite) TestOnlineProgress() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.cluster)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
Expand Down
6 changes: 3 additions & 3 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (suite *tsoAPITestSuite) SetupTest() {
pdLeaderServer := suite.pdCluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
suite.backendEndpoints = pdLeaderServer.GetAddr()
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, suite.pdCluster)
re.NoError(err)
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func TestTSOServerStartFirst(t *testing.T) {
clusterCh := make(chan *tests.TestTSOCluster)
defer close(clusterCh)
go func() {
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, addr)
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, apiCluster)
re.NoError(err)
primary := tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(primary)
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) {
err = tc.RunInitialServers()
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()
ttc, err := tests.NewTestTSOCluster(ctx, 2, pdAddr)
ttc, err := tests.NewTestTSOCluster(ctx, 2, tc)
re.NoError(err)
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
Expand Down
7 changes: 3 additions & 4 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() {
re.NotEmpty(leaderName)
suite.pdLeaderServer = suite.cluster.GetServer(leaderName)
re.NoError(suite.pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 2, suite.cluster)
re.NoError(err)
suite.allocator = mockid.NewIDAllocator()
suite.allocator.SetBase(uint64(time.Now().Second()))
Expand Down Expand Up @@ -544,7 +544,6 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
}
})
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

// Start api server and tso server.
err = tc.RunInitialServers()
Expand All @@ -554,7 +553,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
leaderServer := tc.GetLeaderServer()
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, pdAddr)
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, tc)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)
Expand Down Expand Up @@ -751,7 +750,7 @@ func TestGetTSOImmediately(t *testing.T) {
leaderServer := tc.GetLeaderServer()
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, pdAddr)
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, tc)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *tsoProxyTestSuite) SetupSuite() {
re.NoError(s.apiLeader.BootstrapCluster())

// Create a TSO cluster with 2 servers
s.tsoCluster, err = tests.NewTestTSOCluster(s.ctx, 2, s.backendEndpoints)
s.tsoCluster, err = tests.NewTestTSOCluster(s.ctx, 2, s.apiCluster)
re.NoError(err)
s.tsoCluster.WaitForDefaultPrimaryServing(re)

Expand Down
Loading

0 comments on commit 073cc5e

Please sign in to comment.