Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: introduce the connection ctx manager #8940

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 86 additions & 57 deletions client/clients/tso/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
cctx "github.com/tikv/pd/client/pkg/connectionctx"
"github.com/tikv/pd/client/pkg/utils/grpcutil"
"github.com/tikv/pd/client/pkg/utils/tlsutil"
sd "github.com/tikv/pd/client/servicediscovery"
Expand Down Expand Up @@ -80,7 +81,9 @@ type Cli struct {
svcDiscovery sd.ServiceDiscovery
tsoStreamBuilderFactory
// leaderURL is the URL of the TSO leader.
leaderURL atomic.Value
leaderURL atomic.Value
conCtxMgr *cctx.Manager[*tsoStream]
updateConCtxsCh chan struct{}

// tsoReqPool is the pool to recycle `*tsoRequest`.
tsoReqPool *sync.Pool
Expand All @@ -100,6 +103,8 @@ func NewClient(
option: option,
svcDiscovery: svcDiscovery,
tsoStreamBuilderFactory: factory,
conCtxMgr: cctx.NewManager[*tsoStream](),
updateConCtxsCh: make(chan struct{}, 1),
tsoReqPool: &sync.Pool{
New: func() any {
return &Request{
Expand All @@ -122,6 +127,8 @@ func (c *Cli) getOption() *opt.Option { return c.option }

func (c *Cli) getServiceDiscovery() sd.ServiceDiscovery { return c.svcDiscovery }

func (c *Cli) getConnectionCtxMgr() *cctx.Manager[*tsoStream] { return c.conCtxMgr }

func (c *Cli) getDispatcher() *tsoDispatcher {
return c.dispatcher.Load()
}
Expand All @@ -133,6 +140,8 @@ func (c *Cli) GetRequestPool() *sync.Pool {

// Setup initializes the TSO client.
func (c *Cli) Setup() {
// Daemon goroutine to update the connectionCtxs periodically and handle the `connectionCtxs` update event.
go c.connectionCtxsUpdater()
if err := c.svcDiscovery.CheckMemberChanged(); err != nil {
log.Warn("[tso] failed to check member changed", errs.ZapError(err))
}
Expand All @@ -154,9 +163,12 @@ func (c *Cli) Close() {
log.Info("[tso] tso client is closed")
}

// scheduleUpdateTSOConnectionCtxs update the TSO connection contexts.
// scheduleUpdateTSOConnectionCtxs schedules the update of the TSO connection contexts.
func (c *Cli) scheduleUpdateTSOConnectionCtxs() {
c.getDispatcher().scheduleUpdateConnectionCtxs()
select {
case c.updateConCtxsCh <- struct{}{}:
default:
}
}

// GetTSORequest gets a TSO request from the pool.
Expand Down Expand Up @@ -231,25 +243,66 @@ func (c *Cli) backupClientConn() (*grpc.ClientConn, string) {
return nil, ""
}

// tsoConnectionContext is used to store the context of a TSO stream connection.
type tsoConnectionContext struct {
ctx context.Context
cancel context.CancelFunc
// Current URL of the stream connection.
streamURL string
// Current stream to send gRPC requests.
stream *tsoStream
// connectionCtxsUpdater updates the `connectionCtxs` regularly.
func (c *Cli) connectionCtxsUpdater() {
log.Info("[tso] start tso connection contexts updater")

var updateTicker = &time.Ticker{}
setNewUpdateTicker := func(interval time.Duration) {
if updateTicker.C != nil {
updateTicker.Stop()
}
if interval == 0 {
updateTicker = &time.Ticker{}
} else {
updateTicker = time.NewTicker(interval)
}
}
// If the TSO Follower Proxy is enabled, set the update interval to the member update interval.
if c.option.GetEnableTSOFollowerProxy() {
setNewUpdateTicker(sd.MemberUpdateInterval)
}
// Set to nil before returning to ensure that the existing ticker can be GC.
defer setNewUpdateTicker(0)

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
for {
c.updateConnectionCtxs(ctx)
select {
case <-ctx.Done():
log.Info("[tso] exit tso connection contexts updater")
return
case <-c.option.EnableTSOFollowerProxyCh:
enableTSOFollowerProxy := c.option.GetEnableTSOFollowerProxy()
log.Info("[tso] tso follower proxy status changed",
zap.Bool("enable", enableTSOFollowerProxy))
if enableTSOFollowerProxy && updateTicker.C == nil {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
setNewUpdateTicker(sd.MemberUpdateInterval)
} else if !enableTSOFollowerProxy && updateTicker.C != nil {
// Because the TSO Follower Proxy is disabled,
// the periodic check needs to be turned off.
setNewUpdateTicker(0)
}
case <-updateTicker.C:
// Triggered periodically when the TSO Follower Proxy is enabled.
case <-c.updateConCtxsCh:
// Triggered by the leader/follower change.
}
}
}

// updateConnectionCtxs will choose the proper way to update the connections.
// It will return a bool to indicate whether the update is successful.
func (c *Cli) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool {
func (c *Cli) updateConnectionCtxs(ctx context.Context) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnectToTSO
if c.option.GetEnableTSOFollowerProxy() {
createTSOConnection = c.tryConnectToTSOWithProxy
}
if err := createTSOConnection(ctx, connectionCtxs); err != nil {
if err := createTSOConnection(ctx); err != nil {
log.Error("[tso] update connection contexts failed", errs.ZapError(err))
return false
}
Expand All @@ -260,30 +313,13 @@ func (c *Cli) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map
// and enableForwarding is true, it will create a new connection to a follower to do the forwarding,
// while a new daemon will be created also to switch back to a normal leader connection ASAP the
// connection comes back to normal.
func (c *Cli) tryConnectToTSO(
ctx context.Context,
connectionCtxs *sync.Map,
) error {
func (c *Cli) tryConnectToTSO(ctx context.Context) error {
var (
networkErrNum uint64
err error
stream *tsoStream
url string
cc *grpc.ClientConn
updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) {
// Only store the `connectionCtx` if it does not exist before.
if connectionCtx != nil {
connectionCtxs.LoadOrStore(newURL, connectionCtx)
}
// Remove all other `connectionCtx`s.
connectionCtxs.Range(func(url, cc any) bool {
if url.(string) != newURL {
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(url)
}
return true
})
}
networkErrNum uint64
err error
stream *tsoStream
url string
cc *grpc.ClientConn
)

ticker := time.NewTicker(constants.RetryInterval)
Expand All @@ -292,9 +328,9 @@ func (c *Cli) tryConnectToTSO(
for range constants.MaxRetryTimes {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.getTSOLeaderClientConn()
if _, ok := connectionCtxs.Load(url); ok {
if c.conCtxMgr.Exist(url) {
// Just trigger the clean up of the stale connection contexts.
updateAndClear(url, nil)
c.conCtxMgr.CleanAllAndStore(ctx, url)
return nil
}
if cc != nil {
Expand All @@ -305,7 +341,7 @@ func (c *Cli) tryConnectToTSO(
err = status.New(codes.Unavailable, "unavailable").Err()
})
if stream != nil && err == nil {
updateAndClear(url, &tsoConnectionContext{cctx, cancel, url, stream})
c.conCtxMgr.CleanAllAndStore(ctx, url, stream)
return nil
}

Expand Down Expand Up @@ -348,9 +384,9 @@ func (c *Cli) tryConnectToTSO(
forwardedHostTrim := tlsutil.TrimHTTPPrefix(forwardedHost)
addr := tlsutil.TrimHTTPPrefix(backupURL)
// the goroutine is used to check the network and change back to the original stream
go c.checkLeader(ctx, cancel, forwardedHostTrim, addr, url, updateAndClear)
go c.checkLeader(ctx, cancel, forwardedHostTrim, addr, url)
metrics.RequestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1)
updateAndClear(backupURL, &tsoConnectionContext{cctx, cancel, backupURL, stream})
c.conCtxMgr.CleanAllAndStore(ctx, backupURL, stream)
return nil
}
cancel()
Expand All @@ -363,7 +399,6 @@ func (c *Cli) checkLeader(
ctx context.Context,
forwardCancel context.CancelFunc,
forwardedHostTrim, addr, url string,
updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext),
) {
defer func() {
// cancel the forward stream
Expand Down Expand Up @@ -396,7 +431,7 @@ func (c *Cli) checkLeader(
stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.Timeout)
if err == nil && stream != nil {
log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("url", url))
updateAndClear(url, &tsoConnectionContext{cctx, cancel, url, stream})
c.conCtxMgr.CleanAllAndStore(ctx, url, stream)
return
}
}
Expand All @@ -413,31 +448,25 @@ func (c *Cli) checkLeader(

// tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as
// a TSO proxy to reduce the pressure of the main serving service endpoint.
func (c *Cli) tryConnectToTSOWithProxy(
ctx context.Context,
connectionCtxs *sync.Map,
) error {
func (c *Cli) tryConnectToTSOWithProxy(ctx context.Context) error {
tsoStreamBuilders := c.getAllTSOStreamBuilders()
leaderAddr := c.svcDiscovery.GetServingURL()
forwardedHost := c.getLeaderURL()
if len(forwardedHost) == 0 {
return errors.Errorf("cannot find the tso leader")
}
// GC the stale one.
connectionCtxs.Range(func(addr, cc any) bool {
addrStr := addr.(string)
if _, ok := tsoStreamBuilders[addrStr]; !ok {
c.conCtxMgr.GC(func(addr string) bool {
_, ok := tsoStreamBuilders[addr]
if !ok {
log.Info("[tso] remove the stale tso stream",
zap.String("addr", addrStr))
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(addr)
zap.String("addr", addr))
}
return true
return !ok
})
// Update the missing one.
for addr, tsoStreamBuilder := range tsoStreamBuilders {
_, ok := connectionCtxs.Load(addr)
if ok {
if c.conCtxMgr.Exist(addr) {
continue
}
log.Info("[tso] try to create tso stream", zap.String("addr", addr))
Expand All @@ -456,7 +485,7 @@ func (c *Cli) tryConnectToTSOWithProxy(
addrTrim := tlsutil.TrimHTTPPrefix(addr)
metrics.RequestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1)
}
connectionCtxs.Store(addr, &tsoConnectionContext{cctx, cancel, addr, stream})
c.conCtxMgr.Store(ctx, addr, stream)
continue
}
log.Error("[tso] create the tso stream failed",
Expand Down
Loading
Loading