From f0c84e49b4aea4104ff4151b2abdc9712a6b2db7 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Wed, 16 Oct 2024 17:45:26 +0800 Subject: [PATCH] client: reuse `initRetry` function (#8707) ref tikv/pd#4399 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 39 +++++++-------------------------- client/pd_service_discovery.go | 32 +++++++++++++-------------- client/tso_service_discovery.go | 8 +++---- 3 files changed, 28 insertions(+), 51 deletions(-) diff --git a/client/client.go b/client/client.go index 27952df13cd..f8c8d32cee8 100644 --- a/client/client.go +++ b/client/client.go @@ -442,6 +442,7 @@ func newClientWithKeyspaceName( } clientCtx, clientCancel := context.WithCancel(ctx) c := &client{ + keyspaceID: nullKeyspaceID, updateTokenConnectionCh: make(chan struct{}, 1), ctx: clientCtx, cancel: clientCancel, @@ -455,10 +456,12 @@ func newClientWithKeyspaceName( opt(c) } - updateKeyspaceIDCb := func() error { - if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil { + updateKeyspaceIDFunc := func() error { + keyspaceMeta, err := c.LoadKeyspace(clientCtx, keyspaceName) + if err != nil { return err } + c.keyspaceID = keyspaceMeta.GetId() // c.keyspaceID is the source of truth for keyspace id. c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID) return nil @@ -466,8 +469,8 @@ func newClientWithKeyspaceName( // Create a PD service discovery with null keyspace id, then query the real id with the keyspace name, // finally update the keyspace id to the PD service discovery for the following interactions. - c.pdSvcDiscovery = newPDServiceDiscovery( - clientCtx, clientCancel, &c.wg, c.setServiceMode, updateKeyspaceIDCb, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option) + c.pdSvcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, + c.setServiceMode, updateKeyspaceIDFunc, nullKeyspaceID, c.svrUrls, c.tlsCfg, c.option) if err := c.setup(); err != nil { c.cancel() if c.pdSvcDiscovery != nil { @@ -482,32 +485,6 @@ func newClientWithKeyspaceName( return c, nil } -func (c *client) initRetry(f func(s string) error, str string) error { - var err error - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for i := 0; i < c.option.maxRetryTimes; i++ { - if err = f(str); err == nil { - return nil - } - select { - case <-c.ctx.Done(): - return err - case <-ticker.C: - } - } - return errors.WithStack(err) -} - -func (c *client) loadKeyspaceMeta(keyspace string) error { - keyspaceMeta, err := c.LoadKeyspace(context.TODO(), keyspace) - if err != nil { - return err - } - c.keyspaceID = keyspaceMeta.GetId() - return nil -} - func (c *client) setup() error { // Init the metrics. if c.option.initMetrics { @@ -579,7 +556,7 @@ func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) { case pdpb.ServiceMode_API_SVC_MODE: newTSOSvcDiscovery = newTSOServiceDiscovery( c.ctx, MetaStorageClient(c), c.pdSvcDiscovery, - c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option) + c.keyspaceID, c.tlsCfg, c.option) // At this point, the keyspace group isn't known yet. Starts from the default keyspace group, // and will be updated later. newTSOCli = newTSOClient(c.ctx, c.option, diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index c34a5bebac6..f42ae7fea4a 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -440,9 +440,9 @@ type pdServiceDiscovery struct { cancel context.CancelFunc closeOnce sync.Once - updateKeyspaceIDCb updateKeyspaceIDFunc - keyspaceID uint32 - tlsCfg *tls.Config + updateKeyspaceIDFunc updateKeyspaceIDFunc + keyspaceID uint32 + tlsCfg *tls.Config // Client option. option *option } @@ -461,21 +461,21 @@ func newPDServiceDiscovery( ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, serviceModeUpdateCb func(pdpb.ServiceMode), - updateKeyspaceIDCb updateKeyspaceIDFunc, + updateKeyspaceIDFunc updateKeyspaceIDFunc, keyspaceID uint32, urls []string, tlsCfg *tls.Config, option *option, ) *pdServiceDiscovery { pdsd := &pdServiceDiscovery{ - checkMembershipCh: make(chan struct{}, 1), - ctx: ctx, - cancel: cancel, - wg: wg, - apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)}, - serviceModeUpdateCb: serviceModeUpdateCb, - updateKeyspaceIDCb: updateKeyspaceIDCb, - keyspaceID: keyspaceID, - tlsCfg: tlsCfg, - option: option, + checkMembershipCh: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, + wg: wg, + apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn), newPDServiceBalancer(regionAPIErrorFn)}, + serviceModeUpdateCb: serviceModeUpdateCb, + updateKeyspaceIDFunc: updateKeyspaceIDFunc, + keyspaceID: keyspaceID, + tlsCfg: tlsCfg, + option: option, } urls = addrsToURLs(urls, tlsCfg) pdsd.urls.Store(urls) @@ -500,8 +500,8 @@ func (c *pdServiceDiscovery) Init() error { // We need to update the keyspace ID before we discover and update the service mode // so that TSO in API mode can be initialized with the correct keyspace ID. - if c.updateKeyspaceIDCb != nil { - if err := c.updateKeyspaceIDCb(); err != nil { + if c.keyspaceID == nullKeyspaceID && c.updateKeyspaceIDFunc != nil { + if err := c.initRetry(c.updateKeyspaceIDFunc); err != nil { return err } } diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 443d455e911..617b709ca76 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -158,7 +158,7 @@ type tsoServiceDiscovery struct { // newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service. func newTSOServiceDiscovery( ctx context.Context, metacli MetaStorageClient, apiSvcDiscovery ServiceDiscovery, - clusterID uint64, keyspaceID uint32, tlsCfg *tls.Config, option *option, + keyspaceID uint32, tlsCfg *tls.Config, option *option, ) ServiceDiscovery { ctx, cancel := context.WithCancel(ctx) c := &tsoServiceDiscovery{ @@ -166,7 +166,7 @@ func newTSOServiceDiscovery( cancel: cancel, metacli: metacli, apiSvcDiscovery: apiSvcDiscovery, - clusterID: clusterID, + clusterID: apiSvcDiscovery.GetClusterID(), tlsCfg: tlsCfg, option: option, checkMembershipCh: make(chan struct{}, 1), @@ -180,10 +180,10 @@ func newTSOServiceDiscovery( c.tsoServerDiscovery = &tsoServerDiscovery{urls: make([]string, 0)} // Start with the default keyspace group. The actual keyspace group, to which the keyspace belongs, // will be discovered later. - c.defaultDiscoveryKey = fmt.Sprintf(tsoSvcDiscoveryFormat, clusterID, defaultKeySpaceGroupID) + c.defaultDiscoveryKey = fmt.Sprintf(tsoSvcDiscoveryFormat, c.clusterID, defaultKeySpaceGroupID) log.Info("created tso service discovery", - zap.Uint64("cluster-id", clusterID), + zap.Uint64("cluster-id", c.clusterID), zap.Uint32("keyspace-id", keyspaceID), zap.String("default-discovery-key", c.defaultDiscoveryKey))