Skip to content

Commit

Permalink
fix: handle timeouts on etcd client by using context approriately; fi…
Browse files Browse the repository at this point in the history
…xes panic caused by prematurely closing channel before Revoke returns
  • Loading branch information
gibertoni committed Jul 8, 2024
1 parent 68da1bc commit 89e6d09
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions cluster/etcd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import (
"strings"
"sync"
"time"

"github.com/topfreegames/pitaya/v2/config"
"github.com/topfreegames/pitaya/v2/constants"
"github.com/topfreegames/pitaya/v2/logger"
"github.com/topfreegames/pitaya/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
logutil "go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/namespace"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -81,14 +82,14 @@ func NewEtcdServiceDiscovery(
client = cli[0]
}
sd := &etcdServiceDiscovery{
running: false,
server: server,
serverMapByType: make(map[string]map[string]*Server),
listeners: make([]SDListener, 0),
stopChan: make(chan bool),
stopLeaseChan: make(chan bool),
appDieChan: appDieChan,
cli: client,
running: false,
server: server,
serverMapByType: make(map[string]map[string]*Server),
listeners: make([]SDListener, 0),
stopChan: make(chan bool),
stopLeaseChan: make(chan bool),
appDieChan: appDieChan,
cli: client,
syncServersRunning: make(chan bool),
}

Expand Down Expand Up @@ -300,7 +301,7 @@ func (sd *etcdServiceDiscovery) GetServersByType(serverType string) (map[string]
// Create a new map to avoid concurrent read and write access to the
// map, this also prevents accidental changes to the list of servers
// kept by the service discovery.
ret := make(map[string]*Server,len(sd.serverMapByType[serverType]))
ret := make(map[string]*Server, len(sd.serverMapByType[serverType]))
for k, v := range sd.serverMapByType[serverType] {
ret[k] = v
}
Expand Down Expand Up @@ -615,16 +616,15 @@ func (sd *etcdServiceDiscovery) revoke() error {
go func() {
defer close(c)
logger.Log.Debug("waiting for etcd revoke")
_, err := sd.cli.Revoke(context.TODO(), sd.leaseID)
ctx, cancel := context.WithTimeout(context.Background(), sd.revokeTimeout)
_, err := sd.cli.Revoke(ctx, sd.leaseID)
cancel()
c <- err
logger.Log.Debug("finished waiting for etcd revoke")
}()
select {
case err := <-c:
return err // completed normally
case <-time.After(sd.revokeTimeout):
logger.Log.Warn("timed out waiting for etcd revoke")
return nil // timed out
}
}

Expand Down

0 comments on commit 89e6d09

Please sign in to comment.