Skip to content

Commit

Permalink
Merge branch 'master' into record-max
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Mar 20, 2024
2 parents 0583609 + c00c42e commit 082b30a
Show file tree
Hide file tree
Showing 24 changed files with 225 additions and 73 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ on:
- release-5.*
- release-6.*
- release-7.*
- release-8.*
pull_request:
branches:
- master
- release-4.0
- release-5.*
- release-6.*
- release-7.*
- release-8.*
concurrency:
group: ${{ github.ref }}-${{ github.workflow }}
cancel-in-progress: true
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/tso-function-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ on:
- release-5.*
- release-6.*
- release-7.*
- release-8.*
pull_request:
branches:
- master
- release-5.*
- release-6.*
- release-7.*
- release-8.*
concurrency:
group: ${{ github.ref }}-${{ github.workflow }}
cancel-in-progress: true
Expand Down
76 changes: 57 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,22 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
c.resetTSOClientLocked(newMode)
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

// Reset a new TSO client.
func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) {
// Re-create a new TSO client.
var (
newTSOCli *tsoClient
newTSOSvcDiscovery ServiceDiscovery
)
switch newMode {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
Expand Down Expand Up @@ -649,11 +659,6 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
// We are switching from API service mode to PD service mode, so delete the old tso microservice discovery.
oldTSOSvcDiscovery.Close()
}
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

func (c *client) getTSOClient() *tsoClient {
Expand All @@ -662,6 +667,13 @@ func (c *client) getTSOClient() *tsoClient {
return c.tsoClient
}

// ResetTSOClient resets the TSO client, only for test.
func (c *client) ResetTSOClient() {
c.Lock()
defer c.Unlock()
c.resetTSOClientLocked(c.serviceMode)
}

func (c *client) getServiceMode() pdpb.ServiceMode {
c.RLock()
defer c.RUnlock()
Expand Down Expand Up @@ -779,26 +791,52 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
defer span.Finish()
}

req := c.getTSORequest(ctx, dcLocation)
if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.done <- err
}
return req
}

func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
// Set needed fields in the request before using it.
req.start = time.Now()
req.clientCtx = c.ctx
req.requestCtx = ctx
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
return req
}

if tsoClient == nil {
req.done <- errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
return req
}
const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

if err := tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
// Wait for a while and try again
time.Sleep(50 * time.Millisecond)
if err = tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
req.done <- err
func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error {
var (
retryable bool
err error
)
for i := 0; i < dispatchRetryCount; i++ {
// Do not delay for the first time.
if i > 0 {
time.Sleep(dispatchRetryDelay)
}
// Get the tsoClient each time, as it may be initialized or switched during the process.
tsoClient := c.getTSOClient()
if tsoClient == nil {
err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
continue
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
break
}
}
return req
return err
}

func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
Expand Down
2 changes: 0 additions & 2 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,6 @@ func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
resourceGroupName, requestResource string) bool {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return false
}

Expand All @@ -577,7 +576,6 @@ func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context,
resourceGroupName := "default"
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return false
}
bg = gc.getMeta().BackgroundSettings
Expand Down
16 changes: 15 additions & 1 deletion client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package pd

import (
"context"
"runtime/trace"
"time"

"github.com/tikv/pd/client/tsoutil"
)

type tsoBatchController struct {
Expand Down Expand Up @@ -130,7 +133,18 @@ func (tbc *tsoBatchController) adjustBestBatchSize() {
}
}

func (tbc *tsoBatchController) revokePendingRequest(err error) {
func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < tbc.collectedRequestCount; i++ {
tsoReq := tbc.collectedRequests[i]
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End()
tsoReq.done <- err
}
// Prevent the finished requests from being processed again.
tbc.collectedRequestCount = 0
}

func (tbc *tsoBatchController) revokePendingRequests(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.done <- err
Expand Down
4 changes: 2 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type tsoClient struct {

// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding dc-location TSO channel.
tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest
tsoDispatcher sync.Map // Same as map[string]*tsoDispatcher
// dc-location -> deadline
tsDeadline sync.Map // Same as map[string]chan deadline
// dc-location -> *tsoInfo while the tsoInfo is the last TSO info
Expand Down Expand Up @@ -141,7 +141,7 @@ func (c *tsoClient) Close() {
if dispatcherInterface != nil {
dispatcher := dispatcherInterface.(*tsoDispatcher)
tsoErr := errors.WithStack(errClosing)
dispatcher.tsoBatchController.revokePendingRequest(tsoErr)
dispatcher.tsoBatchController.revokePendingRequests(tsoErr)
dispatcher.dispatcherCancel()
}
return true
Expand Down
57 changes: 35 additions & 22 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,31 @@ func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() {
}
}

func (c *tsoClient) dispatchRequest(ctx context.Context, dcLocation string, request *tsoRequest) error {
dispatcher, ok := c.tsoDispatcher.Load(dcLocation)
func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) {
dispatcher, ok := c.tsoDispatcher.Load(request.dcLocation)
if !ok {
err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", dcLocation))
log.Error("[tso] dispatch tso request error", zap.String("dc-location", dcLocation), errs.ZapError(err))
err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", request.dcLocation))
log.Error("[tso] dispatch tso request error", zap.String("dc-location", request.dcLocation), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
return err
// New dispatcher could be created in the meantime, which is retryable.
return true, err
}

defer trace.StartRegion(request.requestCtx, "pdclient.tsoReqEnqueue").End()
select {
case <-ctx.Done():
return ctx.Err()
case dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request:
case <-request.requestCtx.Done():
// Caller cancelled the request, no need to retry.
return false, request.requestCtx.Err()
case <-request.clientCtx.Done():
// Client is closed, no need to retry.
return false, request.clientCtx.Err()
case <-c.ctx.Done():
// tsoClient is closed due to the PD service mode switch, which is retryable.
return true, c.ctx.Err()
default:
dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request
}
return nil
return false, nil
}

// TSFuture is a future which promises to return a TSO.
Expand Down Expand Up @@ -341,7 +350,8 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) {
func (c *tsoClient) handleDispatcher(
dispatcherCtx context.Context,
dc string,
tbc *tsoBatchController) {
tbc *tsoBatchController,
) {
var (
err error
streamURL string
Expand Down Expand Up @@ -419,7 +429,11 @@ tsoBatchLoop:
}
// Start to collect the TSO requests.
maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval()
// Once the TSO requests are collected, must make sure they could be finished or revoked eventually,
// otherwise the upper caller may get blocked on waiting for the results.
if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil {
// Finish the collected requests if the fetch failed.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err))
if err == context.Canceled {
log.Info("[tso] stop fetching the pending tso requests due to context canceled",
zap.String("dc-location", dc))
Expand Down Expand Up @@ -459,13 +473,16 @@ tsoBatchLoop:
timer := time.NewTimer(retryInterval)
select {
case <-dispatcherCtx.Done():
// Finish the collected requests if the context is canceled.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err()))
timer.Stop()
return
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
// Finish the collected requests if the stream is failed to be created.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err))
timer.Stop()
continue tsoBatchLoop
case <-timer.C:
Expand Down Expand Up @@ -495,9 +512,12 @@ tsoBatchLoop:
}
select {
case <-dispatcherCtx.Done():
// Finish the collected requests if the context is canceled.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err()))
return
case tsDeadlineCh.(chan *deadline) <- dl:
}
// processRequests guarantees that the collected requests could be finished properly.
err = c.processRequests(stream, dc, tbc)
close(done)
// If error happens during tso stream handling, reset stream and run the next trial.
Expand Down Expand Up @@ -767,13 +787,14 @@ func (c *tsoClient) processRequests(
defer span.Finish()
}
}

count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, requests, tbc.batchStartTime)
dcLocation, count, tbc.batchStartTime)
if err != nil {
c.finishRequest(requests, 0, 0, 0, err)
tbc.finishCollectedRequests(0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
Expand All @@ -787,7 +808,7 @@ func (c *tsoClient) processRequests(
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil)
return nil
}

Expand Down Expand Up @@ -834,11 +855,3 @@ func (c *tsoClient) compareAndSwapTS(
lastTSOInfo.physical = curTSOInfo.physical
lastTSOInfo.logical = curTSOInfo.logical
}

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < len(requests); i++ {
requests[i].physical, requests[i].logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
defer trace.StartRegion(requests[i].requestCtx, "pdclient.tsoReqDequeue").End()
requests[i].done <- err
}
}
8 changes: 3 additions & 5 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type tsoStream interface {
// processRequests processes TSO requests in streaming mode to get timestamps
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
}

Expand All @@ -120,10 +120,9 @@ func (s *pdTSOStream) getServerURL() string {
}

func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time,
clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{
ClusterId: clusterID,
Expand Down Expand Up @@ -175,10 +174,9 @@ func (s *tsoTSOStream) getServerURL() string {

func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: clusterID,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b
github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/common v0.46.0
github.com/sasha-s/go-deadlock v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b h1:18oWO4GTxxaCwvt2zYyA49GiM5Jp0kOI53g8FptI7YI=
github.com/pingcap/tidb-dashboard v0.0.0-20240314085625-df2799d91d7b/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c=
github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 h1:mqWKTL6jkeG/MtxNmUbseSD/QvUtO1RAkr5e9Juy0Vk=
github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
Loading

0 comments on commit 082b30a

Please sign in to comment.