Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/tso: organize the methods of TSO dispatcher #8121

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 325 additions & 9 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@ import (
"context"
"fmt"
"math/rand"
"runtime/trace"
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)

// TSOClient is the client used to get timestamps.
Expand Down Expand Up @@ -127,18 +135,36 @@ func (c *tsoClient) Close() {
c.wg.Wait()

log.Info("close tso client")
c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool {
if dispatcherInterface != nil {
dispatcher := dispatcherInterface.(*tsoDispatcher)
dispatcher.dispatcherCancel()
dispatcher.tsoBatchController.clear()
}
return true
})

c.closeTSODispatcher()
log.Info("tso client is closed")
}

func (c *tsoClient) scheduleCheckTSDeadline() {
select {
case c.checkTSDeadlineCh <- struct{}{}:
default:
}
}

func (c *tsoClient) scheduleCheckTSODispatcher() {
select {
case c.checkTSODispatcherCh <- struct{}{}:
default:
}
}

func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() {
select {
case c.updateTSOConnectionCtxsCh <- struct{}{}:
default:
}
}

// TSO Follower Proxy only supports the Global TSO proxy now.
func (c *tsoClient) allowTSOFollowerProxy(dc string) bool {
return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy()
}

func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := c.tsoReqPool.Get().(*tsoRequest)
// Set needed fields in the request before using it.
Expand Down Expand Up @@ -279,3 +305,293 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) {
}
return nil, ""
}

type tsoConnectionContext struct {
streamURL string
// Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster,
// or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster
stream tsoStream
ctx context.Context
cancel context.CancelFunc
}

func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnectToTSO
if c.allowTSOFollowerProxy(dc) {
createTSOConnection = c.tryConnectToTSOWithProxy
}
if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil {
log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err))
return false
}
return true
}

// tryConnectToTSO will try to connect to the TSO allocator leader. If the connection becomes unreachable
// and enableForwarding is true, it will create a new connection to a follower to do the forwarding,
// while a new daemon will be created also to switch back to a normal leader connection ASAP the
// connection comes back to normal.
func (c *tsoClient) tryConnectToTSO(
dispatcherCtx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
var (
networkErrNum uint64
err error
stream tsoStream
url string
cc *grpc.ClientConn
)
updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded {
// If the previous connection still exists, we should close it first.
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Store(newURL, connectionCtx)
}
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
// retry several times before falling back to the follower when the network problem happens

ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
if cc != nil {
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
failpoint.Inject("unreachableNetwork", func() {
stream = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
if stream != nil && err == nil {
updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel})
return nil
}

if err != nil && c.option.enableForwarding {
// The reason we need to judge if the error code is equal to "Canceled" here is that
// when we create a stream we use a goroutine to manually control the timeout of the connection.
// There is no need to wait for the transport layer timeout which can reduce the time of unavailability.
// But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error.
// And actually the `Canceled` error can be regarded as a kind of network error in some way.
if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) {
networkErrNum++
}
}
cancel()
} else {
networkErrNum++
}
select {
case <-dispatcherCtx.Done():
return err
case <-ticker.C:
}
}

if networkErrNum == maxRetryTimes {
// encounter the network error
backupClientConn, backupURL := c.backupClientConn()
if backupClientConn != nil {
log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("follower-url", backupURL))
forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc)
if !ok {
return errors.Errorf("cannot find the allocator leader in %s", dc)
}

// create the follower stream
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout)
if err == nil {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addr := trimHTTPPrefix(backupURL)
// the goroutine is used to check the network and change back to the original stream
go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1)
updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel})
return nil
}
cancel()
}
}
return err
}

// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as
// a TSO proxy to reduce the pressure of the main serving service endpoint.
func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error {
tsoStreamBuilders := c.getAllTSOStreamBuilders()
leaderAddr := c.svcDiscovery.GetServingURL()
forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc)
if !ok {
return errors.Errorf("cannot find the allocator leader in %s", dc)
}
// GC the stale one.
connectionCtxs.Range(func(addr, cc any) bool {
addrStr := addr.(string)
if _, ok := tsoStreamBuilders[addrStr]; !ok {
log.Info("[tso] remove the stale tso stream",
zap.String("dc", dc),
zap.String("addr", addrStr))
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(addr)
}
return true
})
// Update the missing one.
for addr, tsoStreamBuilder := range tsoStreamBuilders {
if _, ok = connectionCtxs.Load(addr); ok {
continue
}
log.Info("[tso] try to create tso stream",
zap.String("dc", dc), zap.String("addr", addr))
cctx, cancel := context.WithCancel(dispatcherCtx)
// Do not proxy the leader client.
if addr != leaderAddr {
log.Info("[tso] use follower to forward tso stream to do the proxy",
zap.String("dc", dc), zap.String("addr", addr))
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
}
// Create the TSO stream.
stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout)
if err == nil {
if addr != leaderAddr {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addrTrim := trimHTTPPrefix(addr)
requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1)
}
connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel})
continue
}
log.Error("[tso] create the tso stream failed",
zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err))
cancel()
}
return nil
}

// getAllTSOStreamBuilders returns a TSO stream builder for every service endpoint of TSO leader/followers
// or of keyspace group primary/secondaries.
func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder {
var (
addrs = c.svcDiscovery.GetServiceURLs()
streamBuilders = make(map[string]tsoStreamBuilder, len(addrs))
cc *grpc.ClientConn
err error
)
for _, addr := range addrs {
if len(addrs) == 0 {
continue
}
if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil {
continue
}
healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout)
resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""})
healthCancel()
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
streamBuilders[addr] = c.tsoStreamBuilderFactory.makeBuilder(cc)
}
}
return streamBuilders
}

type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
respKeyspaceGroupID uint32
respReceivedAt time.Time
physical int64
logical int64
}

func (c *tsoClient) processRequests(
stream tsoStream, dcLocation string, tbc *tsoBatchController,
) error {
requests := tbc.getCollectedRequests()
// nolint
for _, req := range requests {
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End()
if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
}

count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, count, tbc.batchStartTime)
if err != nil {
tbc.finishCollectedRequests(0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
curTSOInfo := &tsoInfo{
tsoServer: stream.getServerURL(),
reqKeyspaceGroupID: reqKeyspaceGroupID,
respKeyspaceGroupID: respKeyspaceGroupID,
respReceivedAt: time.Now(),
physical: physical,
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(
dcLocation string,
curTSOInfo *tsoInfo,
physical, firstLogical int64,
) {
val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo)
if !loaded {
return
}
lastTSOInfo := val.(*tsoInfo)
if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dcLocation),
zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID))
}

// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))
}
lastTSOInfo.tsoServer = curTSOInfo.tsoServer
lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID
lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID
lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt
lastTSOInfo.physical = curTSOInfo.physical
lastTSOInfo.logical = curTSOInfo.logical
}
Loading