Skip to content

Commit

Permalink
client/tso: improve the switching of TSO stream (tikv#8123)
Browse files Browse the repository at this point in the history
close tikv#7997, ref tikv#8047

Previously, without enabling the TSO Follower Proxy, we only passively update its stream when a TSO request fails.
This means that we cannot automatically and gradually complete the TSO stream update after a service switch.
This PR strengthens this logic, which can improve the success rate of TSO requests during service switching.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Apr 28, 2024
1 parent 50c8040 commit aae410f
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 115 deletions.
68 changes: 41 additions & 27 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe
return req
}

func (c *tsoClient) getTSODispatcher(dcLocation string) (*tsoDispatcher, bool) {
dispatcher, ok := c.tsoDispatcher.Load(dcLocation)
if !ok || dispatcher == nil {
return nil, false
}
return dispatcher.(*tsoDispatcher), true
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
func (c *tsoClient) GetTSOAllocators() *sync.Map {
return &c.tsoAllocators
Expand Down Expand Up @@ -259,6 +267,7 @@ func (c *tsoClient) updateTSOGlobalServURL(url string) error {
log.Info("[tso] switch dc tso global allocator serving url",
zap.String("dc-location", globalDCLocation),
zap.String("new-url", url))
c.scheduleUpdateTSOConnectionCtxs()
c.scheduleCheckTSODispatcher()
return nil
}
Expand Down Expand Up @@ -333,40 +342,41 @@ func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc strin
// while a new daemon will be created also to switch back to a normal leader connection ASAP the
// connection comes back to normal.
func (c *tsoClient) tryConnectToTSO(
dispatcherCtx context.Context,
ctx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
var (
networkErrNum uint64
err error
stream tsoStream
url string
cc *grpc.ClientConn
)
updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded {
// If the previous connection still exists, we should close it first.
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Store(newURL, connectionCtx)
networkErrNum uint64
err error
stream tsoStream
url string
cc *grpc.ClientConn
updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) {
// Only store the `connectionCtx` if it does not exist before.
connectionCtxs.LoadOrStore(newURL, connectionCtx)
// Remove all other `connectionCtx`s.
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
// retry several times before falling back to the follower when the network problem happens
)

ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
// Retry several times before falling back to the follower when the network problem happens
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
if _, ok := connectionCtxs.Load(url); ok {
return nil
}
if cc != nil {
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx, cancel := context.WithCancel(ctx)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
failpoint.Inject("unreachableNetwork", func() {
stream = nil
Expand All @@ -392,7 +402,7 @@ func (c *tsoClient) tryConnectToTSO(
networkErrNum++
}
select {
case <-dispatcherCtx.Done():
case <-ctx.Done():
return err
case <-ticker.C:
}
Expand All @@ -409,14 +419,14 @@ func (c *tsoClient) tryConnectToTSO(
}

// create the follower stream
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx, cancel := context.WithCancel(ctx)
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout)
if err == nil {
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addr := trimHTTPPrefix(backupURL)
// the goroutine is used to check the network and change back to the original stream
go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
go c.checkAllocator(ctx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear)
requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1)
updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel})
return nil
Expand All @@ -429,7 +439,11 @@ func (c *tsoClient) tryConnectToTSO(

// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as
// a TSO proxy to reduce the pressure of the main serving service endpoint.
func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error {
func (c *tsoClient) tryConnectToTSOWithProxy(
ctx context.Context,
dc string,
connectionCtxs *sync.Map,
) error {
tsoStreamBuilders := c.getAllTSOStreamBuilders()
leaderAddr := c.svcDiscovery.GetServingURL()
forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc)
Expand All @@ -455,7 +469,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
}
log.Info("[tso] try to create tso stream",
zap.String("dc", dc), zap.String("addr", addr))
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx, cancel := context.WithCancel(ctx)
// Do not proxy the leader client.
if addr != leaderAddr {
log.Info("[tso] use follower to forward tso stream to do the proxy",
Expand Down
Loading

0 comments on commit aae410f

Please sign in to comment.