Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: simplify WaitUntil usage #4613

Merged
merged 2 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *testClientSuite) TestDynamicOptionChange(c *C) {
expectBool := true
o.setEnableTSOFollowerProxy(expectBool)
// Check the value changing notification.
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
<-o.enableTSOFollowerProxyCh
return true
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/mock/mockhbstream/mockhbstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) {

// Active stream is stream1.
hbs.BindStream(1, stream1)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
hbs.SendMsg(region, proto.Clone(msg).(*pdpb.RegionHeartbeatResponse))
return stream1.Recv() != nil && stream2.Recv() == nil
})
// Rebind to stream2.
hbs.BindStream(1, stream2)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
hbs.SendMsg(region, proto.Clone(msg).(*pdpb.RegionHeartbeatResponse))
return stream1.Recv() == nil && stream2.Recv() != nil
})
Expand All @@ -83,7 +83,7 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) {
c.Assert(res.GetHeader().GetError(), NotNil)
// Switch back to 1 again.
hbs.BindStream(1, stream1)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
hbs.SendMsg(region, proto.Clone(msg).(*pdpb.RegionHeartbeatResponse))
return stream1.Recv() != nil && stream2.Recv() == nil
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (

// CheckFunc is a condition checker that passed to WaitUntil. Its implementation
// may call c.Fatal() to abort the test, or c.Log() to add more information.
type CheckFunc func(c *check.C) bool
type CheckFunc func() bool

// WaitOp represents available options when execute WaitUntil
type WaitOp struct {
Expand Down Expand Up @@ -63,7 +63,7 @@ func WaitUntil(c *check.C, f CheckFunc, opts ...WaitOption) {
opt(option)
}
for i := 0; i < option.retryTimes; i++ {
if f(c) {
if f() {
return
}
time.Sleep(option.sleepInterval)
Expand Down
2 changes: 1 addition & 1 deletion server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func mustNewCluster(c *C, num int, opts ...func(cfg *config.Config)) ([]*config.
}

func mustWaitLeader(c *C, svrs []*server.Server) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
var leader *pdpb.Member
for _, svr := range svrs {
l := svr.GetLeader()
Expand Down
2 changes: 1 addition & 1 deletion server/api/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *testTsoSuite) TearDownSuite(c *C) {
}

func (s *testTsoSuite) TestTransferAllocator(c *C) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
s.svr.GetTSOAllocatorManager().ClusterDCLocationChecker()
_, err := s.svr.GetTSOAllocatorManager().GetAllocator("dc-1")
return err == nil
Expand Down
12 changes: 6 additions & 6 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ func BenchmarkPatrolRegion(b *testing.B) {
}

func waitOperator(c *C, co *coordinator, regionID uint64) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
return co.opController.GetOperator(regionID) != nil
})
}
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {

func waitAddLearner(c *C, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
if res = stream.Recv(); res != nil {
return res.GetRegionId() == region.GetID() &&
res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddLearnerNode &&
Expand All @@ -1222,7 +1222,7 @@ func waitAddLearner(c *C, stream mockhbstream.HeartbeatStream, region *core.Regi

func waitPromoteLearner(c *C, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
if res = stream.Recv(); res != nil {
return res.GetRegionId() == region.GetID() &&
res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddNode &&
Expand All @@ -1239,7 +1239,7 @@ func waitPromoteLearner(c *C, stream mockhbstream.HeartbeatStream, region *core.

func waitRemovePeer(c *C, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
if res = stream.Recv(); res != nil {
return res.GetRegionId() == region.GetID() &&
res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_RemoveNode &&
Expand All @@ -1255,7 +1255,7 @@ func waitRemovePeer(c *C, stream mockhbstream.HeartbeatStream, region *core.Regi

func waitTransferLeader(c *C, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
if res = stream.Recv(); res != nil {
if res.GetRegionId() == region.GetID() {
for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) {
Expand All @@ -1273,7 +1273,7 @@ func waitTransferLeader(c *C, stream mockhbstream.HeartbeatStream, region *core.
}

func waitNoResponse(c *C, stream mockhbstream.HeartbeatStream) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
res := stream.Recv()
return res == nil
})
Expand Down
2 changes: 1 addition & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestMain(m *testing.M) {

func mustWaitLeader(c *C, svrs []*Server) *Server {
var leader *Server
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
for _, s := range svrs {
if !s.IsClosed() && s.member.IsLeader() {
leader = s
Expand Down
24 changes: 12 additions & 12 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *clientTestSuite) TestClientLeaderChange(c *C) {
cli := setupCli(c, s.ctx, endpoints)

var ts1, ts2 uint64
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
p1, l1, err := cli.GetTS(context.TODO())
if err == nil {
ts1 = tsoutil.ComposeTS(p1, l1)
Expand All @@ -111,7 +111,7 @@ func (s *clientTestSuite) TestClientLeaderChange(c *C) {
waitLeader(c, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls)

// Check TS won't fall back after leader changed.
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
p2, l2, err := cli.GetTS(context.TODO())
if err == nil {
ts2 = tsoutil.ComposeTS(p2, l2)
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *clientTestSuite) TestLeaderTransfer(c *C) {
cli := setupCli(c, s.ctx, endpoints)

var lastTS uint64
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s *clientTestSuite) TestTSOAllocatorLeader(c *C) {
var allocatorLeaderMap = make(map[string]string)
for _, dcLocation := range dcLocationConfig {
var pdName string
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
pdName = cluster.WaitAllocatorLeader(dcLocation)
return len(pdName) > 0
})
Expand Down Expand Up @@ -423,7 +423,7 @@ func (s *clientTestSuite) TestGetTsoFromFollowerClient1(c *C) {

c.Assert(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"), IsNil)
var lastTS uint64
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -451,7 +451,7 @@ func (s *clientTestSuite) TestGetTsoFromFollowerClient2(c *C) {

c.Assert(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"), IsNil)
var lastTS uint64
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -506,7 +506,7 @@ func setupCli(c *C, ctx context.Context, endpoints []string, opts ...pd.ClientOp
}

func waitLeader(c *C, cli client, leader string) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
cli.ScheduleCheckLeader()
return cli.GetLeaderAddr() == leader
})
Expand Down Expand Up @@ -721,7 +721,7 @@ func (s *testClientSuite) TestGetRegion(c *C) {
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)

testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
r, err := s.client.GetRegion(context.Background(), []byte("a"))
c.Assert(err, IsNil)
if r == nil {
Expand Down Expand Up @@ -759,7 +759,7 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
}
time.Sleep(500 * time.Millisecond)
for i := 0; i < 20; i++ {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
r, err := s.client.GetPrevRegion(context.Background(), []byte{byte(i)})
c.Assert(err, IsNil)
if i > 0 && i < regionLen {
Expand Down Expand Up @@ -798,7 +798,7 @@ func (s *testClientSuite) TestScanRegions(c *C) {
}

// Wait for region heartbeats.
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
scanRegions, err := s.client.ScanRegions(context.Background(), []byte{0}, nil, 10)
return err == nil && len(scanRegions) == 10
})
Expand Down Expand Up @@ -865,7 +865,7 @@ func (s *testClientSuite) TestGetRegionByID(c *C) {
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)

testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
r, err := s.client.GetRegionByID(context.Background(), regionID)
c.Assert(err, IsNil)
if r == nil {
Expand Down Expand Up @@ -1146,7 +1146,7 @@ func (s *testClientSuite) TestScatterRegion(c *C) {
err := s.regionHeartbeat.Send(req)
regionsID := []uint64{regionID}
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
scatterResp, err := s.client.ScatterRegions(context.Background(), regionsID, pd.WithGroup("test"), pd.WithRetry(1))
if c.Check(err, NotNil) {
return false
Expand Down
2 changes: 1 addition & 1 deletion tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func (c *TestCluster) WaitAllLeaders(testC *check.C, dcLocations map[string]stri
for _, dcLocation := range dcLocations {
wg.Add(1)
go func(dc string) {
testutil.WaitUntil(testC, func(testC *check.C) bool {
testutil.WaitUntil(testC, func() bool {
leaderName := c.WaitAllocatorLeader(dc)
return leaderName != ""
})
Expand Down
4 changes: 2 additions & 2 deletions tests/pdctl/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *memberTestSuite) TestMember(c *C) {
args = []string{"-u", pdAddr, "member", "leader", "transfer", "pd2"}
_, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
return c.Check("pd2", Equals, svr.GetLeader().GetName())
})

Expand All @@ -84,7 +84,7 @@ func (s *memberTestSuite) TestMember(c *C) {
output, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(strings.Contains(string(output), "Success"), IsTrue)
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
return c.Check("pd2", Not(Equals), svr.GetLeader().GetName())
})

Expand Down
4 changes: 2 additions & 2 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *serverTestSuite) TestReconnect(c *C) {
// Make sure they proxy requests to the new leader.
for name, s := range cluster.GetServers() {
if name != leader {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
res, e := http.Get(s.GetConfig().AdvertiseClientUrls + "/pd/api/v1/version")
c.Assert(e, IsNil)
defer res.Body.Close()
Expand All @@ -103,7 +103,7 @@ func (s *serverTestSuite) TestReconnect(c *C) {
// Request will fail with no leader.
for name, s := range cluster.GetServers() {
if name != leader && name != newLeader {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
res, err := http.Get(s.GetConfig().AdvertiseClientUrls + "/pd/api/v1/version")
c.Assert(err, IsNil)
defer res.Body.Close()
Expand Down
14 changes: 7 additions & 7 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *memberTestSuite) TestMemberDelete(c *C) {
httpClient := &http.Client{Timeout: 15 * time.Second}
for _, t := range table {
c.Log(time.Now(), "try to delete:", t.path)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
addr := leader.GetConfig().ClientUrls + "/pd/api/v1/members/" + t.path
req, err := http.NewRequest(http.MethodDelete, addr, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -187,21 +187,21 @@ func (s *memberTestSuite) TestLeaderPriority(c *C) {
server1 := cluster.GetServer(leader1)
addr := server1.GetConfig().ClientUrls
// PD leader should sync with etcd leader.
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
return cluster.GetLeader() == leader1
})
// Bind a lower priority to current leader.
s.post(c, addr+"/pd/api/v1/members/name/"+leader1, `{"leader-priority": -1}`)
// Wait etcd leader change.
leader2 := s.waitEtcdLeaderChange(c, server1, leader1)
// PD leader should sync with etcd leader again.
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
return cluster.GetLeader() == leader2
})
}

func (s *memberTestSuite) post(c *C, url string, body string) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
res, err := http.Post(url, "", bytes.NewBufferString(body)) // #nosec
c.Assert(err, IsNil)
b, err := io.ReadAll(res.Body)
Expand All @@ -214,7 +214,7 @@ func (s *memberTestSuite) post(c *C, url string, body string) {

func (s *memberTestSuite) waitEtcdLeaderChange(c *C, server *tests.TestServer, old string) string {
var leader string
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
var err error
leader, err = server.GetEtcdLeader()
if err != nil {
Expand Down Expand Up @@ -271,7 +271,7 @@ func (s *memberTestSuite) TestLeaderResignWithBlock(c *C) {

func (s *memberTestSuite) waitLeaderChange(c *C, cluster *tests.TestCluster, old string) string {
var leader string
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
leader = cluster.GetLeader()
if leader == old || leader == "" {
return false
Expand Down Expand Up @@ -383,7 +383,7 @@ func (s *leaderTestSuite) sendRequest(c *C, addr string) {

func mustWaitLeader(c *C, svrs []*server.Server) *server.Server {
var leader *server.Server
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
for _, s := range svrs {
if !s.IsClosed() && s.GetMember().IsLeader() {
leader = s
Expand Down
2 changes: 1 addition & 1 deletion tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *regionSyncerTestSuite) TestRegionSyncer(c *C) {
c.Assert(followerServer, NotNil)
cacheRegions := leaderServer.GetServer().GetBasicCluster().GetRegions()
c.Assert(cacheRegions, HasLen, regionLen)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
for _, region := range cacheRegions {
r := followerServer.GetServer().GetBasicCluster().GetRegion(region.GetID())
if !(c.Check(r.GetMeta(), DeepEquals, region.GetMeta()) &&
Expand Down
2 changes: 1 addition & 1 deletion tests/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *serverTestSuite) TestLeader(c *C) {

err = cluster.GetServer(leader1).Stop()
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
leader := cluster.GetLeader()
return leader != leader1
})
Expand Down
6 changes: 3 additions & 3 deletions tests/server/tso/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *testAllocatorSuite) TestPriorityAndDifferentLocalTSO(c *C) {
c.Assert(err, IsNil)
dcLocationConfig["pd4"] = "dc-4"
cluster.CheckClusterDCLocation()
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
leaderName := cluster.WaitAllocatorLeader("dc-4")
return leaderName != ""
})
Expand All @@ -180,7 +180,7 @@ func (s *testAllocatorSuite) TestPriorityAndDifferentLocalTSO(c *C) {
for serverName, dcLocation := range dcLocationConfig {
go func(serName, dc string) {
defer wg.Done()
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
leaderName := cluster.WaitAllocatorLeader(dc)
return leaderName == serName
}, testutil.WithRetryTimes(12), testutil.WithSleepInterval(5*time.Second))
Expand Down Expand Up @@ -231,7 +231,7 @@ func (s *testAllocatorSuite) testTSOSuffix(c *C, cluster *tests.TestCluster, am
allocator, err := am.GetAllocator(dcLocation)
c.Assert(err, IsNil)
var tso pdpb.Timestamp
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
tso, err = allocator.GenerateTSO(1)
c.Assert(err, IsNil)
return tso.GetPhysical() != 0
Expand Down
Loading