Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/tso: fix the bug that collected TSO requests could never be finished (#7951) #8089

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,22 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
c.resetTSOClientLocked(newMode)
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

// Reset a new TSO client.
func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) {
// Re-create a new TSO client.
var (
newTSOCli *tsoClient
newTSOSvcDiscovery ServiceDiscovery
)
switch newMode {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
Expand Down Expand Up @@ -424,6 +434,7 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
oldTSOSvcDiscovery.Close()
}
}
<<<<<<< HEAD
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", c.serviceMode.String()),
Expand All @@ -435,6 +446,27 @@ func (c *client) getTSOClient() *tsoClient {
return tsoCli.(*tsoClient)
}
return nil
=======
}

func (c *client) getTSOClient() *tsoClient {
c.RLock()
defer c.RUnlock()
return c.tsoClient
}

// ResetTSOClient resets the TSO client, only for test.
func (c *client) ResetTSOClient() {
c.Lock()
defer c.Unlock()
c.resetTSOClientLocked(c.serviceMode)
}

func (c *client) getServiceMode() pdpb.ServiceMode {
c.RLock()
defer c.RUnlock()
return c.serviceMode
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
}

func (c *client) scheduleUpdateTokenConnection() {
Expand Down Expand Up @@ -598,6 +630,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
ctx = opentracing.ContextWithSpan(ctx, span)
}

<<<<<<< HEAD
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
Expand All @@ -617,10 +650,59 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
if err = tsoClient.dispatchRequest(dcLocation, req); err != nil {
req.done <- err
}
=======
req := c.getTSORequest(ctx, dcLocation)
if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.done <- err
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
}
return req
}

<<<<<<< HEAD
=======
func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := tsoReqPool.Get().(*tsoRequest)
// Set needed fields in the request before using it.
req.start = time.Now()
req.clientCtx = c.ctx
req.requestCtx = ctx
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
return req
}

const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error {
var (
retryable bool
err error
)
for i := 0; i < dispatchRetryCount; i++ {
// Do not delay for the first time.
if i > 0 {
time.Sleep(dispatchRetryDelay)
}
// Get the tsoClient each time, as it may be initialized or switched during the process.
tsoClient := c.getTSOClient()
if tsoClient == nil {
err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
continue
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
break
}
}
return err
}

>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
resp := c.GetTSAsync(ctx)
return resp.Wait()
Expand Down
16 changes: 15 additions & 1 deletion client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package pd

import (
"context"
"runtime/trace"
"time"

"github.com/tikv/pd/client/tsoutil"
)

type tsoBatchController struct {
Expand Down Expand Up @@ -130,7 +133,18 @@ func (tbc *tsoBatchController) adjustBestBatchSize() {
}
}

func (tbc *tsoBatchController) revokePendingRequest(err error) {
func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < tbc.collectedRequestCount; i++ {
tsoReq := tbc.collectedRequests[i]
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End()
tsoReq.done <- err
}
// Prevent the finished requests from being processed again.
tbc.collectedRequestCount = 0
}

func (tbc *tsoBatchController) revokePendingRequests(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.done <- err
Expand Down
2 changes: 1 addition & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *tsoClient) Close() {
if dispatcherInterface != nil {
dispatcher := dispatcherInterface.(*tsoDispatcher)
tsoErr := errors.WithStack(errClosing)
dispatcher.tsoBatchController.revokePendingRequest(tsoErr)
dispatcher.tsoBatchController.revokePendingRequests(tsoErr)
dispatcher.dispatcherCancel()
}
return true
Expand Down
54 changes: 52 additions & 2 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) {
func (c *tsoClient) handleDispatcher(
dispatcherCtx context.Context,
dc string,
tbc *tsoBatchController) {
tbc *tsoBatchController,
) {
var (
err error
streamAddr string
Expand Down Expand Up @@ -377,7 +378,11 @@ tsoBatchLoop:
}
// Start to collect the TSO requests.
maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval()
// Once the TSO requests are collected, must make sure they could be finished or revoked eventually,
// otherwise the upper caller may get blocked on waiting for the results.
if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil {
// Finish the collected requests if the fetch failed.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err))
if err == context.Canceled {
log.Info("[tso] stop fetching the pending tso requests due to context canceled",
zap.String("dc-location", dc))
Expand Down Expand Up @@ -406,12 +411,24 @@ tsoBatchLoop:
}
select {
case <-dispatcherCtx.Done():
<<<<<<< HEAD
=======
// Finish the collected requests if the context is canceled.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err()))
timer.Stop()
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
return
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
<<<<<<< HEAD
c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
=======
// Finish the collected requests if the stream is failed to be created.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err))
timer.Stop()
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
continue tsoBatchLoop
case <-time.After(retryInterval):
continue streamChoosingLoop
Expand Down Expand Up @@ -443,11 +460,18 @@ tsoBatchLoop:
}
select {
case <-dispatcherCtx.Done():
// Finish the collected requests if the context is canceled.
tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err()))
return
case tsDeadlineCh.(chan deadline) <- dl:
}
<<<<<<< HEAD
opts = extractSpanReference(tbc, opts[:0])
err = c.processRequests(stream, dc, tbc, opts)
=======
// processRequests guarantees that the collected requests could be finished properly.
err = c.processRequests(stream, dc, tbc)
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
close(done)
// If error happens during tso stream handling, reset stream and run the next trial.
if err != nil {
Expand Down Expand Up @@ -698,6 +722,7 @@ func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanO
opts = append(opts, opentracing.ChildOf(span.Context()))
}
}
<<<<<<< HEAD
return opts
}

Expand All @@ -710,14 +735,36 @@ func (c *tsoClient) processRequests(stream tsoStream, dcLocation string, tbc *ts
requests := tbc.getCollectedRequests()
count := int64(len(requests))
physical, logical, suffixBits, err := stream.processRequests(c.svcDiscovery.GetClusterID(), dcLocation, requests, tbc.batchStartTime)
=======

count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, count, tbc.batchStartTime)
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
if err != nil {
c.finishRequest(requests, 0, 0, 0, err)
tbc.finishCollectedRequests(0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
<<<<<<< HEAD
firstLogical := addLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
=======
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
curTSOInfo := &tsoInfo{
tsoServer: stream.getServerURL(),
reqKeyspaceGroupID: reqKeyspaceGroupID,
respKeyspaceGroupID: respKeyspaceGroupID,
respReceivedAt: time.Now(),
physical: physical,
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil)
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
return nil
}

Expand Down Expand Up @@ -757,6 +804,7 @@ func tsLessEqual(physical, logical, thatPhysical, thatLogical int64) bool {
}
return physical < thatPhysical
}
<<<<<<< HEAD

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
for i := 0; i < len(requests); i++ {
Expand All @@ -767,3 +815,5 @@ func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical
requests[i].done <- err
}
}
=======
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
30 changes: 28 additions & 2 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,34 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha

type tsoStream interface {
// processRequests processes TSO requests in streaming mode to get timestamps
<<<<<<< HEAD
processRequests(clusterID uint64, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error)
=======
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
}

type pdTSOStream struct {
stream pdpb.PD_TsoClient
}

<<<<<<< HEAD
func (s *pdTSOStream) processRequests(clusterID uint64, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error) {
=======
func (s *pdTSOStream) getServerURL() string {
return s.serverURL
}

func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
start := time.Now()
count := int64(len(requests))
req := &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{
ClusterId: clusterID,
Expand Down Expand Up @@ -152,10 +168,20 @@ type tsoTSOStream struct {
stream tsopb.TSO_TsoClient
}

<<<<<<< HEAD
func (s *tsoTSOStream) processRequests(clusterID uint64, dcLocation string, requests []*tsoRequest,
batchStartTime time.Time) (physical, logical int64, suffixBits uint32, err error) {
=======
func (s *tsoTSOStream) getServerURL() string {
return s.serverURL
}

func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
count int64, batchStartTime time.Time,
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
>>>>>>> c00c42e77 (client/tso: fix the bug that collected TSO requests could never be finished (#7951))
start := time.Now()
count := int64(len(requests))
req := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: clusterID,
Expand Down
Loading
Loading