diff --git a/.github/workflows/pd-tests.yaml b/.github/workflows/pd-tests.yaml index 418fcfdfc10..3674e41cf8a 100644 --- a/.github/workflows/pd-tests.yaml +++ b/.github/workflows/pd-tests.yaml @@ -7,6 +7,7 @@ on: - release-5.* - release-6.* - release-7.* + - release-8.* pull_request: branches: - master @@ -14,6 +15,7 @@ on: - release-5.* - release-6.* - release-7.* + - release-8.* concurrency: group: ${{ github.ref }}-${{ github.workflow }} cancel-in-progress: true diff --git a/.github/workflows/tso-function-test.yaml b/.github/workflows/tso-function-test.yaml index ee7679602f5..d7780425d30 100644 --- a/.github/workflows/tso-function-test.yaml +++ b/.github/workflows/tso-function-test.yaml @@ -6,12 +6,14 @@ on: - release-5.* - release-6.* - release-7.* + - release-8.* pull_request: branches: - master - release-5.* - release-6.* - release-7.* + - release-8.* concurrency: group: ${{ github.ref }}-${{ github.workflow }} cancel-in-progress: true diff --git a/client/client.go b/client/client.go index b2c5cc425bb..8838c184d92 100644 --- a/client/client.go +++ b/client/client.go @@ -606,12 +606,22 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { log.Info("[pd] changing service mode", zap.String("old-mode", c.serviceMode.String()), zap.String("new-mode", newMode.String())) + c.resetTSOClientLocked(newMode) + oldMode := c.serviceMode + c.serviceMode = newMode + log.Info("[pd] service mode changed", + zap.String("old-mode", oldMode.String()), + zap.String("new-mode", newMode.String())) +} + +// Reset a new TSO client. +func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) { // Re-create a new TSO client. var ( newTSOCli *tsoClient newTSOSvcDiscovery ServiceDiscovery ) - switch newMode { + switch mode { case pdpb.ServiceMode_PD_SVC_MODE: newTSOCli = newTSOClient(c.ctx, c.option, c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{}) @@ -649,11 +659,6 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { // We are switching from API service mode to PD service mode, so delete the old tso microservice discovery. oldTSOSvcDiscovery.Close() } - oldMode := c.serviceMode - c.serviceMode = newMode - log.Info("[pd] service mode changed", - zap.String("old-mode", oldMode.String()), - zap.String("new-mode", newMode.String())) } func (c *client) getTSOClient() *tsoClient { @@ -662,6 +667,13 @@ func (c *client) getTSOClient() *tsoClient { return c.tsoClient } +// ResetTSOClient resets the TSO client, only for test. +func (c *client) ResetTSOClient() { + c.Lock() + defer c.Unlock() + c.resetTSOClientLocked(c.serviceMode) +} + func (c *client) getServiceMode() pdpb.ServiceMode { c.RLock() defer c.RUnlock() @@ -779,26 +791,52 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur defer span.Finish() } + req := c.getTSORequest(ctx, dcLocation) + if err := c.dispatchTSORequestWithRetry(req); err != nil { + req.done <- err + } + return req +} + +func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest { req := tsoReqPool.Get().(*tsoRequest) - req.requestCtx = ctx - req.clientCtx = c.ctx - tsoClient := c.getTSOClient() + // Set needed fields in the request before using it. req.start = time.Now() + req.clientCtx = c.ctx + req.requestCtx = ctx + req.physical = 0 + req.logical = 0 req.dcLocation = dcLocation + return req +} - if tsoClient == nil { - req.done <- errs.ErrClientGetTSO.FastGenByArgs("tso client is nil") - return req - } +const ( + dispatchRetryDelay = 50 * time.Millisecond + dispatchRetryCount = 2 +) - if err := tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil { - // Wait for a while and try again - time.Sleep(50 * time.Millisecond) - if err = tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil { - req.done <- err +func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error { + var ( + retryable bool + err error + ) + for i := 0; i < dispatchRetryCount; i++ { + // Do not delay for the first time. + if i > 0 { + time.Sleep(dispatchRetryDelay) + } + // Get the tsoClient each time, as it may be initialized or switched during the process. + tsoClient := c.getTSOClient() + if tsoClient == nil { + err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil") + continue + } + retryable, err = tsoClient.dispatchRequest(req) + if !retryable { + break } } - return req + return err } func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) { diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 326f564b1df..a695aaf82bc 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -564,7 +564,6 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { - failedRequestCounter.WithLabelValues(resourceGroupName).Inc() return false } @@ -577,7 +576,6 @@ func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, resourceGroupName := "default" gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { - failedRequestCounter.WithLabelValues(resourceGroupName).Inc() return false } bg = gc.getMeta().BackgroundSettings diff --git a/client/tso_batch_controller.go b/client/tso_batch_controller.go index 842c772abd9..bd7a440fb08 100644 --- a/client/tso_batch_controller.go +++ b/client/tso_batch_controller.go @@ -16,7 +16,10 @@ package pd import ( "context" + "runtime/trace" "time" + + "github.com/tikv/pd/client/tsoutil" ) type tsoBatchController struct { @@ -130,7 +133,18 @@ func (tbc *tsoBatchController) adjustBestBatchSize() { } } -func (tbc *tsoBatchController) revokePendingRequest(err error) { +func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) { + for i := 0; i < tbc.collectedRequestCount; i++ { + tsoReq := tbc.collectedRequests[i] + tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) + defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End() + tsoReq.done <- err + } + // Prevent the finished requests from being processed again. + tbc.collectedRequestCount = 0 +} + +func (tbc *tsoBatchController) revokePendingRequests(err error) { for i := 0; i < len(tbc.tsoRequestCh); i++ { req := <-tbc.tsoRequestCh req.done <- err diff --git a/client/tso_client.go b/client/tso_client.go index 158d84e043a..c563df0efdb 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -80,7 +80,7 @@ type tsoClient struct { // tsoDispatcher is used to dispatch different TSO requests to // the corresponding dc-location TSO channel. - tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest + tsoDispatcher sync.Map // Same as map[string]*tsoDispatcher // dc-location -> deadline tsDeadline sync.Map // Same as map[string]chan deadline // dc-location -> *tsoInfo while the tsoInfo is the last TSO info @@ -141,7 +141,7 @@ func (c *tsoClient) Close() { if dispatcherInterface != nil { dispatcher := dispatcherInterface.(*tsoDispatcher) tsoErr := errors.WithStack(errClosing) - dispatcher.tsoBatchController.revokePendingRequest(tsoErr) + dispatcher.tsoBatchController.revokePendingRequests(tsoErr) dispatcher.dispatcherCancel() } return true diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index defe7de2afd..a625f8dbbe1 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -73,22 +73,31 @@ func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() { } } -func (c *tsoClient) dispatchRequest(ctx context.Context, dcLocation string, request *tsoRequest) error { - dispatcher, ok := c.tsoDispatcher.Load(dcLocation) +func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { + dispatcher, ok := c.tsoDispatcher.Load(request.dcLocation) if !ok { - err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", dcLocation)) - log.Error("[tso] dispatch tso request error", zap.String("dc-location", dcLocation), errs.ZapError(err)) + err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", request.dcLocation)) + log.Error("[tso] dispatch tso request error", zap.String("dc-location", request.dcLocation), errs.ZapError(err)) c.svcDiscovery.ScheduleCheckMemberChanged() - return err + // New dispatcher could be created in the meantime, which is retryable. + return true, err } defer trace.StartRegion(request.requestCtx, "pdclient.tsoReqEnqueue").End() select { - case <-ctx.Done(): - return ctx.Err() - case dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request: + case <-request.requestCtx.Done(): + // Caller cancelled the request, no need to retry. + return false, request.requestCtx.Err() + case <-request.clientCtx.Done(): + // Client is closed, no need to retry. + return false, request.clientCtx.Err() + case <-c.ctx.Done(): + // tsoClient is closed due to the PD service mode switch, which is retryable. + return true, c.ctx.Err() + default: + dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request } - return nil + return false, nil } // TSFuture is a future which promises to return a TSO. @@ -341,7 +350,8 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) { func (c *tsoClient) handleDispatcher( dispatcherCtx context.Context, dc string, - tbc *tsoBatchController) { + tbc *tsoBatchController, +) { var ( err error streamURL string @@ -419,7 +429,11 @@ tsoBatchLoop: } // Start to collect the TSO requests. maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() + // Once the TSO requests are collected, must make sure they could be finished or revoked eventually, + // otherwise the upper caller may get blocked on waiting for the results. if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { + // Finish the collected requests if the fetch failed. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) if err == context.Canceled { log.Info("[tso] stop fetching the pending tso requests due to context canceled", zap.String("dc-location", dc)) @@ -459,13 +473,16 @@ tsoBatchLoop: timer := time.NewTimer(retryInterval) select { case <-dispatcherCtx.Done(): + // Finish the collected requests if the context is canceled. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) timer.Stop() return case <-streamLoopTimer.C: err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) c.svcDiscovery.ScheduleCheckMemberChanged() - c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) + // Finish the collected requests if the stream is failed to be created. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) timer.Stop() continue tsoBatchLoop case <-timer.C: @@ -495,9 +512,12 @@ tsoBatchLoop: } select { case <-dispatcherCtx.Done(): + // Finish the collected requests if the context is canceled. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) return case tsDeadlineCh.(chan *deadline) <- dl: } + // processRequests guarantees that the collected requests could be finished properly. err = c.processRequests(stream, dc, tbc) close(done) // If error happens during tso stream handling, reset stream and run the next trial. @@ -767,13 +787,14 @@ func (c *tsoClient) processRequests( defer span.Finish() } } + count := int64(len(requests)) reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID() respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID, - dcLocation, requests, tbc.batchStartTime) + dcLocation, count, tbc.batchStartTime) if err != nil { - c.finishRequest(requests, 0, 0, 0, err) + tbc.finishCollectedRequests(0, 0, 0, err) return err } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. @@ -787,7 +808,7 @@ func (c *tsoClient) processRequests( logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), } c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) - c.finishRequest(requests, physical, firstLogical, suffixBits, nil) + tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil) return nil } @@ -834,11 +855,3 @@ func (c *tsoClient) compareAndSwapTS( lastTSOInfo.physical = curTSOInfo.physical lastTSOInfo.logical = curTSOInfo.logical } - -func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) { - for i := 0; i < len(requests); i++ { - requests[i].physical, requests[i].logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) - defer trace.StartRegion(requests[i].requestCtx, "pdclient.tsoReqDequeue").End() - requests[i].done <- err - } -} diff --git a/client/tso_stream.go b/client/tso_stream.go index acefa19d21c..83c0f08d4e0 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -106,7 +106,7 @@ type tsoStream interface { // processRequests processes TSO requests in streaming mode to get timestamps processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - requests []*tsoRequest, batchStartTime time.Time, + count int64, batchStartTime time.Time, ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) } @@ -120,10 +120,9 @@ func (s *pdTSOStream) getServerURL() string { } func (s *pdTSOStream) processRequests( - clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time, + clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time, ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { start := time.Now() - count := int64(len(requests)) req := &pdpb.TsoRequest{ Header: &pdpb.RequestHeader{ ClusterId: clusterID, @@ -175,10 +174,9 @@ func (s *tsoTSOStream) getServerURL() string { func (s *tsoTSOStream) processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - requests []*tsoRequest, batchStartTime time.Time, + count int64, batchStartTime time.Time, ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { start := time.Now() - count := int64(len(requests)) req := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: clusterID, diff --git a/go.mod b/go.mod index e6101c7cc5e..733246a1e99 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 - github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b + github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/common v0.46.0 github.com/sasha-s/go-deadlock v0.2.0 diff --git a/go.sum b/go.sum index 197b98ca7f4..3f618ac0af8 100644 --- a/go.sum +++ b/go.sum @@ -424,8 +424,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b h1:18oWO4GTxxaCwvt2zYyA49GiM5Jp0kOI53g8FptI7YI= -github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 h1:mqWKTL6jkeG/MtxNmUbseSD/QvUtO1RAkr5e9Juy0Vk= +github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/pkg/schedule/labeler/labeler_test.go b/pkg/schedule/labeler/labeler_test.go index 910a5558eb3..364f79b7a14 100644 --- a/pkg/schedule/labeler/labeler_test.go +++ b/pkg/schedule/labeler/labeler_test.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/testutil" ) func TestAdjustRule(t *testing.T) { @@ -382,8 +383,10 @@ func TestLabelerRuleTTL(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/labeler/regionLabelExpireSub1Minute", "return(true)")) // rule2 should expire and only 2 labels left. - labels := labeler.GetRegionLabels(region) - re.Len(labels, 2) + testutil.Eventually(re, func() bool { + labels := labeler.GetRegionLabels(region) + return len(labels) == 2 + }) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/labeler/regionLabelExpireSub1Minute")) // rule2 should be existed since `GetRegionLabels` won't clear it physically. diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index d56d0e662e3..6bb3dba1609 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -418,6 +418,8 @@ type LoopWatcher struct { // updateClientCh is used to update the etcd client. // It's only used for testing. updateClientCh chan *clientv3.Client + // watchChTimeoutDuration is the timeout duration for a watchChan. + watchChTimeoutDuration time.Duration } // NewLoopWatcher creates a new LoopWatcher. @@ -448,6 +450,7 @@ func NewLoopWatcher( loadRetryTimes: defaultLoadFromEtcdRetryTimes, loadBatchSize: maxLoadBatchSize, watchChangeRetryInterval: defaultEtcdRetryInterval, + watchChTimeoutDuration: WatchChTimeoutDuration, } } @@ -597,7 +600,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision cancel() // If no message comes from an etcd watchChan for WatchChTimeoutDuration, // create a new one and need not to reset lastReceivedResponseTime. - if time.Since(lastReceivedResponseTime) >= WatchChTimeoutDuration { + if time.Since(lastReceivedResponseTime) >= lw.watchChTimeoutDuration { log.Warn("watch channel is blocked for a long time, recreating a new one in watch loop", zap.Duration("timeout", time.Since(lastReceivedResponseTime)), zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 63fa50fd800..4fb96895942 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -583,9 +583,25 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLargeKey() { count := 65536 ctx, cancel := context.WithCancel(suite.ctx) defer cancel() + + // create data + var wg sync.WaitGroup + tasks := make(chan int, count) + for w := 0; w < 16; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := range tasks { + suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "") + } + }() + } for i := 0; i < count; i++ { - suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "") + tasks <- i } + close(tasks) + wg.Wait() + cache := make([]string, 0) watcher := NewLoopWatcher( ctx, @@ -724,6 +740,7 @@ func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() { func([]*clientv3.Event) error { return nil }, false, /* withPrefix */ ) + watcher.watchChTimeoutDuration = 2 * RequestProgressInterval suite.wg.Add(1) go func() { diff --git a/scripts/dashboard-version b/scripts/dashboard-version index 6a4b729af9f..2bef159c1ae 100644 --- a/scripts/dashboard-version +++ b/scripts/dashboard-version @@ -1,3 +1,3 @@ # This file is updated by running scripts/update-dashboard.sh # Don't edit it manullay -8.0.0-df2799d9 +8.0.0-ab48e09f diff --git a/server/api/pprof_test.go b/server/api/pprof_test.go index a1acd84dcb6..b43feeab108 100644 --- a/server/api/pprof_test.go +++ b/server/api/pprof_test.go @@ -53,7 +53,7 @@ func (suite *profTestSuite) TearDownSuite() { func (suite *profTestSuite) TestGetZip() { re := suite.Require() - rsp, err := testDialClient.Get(suite.urlPrefix + "/pprof/zip?" + "seconds=5s") + rsp, err := testDialClient.Get(suite.urlPrefix + "/pprof/zip?" + "seconds=5") re.NoError(err) defer rsp.Body.Close() body, err := io.ReadAll(rsp.Body) diff --git a/server/api/status_test.go b/server/api/status_test.go index 065618efb6c..5444fda77b4 100644 --- a/server/api/status_test.go +++ b/server/api/status_test.go @@ -33,7 +33,7 @@ func checkStatusResponse(re *require.Assertions, body []byte) { func TestStatus(t *testing.T) { re := require.New(t) - cfgs, _, clean := mustNewCluster(re, 3) + cfgs, _, clean := mustNewCluster(re, 1) defer clean() for _, cfg := range cfgs { diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index 51668ab9ea8..209a4c95445 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -40,7 +40,7 @@ func RegisterMicroService(r *gin.RouterGroup) { func GetMembers(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) if !svr.IsAPIServiceMode() { - c.AbortWithStatusJSON(http.StatusServiceUnavailable, "not support micro service") + c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service") return } @@ -66,7 +66,7 @@ func GetMembers(c *gin.Context) { func GetPrimary(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) if !svr.IsAPIServiceMode() { - c.AbortWithStatusJSON(http.StatusServiceUnavailable, "not support micro service") + c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service") return } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 5b33a9c5bca..e91e549d797 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -1062,9 +1062,23 @@ func TestCloseClient(t *testing.T) { defer cluster.Destroy() endpoints := runServer(re, cluster) cli := setupCli(re, ctx, endpoints) - cli.GetTSAsync(context.TODO()) + ts := cli.GetTSAsync(context.TODO()) time.Sleep(time.Second) cli.Close() + physical, logical, err := ts.Wait() + if err == nil { + re.Greater(physical, int64(0)) + re.Greater(logical, int64(0)) + } else { + re.ErrorIs(err, context.Canceled) + re.Zero(physical) + re.Zero(logical) + } + ts = cli.GetTSAsync(context.TODO()) + physical, logical, err = ts.Wait() + re.ErrorIs(err, context.Canceled) + re.Zero(physical) + re.Zero(logical) } type idAllocator struct { diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index 3ac9df728b4..d689298c314 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -126,7 +126,7 @@ require ( github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index 29197f89432..852e8fab5fa 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -406,8 +406,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b h1:18oWO4GTxxaCwvt2zYyA49GiM5Jp0kOI53g8FptI7YI= -github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 h1:mqWKTL6jkeG/MtxNmUbseSD/QvUtO1RAkr5e9Juy0Vk= +github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index aa7a264f5e6..aea0441c7d7 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -1343,6 +1343,7 @@ func (suite *resourceManagerClientTestSuite) TestCheckBackgroundJobs() { re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "")) + re.False(c.IsBackgroundRequest(suite.ctx, "none", "none")) resourceGroupName = enableBackgroundGroup(true) re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_br")) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 3d7b099f342..b0bd6f1d4e5 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -21,6 +21,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "testing" "time" @@ -66,6 +67,10 @@ type tsoClientTestSuite struct { clients []pd.Client } +func (suite *tsoClientTestSuite) getBackendEndpoints() []string { + return strings.Split(suite.backendEndpoints, ",") +} + func TestLegacyTSOClient(t *testing.T) { suite.Run(t, &tsoClientTestSuite{ legacy: true, @@ -98,7 +103,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.keyspaceIDs = make([]uint32, 0) if suite.legacy { - client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}, pd.WithForwardingOption(true)) + client, err := pd.NewClientWithContext(suite.ctx, suite.getBackendEndpoints(), pd.SecurityOption{}, pd.WithForwardingOption(true)) re.NoError(err) innerClient, ok := client.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) @@ -173,7 +178,7 @@ func (suite *tsoClientTestSuite) waitForAllKeyspaceGroupsInServing(re *require.A // Create clients and make sure they all have discovered the tso service. suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( - suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) + suite.ctx, re, suite.keyspaceIDs, suite.getBackendEndpoints()) re.Equal(len(suite.keyspaceIDs), len(suite.clients)) } @@ -254,7 +259,7 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { ctx, cancel := context.WithCancel(suite.ctx) defer cancel() client := mcs.SetupClientWithKeyspaceID( - ctx, re, keyspaceID, strings.Split(suite.backendEndpoints, ",")) + ctx, re, keyspaceID, suite.getBackendEndpoints()) defer client.Close() var lastTS uint64 for j := 0; j < tsoRequestRound; j++ { @@ -420,6 +425,50 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")) } +func (suite *tsoClientTestSuite) TestGetTSWhileRestingTSOClient() { + re := suite.Require() + var ( + clients []pd.Client + stopSignal atomic.Bool + wg sync.WaitGroup + ) + // Create independent clients to prevent interfering with other tests. + if suite.legacy { + client, err := pd.NewClientWithContext(suite.ctx, suite.getBackendEndpoints(), pd.SecurityOption{}, pd.WithForwardingOption(true)) + re.NoError(err) + clients = []pd.Client{client} + } else { + clients = mcs.WaitForMultiKeyspacesTSOAvailable(suite.ctx, re, suite.keyspaceIDs, suite.getBackendEndpoints()) + } + wg.Add(tsoRequestConcurrencyNumber * len(clients)) + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + for _, client := range clients { + go func(client pd.Client) { + defer wg.Done() + var lastTS uint64 + for !stopSignal.Load() { + physical, logical, err := client.GetTS(suite.ctx) + if err != nil { + re.ErrorContains(err, context.Canceled.Error()) + } else { + ts := tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + } + }(client) + } + } + // Reset the TSO clients while requesting TSO concurrently. + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + for _, client := range clients { + client.(interface{ ResetTSOClient() }).ResetTSOClient() + } + } + stopSignal.Store(true) + wg.Wait() +} + // When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time. func TestMixedTSODeployment(t *testing.T) { re := require.New(t) diff --git a/tools/go.mod b/tools/go.mod index 97c25a852ee..077f9377728 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -126,7 +126,7 @@ require ( github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect diff --git a/tools/go.sum b/tools/go.sum index 2c2bf0e0e29..6b46e5bfbfb 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -402,8 +402,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b h1:18oWO4GTxxaCwvt2zYyA49GiM5Jp0kOI53g8FptI7YI= -github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 h1:mqWKTL6jkeG/MtxNmUbseSD/QvUtO1RAkr5e9Juy0Vk= +github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=