Skip to content

Commit

Permalink
etcdutil: remove stale client endpoints for healthyChecker (#7227)
Browse files Browse the repository at this point in the history
close #7226

remove stale client endpoints for `healthyChecker`

Signed-off-by: iosmanthus <[email protected]>

Co-authored-by: lhy1024 <[email protected]>
  • Loading branch information
iosmanthus and lhy1024 authored Nov 16, 2023
1 parent 9555784 commit a6800a9
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,18 @@ func (checker *healthyChecker) patrol(ctx context.Context) []string {
}

func (checker *healthyChecker) update(eps []string) {
epMap := make(map[string]struct{})
for _, ep := range eps {
epMap[ep] = struct{}{}
}

for ep := range epMap {
// check if client exists, if not, create one, if exists, check if it's offline or disconnected.
if client, ok := checker.Load(ep); ok {
lastHealthy := client.(*healthyClient).lastHealth
if time.Since(lastHealthy) > etcdServerOfflineTimeout {
log.Info("some etcd server maybe offline", zap.String("endpoint", ep))
checker.Delete(ep)
checker.removeClient(ep)
}
if time.Since(lastHealthy) > etcdServerDisconnectedTimeout {
// try to reset client endpoint to trigger reconnect
Expand All @@ -399,6 +404,16 @@ func (checker *healthyChecker) update(eps []string) {
}
checker.addClient(ep, time.Now())
}

// check if there are some stale clients, if exists, remove them.
checker.Range(func(key, value interface{}) bool {
ep := key.(string)
if _, ok := epMap[ep]; !ok {
log.Info("remove stale etcd client", zap.String("endpoint", ep))
checker.removeClient(ep)
}
return true
})
}

func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) {
Expand All @@ -413,6 +428,15 @@ func (checker *healthyChecker) addClient(ep string, lastHealth time.Time) {
})
}

func (checker *healthyChecker) removeClient(ep string) {
if client, ok := checker.LoadAndDelete(ep); ok {
err := client.(*healthyClient).Close()
if err != nil {
log.Error("failed to close etcd healthy client", zap.Error(err))
}
}
}

func syncUrls(client *clientv3.Client) []string {
// See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170-L183
ctx, cancel := context.WithTimeout(clientv3.WithRequireLeader(client.Ctx()), DefaultRequestTimeout)
Expand Down

0 comments on commit a6800a9

Please sign in to comment.