Skip to content

Commit

Permalink
This is an automated cherry-pick of #7951
Browse files Browse the repository at this point in the history
close #7849

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
JmPotato authored and ti-chi-bot committed Apr 18, 2024
1 parent c325311 commit ff570d7
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 7 deletions.
84 changes: 83 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,22 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
c.resetTSOClientLocked(newMode)
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

// Reset a new TSO client.
func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) {
// Re-create a new TSO client.
var (
newTSOCli *tsoClient
newTSOSvcDiscovery ServiceDiscovery
)
switch newMode {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
Expand Down Expand Up @@ -424,6 +434,7 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
oldTSOSvcDiscovery.Close()
}
}
<<<<<<< HEAD
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", c.serviceMode.String()),
Expand All @@ -435,6 +446,27 @@ func (c *client) getTSOClient() *tsoClient {
return tsoCli.(*tsoClient)
}
return nil
=======
}

func (c *client) getTSOClient() *tsoClient {
c.RLock()
defer c.RUnlock()
return c.tsoClient
}

// ResetTSOClient resets the TSO client, only for test.
func (c *client) ResetTSOClient() {
c.Lock()
defer c.Unlock()
c.resetTSOClientLocked(c.serviceMode)
}

func (c *client) getServiceMode() pdpb.ServiceMode {
c.RLock()
defer c.RUnlock()
return c.serviceMode
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
}

func (c *client) scheduleUpdateTokenConnection() {
Expand Down Expand Up @@ -598,6 +630,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
ctx = opentracing.ContextWithSpan(ctx, span)
}

<<<<<<< HEAD
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
Expand All @@ -617,10 +650,59 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
if err = tsoClient.dispatchRequest(dcLocation, req); err != nil {
req.done <- err
}
=======
req := c.getTSORequest(ctx, dcLocation)
if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.done <- err
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
}
return req
}

<<<<<<< HEAD
=======
func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := tsoReqPool.Get().(*tsoRequest)
// Set needed fields in the request before using it.
req.start = time.Now()
req.clientCtx = c.ctx
req.requestCtx = ctx
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
return req
}

const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error {
var (
retryable bool
err error
)
for i := 0; i < dispatchRetryCount; i++ {
// Do not delay for the first time.
if i > 0 {
time.Sleep(dispatchRetryDelay)
}
// Get the tsoClient each time, as it may be initialized or switched during the process.
tsoClient := c.getTSOClient()
if tsoClient == nil {
err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
continue
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
break
}
}
return err
}

>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
resp := c.GetTSAsync(ctx)
return resp.Wait()
Expand Down
16 changes: 15 additions & 1 deletion client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package pd

import (
"context"
"runtime/trace"
"time"

"github.com/tikv/pd/client/tsoutil"
)

type tsoBatchController struct {
Expand Down Expand Up @@ -130,7 +133,18 @@ func (tbc *tsoBatchController) adjustBestBatchSize() {
}
}

func (tbc *tsoBatchController) revokePendingRequest(err error) {
func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < tbc.collectedRequestCount; i++ {
tsoReq := tbc.collectedRequests[i]
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End()
tsoReq.done <- err
}
// Prevent the finished requests from being processed again.
tbc.collectedRequestCount = 0
}

func (tbc *tsoBatchController) revokePendingRequests(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.done <- err
Expand Down
2 changes: 1 addition & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *tsoClient) Close() {
if dispatcherInterface != nil {
dispatcher := dispatcherInterface.(*tsoDispatcher)
tsoErr := errors.WithStack(errClosing)
dispatcher.tsoBatchController.revokePendingRequest(tsoErr)
dispatcher.tsoBatchController.revokePendingRequests(tsoErr)
dispatcher.dispatcherCancel()
}
return true
Expand Down
54 changes: 52 additions & 2 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) {
func (c *tsoClient) handleDispatcher(
dispatcherCtx context.Context,
dc string,
tbc *tsoBatchController) {
tbc *tsoBatchController,
) {
var (
err error
streamAddr string
Expand Down Expand Up @@ -377,7 +378,11 @@ tsoBatchLoop:
}
// Start to collect the TSO requests.
maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval()
// Once the TSO requests are collected, must make sure they could be finished or revoked eventually,
// otherwise the upper caller may get blocked on waiting for the results.
if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil {
// Finish the collected requests if the fetch failed.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err))
if err == context.Canceled {
log.Info("[tso] stop fetching the pending tso requests due to context canceled",
zap.String("dc-location", dc))
Expand Down Expand Up @@ -406,12 +411,24 @@ tsoBatchLoop:
}
select {
case <-dispatcherCtx.Done():
<<<<<<< HEAD
=======
// Finish the collected requests if the context is canceled.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err()))
timer.Stop()
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
return
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
<<<<<<< HEAD
c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
=======
// Finish the collected requests if the stream is failed to be created.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err))
timer.Stop()
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
continue tsoBatchLoop
case <-time.After(retryInterval):
continue streamChoosingLoop
Expand Down Expand Up @@ -443,11 +460,18 @@ tsoBatchLoop:
}
select {
case <-dispatcherCtx.Done():
// Finish the collected requests if the context is canceled.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err()))
return
case tsDeadlineCh.(chan deadline) <- dl:
}
<<<<<<< HEAD
opts = extractSpanReference(tbc, opts[:0])
err = c.processRequests(stream, dc, tbc, opts)
=======
// processRequests guarantees that the collected requests could be finished properly.
err = c.processRequests(stream, dc, tbc)
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
close(done)
// If error happens during tso stream handling, reset stream and run the next trial.
if err != nil {
Expand Down Expand Up @@ -698,6 +722,7 @@ func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanO
opts = append(opts, opentracing.ChildOf(span.Context()))
}
}
<<<<<<< HEAD
return opts
}

Expand All @@ -710,14 +735,36 @@ func (c *tsoClient) processRequests(stream tsoStream, dcLocation string, tbc *ts
requests := tbc.getCollectedRequests()
count := int64(len(requests))
physical, logical, suffixBits, err := stream.processRequests(c.svcDiscovery.GetClusterID(), dcLocation, requests, tbc.batchStartTime)
=======

count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, count, tbc.batchStartTime)
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
if err != nil {
c.finishRequest(requests, 0, 0, 0, err)
tbc.finishCollectedRequests(0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
<<<<<<< HEAD
firstLogical := addLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
=======
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
curTSOInfo := &tsoInfo{
tsoServer: stream.getServerURL(),
reqKeyspaceGroupID: reqKeyspaceGroupID,
respKeyspaceGroupID: respKeyspaceGroupID,
respReceivedAt: time.Now(),
physical: physical,
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil)
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
return nil
}

Expand Down Expand Up @@ -757,6 +804,7 @@ func tsLessEqual(physical, logical, thatPhysical, thatLogical int64) bool {
}
return physical < thatPhysical
}
<<<<<<< HEAD

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < len(requests); i++ {
Expand All @@ -767,3 +815,5 @@ func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical
requests[i].done <- err
}
}
=======
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
30 changes: 28 additions & 2 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,34 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha

type tsoStream interface {
// processRequests processes TSO requests in streaming mode to get timestamps
<<<<<<< HEAD
processRequests(clusterID uint64, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error)
=======
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
}

type pdTSOStream struct {
stream pdpb.PD_TsoClient
}

<<<<<<< HEAD
func (s *pdTSOStream) processRequests(clusterID uint64, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error) {
=======
func (s *pdTSOStream) getServerURL() string {
return s.serverURL
}

func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
start := time.Now()
count := int64(len(requests))
req := &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{
ClusterId: clusterID,
Expand Down Expand Up @@ -152,10 +168,20 @@ type tsoTSOStream struct {
stream tsopb.TSO_TsoClient
}

<<<<<<< HEAD
func (s *tsoTSOStream) processRequests(clusterID uint64, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error) {
=======
func (s *tsoTSOStream) getServerURL() string {
return s.serverURL
}

func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
start := time.Now()
count := int64(len(requests))
req := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: clusterID,
Expand Down
Loading

0 comments on commit ff570d7

Please sign in to comment.