diff --git a/client/tso_client.go b/client/tso_client.go index 8185b99d1d0..347d1f6ec0a 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -18,14 +18,22 @@ import ( "context" "fmt" "math/rand" + "runtime/trace" "sync" "time" + "github.com/opentracing/opentracing-go" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/grpcutil" + "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" ) // TSOClient is the client used to get timestamps. @@ -127,18 +135,36 @@ func (c *tsoClient) Close() { c.wg.Wait() log.Info("close tso client") - c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { - if dispatcherInterface != nil { - dispatcher := dispatcherInterface.(*tsoDispatcher) - dispatcher.dispatcherCancel() - dispatcher.tsoBatchController.clear() - } - return true - }) - + c.closeTSODispatcher() log.Info("tso client is closed") } +func (c *tsoClient) scheduleCheckTSDeadline() { + select { + case c.checkTSDeadlineCh <- struct{}{}: + default: + } +} + +func (c *tsoClient) scheduleCheckTSODispatcher() { + select { + case c.checkTSODispatcherCh <- struct{}{}: + default: + } +} + +func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() { + select { + case c.updateTSOConnectionCtxsCh <- struct{}{}: + default: + } +} + +// TSO Follower Proxy only supports the Global TSO proxy now. +func (c *tsoClient) allowTSOFollowerProxy(dc string) bool { + return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy() +} + func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest { req := c.tsoReqPool.Get().(*tsoRequest) // Set needed fields in the request before using it. @@ -279,3 +305,293 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { } return nil, "" } + +type tsoConnectionContext struct { + streamURL string + // Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster, + // or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster + stream tsoStream + ctx context.Context + cancel context.CancelFunc +} + +func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { + // Normal connection creating, it will be affected by the `enableForwarding`. + createTSOConnection := c.tryConnectToTSO + if c.allowTSOFollowerProxy(dc) { + createTSOConnection = c.tryConnectToTSOWithProxy + } + if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { + log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) + return false + } + return true +} + +// tryConnectToTSO will try to connect to the TSO allocator leader. If the connection becomes unreachable +// and enableForwarding is true, it will create a new connection to a follower to do the forwarding, +// while a new daemon will be created also to switch back to a normal leader connection ASAP the +// connection comes back to normal. +func (c *tsoClient) tryConnectToTSO( + dispatcherCtx context.Context, + dc string, + connectionCtxs *sync.Map, +) error { + var ( + networkErrNum uint64 + err error + stream tsoStream + url string + cc *grpc.ClientConn + ) + updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) { + if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded { + // If the previous connection still exists, we should close it first. + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Store(newURL, connectionCtx) + } + connectionCtxs.Range(func(url, cc any) bool { + if url.(string) != newURL { + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(url) + } + return true + }) + } + // retry several times before falling back to the follower when the network problem happens + + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() + for i := 0; i < maxRetryTimes; i++ { + c.svcDiscovery.ScheduleCheckMemberChanged() + cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) + if cc != nil { + cctx, cancel := context.WithCancel(dispatcherCtx) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + failpoint.Inject("unreachableNetwork", func() { + stream = nil + err = status.New(codes.Unavailable, "unavailable").Err() + }) + if stream != nil && err == nil { + updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) + return nil + } + + if err != nil && c.option.enableForwarding { + // The reason we need to judge if the error code is equal to "Canceled" here is that + // when we create a stream we use a goroutine to manually control the timeout of the connection. + // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. + // But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error. + // And actually the `Canceled` error can be regarded as a kind of network error in some way. + if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) { + networkErrNum++ + } + } + cancel() + } else { + networkErrNum++ + } + select { + case <-dispatcherCtx.Done(): + return err + case <-ticker.C: + } + } + + if networkErrNum == maxRetryTimes { + // encounter the network error + backupClientConn, backupURL := c.backupClientConn() + if backupClientConn != nil { + log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("follower-url", backupURL)) + forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) + if !ok { + return errors.Errorf("cannot find the allocator leader in %s", dc) + } + + // create the follower stream + cctx, cancel := context.WithCancel(dispatcherCtx) + cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) + if err == nil { + forwardedHostTrim := trimHTTPPrefix(forwardedHost) + addr := trimHTTPPrefix(backupURL) + // the goroutine is used to check the network and change back to the original stream + go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) + requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) + updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel}) + return nil + } + cancel() + } + } + return err +} + +// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as +// a TSO proxy to reduce the pressure of the main serving service endpoint. +func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { + tsoStreamBuilders := c.getAllTSOStreamBuilders() + leaderAddr := c.svcDiscovery.GetServingURL() + forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) + if !ok { + return errors.Errorf("cannot find the allocator leader in %s", dc) + } + // GC the stale one. + connectionCtxs.Range(func(addr, cc any) bool { + addrStr := addr.(string) + if _, ok := tsoStreamBuilders[addrStr]; !ok { + log.Info("[tso] remove the stale tso stream", + zap.String("dc", dc), + zap.String("addr", addrStr)) + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(addr) + } + return true + }) + // Update the missing one. + for addr, tsoStreamBuilder := range tsoStreamBuilders { + if _, ok = connectionCtxs.Load(addr); ok { + continue + } + log.Info("[tso] try to create tso stream", + zap.String("dc", dc), zap.String("addr", addr)) + cctx, cancel := context.WithCancel(dispatcherCtx) + // Do not proxy the leader client. + if addr != leaderAddr { + log.Info("[tso] use follower to forward tso stream to do the proxy", + zap.String("dc", dc), zap.String("addr", addr)) + cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) + } + // Create the TSO stream. + stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout) + if err == nil { + if addr != leaderAddr { + forwardedHostTrim := trimHTTPPrefix(forwardedHost) + addrTrim := trimHTTPPrefix(addr) + requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) + } + connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) + continue + } + log.Error("[tso] create the tso stream failed", + zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err)) + cancel() + } + return nil +} + +// getAllTSOStreamBuilders returns a TSO stream builder for every service endpoint of TSO leader/followers +// or of keyspace group primary/secondaries. +func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { + var ( + addrs = c.svcDiscovery.GetServiceURLs() + streamBuilders = make(map[string]tsoStreamBuilder, len(addrs)) + cc *grpc.ClientConn + err error + ) + for _, addr := range addrs { + if len(addrs) == 0 { + continue + } + if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { + continue + } + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) + resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) + healthCancel() + if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { + streamBuilders[addr] = c.tsoStreamBuilderFactory.makeBuilder(cc) + } + } + return streamBuilders +} + +type tsoInfo struct { + tsoServer string + reqKeyspaceGroupID uint32 + respKeyspaceGroupID uint32 + respReceivedAt time.Time + physical int64 + logical int64 +} + +func (c *tsoClient) processRequests( + stream tsoStream, dcLocation string, tbc *tsoBatchController, +) error { + requests := tbc.getCollectedRequests() + // nolint + for _, req := range requests { + defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End() + if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil { + span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + } + + count := int64(len(requests)) + reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID() + respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( + c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID, + dcLocation, count, tbc.batchStartTime) + if err != nil { + tbc.finishCollectedRequests(0, 0, 0, err) + return err + } + // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. + firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) + curTSOInfo := &tsoInfo{ + tsoServer: stream.getServerURL(), + reqKeyspaceGroupID: reqKeyspaceGroupID, + respKeyspaceGroupID: respKeyspaceGroupID, + respReceivedAt: time.Now(), + physical: physical, + logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), + } + c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) + tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil) + return nil +} + +func (c *tsoClient) compareAndSwapTS( + dcLocation string, + curTSOInfo *tsoInfo, + physical, firstLogical int64, +) { + val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo) + if !loaded { + return + } + lastTSOInfo := val.(*tsoInfo) + if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { + log.Info("[tso] keyspace group changed", + zap.String("dc-location", dcLocation), + zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) + } + + // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical + // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then + // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned + // last time. + if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { + log.Panic("[tso] timestamp fallback", + zap.String("dc-location", dcLocation), + zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), + zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("last-tso-server", lastTSOInfo.tsoServer), + zap.String("cur-tso-server", curTSOInfo.tsoServer), + zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), + zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), + zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) + } + lastTSOInfo.tsoServer = curTSOInfo.tsoServer + lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID + lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID + lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt + lastTSOInfo.physical = curTSOInfo.physical + lastTSOInfo.logical = curTSOInfo.logical +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index d02fdd52af8..7528293a733 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -22,36 +22,16 @@ import ( "sync" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" - "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/timerpool" - "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" healthpb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" ) -type tsoDispatcher struct { - dispatcherCancel context.CancelFunc - tsoBatchController *tsoBatchController -} - -type tsoInfo struct { - tsoServer string - reqKeyspaceGroupID uint32 - respKeyspaceGroupID uint32 - respReceivedAt time.Time - physical int64 - logical int64 -} - const ( tsLoopDCCheckInterval = time.Minute defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst @@ -59,18 +39,9 @@ const ( maxRetryTimes = 6 ) -func (c *tsoClient) scheduleCheckTSODispatcher() { - select { - case c.checkTSODispatcherCh <- struct{}{}: - default: - } -} - -func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() { - select { - case c.updateTSOConnectionCtxsCh <- struct{}{}: - default: - } +type tsoDispatcher struct { + dispatcherCancel context.CancelFunc + tsoBatchController *tsoBatchController } func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { @@ -115,6 +86,17 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { return false, nil } +func (c *tsoClient) closeTSODispatcher() { + c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { + if dispatcherInterface != nil { + dispatcher := dispatcherInterface.(*tsoDispatcher) + dispatcher.dispatcherCancel() + dispatcher.tsoBatchController.clear() + } + return true + }) +} + func (c *tsoClient) updateTSODispatcher() { // Set up the new TSO dispatcher and batch controller. c.GetTSOAllocators().Range(func(dcLocationKey, _ any) bool { @@ -212,13 +194,6 @@ func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { } } -func (c *tsoClient) scheduleCheckTSDeadline() { - select { - case c.checkTSDeadlineCh <- struct{}{}: - default: - } -} - func (c *tsoClient) tsoDispatcherCheckLoop() { defer c.wg.Done() @@ -445,7 +420,7 @@ tsoBatchLoop: // Choose a stream to send the TSO gRPC request. streamChoosingLoop: for { - connectionCtx := c.chooseStream(&connectionCtxs) + connectionCtx := chooseStream(&connectionCtxs) if connectionCtx != nil { streamURL, stream, streamCtx, cancel = connectionCtx.streamURL, connectionCtx.stream, connectionCtx.ctx, connectionCtx.cancel } @@ -541,14 +516,9 @@ tsoBatchLoop: } } -// TSO Follower Proxy only supports the Global TSO proxy now. -func (c *tsoClient) allowTSOFollowerProxy(dc string) bool { - return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy() -} - // chooseStream uses the reservoir sampling algorithm to randomly choose a connection. // connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off. -func (*tsoClient) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) { +func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) { idx := 0 connectionCtxs.Range(func(_, cc any) bool { j := rand.Intn(idx + 1) @@ -560,284 +530,3 @@ func (*tsoClient) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConn }) return connectionCtx } - -type tsoConnectionContext struct { - streamURL string - // Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster, - // or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster - stream tsoStream - ctx context.Context - cancel context.CancelFunc -} - -func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { - // Normal connection creating, it will be affected by the `enableForwarding`. - createTSOConnection := c.tryConnectToTSO - if c.allowTSOFollowerProxy(dc) { - createTSOConnection = c.tryConnectToTSOWithProxy - } - if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { - log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) - return false - } - return true -} - -// tryConnectToTSO will try to connect to the TSO allocator leader. If the connection becomes unreachable -// and enableForwarding is true, it will create a new connection to a follower to do the forwarding, -// while a new daemon will be created also to switch back to a normal leader connection ASAP the -// connection comes back to normal. -func (c *tsoClient) tryConnectToTSO( - dispatcherCtx context.Context, - dc string, - connectionCtxs *sync.Map, -) error { - var ( - networkErrNum uint64 - err error - stream tsoStream - url string - cc *grpc.ClientConn - ) - updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) { - if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded { - // If the previous connection still exists, we should close it first. - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Store(newURL, connectionCtx) - } - connectionCtxs.Range(func(url, cc any) bool { - if url.(string) != newURL { - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Delete(url) - } - return true - }) - } - // retry several times before falling back to the follower when the network problem happens - - ticker := time.NewTicker(retryInterval) - defer ticker.Stop() - for i := 0; i < maxRetryTimes; i++ { - c.svcDiscovery.ScheduleCheckMemberChanged() - cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) - if cc != nil { - cctx, cancel := context.WithCancel(dispatcherCtx) - stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) - failpoint.Inject("unreachableNetwork", func() { - stream = nil - err = status.New(codes.Unavailable, "unavailable").Err() - }) - if stream != nil && err == nil { - updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) - return nil - } - - if err != nil && c.option.enableForwarding { - // The reason we need to judge if the error code is equal to "Canceled" here is that - // when we create a stream we use a goroutine to manually control the timeout of the connection. - // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. - // But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error. - // And actually the `Canceled` error can be regarded as a kind of network error in some way. - if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) { - networkErrNum++ - } - } - cancel() - } else { - networkErrNum++ - } - select { - case <-dispatcherCtx.Done(): - return err - case <-ticker.C: - } - } - - if networkErrNum == maxRetryTimes { - // encounter the network error - backupClientConn, backupURL := c.backupClientConn() - if backupClientConn != nil { - log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("follower-url", backupURL)) - forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) - if !ok { - return errors.Errorf("cannot find the allocator leader in %s", dc) - } - - // create the follower stream - cctx, cancel := context.WithCancel(dispatcherCtx) - cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) - stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) - if err == nil { - forwardedHostTrim := trimHTTPPrefix(forwardedHost) - addr := trimHTTPPrefix(backupURL) - // the goroutine is used to check the network and change back to the original stream - go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) - requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) - updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel}) - return nil - } - cancel() - } - } - return err -} - -// getAllTSOStreamBuilders returns a TSO stream builder for every service endpoint of TSO leader/followers -// or of keyspace group primary/secondaries. -func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { - var ( - addrs = c.svcDiscovery.GetServiceURLs() - streamBuilders = make(map[string]tsoStreamBuilder, len(addrs)) - cc *grpc.ClientConn - err error - ) - for _, addr := range addrs { - if len(addrs) == 0 { - continue - } - if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { - continue - } - healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) - resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) - healthCancel() - if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { - streamBuilders[addr] = c.tsoStreamBuilderFactory.makeBuilder(cc) - } - } - return streamBuilders -} - -// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as -// a TSO proxy to reduce the pressure of the main serving service endpoint. -func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { - tsoStreamBuilders := c.getAllTSOStreamBuilders() - leaderAddr := c.svcDiscovery.GetServingURL() - forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) - if !ok { - return errors.Errorf("cannot find the allocator leader in %s", dc) - } - // GC the stale one. - connectionCtxs.Range(func(addr, cc any) bool { - addrStr := addr.(string) - if _, ok := tsoStreamBuilders[addrStr]; !ok { - log.Info("[tso] remove the stale tso stream", - zap.String("dc", dc), - zap.String("addr", addrStr)) - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Delete(addr) - } - return true - }) - // Update the missing one. - for addr, tsoStreamBuilder := range tsoStreamBuilders { - if _, ok = connectionCtxs.Load(addr); ok { - continue - } - log.Info("[tso] try to create tso stream", - zap.String("dc", dc), zap.String("addr", addr)) - cctx, cancel := context.WithCancel(dispatcherCtx) - // Do not proxy the leader client. - if addr != leaderAddr { - log.Info("[tso] use follower to forward tso stream to do the proxy", - zap.String("dc", dc), zap.String("addr", addr)) - cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) - } - // Create the TSO stream. - stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout) - if err == nil { - if addr != leaderAddr { - forwardedHostTrim := trimHTTPPrefix(forwardedHost) - addrTrim := trimHTTPPrefix(addr) - requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1) - } - connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) - continue - } - log.Error("[tso] create the tso stream failed", - zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err)) - cancel() - } - return nil -} - -func (c *tsoClient) processRequests( - stream tsoStream, dcLocation string, tbc *tsoBatchController, -) error { - requests := tbc.getCollectedRequests() - // nolint - for _, req := range requests { - defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End() - if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil { - span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context())) - defer span.Finish() - } - } - - count := int64(len(requests)) - reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID() - respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( - c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID, - dcLocation, count, tbc.batchStartTime) - if err != nil { - tbc.finishCollectedRequests(0, 0, 0, err) - return err - } - // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. - firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) - curTSOInfo := &tsoInfo{ - tsoServer: stream.getServerURL(), - reqKeyspaceGroupID: reqKeyspaceGroupID, - respKeyspaceGroupID: respKeyspaceGroupID, - respReceivedAt: time.Now(), - physical: physical, - logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), - } - c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) - tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil) - return nil -} - -func (c *tsoClient) compareAndSwapTS( - dcLocation string, - curTSOInfo *tsoInfo, - physical, firstLogical int64, -) { - val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo) - if !loaded { - return - } - lastTSOInfo := val.(*tsoInfo) - if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { - log.Info("[tso] keyspace group changed", - zap.String("dc-location", dcLocation), - zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), - zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) - } - - // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical - // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then - // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned - // last time. - if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { - log.Panic("[tso] timestamp fallback", - zap.String("dc-location", dcLocation), - zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), - zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), - zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), - zap.String("last-tso-server", lastTSOInfo.tsoServer), - zap.String("cur-tso-server", curTSOInfo.tsoServer), - zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), - zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), - zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), - zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), - zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), - zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) - } - lastTSOInfo.tsoServer = curTSOInfo.tsoServer - lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID - lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID - lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt - lastTSOInfo.physical = curTSOInfo.physical - lastTSOInfo.logical = curTSOInfo.logical -}