From ad76badd72bd9eac6ec55d213e9646423b05eed4 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 7 Nov 2023 14:04:50 +0800 Subject: [PATCH] Fix the integration tests Signed-off-by: JmPotato --- integration_tests/pd_api_test.go | 76 ++++++++++---------------------- tikv/kv.go | 40 ++++++++--------- 2 files changed, 41 insertions(+), 75 deletions(-) diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index 4a27abcad..787538373 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -48,14 +48,15 @@ type apiTestSuite struct { } func (s *apiTestSuite) SetupTest() { + re := s.Require() addrs := strings.Split(*pdAddrs, ",") pdClient, err := pd.NewClient(addrs, pd.SecurityOption{}) - s.Require().NoError(err) + re.NoError(err) rpcClient := tikv.NewRPCClient() - s.Require().NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) + re.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`)) // Set PD HTTP client. - store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs)) - s.store = store + s.store, err = tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(addrs, nil)) + re.NoError(err) storeID := uint64(1) s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, nil) } @@ -123,14 +124,9 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() { s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels) // Try to get the minimum resolved timestamp of the stores from PD. require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) - var retryCount int - for s.store.GetMinSafeTS(dcLabel) != 100 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } + s.Eventually(func() bool { + return s.store.GetMinSafeTS(dcLabel) == 100 + }, time.Second, 200*time.Millisecond) require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount)) require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel)) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) @@ -143,14 +139,9 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { s.store.SetTiKVClient(&mockClient) // Try to get the minimum resolved timestamp of the cluster from PD. require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) - var retryCount int - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } + s.Eventually(func() bool { + return s.store.GetMinSafeTS(oracle.GlobalTxnScope) == 100 + }, time.Second, 100*time.Millisecond) require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0)) require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) @@ -173,15 +164,9 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() { storeID := uint64(1) s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels) // Try to get the minimum resolved timestamp of the store from TiKV. - retryCount = 0 - for s.store.GetMinSafeTS(dcLabel) != 150 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } - + s.Eventually(func() bool { + return s.store.GetMinSafeTS(dcLabel) == 150 + }, 3*time.Second, 200*time.Millisecond) require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1)) require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel)) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) @@ -197,28 +182,18 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { mockClient.SetKVSafeTS(0) // Try to get the minimum resolved timestamp of the cluster from TiKV. require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) - var retryCount int - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } + s.Eventually(func() bool { + return s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 + }, time.Second, 200*time.Millisecond) // Make sure the store's min resolved ts is not initialized. require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) // Try to get the minimum resolved timestamp of the cluster from PD. require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(100)`)) - retryCount = 0 - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } + s.Eventually(func() bool { + return s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 + }, time.Second, 200*time.Millisecond) // Make sure the store's min resolved ts is not regarded as MaxUint64. require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) @@ -226,14 +201,9 @@ func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() { // Fallback to KV Request when PD server not support get min resolved ts. require.NoError(failpoint.Enable("tikvclient/InjectPDMinResolvedTS", `return(0)`)) mockClient.SetKVSafeTS(150) - retryCount = 0 - for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 { - time.Sleep(100 * time.Millisecond) - if retryCount > 5 { - break - } - retryCount++ - } + s.Eventually(func() bool { + return s.store.GetMinSafeTS(oracle.GlobalTxnScope) == 150 + }, time.Second, 200*time.Millisecond) // Make sure the minSafeTS can advance. require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope)) require.NoError(failpoint.Disable("tikvclient/InjectPDMinResolvedTS")) diff --git a/tikv/kv.go b/tikv/kv.go index 16b7b6992..ccfafad78 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -198,9 +198,9 @@ func WithPool(gp Pool) Option { } // WithPDHTTPClient set the PD HTTP client with the given address and TLS config. -func WithPDHTTPClient(tlsConf *tls.Config, pdaddrs []string) Option { +func WithPDHTTPClient(pdAddrs []string, tlsConf *tls.Config) Option { return func(o *KVStore) { - o.pdHttpClient = pdhttp.NewClient(pdaddrs, pdhttp.WithTLSConfig(tlsConf)) + o.pdHttpClient = pdhttp.NewClient(pdAddrs, pdhttp.WithTLSConfig(tlsConf)) } } @@ -611,7 +611,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { for i, store := range stores { storeIDs[i] = store.StoreID() } - _, storeMinResolvedTSs, err = s.getGetMinResolvedTSByStoresIDs(ctx, storeIDs) + _, storeMinResolvedTSs, err = s.getMinResolvedTSByStoresIDs(ctx, storeIDs) if err != nil { // If getting the minimum resolved timestamp from PD failed, log the error and need to get it from TiKV. logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Any("stores", storeIDs)) @@ -651,8 +651,10 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { return } safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs() + logutil.BgLogger().Info("get min resolved ts from tikv", zap.Uint64("store-id", storeID), zap.Uint64("safe-ts", safeTS)) } else { safeTS = storeMinResolvedTSs[storeID] + logutil.BgLogger().Info("get min resolved ts from pd", zap.Uint64("store-id", storeID), zap.Uint64("safe-ts", safeTS)) } _, preSafeTS := s.getSafeTS(storeID) @@ -683,7 +685,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { wg.Wait() } -func (s *KVStore) getGetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) { +func (s *KVStore) getMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) { var ( minResolvedTS uint64 storeMinResolvedTSs map[uint64]uint64 @@ -694,25 +696,19 @@ func (s *KVStore) getGetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs [ return 0, nil, err } if val, e := util.EvalFailpoint("InjectPDMinResolvedTS"); e == nil { - // Need to make sure successfully get from real pd. - if storeMinResolvedTSs != nil { - for storeID, v := range storeMinResolvedTSs { - if v != 0 { - // Should be val.(uint64) but failpoint doesn't support that. - if tmp, ok := val.(int); ok { - storeMinResolvedTSs[storeID] = uint64(tmp) - logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(tmp))) - } - } + injectedTS, ok := val.(int) + if !ok { + return minResolvedTS, storeMinResolvedTSs, err + } + minResolvedTS = uint64(injectedTS) + logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(injectedTS))) + // Currently we only have a store 1 in the test, so it's OK to inject the same min resolved TS for all stores here. + for storeID, v := range storeMinResolvedTSs { + if v != 0 && v != math.MaxUint64 { + storeMinResolvedTSs[storeID] = uint64(injectedTS) + logutil.BgLogger().Info("inject store min resolved ts", zap.Uint64("storeID", storeID), zap.Uint64("ts", uint64(injectedTS))) } - } else if tmp, ok := val.(int); ok { - // Should be val.(uint64) but failpoint doesn't support that. - // ci's store id is 1, we can change it if we have more stores. - // but for pool ci it's no need to do that :( - minResolvedTS = uint64(tmp) - logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(tmp))) } - } return minResolvedTS, storeMinResolvedTSs, err } @@ -729,7 +725,7 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool { isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope // Try to get the minimum resolved timestamp of the cluster from PD. if s.pdHttpClient != nil && isGlobal { - clusterMinSafeTS, _, err := s.pdHttpClient.GetMinResolvedTSByStoresIDs(ctx, nil) + clusterMinSafeTS, _, err := s.getMinResolvedTSByStoresIDs(ctx, nil) if err != nil { logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err)) } else if clusterMinSafeTS != 0 {