From aae410f16f8a16d873a8aad87acee3559db5e321 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Sun, 28 Apr 2024 15:44:57 +0800 Subject: [PATCH] client/tso: improve the switching of TSO stream (#8123) close tikv/pd#7997, ref tikv/pd#8047 Previously, without enabling the TSO Follower Proxy, we only passively update its stream when a TSO request fails. This means that we cannot automatically and gradually complete the TSO stream update after a service switch. This PR strengthens this logic, which can improve the success rate of TSO requests during service switching. Signed-off-by: JmPotato --- client/tso_client.go | 68 +++++---- client/tso_dispatcher.go | 181 ++++++++++++----------- tests/integrations/client/client_test.go | 35 +++++ 3 files changed, 169 insertions(+), 115 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 347d1f6ec0a..e3bdb835901 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -178,6 +178,14 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe return req } +func (c *tsoClient) getTSODispatcher(dcLocation string) (*tsoDispatcher, bool) { + dispatcher, ok := c.tsoDispatcher.Load(dcLocation) + if !ok || dispatcher == nil { + return nil, false + } + return dispatcher.(*tsoDispatcher), true +} + // GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map func (c *tsoClient) GetTSOAllocators() *sync.Map { return &c.tsoAllocators @@ -259,6 +267,7 @@ func (c *tsoClient) updateTSOGlobalServURL(url string) error { log.Info("[tso] switch dc tso global allocator serving url", zap.String("dc-location", globalDCLocation), zap.String("new-url", url)) + c.scheduleUpdateTSOConnectionCtxs() c.scheduleCheckTSODispatcher() return nil } @@ -333,40 +342,41 @@ func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc strin // while a new daemon will be created also to switch back to a normal leader connection ASAP the // connection comes back to normal. func (c *tsoClient) tryConnectToTSO( - dispatcherCtx context.Context, + ctx context.Context, dc string, connectionCtxs *sync.Map, ) error { var ( - networkErrNum uint64 - err error - stream tsoStream - url string - cc *grpc.ClientConn - ) - updateAndClear := func(newURL string, connectionCtx *tsoConnectionContext) { - if cc, loaded := connectionCtxs.LoadOrStore(newURL, connectionCtx); loaded { - // If the previous connection still exists, we should close it first. - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Store(newURL, connectionCtx) + networkErrNum uint64 + err error + stream tsoStream + url string + cc *grpc.ClientConn + updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { + // Only store the `connectionCtx` if it does not exist before. + connectionCtxs.LoadOrStore(newURL, connectionCtx) + // Remove all other `connectionCtx`s. + connectionCtxs.Range(func(url, cc any) bool { + if url.(string) != newURL { + cc.(*tsoConnectionContext).cancel() + connectionCtxs.Delete(url) + } + return true + }) } - connectionCtxs.Range(func(url, cc any) bool { - if url.(string) != newURL { - cc.(*tsoConnectionContext).cancel() - connectionCtxs.Delete(url) - } - return true - }) - } - // retry several times before falling back to the follower when the network problem happens + ) ticker := time.NewTicker(retryInterval) defer ticker.Stop() + // Retry several times before falling back to the follower when the network problem happens for i := 0; i < maxRetryTimes; i++ { c.svcDiscovery.ScheduleCheckMemberChanged() cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) + if _, ok := connectionCtxs.Load(url); ok { + return nil + } if cc != nil { - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) failpoint.Inject("unreachableNetwork", func() { stream = nil @@ -392,7 +402,7 @@ func (c *tsoClient) tryConnectToTSO( networkErrNum++ } select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return err case <-ticker.C: } @@ -409,14 +419,14 @@ func (c *tsoClient) tryConnectToTSO( } // create the follower stream - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) if err == nil { forwardedHostTrim := trimHTTPPrefix(forwardedHost) addr := trimHTTPPrefix(backupURL) // the goroutine is used to check the network and change back to the original stream - go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) + go c.checkAllocator(ctx, cancel, dc, forwardedHostTrim, addr, url, updateAndClear) requestForwarded.WithLabelValues(forwardedHostTrim, addr).Set(1) updateAndClear(backupURL, &tsoConnectionContext{backupURL, stream, cctx, cancel}) return nil @@ -429,7 +439,11 @@ func (c *tsoClient) tryConnectToTSO( // tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as // a TSO proxy to reduce the pressure of the main serving service endpoint. -func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { +func (c *tsoClient) tryConnectToTSOWithProxy( + ctx context.Context, + dc string, + connectionCtxs *sync.Map, +) error { tsoStreamBuilders := c.getAllTSOStreamBuilders() leaderAddr := c.svcDiscovery.GetServingURL() forwardedHost, ok := c.GetTSOAllocatorServingURLByDCLocation(dc) @@ -455,7 +469,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s } log.Info("[tso] try to create tso stream", zap.String("dc", dc), zap.String("addr", addr)) - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) // Do not proxy the leader client. if addr != leaderAddr { log.Info("[tso] use follower to forward tso stream to do the proxy", diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 7528293a733..c82ec777eca 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -44,8 +44,17 @@ type tsoDispatcher struct { tsoBatchController *tsoBatchController } +func (td *tsoDispatcher) close() { + td.dispatcherCancel() + td.tsoBatchController.clear() +} + +func (td *tsoDispatcher) push(request *tsoRequest) { + td.tsoBatchController.tsoRequestCh <- request +} + func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { - dispatcher, ok := c.tsoDispatcher.Load(request.dcLocation) + dispatcher, ok := c.getTSODispatcher(request.dcLocation) if !ok { err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", request.dcLocation)) log.Error("[tso] dispatch tso request error", zap.String("dc-location", request.dcLocation), errs.ZapError(err)) @@ -70,7 +79,7 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { failpoint.Inject("delayDispatchTSORequest", func() { time.Sleep(time.Second) }) - dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request + dispatcher.push(request) } // Check the contexts again to make sure the request is not been sent to a closed dispatcher. // Never retry on these conditions to prevent unexpected data race. @@ -89,9 +98,7 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { func (c *tsoClient) closeTSODispatcher() { c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { if dispatcherInterface != nil { - dispatcher := dispatcherInterface.(*tsoDispatcher) - dispatcher.dispatcherCancel() - dispatcher.tsoBatchController.clear() + dispatcherInterface.(*tsoDispatcher).close() } return true }) @@ -101,7 +108,7 @@ func (c *tsoClient) updateTSODispatcher() { // Set up the new TSO dispatcher and batch controller. c.GetTSOAllocators().Range(func(dcLocationKey, _ any) bool { dcLocation := dcLocationKey.(string) - if !c.checkTSODispatcher(dcLocation) { + if _, ok := c.getTSODispatcher(dcLocation); !ok { c.createTSODispatcher(dcLocation) } return true @@ -115,8 +122,8 @@ func (c *tsoClient) updateTSODispatcher() { } if _, exist := c.GetTSOAllocators().Load(dcLocation); !exist { log.Info("[tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) - dispatcher.(*tsoDispatcher).dispatcherCancel() c.tsoDispatcher.Delete(dcLocation) + dispatcher.(*tsoDispatcher).close() } return true }) @@ -215,7 +222,7 @@ func (c *tsoClient) tsoDispatcherCheckLoop() { } func (c *tsoClient) checkAllocator( - dispatcherCtx context.Context, + ctx context.Context, forwardCancel context.CancelFunc, dc, forwardedHostTrim, addr, url string, updateAndClear func(newAddr string, connectionCtx *tsoConnectionContext)) { @@ -238,7 +245,7 @@ func (c *tsoClient) checkAllocator( healthCli = healthpb.NewHealthClient(cc) } if healthCli != nil { - healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout) + healthCtx, healthCancel := context.WithTimeout(ctx, c.option.timeout) resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) failpoint.Inject("unreachableNetwork", func() { resp.Status = healthpb.HealthCheckResponse_UNKNOWN @@ -246,7 +253,7 @@ func (c *tsoClient) checkAllocator( healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { // create a stream of the original allocator - cctx, cancel := context.WithCancel(dispatcherCtx) + cctx, cancel := context.WithCancel(ctx) stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) if err == nil && stream != nil { log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) @@ -256,7 +263,7 @@ func (c *tsoClient) checkAllocator( } } select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return case <-ticker.C: // To ensure we can get the latest allocator leader @@ -266,30 +273,19 @@ func (c *tsoClient) checkAllocator( } } -func (c *tsoClient) checkTSODispatcher(dcLocation string) bool { - dispatcher, ok := c.tsoDispatcher.Load(dcLocation) - if !ok || dispatcher == nil { - return false - } - return true -} - func (c *tsoClient) createTSODispatcher(dcLocation string) { dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) - dispatcher := &tsoDispatcher{ - dispatcherCancel: dispatcherCancel, - tsoBatchController: newTSOBatchController( - make(chan *tsoRequest, defaultMaxTSOBatchSize*2), - defaultMaxTSOBatchSize), - } + tsoBatchController := newTSOBatchController( + make(chan *tsoRequest, defaultMaxTSOBatchSize*2), + defaultMaxTSOBatchSize, + ) failpoint.Inject("shortDispatcherChannel", func() { - dispatcher = &tsoDispatcher{ - dispatcherCancel: dispatcherCancel, - tsoBatchController: newTSOBatchController( - make(chan *tsoRequest, 1), - defaultMaxTSOBatchSize), - } + tsoBatchController = newTSOBatchController( + make(chan *tsoRequest, 1), + defaultMaxTSOBatchSize, + ) }) + dispatcher := &tsoDispatcher{dispatcherCancel, tsoBatchController} if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok { // Successfully stored the value. Start the following goroutine. @@ -306,7 +302,7 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) { } func (c *tsoClient) handleDispatcher( - dispatcherCtx context.Context, + ctx context.Context, dc string, tbc *tsoBatchController, ) { @@ -319,6 +315,7 @@ func (c *tsoClient) handleDispatcher( // url -> connectionContext connectionCtxs sync.Map ) + // Clean up the connectionCtxs when the dispatcher exits. defer func() { log.Info("[tso] exit tso dispatcher", zap.String("dc-location", dc)) // Cancel all connections. @@ -330,51 +327,8 @@ func (c *tsoClient) handleDispatcher( tbc.clear() c.wg.Done() }() - // Call updateTSOConnectionCtxs once to init the connectionCtxs first. - c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - // Only the Global TSO needs to watch the updateTSOConnectionCtxsCh to sense the - // change of the cluster when TSO Follower Proxy is enabled. - // TODO: support TSO Follower Proxy for the Local TSO. - if dc == globalDCLocation { - go func() { - var updateTicker = &time.Ticker{} - setNewUpdateTicker := func(ticker *time.Ticker) { - if updateTicker.C != nil { - updateTicker.Stop() - } - updateTicker = ticker - } - // Set to nil before returning to ensure that the existing ticker can be GC. - defer setNewUpdateTicker(nil) - - for { - select { - case <-dispatcherCtx.Done(): - return - case <-c.option.enableTSOFollowerProxyCh: - enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() - log.Info("[tso] tso follower proxy status changed", - zap.String("dc-location", dc), - zap.Bool("enable", enableTSOFollowerProxy)) - if enableTSOFollowerProxy && updateTicker.C == nil { - // Because the TSO Follower Proxy is enabled, - // the periodic check needs to be performed. - setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) - } else if !enableTSOFollowerProxy && updateTicker.C != nil { - // Because the TSO Follower Proxy is disabled, - // the periodic check needs to be turned off. - setNewUpdateTicker(&time.Ticker{}) - } else { - // The status of TSO Follower Proxy does not change, and updateTSOConnectionCtxs is not triggered - continue - } - case <-updateTicker.C: - case <-c.updateTSOConnectionCtxsCh: - } - c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - } - }() - } + // Daemon goroutine to update the connectionCtxs periodically and handle the `connectionCtxs` update event. + go c.connectionCtxsUpdater(ctx, dc, &connectionCtxs) // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(c.option.timeout) @@ -383,7 +337,7 @@ func (c *tsoClient) handleDispatcher( tsoBatchLoop: for { select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return default: } @@ -391,7 +345,7 @@ tsoBatchLoop: maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() // Once the TSO requests are collected, must make sure they could be finished or revoked eventually, // otherwise the upper caller may get blocked on waiting for the results. - if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { + if err = tbc.fetchPendingRequests(ctx, maxBatchWaitInterval); err != nil { // Finish the collected requests if the fetch failed. tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) if err == context.Canceled { @@ -427,14 +381,14 @@ tsoBatchLoop: // Check stream and retry if necessary. if stream == nil { log.Info("[tso] tso stream is not ready", zap.String("dc", dc)) - if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { + if c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) { continue streamChoosingLoop } timer := time.NewTimer(retryInterval) select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): // Finish the collected requests if the context is canceled. - tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err())) timer.Stop() return case <-streamLoopTimer.C: @@ -471,9 +425,9 @@ tsoBatchLoop: tsDeadlineCh, ok = c.tsDeadline.Load(dc) } select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): // Finish the collected requests if the context is canceled. - tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(ctx.Err())) return case tsDeadlineCh.(chan *deadline) <- dl: } @@ -483,7 +437,7 @@ tsoBatchLoop: // If error happens during tso stream handling, reset stream and run the next trial. if err != nil { select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return default: } @@ -498,9 +452,9 @@ tsoBatchLoop: stream = nil // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. if IsLeaderChange(err) { - if err := bo.Exec(dispatcherCtx, c.svcDiscovery.CheckMemberChanged); err != nil { + if err := bo.Exec(ctx, c.svcDiscovery.CheckMemberChanged); err != nil { select { - case <-dispatcherCtx.Done(): + case <-ctx.Done(): return default: } @@ -510,8 +464,59 @@ tsoBatchLoop: // will cancel the current stream, then the EOF error caused by cancel() // should not trigger the updateTSOConnectionCtxs here. // So we should only call it when the leader changes. - c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) + c.updateTSOConnectionCtxs(ctx, dc, &connectionCtxs) + } + } + } +} + +// updateTSOConnectionCtxs updates the `connectionCtxs` for the specified DC location regularly. +// TODO: implement support for the Local TSO. +func (c *tsoClient) connectionCtxsUpdater( + ctx context.Context, + dc string, + connectionCtxs *sync.Map, +) { + if dc != globalDCLocation { + return + } + log.Info("[tso] start tso connection contexts updater", zap.String("dc-location", dc)) + var updateTicker = &time.Ticker{} + setNewUpdateTicker := func(ticker *time.Ticker) { + if updateTicker.C != nil { + updateTicker.Stop() + } + updateTicker = ticker + } + // Set to nil before returning to ensure that the existing ticker can be GC. + defer setNewUpdateTicker(nil) + + for { + c.updateTSOConnectionCtxs(ctx, dc, connectionCtxs) + select { + case <-ctx.Done(): + log.Info("[tso] exit tso connection contexts updater", zap.String("dc-location", dc)) + return + case <-c.option.enableTSOFollowerProxyCh: + enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy() + log.Info("[tso] tso follower proxy status changed", + zap.String("dc-location", dc), + zap.Bool("enable", enableTSOFollowerProxy)) + if enableTSOFollowerProxy && updateTicker.C == nil { + // Because the TSO Follower Proxy is enabled, + // the periodic check needs to be performed. + setNewUpdateTicker(time.NewTicker(memberUpdateInterval)) + } else if !enableTSOFollowerProxy && updateTicker.C != nil { + // Because the TSO Follower Proxy is disabled, + // the periodic check needs to be turned off. + setNewUpdateTicker(&time.Ticker{}) + } else { + continue } + case <-updateTicker.C: + // Triggered periodically when the TSO Follower Proxy is enabled. + case <-c.updateTSOConnectionCtxsCh: + // Triggered by the leader/follower change. } } } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 10be418c029..dfe7a6980c7 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -248,6 +249,40 @@ func TestLeaderTransferAndMoveCluster(t *testing.T) { wg.Wait() } +func TestGetTSAfterTransferLeader(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 2) + re.NoError(err) + endpoints := runServer(re, cluster) + leader := cluster.WaitLeader() + re.NotEmpty(leader) + defer cluster.Destroy() + + cli := setupCli(ctx, re, endpoints, pd.WithCustomTimeoutOption(10*time.Second)) + defer cli.Close() + + var leaderSwitched atomic.Bool + cli.GetServiceDiscovery().AddServingURLSwitchedCallback(func() { + leaderSwitched.Store(true) + }) + err = cluster.GetServer(leader).ResignLeader() + re.NoError(err) + newLeader := cluster.WaitLeader() + re.NotEmpty(newLeader) + re.NotEqual(leader, newLeader) + leader = cluster.WaitLeader() + re.NotEmpty(leader) + err = cli.GetServiceDiscovery().CheckMemberChanged() + re.NoError(err) + + testutil.Eventually(re, leaderSwitched.Load) + // The leader stream must be updated after the leader switch is sensed by the client. + _, _, err = cli.GetTS(context.TODO()) + re.NoError(err) +} + func TestTSOAllocatorLeader(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background())