Skip to content

Commit

Permalink
Introduce the connection ctx manager
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Dec 19, 2024
1 parent 7889b67 commit 9a6c85f
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 198 deletions.
137 changes: 80 additions & 57 deletions client/clients/tso/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
cctx "github.com/tikv/pd/client/pkg/connectionctx"
"github.com/tikv/pd/client/pkg/utils/grpcutil"
"github.com/tikv/pd/client/pkg/utils/tlsutil"
sd "github.com/tikv/pd/client/servicediscovery"
Expand Down Expand Up @@ -80,7 +81,9 @@ type Cli struct {
svcDiscovery sd.ServiceDiscovery
tsoStreamBuilderFactory
// leaderURL is the URL of the TSO leader.
leaderURL atomic.Value
leaderURL atomic.Value
conCtxMgr *cctx.Manager[*tsoStream]
updateConCtxsCh chan struct{}

// tsoReqPool is the pool to recycle `*tsoRequest`.
tsoReqPool *sync.Pool
Expand All @@ -100,6 +103,8 @@ func NewClient(
option: option,
svcDiscovery: svcDiscovery,
tsoStreamBuilderFactory: factory,
conCtxMgr: cctx.NewManager[*tsoStream](),
updateConCtxsCh: make(chan struct{}, 1),
tsoReqPool: &sync.Pool{
New: func() any {
return &Request{
Expand All @@ -122,6 +127,8 @@ func (c *Cli) getOption() *opt.Option { return c.option }

func (c *Cli) getServiceDiscovery() sd.ServiceDiscovery { return c.svcDiscovery }

func (c *Cli) getConnectionCtxMgr() *cctx.Manager[*tsoStream] { return c.conCtxMgr }

func (c *Cli) getDispatcher() *tsoDispatcher {
return c.dispatcher.Load()
}
Expand All @@ -133,6 +140,8 @@ func (c *Cli) GetRequestPool() *sync.Pool {

// Setup initializes the TSO client.
func (c *Cli) Setup() {
// Daemon goroutine to update the connectionCtxs periodically and handle the `connectionCtxs` update event.
go c.connectionCtxsUpdater()
if err := c.svcDiscovery.CheckMemberChanged(); err != nil {
log.Warn("[tso] failed to check member changed", errs.ZapError(err))
}
Expand All @@ -154,9 +163,12 @@ func (c *Cli) Close() {
log.Info("[tso] tso client is closed")
}

// scheduleUpdateTSOConnectionCtxs update the TSO connection contexts.
// scheduleUpdateTSOConnectionCtxs schedules the update of the TSO connection contexts.
func (c *Cli) scheduleUpdateTSOConnectionCtxs() {
c.getDispatcher().scheduleUpdateConnectionCtxs()
select {
case c.updateConCtxsCh <- struct{}{}:
default:
}
}

// GetTSORequest gets a TSO request from the pool.
Expand Down Expand Up @@ -231,25 +243,60 @@ func (c *Cli) backupClientConn() (*grpc.ClientConn, string) {
return nil, ""
}

// tsoConnectionContext is used to store the context of a TSO stream connection.
type tsoConnectionContext struct {
ctx context.Context
cancel context.CancelFunc
// Current URL of the stream connection.
streamURL string
// Current stream to send gRPC requests.
stream *tsoStream
// connectionCtxsUpdater updates the `connectionCtxs` regularly.
func (c *Cli) connectionCtxsUpdater() {
log.Info("[tso] start tso connection contexts updater")

var updateTicker = &time.Ticker{}
setNewUpdateTicker := func(ticker *time.Ticker) {
if updateTicker.C != nil {
updateTicker.Stop()
}
updateTicker = ticker
}
// Set to nil before returning to ensure that the existing ticker can be GC.
defer setNewUpdateTicker(nil)

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
for {
c.updateConnectionCtxs(ctx)
select {
case <-ctx.Done():
log.Info("[tso] exit tso connection contexts updater")
return
case <-c.option.EnableTSOFollowerProxyCh:
enableTSOFollowerProxy := c.option.GetEnableTSOFollowerProxy()
log.Info("[tso] tso follower proxy status changed",
zap.Bool("enable", enableTSOFollowerProxy))
if enableTSOFollowerProxy && updateTicker.C == nil {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
setNewUpdateTicker(time.NewTicker(sd.MemberUpdateInterval))
} else if !enableTSOFollowerProxy && updateTicker.C != nil {
// Because the TSO Follower Proxy is disabled,
// the periodic check needs to be turned off.
setNewUpdateTicker(&time.Ticker{})
} else {
continue

Check warning on line 281 in client/clients/tso/client.go

View check run for this annotation

Codecov / codecov/patch

client/clients/tso/client.go#L281

Added line #L281 was not covered by tests
}
case <-updateTicker.C:

Check warning on line 283 in client/clients/tso/client.go

View check run for this annotation

Codecov / codecov/patch

client/clients/tso/client.go#L283

Added line #L283 was not covered by tests
// Triggered periodically when the TSO Follower Proxy is enabled.
case <-c.updateConCtxsCh:
// Triggered by the leader/follower change.
}
}
}

// updateConnectionCtxs will choose the proper way to update the connections.
// It will return a bool to indicate whether the update is successful.
func (c *Cli) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool {
func (c *Cli) updateConnectionCtxs(ctx context.Context) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnectToTSO
if c.option.GetEnableTSOFollowerProxy() {
createTSOConnection = c.tryConnectToTSOWithProxy
}
if err := createTSOConnection(ctx, connectionCtxs); err != nil {
if err := createTSOConnection(ctx); err != nil {
log.Error("[tso] update connection contexts failed", errs.ZapError(err))
return false
}
Expand All @@ -260,30 +307,13 @@ func (c *Cli) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map
// and enableForwarding is true, it will create a new connection to a follower to do the forwarding,
// while a new daemon will be created also to switch back to a normal leader connection ASAP the
// connection comes back to normal.
func (c *Cli) tryConnectToTSO(
ctx context.Context,
connectionCtxs *sync.Map,
) error {
func (c *Cli) tryConnectToTSO(ctx context.Context) error {
var (
networkErrNum uint64
err error
stream *tsoStream
url string
cc *grpc.ClientConn
updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) {
// Only store the `connectionCtx` if it does not exist before.
if connectionCtx != nil {
connectionCtxs.LoadOrStore(newURL, connectionCtx)
}
// Remove all other `connectionCtx`s.
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
networkErrNum uint64
err error
stream *tsoStream
url string
cc *grpc.ClientConn
)

ticker := time.NewTicker(constants.RetryInterval)
Expand All @@ -292,9 +322,9 @@ func (c *Cli) tryConnectToTSO(
for range constants.MaxRetryTimes {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.getTSOLeaderClientConn()
if _, ok := connectionCtxs.Load(url); ok {
if c.conCtxMgr.Exist(url) {
// Just trigger the clean up of the stale connection contexts.
updateAndClear(url, nil)
c.conCtxMgr.ExclusivelyStore(ctx, url)
return nil
}
if cc != nil {
Expand All @@ -305,7 +335,7 @@ func (c *Cli) tryConnectToTSO(
err = status.New(codes.Unavailable, "unavailable").Err()
})
if stream != nil && err == nil {
updateAndClear(url, &tsoConnectionContext{cctx, cancel, url, stream})
c.conCtxMgr.ExclusivelyStore(ctx, url, stream)
return nil
}

Expand Down Expand Up @@ -348,9 +378,9 @@ func (c *Cli) tryConnectToTSO(
forwardedHostTrim := tlsutil.TrimHTTPPrefix(forwardedHost)
addr := tlsutil.TrimHTTPPrefix(backupURL)
// the goroutine is used to check the network and change back to the original stream
go c.checkLeader(ctx, cancel, forwardedHostTrim, addr, url, updateAndClear)
go c.checkLeader(ctx, cancel, forwardedHostTrim, addr, url)
metrics.RequestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1)
updateAndClear(backupURL, &tsoConnectionContext{cctx, cancel, backupURL, stream})
c.conCtxMgr.ExclusivelyStore(ctx, backupURL, stream)
return nil
}
cancel()
Expand All @@ -363,7 +393,6 @@ func (c *Cli) checkLeader(
ctx context.Context,
forwardCancel context.CancelFunc,
forwardedHostTrim, addr, url string,
updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext),
) {
defer func() {
// cancel the forward stream
Expand Down Expand Up @@ -396,7 +425,7 @@ func (c *Cli) checkLeader(
stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.Timeout)
if err == nil && stream != nil {
log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("url", url))
updateAndClear(url, &tsoConnectionContext{cctx, cancel, url, stream})
c.conCtxMgr.ExclusivelyStore(ctx, url, stream)
return
}
}
Expand All @@ -413,31 +442,25 @@ func (c *Cli) checkLeader(

// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as
// a TSO proxy to reduce the pressure of the main serving service endpoint.
func (c *Cli) tryConnectToTSOWithProxy(
ctx context.Context,
connectionCtxs *sync.Map,
) error {
func (c *Cli) tryConnectToTSOWithProxy(ctx context.Context) error {
tsoStreamBuilders := c.getAllTSOStreamBuilders()
leaderAddr := c.svcDiscovery.GetServingURL()
forwardedHost := c.getLeaderURL()
if len(forwardedHost) == 0 {
return errors.Errorf("cannot find the tso leader")
}
// GC the stale one.
connectionCtxs.Range(func(addr, cc any) bool {
addrStr := addr.(string)
if _, ok := tsoStreamBuilders[addrStr]; !ok {
c.conCtxMgr.GC(func(url string) bool {
_, ok := tsoStreamBuilders[url]
if !ok {
log.Info("[tso] remove the stale tso stream",
zap.String("addr", addrStr))
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(addr)
zap.String("addr", url))

Check warning on line 457 in client/clients/tso/client.go

View check run for this annotation

Codecov / codecov/patch

client/clients/tso/client.go#L457

Added line #L457 was not covered by tests
}
return true
return !ok
})
// Update the missing one.
for addr, tsoStreamBuilder := range tsoStreamBuilders {
_, ok := connectionCtxs.Load(addr)
if ok {
if c.conCtxMgr.Exist(addr) {
continue
}
log.Info("[tso] try to create tso stream", zap.String("addr", addr))
Expand All @@ -456,7 +479,7 @@ func (c *Cli) tryConnectToTSOWithProxy(
addrTrim := tlsutil.TrimHTTPPrefix(addr)
metrics.RequestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1)
}
connectionCtxs.Store(addr, &tsoConnectionContext{cctx, cancel, addr, stream})
c.conCtxMgr.StoreIfNotExist(ctx, addr, stream)
continue
}
log.Error("[tso] create the tso stream failed",
Expand Down
Loading

0 comments on commit 9a6c85f

Please sign in to comment.