Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/tso: organize the methods of TSO dispatcher #8121

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 325 additions & 9 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@
"context"
"fmt"
"math/rand"
"runtime/trace"
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)

// TSOClient is the client used to get timestamps.
Expand Down Expand Up @@ -127,18 +135,36 @@
c.wg.Wait()

log.Info("close tso client")
c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool {
if dispatcherInterface != nil {
dispatcher := dispatcherInterface.(*tsoDispatcher)
dispatcher.dispatcherCancel()
dispatcher.tsoBatchController.clear()
}
return true
})

c.closeTSODispatcher()
log.Info("tso client is closed")
}

func (c *tsoClient) scheduleCheckTSDeadline() {
select {
case c.checkTSDeadlineCh <- struct{}{}:
default:
}
}

func (c *tsoClient) scheduleCheckTSODispatcher() {
select {
case c.checkTSODispatcherCh <- struct{}{}:
default:
}
}

func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() {
select {
case c.updateTSOConnectionCtxsCh <- struct{}{}:
default:

Check warning on line 159 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L156-L159

Added lines #L156 - L159 were not covered by tests
}
}

// TSO Follower Proxy only supports the Global TSO proxy now.
func (c *tsoClient) allowTSOFollowerProxy(dc string) bool {
return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy()
}

func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := c.tsoReqPool.Get().(*tsoRequest)
// Set needed fields in the request before using it.
Expand Down Expand Up @@ -279,3 +305,293 @@
}
return nil, ""
}

type tsoConnectionContext struct {
streamURL string
// Current stream to send gRPC requests, pdpb.PD_TsoClient for a leader/follower in the PD cluster,
// or tsopb.TSO_TsoClient for a primary/secondary in the TSO cluster
stream tsoStream
ctx context.Context
cancel context.CancelFunc
}

func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnectToTSO
if c.allowTSOFollowerProxy(dc) {
createTSOConnection = c.tryConnectToTSOWithProxy
}
if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil {
log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err))
return false
}
return true
}

// tryConnectToTSO will try to connect to the TSO allocator leader. If the connection becomes unreachable
// and enableForwarding is true, it will create a new connection to a follower to do the forwarding,
// while a new daemon will be created also to switch back to a normal leader connection ASAP the
// connection comes back to normal.
func (c *tsoClient) tryConnectToTSO(
dispatcherCtx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
var (
networkErrNum uint64
err error
stream tsoStream
url string
cc *grpc.ClientConn
)
updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded {
// If the previous connection still exists, we should close it first.
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Store(newURL, connectionCtx)
}
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
// retry several times before falling back to the follower when the network problem happens

ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
if cc != nil {
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
failpoint.Inject("unreachableNetwork", func() {
stream = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
if stream != nil && err == nil {
updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel})
return nil
}

if err != nil && c.option.enableForwarding {
// The reason we need to judge if the error code is equal to "Canceled" here is that
// when we create a stream we use a goroutine to manually control the timeout of the connection.
// There is no need to wait for the transport layer timeout which can reduce the time of unavailability.
// But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error.
// And actually the `Canceled` error can be regarded as a kind of network error in some way.
if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) {
networkErrNum++
}
}
cancel()
} else {
networkErrNum++
}
select {
case <-dispatcherCtx.Done():
return err
case <-ticker.C:
}
}

if networkErrNum == maxRetryTimes {
// encounter the network error
backupClientConn, backupURL := c.backupClientConn()
if backupClientConn != nil {
log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("follower-url", backupURL))
forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc)
if !ok {
return errors.Errorf("cannot find the allocator leader in %s", dc)

Check warning on line 408 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L408

Added line #L408 was not covered by tests
}

// create the follower stream
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout)
if err == nil {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addr := trimHTTPPrefix(backupURL)
// the goroutine is used to check the network and change back to the original stream
go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1)
updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel})
return nil
}
cancel()

Check warning on line 424 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L424

Added line #L424 was not covered by tests
}
}
return err
}

// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as
// a TSO proxy to reduce the pressure of the main serving service endpoint.
func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error {
tsoStreamBuilders := c.getAllTSOStreamBuilders()
leaderAddr := c.svcDiscovery.GetServingURL()
forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc)
if !ok {
return errors.Errorf("cannot find the allocator leader in %s", dc)

Check warning on line 437 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L437

Added line #L437 was not covered by tests
}
// GC the stale one.
connectionCtxs.Range(func(addr, cc any) bool {
addrStr := addr.(string)
if _, ok := tsoStreamBuilders[addrStr]; !ok {
log.Info("[tso] remove the stale tso stream",
zap.String("dc", dc),
zap.String("addr", addrStr))
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(addr)

Check warning on line 447 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L443-L447

Added lines #L443 - L447 were not covered by tests
}
return true
})
// Update the missing one.
for addr, tsoStreamBuilder := range tsoStreamBuilders {
if _, ok = connectionCtxs.Load(addr); ok {
continue
}
log.Info("[tso] try to create tso stream",
zap.String("dc", dc), zap.String("addr", addr))
cctx, cancel := context.WithCancel(dispatcherCtx)
// Do not proxy the leader client.
if addr != leaderAddr {
log.Info("[tso] use follower to forward tso stream to do the proxy",
zap.String("dc", dc), zap.String("addr", addr))
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
}
// Create the TSO stream.
stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout)
if err == nil {
if addr != leaderAddr {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addrTrim := trimHTTPPrefix(addr)
requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1)
}
connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel})
continue
}
log.Error("[tso] create the tso stream failed",
zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err))
cancel()

Check warning on line 478 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L476-L478

Added lines #L476 - L478 were not covered by tests
}
return nil
}

// getAllTSOStreamBuilders returns a TSO stream builder for every service endpoint of TSO leader/followers
// or of keyspace group primary/secondaries.
func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder {
var (
addrs = c.svcDiscovery.GetServiceURLs()
streamBuilders = make(map[string]tsoStreamBuilder, len(addrs))
cc *grpc.ClientConn
err error
)
for _, addr := range addrs {
if len(addrs) == 0 {
continue

Check warning on line 494 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L494

Added line #L494 was not covered by tests
}
if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil {
continue

Check warning on line 497 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L497

Added line #L497 was not covered by tests
}
healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout)
resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""})
healthCancel()
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
streamBuilders[addr] = c.tsoStreamBuilderFactory.makeBuilder(cc)
}
}
return streamBuilders
}

type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
respKeyspaceGroupID uint32
respReceivedAt time.Time
physical int64
logical int64
}

func (c *tsoClient) processRequests(
stream tsoStream, dcLocation string, tbc *tsoBatchController,
) error {
requests := tbc.getCollectedRequests()
// nolint
for _, req := range requests {
defer trace.StartRegion(req.requestCtx, "pdclient.tsoReqSend").End()
if span := opentracing.SpanFromContext(req.requestCtx); span != nil && span.Tracer() != nil {
span = span.Tracer().StartSpan("pdclient.processRequests", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
}

count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, count, tbc.batchStartTime)
if err != nil {
tbc.finishCollectedRequests(0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
curTSOInfo := &tsoInfo{
tsoServer: stream.getServerURL(),
reqKeyspaceGroupID: reqKeyspaceGroupID,
respKeyspaceGroupID: respKeyspaceGroupID,
respReceivedAt: time.Now(),
physical: physical,
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(
dcLocation string,
curTSOInfo *tsoInfo,
physical, firstLogical int64,
) {
val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo)
if !loaded {
return
}
lastTSOInfo := val.(*tsoInfo)
if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dcLocation),
zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID))
}

// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))

Check warning on line 589 in client/tso_client.go

View check run for this annotation

Codecov / codecov/patch

client/tso_client.go#L577-L589

Added lines #L577 - L589 were not covered by tests
}
lastTSOInfo.tsoServer = curTSOInfo.tsoServer
lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID
lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID
lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt
lastTSOInfo.physical = curTSOInfo.physical
lastTSOInfo.logical = curTSOInfo.logical
}
Loading