diff --git a/client/client.go b/client/client.go index eaebef7e10c..1865fd0866e 100644 --- a/client/client.go +++ b/client/client.go @@ -301,7 +301,7 @@ func (k *serviceModeKeeper) close() { fallthrough case pdpb.ServiceMode_PD_SVC_MODE: if k.tsoClient != nil { - k.tsoClient.Close() + k.tsoClient.close() } case pdpb.ServiceMode_UNKNOWN_SVC_MODE: } @@ -651,11 +651,11 @@ func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) { log.Warn("[pd] intend to switch to unknown service mode, just return") return } - newTSOCli.Setup() + newTSOCli.setup() // Replace the old TSO client. oldTSOClient := c.tsoClient c.tsoClient = newTSOCli - oldTSOClient.Close() + oldTSOClient.close() // Replace the old TSO service discovery if needed. oldTSOSvcDiscovery := c.tsoSvcDiscovery // If newTSOSvcDiscovery is nil, that's expected, as it means we are switching to PD service mode and diff --git a/client/tso_batch_controller.go b/client/tso_batch_controller.go index d7ba5d7e74b..a713b7a187d 100644 --- a/client/tso_batch_controller.go +++ b/client/tso_batch_controller.go @@ -139,9 +139,11 @@ func (tbc *tsoBatchController) adjustBestBatchSize() { func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) { for i := 0; i < tbc.collectedRequestCount; i++ { tsoReq := tbc.collectedRequests[i] + // Retrieve the request context before the request is done to trace without race. + requestCtx := tsoReq.requestCtx tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) - defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End() // nolint tsoReq.tryDone(err) + trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End() } // Prevent the finished requests from being processed again. tbc.collectedRequestCount = 0 diff --git a/client/tso_client.go b/client/tso_client.go index e3bdb835901..72b09d8054d 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -22,13 +22,11 @@ import ( "sync" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" - "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -36,6 +34,15 @@ import ( "google.golang.org/grpc/status" ) +const ( + tsoDispatcherCheckInterval = time.Minute + // defaultMaxTSOBatchSize is the default max size of the TSO request batch. + defaultMaxTSOBatchSize = 10000 + // retryInterval and maxRetryTimes are used to control the retry interval and max retry times. + retryInterval = 500 * time.Millisecond + maxRetryTimes = 6 +) + // TSOClient is the client used to get timestamps. type TSOClient interface { // GetTS gets a timestamp from PD or TSO microservice. @@ -70,14 +77,8 @@ type tsoClient struct { // tsoDispatcher is used to dispatch different TSO requests to // the corresponding dc-location TSO channel. tsoDispatcher sync.Map // Same as map[string]*tsoDispatcher - // dc-location -> deadline - tsDeadline sync.Map // Same as map[string]chan deadline - // dc-location -> *tsoInfo while the tsoInfo is the last TSO info - lastTSOInfoMap sync.Map // Same as map[string]*tsoInfo - - checkTSDeadlineCh chan struct{} - checkTSODispatcherCh chan struct{} - updateTSOConnectionCtxsCh chan struct{} + + checkTSODispatcherCh chan struct{} } // newTSOClient returns a new TSO client. @@ -101,49 +102,64 @@ func newTSOClient( } }, }, - checkTSDeadlineCh: make(chan struct{}), - checkTSODispatcherCh: make(chan struct{}, 1), - updateTSOConnectionCtxsCh: make(chan struct{}, 1), + checkTSODispatcherCh: make(chan struct{}, 1), } eventSrc := svcDiscovery.(tsoAllocatorEventSource) eventSrc.SetTSOLocalServURLsUpdatedCallback(c.updateTSOLocalServURLs) eventSrc.SetTSOGlobalServURLUpdatedCallback(c.updateTSOGlobalServURL) - c.svcDiscovery.AddServiceURLsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) + c.svcDiscovery.AddServiceURLsSwitchedCallback(c.scheduleUpdateAllTSOConnectionCtxs) return c } -func (c *tsoClient) Setup() { +func (c *tsoClient) getOption() *option { return c.option } + +func (c *tsoClient) getServiceDiscovery() ServiceDiscovery { return c.svcDiscovery } + +func (c *tsoClient) setup() { c.svcDiscovery.CheckMemberChanged() c.updateTSODispatcher() // Start the daemons. - c.wg.Add(2) + c.wg.Add(1) go c.tsoDispatcherCheckLoop() - go c.tsCancelLoop() } -// Close closes the TSO client -func (c *tsoClient) Close() { +func (c *tsoClient) tsoDispatcherCheckLoop() { + log.Info("[tso] start tso dispatcher check loop") + defer log.Info("[tso] exit tso dispatcher check loop") + defer c.wg.Done() + + loopCtx, loopCancel := context.WithCancel(c.ctx) + defer loopCancel() + + ticker := time.NewTicker(tsoDispatcherCheckInterval) + defer ticker.Stop() + for { + c.updateTSODispatcher() + select { + case <-ticker.C: + case <-c.checkTSODispatcherCh: + case <-loopCtx.Done(): + return + } + } +} + +// close closes the TSO client +func (c *tsoClient) close() { if c == nil { return } - log.Info("closing tso client") + log.Info("[tso] closing tso client") c.cancel() c.wg.Wait() - log.Info("close tso client") + log.Info("[tso] close tso client") c.closeTSODispatcher() - log.Info("tso client is closed") -} - -func (c *tsoClient) scheduleCheckTSDeadline() { - select { - case c.checkTSDeadlineCh <- struct{}{}: - default: - } + log.Info("[tso] tso client is closed") } func (c *tsoClient) scheduleCheckTSODispatcher() { @@ -153,11 +169,21 @@ func (c *tsoClient) scheduleCheckTSODispatcher() { } } -func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() { - select { - case c.updateTSOConnectionCtxsCh <- struct{}{}: - default: +// scheduleUpdateAllTSOConnectionCtxs update the TSO connection contexts for all dc-locations. +func (c *tsoClient) scheduleUpdateAllTSOConnectionCtxs() { + c.tsoDispatcher.Range(func(_, dispatcher any) bool { + dispatcher.(*tsoDispatcher).scheduleUpdateConnectionCtxs() + return true + }) +} + +// scheduleUpdateTSOConnectionCtxs update the TSO connection contexts for the given dc-location. +func (c *tsoClient) scheduleUpdateTSOConnectionCtxs(dcLocation string) { + dispatcher, ok := c.getTSODispatcher(dcLocation) + if !ok { + return } + dispatcher.scheduleUpdateConnectionCtxs() } // TSO Follower Proxy only supports the Global TSO proxy now. @@ -200,14 +226,12 @@ func (c *tsoClient) GetTSOAllocatorServingURLByDCLocation(dcLocation string) (st return url.(string), true } -// GetTSOAllocatorClientConnByDCLocation returns the tso allocator grpc client connection -// of the given dcLocation +// GetTSOAllocatorClientConnByDCLocation returns the TSO allocator gRPC client connection of the given dcLocation. func (c *tsoClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) { url, ok := c.tsoAllocators.Load(dcLocation) if !ok { - panic(fmt.Sprintf("the allocator leader in %s should exist", dcLocation)) + log.Fatal("[tso] the allocator leader should exist", zap.String("dc-location", dcLocation)) } - // todo: if we support local tso forward, we should get or create client conns. cc, ok := c.svcDiscovery.GetClientConns().Load(url) if !ok { return nil, url.(string) @@ -250,6 +274,8 @@ func (c *tsoClient) updateTSOLocalServURLs(allocatorMap map[string]string) error zap.String("dc-location", dcLocation), zap.String("new-url", url), zap.String("old-url", oldURL)) + // Should trigger the update of the connection contexts once the allocator leader is switched. + c.scheduleUpdateTSOConnectionCtxs(dcLocation) } // Garbage collection of the old TSO allocator primaries @@ -267,7 +293,7 @@ func (c *tsoClient) updateTSOGlobalServURL(url string) error { log.Info("[tso] switch dc tso global allocator serving url", zap.String("dc-location", globalDCLocation), zap.String("new-url", url)) - c.scheduleUpdateTSOConnectionCtxs() + c.scheduleUpdateTSOConnectionCtxs(globalDCLocation) c.scheduleCheckTSODispatcher() return nil } @@ -315,22 +341,27 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { return nil, "" } +// tsoConnectionContext is used to store the context of a TSO stream connection. type tsoConnectionContext struct { - streamURL string - // Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster, - // or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster - stream tsoStream ctx context.Context cancel context.CancelFunc + // Current URL of the stream connection. + streamURL string + // Current stream to send gRPC requests. + // - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster. + // - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster. + stream tsoStream } -func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { +// updateConnectionCtxs will choose the proper way to update the connections for the given dc-location. +// It will return a bool to indicate whether the update is successful. +func (c *tsoClient) updateConnectionCtxs(ctx context.Context, dc string, connectionCtxs *sync.Map) bool { // Normal connection creating, it will be affected by the `enableForwarding`. createTSOConnection := c.tryConnectToTSO if c.allowTSOFollowerProxy(dc) { createTSOConnection = c.tryConnectToTSOWithProxy } - if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { + if err := createTSOConnection(ctx, dc, connectionCtxs); err != nil { log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) return false } @@ -383,7 +414,7 @@ func (c *tsoClient) tryConnectToTSO( err = status.New(codes.Unavailable, "unavailable").Err() }) if stream != nil && err == nil { - updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) + updateAndClear(url, &tsoConnectionContext{cctx, cancel, url, stream}) return nil } @@ -428,7 +459,7 @@ func (c *tsoClient) tryConnectToTSO( // the goroutine is used to check the network and change back to the original stream go c.checkAllocator(ctx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) - updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel}) + updateAndClear(backupURL, &tsoConnectionContext{cctx, cancel, backupURL, stream}) return nil } cancel() @@ -437,6 +468,59 @@ func (c *tsoClient) tryConnectToTSO( return err } +func (c *tsoClient) checkAllocator( + ctx context.Context, + forwardCancel context.CancelFunc, + dc, forwardedHostTrim, addr, url string, + updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext), +) { + defer func() { + // cancel the forward stream + forwardCancel() + requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(0) + }() + cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc) + var healthCli healthpb.HealthClient + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + // the pd/allocator leader change, we need to re-establish the stream + if u != url { + log.Info("[tso] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u)) + return + } + if healthCli == nil && cc != nil { + healthCli = healthpb.NewHealthClient(cc) + } + if healthCli != nil { + healthCtx, healthCancel := context.WithTimeout(ctx, c.option.timeout) + resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) + failpoint.Inject("unreachableNetwork", func() { + resp.Status = healthpb.HealthCheckResponse_UNKNOWN + }) + healthCancel() + if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { + // create a stream of the original allocator + cctx, cancel := context.WithCancel(ctx) + stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + if err == nil && stream != nil { + log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) + updateAndClear(url, &tsoConnectionContext{cctx, cancel, url, stream}) + return + } + } + } + select { + case <-ctx.Done(): + return + case <-ticker.C: + // To ensure we can get the latest allocator leader + // and once the leader is changed, we can exit this function. + cc, u = c.GetTSOAllocatorClientConnByDCLocation(dc) + } + } +} + // tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as // a TSO proxy to reduce the pressure of the main serving service endpoint. func (c *tsoClient) tryConnectToTSOWithProxy( @@ -484,7 +568,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy( addrTrim := trimHTTPPrefix(addr) requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) } - connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) + connectionCtxs.Store(addr, &tsoConnectionContext{cctx, cancel, addr, stream}) continue } log.Error("[tso] create the tso stream failed", @@ -520,92 +604,90 @@ func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { return streamBuilders } -type tsoInfo struct { - tsoServer string - reqKeyspaceGroupID uint32 - respKeyspaceGroupID uint32 - respReceivedAt time.Time - physical int64 - logical int64 +func (c *tsoClient) createTSODispatcher(dcLocation string) { + dispatcher := newTSODispatcher(c.ctx, dcLocation, defaultMaxTSOBatchSize, c) + if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok { + // Create a new dispatcher for the dc-location to handle the TSO requests. + c.wg.Add(1) + go dispatcher.handleDispatcher(&c.wg) + } else { + dispatcher.close() + } } -func (c *tsoClient) processRequests( - stream tsoStream, dcLocation string, tbc *tsoBatchController, -) error { - requests := tbc.getCollectedRequests() - // nolint - for _, req := range requests { - defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End() - if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil { - span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context())) - defer span.Finish() +func (c *tsoClient) closeTSODispatcher() { + c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { + if dispatcherInterface != nil { + dispatcherInterface.(*tsoDispatcher).close() } - } + return true + }) +} - count := int64(len(requests)) - reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID() - respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( - c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID, - dcLocation, count, tbc.batchStartTime) - if err != nil { - tbc.finishCollectedRequests(0, 0, 0, err) - return err - } - // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. - firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) - curTSOInfo := &tsoInfo{ - tsoServer: stream.getServerURL(), - reqKeyspaceGroupID: reqKeyspaceGroupID, - respKeyspaceGroupID: respKeyspaceGroupID, - respReceivedAt: time.Now(), - physical: physical, - logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), - } - c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) - tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil) - return nil +func (c *tsoClient) updateTSODispatcher() { + // Set up the new TSO dispatcher and batch controller. + c.GetTSOAllocators().Range(func(dcLocationKey, _ any) bool { + dcLocation := dcLocationKey.(string) + if _, ok := c.getTSODispatcher(dcLocation); !ok { + c.createTSODispatcher(dcLocation) + } + return true + }) + // Clean up the unused TSO dispatcher + c.tsoDispatcher.Range(func(dcLocationKey, dispatcher any) bool { + dcLocation := dcLocationKey.(string) + // Skip the Global TSO Allocator + if dcLocation == globalDCLocation { + return true + } + if _, exist := c.GetTSOAllocators().Load(dcLocation); !exist { + log.Info("[tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) + c.tsoDispatcher.Delete(dcLocation) + dispatcher.(*tsoDispatcher).close() + } + return true + }) } -func (c *tsoClient) compareAndSwapTS( - dcLocation string, - curTSOInfo *tsoInfo, - physical, firstLogical int64, -) { - val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo) - if !loaded { - return - } - lastTSOInfo := val.(*tsoInfo) - if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { - log.Info("[tso] keyspace group changed", - zap.String("dc-location", dcLocation), - zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), - zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) +// dispatchRequest will send the TSO request to the corresponding TSO dispatcher. +func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { + dispatcher, ok := c.getTSODispatcher(request.dcLocation) + if !ok { + err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", request.dcLocation)) + log.Error("[tso] dispatch tso request error", zap.String("dc-location", request.dcLocation), errs.ZapError(err)) + c.svcDiscovery.ScheduleCheckMemberChanged() + // New dispatcher could be created in the meantime, which is retryable. + return true, err } - // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical - // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then - // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned - // last time. - if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { - log.Panic("[tso] timestamp fallback", - zap.String("dc-location", dcLocation), - zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), - zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), - zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), - zap.String("last-tso-server", lastTSOInfo.tsoServer), - zap.String("cur-tso-server", curTSOInfo.tsoServer), - zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), - zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), - zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), - zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), - zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), - zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) + defer trace.StartRegion(request.requestCtx, "pdclient.tsoReqEnqueue").End() + select { + case <-request.requestCtx.Done(): + // Caller cancelled the request, no need to retry. + return false, request.requestCtx.Err() + case <-request.clientCtx.Done(): + // Client is closed, no need to retry. + return false, request.clientCtx.Err() + case <-c.ctx.Done(): + // tsoClient is closed due to the PD service mode switch, which is retryable. + return true, c.ctx.Err() + default: + // This failpoint will increase the possibility that the request is sent to a closed dispatcher. + failpoint.Inject("delayDispatchTSORequest", func() { + time.Sleep(time.Second) + }) + dispatcher.push(request) + } + // Check the contexts again to make sure the request is not been sent to a closed dispatcher. + // Never retry on these conditions to prevent unexpected data race. + select { + case <-request.requestCtx.Done(): + return false, request.requestCtx.Err() + case <-request.clientCtx.Done(): + return false, request.clientCtx.Err() + case <-c.ctx.Done(): + return false, c.ctx.Err() + default: } - lastTSOInfo.tsoServer = curTSOInfo.tsoServer - lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID - lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID - lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt - lastTSOInfo.physical = curTSOInfo.physical - lastTSOInfo.logical = curTSOInfo.logical + return false, nil } diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index c82ec777eca..d5b52ad6039 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -22,113 +22,19 @@ import ( "sync" "time" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/timerpool" + "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" - healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -const ( - tsLoopDCCheckInterval = time.Minute - defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst - retryInterval = 500 * time.Millisecond - maxRetryTimes = 6 -) - -type tsoDispatcher struct { - dispatcherCancel context.CancelFunc - tsoBatchController *tsoBatchController -} - -func (td *tsoDispatcher) close() { - td.dispatcherCancel() - td.tsoBatchController.clear() -} - -func (td *tsoDispatcher) push(request *tsoRequest) { - td.tsoBatchController.tsoRequestCh <- request -} - -func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { - dispatcher, ok := c.getTSODispatcher(request.dcLocation) - if !ok { - err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", request.dcLocation)) - log.Error("[tso] dispatch tso request error", zap.String("dc-location", request.dcLocation), errs.ZapError(err)) - c.svcDiscovery.ScheduleCheckMemberChanged() - // New dispatcher could be created in the meantime, which is retryable. - return true, err - } - - defer trace.StartRegion(request.requestCtx, "pdclient.tsoReqEnqueue").End() - select { - case <-request.requestCtx.Done(): - // Caller cancelled the request, no need to retry. - return false, request.requestCtx.Err() - case <-request.clientCtx.Done(): - // Client is closed, no need to retry. - return false, request.clientCtx.Err() - case <-c.ctx.Done(): - // tsoClient is closed due to the PD service mode switch, which is retryable. - return true, c.ctx.Err() - default: - // This failpoint will increase the possibility that the request is sent to a closed dispatcher. - failpoint.Inject("delayDispatchTSORequest", func() { - time.Sleep(time.Second) - }) - dispatcher.push(request) - } - // Check the contexts again to make sure the request is not been sent to a closed dispatcher. - // Never retry on these conditions to prevent unexpected data race. - select { - case <-request.requestCtx.Done(): - return false, request.requestCtx.Err() - case <-request.clientCtx.Done(): - return false, request.clientCtx.Err() - case <-c.ctx.Done(): - return false, c.ctx.Err() - default: - } - return false, nil -} - -func (c *tsoClient) closeTSODispatcher() { - c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { - if dispatcherInterface != nil { - dispatcherInterface.(*tsoDispatcher).close() - } - return true - }) -} - -func (c *tsoClient) updateTSODispatcher() { - // Set up the new TSO dispatcher and batch controller. - c.GetTSOAllocators().Range(func(dcLocationKey, _ any) bool { - dcLocation := dcLocationKey.(string) - if _, ok := c.getTSODispatcher(dcLocation); !ok { - c.createTSODispatcher(dcLocation) - } - return true - }) - // Clean up the unused TSO dispatcher - c.tsoDispatcher.Range(func(dcLocationKey, dispatcher any) bool { - dcLocation := dcLocationKey.(string) - // Skip the Global TSO Allocator - if dcLocation == globalDCLocation { - return true - } - if _, exist := c.GetTSOAllocators().Load(dcLocation); !exist { - log.Info("[tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) - c.tsoDispatcher.Delete(dcLocation) - dispatcher.(*tsoDispatcher).close() - } - return true - }) -} - +// deadline is used to control the TS request timeout manually, +// it will be sent to the `tsDeadlineCh` to be handled by the `watchTSDeadline` goroutine. type deadline struct { timer *time.Timer done chan struct{} @@ -148,173 +54,118 @@ func newTSDeadline( } } -func (c *tsoClient) tsCancelLoop() { - defer c.wg.Done() - - tsCancelLoopCtx, tsCancelLoopCancel := context.WithCancel(c.ctx) - defer tsCancelLoopCancel() - - ticker := time.NewTicker(tsLoopDCCheckInterval) - defer ticker.Stop() - for { - // Watch every dc-location's tsDeadlineCh - c.GetTSOAllocators().Range(func(dcLocation, _ any) bool { - c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string)) - return true - }) - select { - case <-c.checkTSDeadlineCh: - continue - case <-ticker.C: - continue - case <-tsCancelLoopCtx.Done(): - log.Info("exit tso requests cancel loop") - return - } - } +type tsoInfo struct { + tsoServer string + reqKeyspaceGroupID uint32 + respKeyspaceGroupID uint32 + respReceivedAt time.Time + physical int64 + logical int64 } -func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { - if _, exist := c.tsDeadline.Load(dcLocation); !exist { - tsDeadlineCh := make(chan *deadline, 1) - c.tsDeadline.Store(dcLocation, tsDeadlineCh) - go func(dc string, tsDeadlineCh <-chan *deadline) { - for { - select { - case d := <-tsDeadlineCh: - select { - case <-d.timer.C: - log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) - d.cancel() - timerpool.GlobalTimerPool.Put(d.timer) - case <-d.done: - timerpool.GlobalTimerPool.Put(d.timer) - case <-ctx.Done(): - timerpool.GlobalTimerPool.Put(d.timer) - return - } - case <-ctx.Done(): - return - } - } - }(dcLocation, tsDeadlineCh) - } +type tsoServiceProvider interface { + getOption() *option + getServiceDiscovery() ServiceDiscovery + updateConnectionCtxs(ctx context.Context, dc string, connectionCtxs *sync.Map) bool } -func (c *tsoClient) tsoDispatcherCheckLoop() { - defer c.wg.Done() +type tsoDispatcher struct { + ctx context.Context + cancel context.CancelFunc + dc string - loopCtx, loopCancel := context.WithCancel(c.ctx) - defer loopCancel() + provider tsoServiceProvider + // URL -> *connectionContext + connectionCtxs *sync.Map + batchController *tsoBatchController + tsDeadlineCh chan *deadline + lastTSOInfo *tsoInfo - ticker := time.NewTicker(tsLoopDCCheckInterval) - defer ticker.Stop() - for { - c.updateTSODispatcher() - select { - case <-ticker.C: - case <-c.checkTSODispatcherCh: - case <-loopCtx.Done(): - log.Info("exit tso dispatcher loop") - return - } - } + updateConnectionCtxsCh chan struct{} } -func (c *tsoClient) checkAllocator( +func newTSODispatcher( ctx context.Context, - forwardCancel context.CancelFunc, - dc, forwardedHostTrim, addr, url string, - updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext)) { - defer func() { - // cancel the forward stream - forwardCancel() - requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(0) - }() - cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc) - var healthCli healthpb.HealthClient - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - // the pd/allocator leader change, we need to re-establish the stream - if u != url { - log.Info("[tso] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u)) - return - } - if healthCli == nil && cc != nil { - healthCli = healthpb.NewHealthClient(cc) - } - if healthCli != nil { - healthCtx, healthCancel := context.WithTimeout(ctx, c.option.timeout) - resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) - failpoint.Inject("unreachableNetwork", func() { - resp.Status = healthpb.HealthCheckResponse_UNKNOWN - }) - healthCancel() - if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { - // create a stream of the original allocator - cctx, cancel := context.WithCancel(ctx) - stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) - if err == nil && stream != nil { - log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) - updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) - return - } - } - } - select { - case <-ctx.Done(): - return - case <-ticker.C: - // To ensure we can get the latest allocator leader - // and once the leader is changed, we can exit this function. - cc, u = c.GetTSOAllocatorClientConnByDCLocation(dc) - } - } -} - -func (c *tsoClient) createTSODispatcher(dcLocation string) { - dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) + dc string, + maxBatchSize int, + provider tsoServiceProvider, +) *tsoDispatcher { + dispatcherCtx, dispatcherCancel := context.WithCancel(ctx) tsoBatchController := newTSOBatchController( - make(chan *tsoRequest, defaultMaxTSOBatchSize*2), - defaultMaxTSOBatchSize, + make(chan *tsoRequest, maxBatchSize*2), + maxBatchSize, ) failpoint.Inject("shortDispatcherChannel", func() { tsoBatchController = newTSOBatchController( make(chan *tsoRequest, 1), - defaultMaxTSOBatchSize, + maxBatchSize, ) }) - dispatcher := &tsoDispatcher{dispatcherCancel, tsoBatchController} + td := &tsoDispatcher{ + ctx: dispatcherCtx, + cancel: dispatcherCancel, + dc: dc, + provider: provider, + connectionCtxs: &sync.Map{}, + batchController: tsoBatchController, + tsDeadlineCh: make(chan *deadline, 1), + updateConnectionCtxsCh: make(chan struct{}, 1), + } + go td.watchTSDeadline() + return td +} - if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok { - // Successfully stored the value. Start the following goroutine. - // Each goroutine is responsible for handling the tso stream request for its dc-location. - // The only case that will make the dispatcher goroutine exit - // is that the loopCtx is done, otherwise there is no circumstance - // this goroutine should exit. - c.wg.Add(1) - go c.handleDispatcher(dispatcherCtx, dcLocation, dispatcher.tsoBatchController) - log.Info("[tso] tso dispatcher created", zap.String("dc-location", dcLocation)) - } else { - dispatcherCancel() +func (td *tsoDispatcher) watchTSDeadline() { + log.Info("[tso] start tso deadline watcher", zap.String("dc-location", td.dc)) + defer log.Info("[tso] exit tso deadline watcher", zap.String("dc-location", td.dc)) + for { + select { + case d := <-td.tsDeadlineCh: + select { + case <-d.timer.C: + log.Error("[tso] tso request is canceled due to timeout", + zap.String("dc-location", td.dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) + d.cancel() + timerpool.GlobalTimerPool.Put(d.timer) + case <-d.done: + timerpool.GlobalTimerPool.Put(d.timer) + case <-td.ctx.Done(): + timerpool.GlobalTimerPool.Put(d.timer) + return + } + case <-td.ctx.Done(): + return + } } } -func (c *tsoClient) handleDispatcher( - ctx context.Context, - dc string, - tbc *tsoBatchController, -) { +func (td *tsoDispatcher) scheduleUpdateConnectionCtxs() { + select { + case td.updateConnectionCtxsCh <- struct{}{}: + default: + } +} + +func (td *tsoDispatcher) close() { + td.cancel() + td.batchController.clear() +} + +func (td *tsoDispatcher) push(request *tsoRequest) { + td.batchController.tsoRequestCh <- request +} + +func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { var ( - err error - streamURL string - stream tsoStream - streamCtx context.Context - cancel context.CancelFunc - // url -> connectionContext - connectionCtxs sync.Map + ctx = td.ctx + dc = td.dc + provider = td.provider + svcDiscovery = provider.getServiceDiscovery() + option = provider.getOption() + connectionCtxs = td.connectionCtxs + batchController = td.batchController ) + log.Info("[tso] tso dispatcher created", zap.String("dc-location", dc)) // Clean up the connectionCtxs when the dispatcher exits. defer func() { log.Info("[tso] exit tso dispatcher", zap.String("dc-location", dc)) @@ -324,14 +175,21 @@ func (c *tsoClient) handleDispatcher( return true }) // Clear the tso batch controller. - tbc.clear() - c.wg.Done() + batchController.clear() + wg.Done() }() // Daemon goroutine to update the connectionCtxs periodically and handle the `connectionCtxs` update event. - go c.connectionCtxsUpdater(ctx, dc, &connectionCtxs) + go td.connectionCtxsUpdater() + var ( + err error + streamCtx context.Context + cancel context.CancelFunc + streamURL string + stream tsoStream + ) // Loop through each batch of TSO requests and send them for processing. - streamLoopTimer := time.NewTimer(c.option.timeout) + streamLoopTimer := time.NewTimer(option.timeout) defer streamLoopTimer.Stop() bo := retry.InitialBackoffer(updateMemberBackOffBaseTime, updateMemberTimeout, updateMemberBackOffBaseTime) tsoBatchLoop: @@ -342,12 +200,12 @@ tsoBatchLoop: default: } // Start to collect the TSO requests. - maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() + maxBatchWaitInterval := option.getMaxTSOBatchWaitInterval() // Once the TSO requests are collected, must make sure they could be finished or revoked eventually, // otherwise the upper caller may get blocked on waiting for the results. - if err = tbc.fetchPendingRequests(ctx, maxBatchWaitInterval); err != nil { + if err = batchController.fetchPendingRequests(ctx, maxBatchWaitInterval); err != nil { // Finish the collected requests if the fetch failed. - tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) + batchController.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) if err == context.Canceled { log.Info("[tso] stop fetching the pending tso requests due to context canceled", zap.String("dc-location", dc)) @@ -359,7 +217,7 @@ tsoBatchLoop: return } if maxBatchWaitInterval >= 0 { - tbc.adjustBestBatchSize() + batchController.adjustBestBatchSize() } // Stop the timer if it's not stopped. if !streamLoopTimer.Stop() { @@ -370,33 +228,33 @@ tsoBatchLoop: } // We need be careful here, see more details in the comments of Timer.Reset. // https://pkg.go.dev/time@master#Timer.Reset - streamLoopTimer.Reset(c.option.timeout) + streamLoopTimer.Reset(option.timeout) // Choose a stream to send the TSO gRPC request. streamChoosingLoop: for { - connectionCtx := chooseStream(&connectionCtxs) + connectionCtx := chooseStream(connectionCtxs) if connectionCtx != nil { - streamURL, stream, streamCtx, cancel = connectionCtx.streamURL, connectionCtx.stream, connectionCtx.ctx, connectionCtx.cancel + streamCtx, cancel, streamURL, stream = connectionCtx.ctx, connectionCtx.cancel, connectionCtx.streamURL, connectionCtx.stream } // Check stream and retry if necessary. if stream == nil { log.Info("[tso] tso stream is not ready", zap.String("dc", dc)) - if c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) { + if provider.updateConnectionCtxs(ctx, dc, connectionCtxs) { continue streamChoosingLoop } timer := time.NewTimer(retryInterval) select { case <-ctx.Done(): // Finish the collected requests if the context is canceled. - tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err())) + batchController.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err())) timer.Stop() return case <-streamLoopTimer.C: err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) - c.svcDiscovery.ScheduleCheckMemberChanged() + svcDiscovery.ScheduleCheckMemberChanged() // Finish the collected requests if the stream is failed to be created. - tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) + batchController.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) timer.Stop() continue tsoBatchLoop case <-timer.C: @@ -417,22 +275,16 @@ tsoBatchLoop: } } done := make(chan struct{}) - dl := newTSDeadline(c.option.timeout, done, cancel) - tsDeadlineCh, ok := c.tsDeadline.Load(dc) - for !ok || tsDeadlineCh == nil { - c.scheduleCheckTSDeadline() - time.Sleep(time.Millisecond * 100) - tsDeadlineCh, ok = c.tsDeadline.Load(dc) - } + dl := newTSDeadline(option.timeout, done, cancel) select { case <-ctx.Done(): // Finish the collected requests if the context is canceled. - tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err())) + batchController.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err())) return - case tsDeadlineCh.(chan *deadline) <- dl: + case td.tsDeadlineCh <- dl: } // processRequests guarantees that the collected requests could be finished properly. - err = c.processRequests(stream, dc, tbc) + err = td.processRequests(stream, dc, td.batchController) close(done) // If error happens during tso stream handling, reset stream and run the next trial. if err != nil { @@ -441,7 +293,7 @@ tsoBatchLoop: return default: } - c.svcDiscovery.ScheduleCheckMemberChanged() + svcDiscovery.ScheduleCheckMemberChanged() log.Error("[tso] getTS error after processing requests", zap.String("dc-location", dc), zap.String("stream-url", streamURL), @@ -452,7 +304,7 @@ tsoBatchLoop: stream = nil // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. if IsLeaderChange(err) { - if err := bo.Exec(ctx, c.svcDiscovery.CheckMemberChanged); err != nil { + if err := bo.Exec(ctx, svcDiscovery.CheckMemberChanged); err != nil { select { case <-ctx.Done(): return @@ -460,28 +312,28 @@ tsoBatchLoop: } } // Because the TSO Follower Proxy could be configured online, - // If we change it from on -> off, background updateTSOConnectionCtxs + // If we change it from on -> off, background updateConnectionCtxs // will cancel the current stream, then the EOF error caused by cancel() - // should not trigger the updateTSOConnectionCtxs here. + // should not trigger the updateConnectionCtxs here. // So we should only call it when the leader changes. - c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) + provider.updateConnectionCtxs(ctx, dc, connectionCtxs) } } } } -// updateTSOConnectionCtxs updates the `connectionCtxs` for the specified DC location regularly. -// TODO: implement support for the Local TSO. -func (c *tsoClient) connectionCtxsUpdater( - ctx context.Context, - dc string, - connectionCtxs *sync.Map, -) { - if dc != globalDCLocation { - return - } +// updateConnectionCtxs updates the `connectionCtxs` for the specified DC location regularly. +func (td *tsoDispatcher) connectionCtxsUpdater() { + var ( + ctx = td.ctx + dc = td.dc + connectionCtxs = td.connectionCtxs + provider = td.provider + option = td.provider.getOption() + updateTicker = &time.Ticker{} + ) + log.Info("[tso] start tso connection contexts updater", zap.String("dc-location", dc)) - var updateTicker = &time.Ticker{} setNewUpdateTicker := func(ticker *time.Ticker) { if updateTicker.C != nil { updateTicker.Stop() @@ -492,13 +344,17 @@ func (c *tsoClient) connectionCtxsUpdater( defer setNewUpdateTicker(nil) for { - c.updateTSOConnectionCtxs(ctx, dc, connectionCtxs) + provider.updateConnectionCtxs(ctx, dc, connectionCtxs) select { case <-ctx.Done(): log.Info("[tso] exit tso connection contexts updater", zap.String("dc-location", dc)) return - case <-c.option.enableTSOFollowerProxyCh: - enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() + case <-option.enableTSOFollowerProxyCh: + // TODO: implement support of TSO Follower Proxy for the Local TSO. + if dc != globalDCLocation { + continue + } + enableTSOFollowerProxy := option.getEnableTSOFollowerProxy() log.Info("[tso] tso follower proxy status changed", zap.String("dc-location", dc), zap.Bool("enable", enableTSOFollowerProxy)) @@ -515,7 +371,7 @@ func (c *tsoClient) connectionCtxsUpdater( } case <-updateTicker.C: // Triggered periodically when the TSO Follower Proxy is enabled. - case <-c.updateTSOConnectionCtxsCh: + case <-td.updateConnectionCtxsCh: // Triggered by the leader/follower change. } } @@ -535,3 +391,94 @@ func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext }) return connectionCtx } + +func (td *tsoDispatcher) processRequests( + stream tsoStream, dcLocation string, tbc *tsoBatchController, +) error { + var ( + requests = tbc.getCollectedRequests() + traceRegions = make([]*trace.Region, 0, len(requests)) + spans = make([]opentracing.Span, 0, len(requests)) + ) + for _, req := range requests { + traceRegions = append(traceRegions, trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend")) + if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil { + spans = append(spans, span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context()))) + } + } + defer func() { + for i := range spans { + spans[i].Finish() + } + for i := range traceRegions { + traceRegions[i].End() + } + }() + + var ( + count = int64(len(requests)) + svcDiscovery = td.provider.getServiceDiscovery() + clusterID = svcDiscovery.GetClusterID() + keyspaceID = svcDiscovery.GetKeyspaceID() + reqKeyspaceGroupID = svcDiscovery.GetKeyspaceGroupID() + ) + respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( + clusterID, keyspaceID, reqKeyspaceGroupID, + dcLocation, count, tbc.batchStartTime) + if err != nil { + tbc.finishCollectedRequests(0, 0, 0, err) + return err + } + curTSOInfo := &tsoInfo{ + tsoServer: stream.getServerURL(), + reqKeyspaceGroupID: reqKeyspaceGroupID, + respKeyspaceGroupID: respKeyspaceGroupID, + respReceivedAt: time.Now(), + physical: physical, + logical: logical, + } + // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. + firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) + td.compareAndSwapTS(curTSOInfo, firstLogical) + tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil) + return nil +} + +func (td *tsoDispatcher) compareAndSwapTS( + curTSOInfo *tsoInfo, firstLogical int64, +) { + if td.lastTSOInfo != nil { + var ( + lastTSOInfo = td.lastTSOInfo + dc = td.dc + physical = curTSOInfo.physical + keyspaceID = td.provider.getServiceDiscovery().GetKeyspaceID() + ) + if td.lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { + log.Info("[tso] keyspace group changed", + zap.String("dc-location", dc), + zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) + } + // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical + // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then + // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned + // last time. + if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { + log.Panic("[tso] timestamp fallback", + zap.String("dc-location", dc), + zap.Uint32("keyspace", keyspaceID), + zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("last-tso-server", lastTSOInfo.tsoServer), + zap.String("cur-tso-server", curTSOInfo.tsoServer), + zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), + zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), + zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) + } + } + td.lastTSOInfo = curTSOInfo +} diff --git a/client/tso_request.go b/client/tso_request.go index f30ceb5268a..b912fa35497 100644 --- a/client/tso_request.go +++ b/client/tso_request.go @@ -63,8 +63,8 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { cmdDurationTSOAsyncWait.Observe(start.Sub(req.start).Seconds()) select { case err = <-req.done: - defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End() defer req.pool.Put(req) + defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqDone").End() err = errors.WithStack(err) if err != nil { cmdFailDurationTSO.Observe(time.Since(req.start).Seconds())