From a6800a9dfad52f228f242fec2d335ea9bbaa170b Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Thu, 16 Nov 2023 17:42:46 +0800 Subject: [PATCH] etcdutil: remove stale client endpoints for `healthyChecker` (#7227) close tikv/pd#7226 remove stale client endpoints for `healthyChecker` Signed-off-by: iosmanthus Co-authored-by: lhy1024 --- pkg/utils/etcdutil/etcdutil.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index e004247c6d0..03c2374efc6 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -382,13 +382,18 @@ func (checker *healthyChecker) patrol(ctx context.Context) []string { } func (checker *healthyChecker) update(eps []string) { + epMap := make(map[string]struct{}) for _, ep := range eps { + epMap[ep] = struct{}{} + } + + for ep := range epMap { // check if client exists, if not, create one, if exists, check if it's offline or disconnected. if client, ok := checker.Load(ep); ok { lastHealthy := client.(*healthyClient).lastHealth if time.Since(lastHealthy) > etcdServerOfflineTimeout { log.Info("some etcd server maybe offline", zap.String("endpoint", ep)) - checker.Delete(ep) + checker.removeClient(ep) } if time.Since(lastHealthy) > etcdServerDisconnectedTimeout { // try to reset client endpoint to trigger reconnect @@ -399,6 +404,16 @@ func (checker *healthyChecker) update(eps []string) { } checker.addClient(ep, time.Now()) } + + // check if there are some stale clients, if exists, remove them. + checker.Range(func(key, value interface{}) bool { + ep := key.(string) + if _, ok := epMap[ep]; !ok { + log.Info("remove stale etcd client", zap.String("endpoint", ep)) + checker.removeClient(ep) + } + return true + }) } func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { @@ -413,6 +428,15 @@ func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) { }) } +func (checker *healthyChecker) removeClient(ep string) { + if client, ok := checker.LoadAndDelete(ep); ok { + err := client.(*healthyClient).Close() + if err != nil { + log.Error("failed to close etcd healthy client", zap.Error(err)) + } + } +} + func syncUrls(client *clientv3.Client) []string { // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170-L183 ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(client.Ctx()), DefaultRequestTimeout)