Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests/tso: add IDAllocator to make keyspace test stable #8202

Merged
merged 5 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,7 @@ func (kgm *KeyspaceGroupManager) groupSplitPatroller() {
defer kgm.wg.Done()
patrolInterval := groupPatrolInterval
failpoint.Inject("fastGroupSplitPatroller", func() {
patrolInterval = time.Second
patrolInterval = 3 * time.Second
})
ticker := time.NewTicker(patrolInterval)
defer ticker.Stop()
Expand Down
158 changes: 89 additions & 69 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
tsopkg "github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand All @@ -56,6 +57,13 @@ type tsoKeyspaceGroupManagerTestSuite struct {
pdLeaderServer *tests.TestServer
// tsoCluster is the TSO service cluster.
tsoCluster *tests.TestTSOCluster

allocator *mockid.IDAllocator
}

func (suite *tsoKeyspaceGroupManagerTestSuite) allocID() uint32 {
id, _ := suite.allocator.Alloc()
return uint32(id)
}

func TestTSOKeyspaceGroupManager(t *testing.T) {
Expand All @@ -77,6 +85,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() {
re.NoError(suite.pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr())
re.NoError(err)
suite.allocator = mockid.NewIDAllocator()
suite.allocator.SetBase(uint64(time.Now().Second()))
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() {
Expand Down Expand Up @@ -166,9 +176,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
keyspaceGroupID uint32
keyspaceIDs []uint32
}{
{0, []uint32{0, 10}},
{1, []uint32{1, 11}},
{2, []uint32{2, 12}},
{suite.allocID(), []uint32{0, 10}},
{suite.allocID(), []uint32{1, 11}},
{suite.allocID(), []uint32{2, 12}},
}

for _, param := range params {
Expand Down Expand Up @@ -242,51 +252,53 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
re := suite.Require()
// Create the keyspace group 1 with keyspaces [111, 222, 333].
// Create the keyspace group `oldID` with keyspaces [111, 222, 333].
oldID := suite.allocID()
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: oldID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
})
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1)
re.Equal(uint32(1), kg1.ID)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID)
re.Equal(oldID, kg1.ID)
re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
// Get a TSO from the keyspace group 1.
// Get a TSO from the keyspace group `oldID`.
var (
ts pdpb.Timestamp
err error
)
testutil.Eventually(re, func() bool {
ts, err = suite.requestTSO(re, 222, 1)
ts, err = suite.requestTSO(re, 222, oldID)
return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0
})
ts.Physical += time.Hour.Milliseconds()
// Set the TSO of the keyspace group 1 to a large value.
err = suite.tsoCluster.GetPrimaryServer(222, 1).ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
// Set the TSO of the keyspace group `oldID` to a large value.
err = suite.tsoCluster.GetPrimaryServer(222, oldID).ResetTS(tsoutil.GenerateTS(&ts), false, true, oldID)
re.NoError(err)
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
NewID: 2,
// Split the keyspace group `oldID` to `newID`.
newID := suite.allocID()
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{
NewID: newID,
Keyspaces: []uint32{222, 333},
})
// Wait for the split to complete automatically even there is no TSO request from the outside.
testutil.Eventually(re, func() bool {
kg2, code := handlersutil.TryLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2)
kg2, code := handlersutil.TryLoadKeyspaceGroupByID(re, suite.pdLeaderServer, newID)
if code != http.StatusOK {
return false
}
re.Equal(uint32(2), kg2.ID)
re.Equal(newID, kg2.ID)
re.Equal([]uint32{222, 333}, kg2.Keyspaces)
return !kg2.IsSplitting()
})
// Check the split TSO from keyspace group 2 now.
splitTS, err := suite.requestTSO(re, 222, 2)
// Check the split TSO from keyspace group `newID` now.
splitTS, err := suite.requestTSO(re, 222, newID)
re.NoError(err)
re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0)
}
Expand All @@ -304,60 +316,62 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO(

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() {
re := suite.Require()
// Create the keyspace group 1 with keyspaces [111, 222, 333].
// Create the keyspace group `oldID` with keyspaces [111, 222, 333].
oldID := suite.allocID()
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: oldID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
})
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1)
re.Equal(uint32(1), kg1.ID)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID)
re.Equal(oldID, kg1.ID)
re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
NewID: 2,
// Split the keyspace group `oldID` to `newID`.
newID := suite.allocID()
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{
NewID: newID,
Keyspaces: []uint32{222, 333},
})
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2)
re.Equal(uint32(2), kg2.ID)
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, newID)
re.Equal(newID, kg2.ID)
re.Equal([]uint32{222, 333}, kg2.Keyspaces)
re.True(kg2.IsSplitTarget())
// Check the leadership.
member1, err := suite.tsoCluster.WaitForPrimaryServing(re, 111, 1).GetMember(111, 1)
member1, err := suite.tsoCluster.WaitForPrimaryServing(re, 111, oldID).GetMember(111, oldID)
re.NoError(err)
re.NotNil(member1)
member2, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 2).GetMember(222, 2)
member2, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, newID).GetMember(222, newID)
re.NoError(err)
re.NotNil(member2)
// Wait for the leader of the keyspace group 1 and 2 to be elected.
// Wait for the leader of the keyspace group `oldID` and `newID` to be elected.
testutil.Eventually(re, func() bool {
return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0
})
// Check if the leader of the keyspace group 1 and 2 are the same.
// Check if the leader of the keyspace group `oldID` and `newID` are the same.
re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls())
// Resign and block the leader of the keyspace group 1 from being elected.
// Resign and block the leader of the keyspace group `oldID` from being elected.
member1.(*member.Participant).SetCampaignChecker(func(*election.Leadership) bool {
return false
})
member1.ResetLeader()
// The leader of the keyspace group 2 should be resigned also.
// The leader of the keyspace group `newID` should be resigned also.
testutil.Eventually(re, func() bool {
return member2.IsLeader() == false
})
// Check if the leader of the keyspace group 1 and 2 are the same again.
// Check if the leader of the keyspace group `oldID` and `newID` are the same again.
member1.(*member.Participant).SetCampaignChecker(nil)
testutil.Eventually(re, func() bool {
return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0
})
re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls())
// Wait for the keyspace groups to finish the split.
waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{111}, []uint32{222, 333})
waitFinishSplit(re, suite.pdLeaderServer, oldID, newID, []uint32{111}, []uint32{222, 333})
}

func waitFinishSplit(
Expand Down Expand Up @@ -390,30 +404,32 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient()
re := suite.Require()
// Enable the failpoint to slow down the system time to test whether the TSO is monotonic.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/systemTimeSlow", `return(true)`))
// Create the keyspace group 1 with keyspaces [444, 555, 666].
// Create the keyspace group `oldID` with keyspaces [444, 555, 666].
oldID := suite.allocID()
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: oldID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{444, 555, 666},
},
},
})
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1)
re.Equal(uint32(1), kg1.ID)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID)
re.Equal(oldID, kg1.ID)
re.Equal([]uint32{444, 555, 666}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
// Request the TSO for keyspace 555 concurrently via client.
cancel := suite.dispatchClient(re, 555, 1)
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
NewID: 2,
cancel := suite.dispatchClient(re, 555, oldID)
// Split the keyspace group `oldID` to `newID`.
newID := suite.allocID()
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{
NewID: newID,
Keyspaces: []uint32{555, 666},
})
// Wait for the keyspace groups to finish the split.
waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666})
waitFinishSplit(re, suite.pdLeaderServer, oldID, newID, []uint32{444}, []uint32{555, 666})
// Stop the client.
cancel()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow"))
Expand Down Expand Up @@ -569,48 +585,49 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() {
re := suite.Require()
// Create the keyspace group 1 and 2 with keyspaces [111, 222] and [333].
// Create the keyspace group `firstID` and `secondID` with keyspaces [111, 222] and [333].
firstID, secondID := suite.allocID(), suite.allocID()
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: firstID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222},
},
{
ID: 2,
ID: secondID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{333},
},
},
})
// Get a TSO from the keyspace group 1.
// Get a TSO from the keyspace group `firstID`.
var (
ts pdpb.Timestamp
err error
)
testutil.Eventually(re, func() bool {
ts, err = suite.requestTSO(re, 222, 1)
ts, err = suite.requestTSO(re, 222, firstID)
return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0
})
ts.Physical += time.Hour.Milliseconds()
// Set the TSO of the keyspace group 1 to a large value.
err = suite.tsoCluster.GetPrimaryServer(222, 1).ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
// Set the TSO of the keyspace group `firstID` to a large value.
err = suite.tsoCluster.GetPrimaryServer(222, firstID).ResetTS(tsoutil.GenerateTS(&ts), false, true, firstID)
re.NoError(err)
// Merge the keyspace group 1 and 2 to the default keyspace group.
// Merge the keyspace group `firstID` and `secondID` to the default keyspace group.
handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{
MergeList: []uint32{1, 2},
MergeList: []uint32{firstID, secondID},
})
// Check the keyspace group 1 and 2 are merged to the default keyspace group.
// Check the keyspace group `firstID` and `secondID` are merged to the default keyspace group.
kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID)
for _, keyspaceID := range []uint32{111, 222, 333} {
re.Contains(kg.Keyspaces, keyspaceID)
}
re.True(kg.IsMergeTarget())
// Check the merged TSO from the default keyspace group is greater than the TSO from the keyspace group 1.
// Check the merged TSO from the default keyspace group is greater than the TSO from the keyspace group`firstID`.
var mergedTS pdpb.Timestamp
testutil.Eventually(re, func() bool {
mergedTS, err = suite.requestTSO(re, 333, mcsutils.DefaultKeyspaceGroupID)
Expand All @@ -624,26 +641,27 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() {

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() {
re := suite.Require()
// Create the keyspace group 1 with keyspaces [111, 222, 333].
// Create the keyspace group `id` with keyspaces [111, 222, 333].
id := suite.allocID()
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: id,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
})
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1)
re.Equal(uint32(1), kg1.ID)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, id)
re.Equal(id, kg1.ID)
re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces)
re.False(kg1.IsMerging())
// Request the TSO for keyspace 222 concurrently via client.
cancel := suite.dispatchClient(re, 222, 1)
cancel := suite.dispatchClient(re, 222, id)
// Merge the keyspace group 1 to the default keyspace group.
handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{
MergeList: []uint32{1},
MergeList: []uint32{id},
})
// Wait for the default keyspace group to finish the merge.
waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333})
Expand Down Expand Up @@ -671,24 +689,25 @@ func waitFinishMerge(

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeInitTSO() {
re := suite.Require()
// Make sure the TSO of keyspace group 1 won't be initialized before it's merged.
// Make sure the TSO of keyspace group `id` won't be initialized before it's merged.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp", `return(true)`))
// Request the TSO for the default keyspace concurrently via client.
id := suite.allocID()
cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
// Create the keyspace group 1 with keyspaces [111, 222, 333].
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: id,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
})
// Merge the keyspace group 1 to the default keyspace group.
// Merge the keyspace group `id` to the default keyspace group.
handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{
MergeList: []uint32{1},
MergeList: []uint32{id},
})
// Wait for the default keyspace group to finish the merge.
waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333})
Expand Down Expand Up @@ -775,12 +794,13 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault
keyspaces = make([]uint32, 0, keyspaceGroupNum)
)
for i := 1; i <= keyspaceGroupNum; i++ {
id := suite.allocID()
keyspaceGroups = append(keyspaceGroups, &endpoint.KeyspaceGroup{
ID: uint32(i),
ID: id,
UserKind: endpoint.UserKind(rand.Intn(int(endpoint.UserKindCount))).String(),
Keyspaces: []uint32{uint32(i)},
Keyspaces: []uint32{id},
})
keyspaces = append(keyspaces, uint32(i))
keyspaces = append(keyspaces, id)
if i != keyspaceGroupNum {
continue
}
Expand All @@ -797,7 +817,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault
re.NotNil(svr)
for i := 1; i < keyspaceGroupNum; i++ {
// Check if the keyspace group is served.
svr = suite.tsoCluster.WaitForPrimaryServing(re, uint32(i), uint32(i))
svr = suite.tsoCluster.WaitForPrimaryServing(re, keyspaceGroups[i].ID, keyspaceGroups[i].ID)
re.NotNil(svr)
}
// Merge all the keyspace groups into the default keyspace group.
Expand Down