Skip to content

Commit

Permalink
pan: fix pathPool query path procedure + close underlay UDPConn if QU…
Browse files Browse the repository at this point in the history
…IC session was not established successfully (#259)

* fix pathPool query path procedure

* refactor

* pass
  • Loading branch information
JordiSubira authored Oct 7, 2024
1 parent 6c98a8d commit 3f7a7dd
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 39 deletions.
33 changes: 18 additions & 15 deletions pkg/pan/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,20 @@ func (p *pathPool) unsubscribe(dstIA IA, s pathPoolSubscriber) {
}

// paths returns paths to dstIA. This _may_ query paths, unless they have recently been queried.
func (p *pathPool) paths(ctx context.Context, dstIA IA) ([]*Path, error) {
func (p *pathPool) paths(ctx context.Context, dstIA IA) ([]*Path, bool, error) {
p.entriesMutex.RLock()
if entry, ok := p.entries[dstIA]; ok {
if time.Since(entry.lastQuery) > pathRefreshMinInterval {
defer p.entriesMutex.RUnlock()
return append([]*Path{}, entry.paths...), nil
}
}
entry, ok := p.entries[dstIA]
p.entriesMutex.RUnlock()
return p.queryPaths(ctx, dstIA)

if ok && !shouldQuery(time.Now(), entry.earliestExpiry, entry.lastQuery) {
return append([]*Path{}, entry.paths...), false, nil
}

paths, err := p.queryPaths(ctx, dstIA)
if err != nil {
return nil, false, err
}
return paths, true, nil
}

// queryPaths returns paths to dstIA. Unconditionally requests paths from sciond.
Expand All @@ -100,13 +104,6 @@ func (p *pathPool) cachedPaths(dst IA) []*Path {
return append([]*Path{}, p.entries[dst].paths...)
}

func (p *pathPool) entry(dstIA IA) (pathPoolDst, bool) {
p.entriesMutex.RLock()
defer p.entriesMutex.RUnlock()
e, ok := p.entries[dstIA]
return e, ok
}

func (e *pathPoolDst) update(paths []*Path) {
now := time.Now()
expiryDropTime := now.Add(-pathPruneLeadTime)
Expand Down Expand Up @@ -151,3 +148,9 @@ func earliestPathExpiry(paths []*Path) time.Time {
}
return ret
}

func shouldQuery(now, expiry, lastQuery time.Time) bool {
earliestAllowedRefresh := lastQuery.Add(pathRefreshMinInterval)
timeForRefresh := expiry.Add(-pathRefreshLeadTime)
return now.After(earliestAllowedRefresh) && now.After(timeForRefresh)
}
2 changes: 2 additions & 0 deletions pkg/pan/quic_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func DialQUIC(ctx context.Context,

session, err := quic.Dial(ctx, pconn, remote, tlsConf, quicConf)
if err != nil {
// Close the underlying connection if the QUIC session could not be established.
pconn.Close()
return nil, err
}
return &QUICSession{session, conn}, nil
Expand Down
38 changes: 14 additions & 24 deletions pkg/pan/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func makeRefresher(pool *pathPool) refresher {
// subscribe for paths to dst.
func (r *refresher) subscribe(ctx context.Context, dst IA, s refreshee) ([]*Path, error) {
// BUG: oops, this will not inform subscribers of updated paths. Need to explicily check here
paths, err := r.pool.paths(ctx, dst)
paths, _, err := r.pool.paths(ctx, dst)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -105,7 +105,6 @@ func (r *refresher) run() {
}

func (r *refresher) refresh() {
now := time.Now()
// when a refresh is triggered, we batch all
r.subscribersMutex.Lock()
refreshIAs := make([]IA, 0, len(r.subscribers))
Expand All @@ -115,32 +114,23 @@ func (r *refresher) refresh() {
r.subscribersMutex.Unlock()

for _, dstIA := range refreshIAs {
poolEntry, _ := r.pool.entry(dstIA)
if r.shouldRefresh(now, poolEntry.earliestExpiry, poolEntry.lastQuery) {
paths, err := r.pool.queryPaths(context.Background(), dstIA)
if err != nil {
// ignore errors here. The idea is that there is probably a lot of time
// until this manifests as an actual problem to the application (i.e.
// when the paths actually expire).
// TODO: check whether there are errors that could be handled, like try to reconnect
// to sciond or something like that.
continue
}
r.subscribersMutex.Lock()
for _, subscriber := range r.subscribers[dstIA] {
subscriber.refresh(dstIA, paths)
}
r.subscribersMutex.Unlock()
paths, areFresh, err := r.pool.paths(context.Background(), dstIA)
if err != nil || !areFresh {
// ignore errors here. The idea is that there is probably a lot of time
// until this manifests as an actual problem to the application (i.e.
// when the paths actually expire).
// TODO: check whether there are errors that could be handled, like try to reconnect
// to sciond or something like that.
continue
}
r.subscribersMutex.Lock()
for _, subscriber := range r.subscribers[dstIA] {
subscriber.refresh(dstIA, paths)
}
r.subscribersMutex.Unlock()
}
}

func (r *refresher) shouldRefresh(now, expiry, lastQuery time.Time) bool {
earliestAllowedRefresh := lastQuery.Add(pathRefreshMinInterval)
timeForRefresh := expiry.Add(-pathRefreshLeadTime)
return now.After(earliestAllowedRefresh) && now.After(timeForRefresh)
}

func (r *refresher) untilNextRefresh(prevRefresh time.Time) time.Duration {
return time.Until(r.nextRefresh(prevRefresh))
}
Expand Down

0 comments on commit 3f7a7dd

Please sign in to comment.