Skip to content

Commit

Permalink
Restore removePeer method
Browse files Browse the repository at this point in the history
  • Loading branch information
neilalexander committed Oct 22, 2023
1 parent 80e56ea commit 73c6c25
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 40 deletions.
11 changes: 10 additions & 1 deletion src/admin/removepeer.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package admin

import (
"fmt"
"net/url"
)

type RemovePeerRequest struct {
Uri string `json:"uri"`
Sintf string `json:"interface,omitempty"`
Expand All @@ -8,5 +13,9 @@ type RemovePeerRequest struct {
type RemovePeerResponse struct{}

func (a *AdminSocket) removePeerHandler(req *RemovePeerRequest, res *RemovePeerResponse) error {
return a.core.RemovePeer(req.Uri, req.Sintf)
u, err := url.Parse(req.Uri)
if err != nil {
return fmt.Errorf("unable to parse peering URI: %w", err)
}
return a.core.RemovePeer(u, req.Sintf)
}
25 changes: 2 additions & 23 deletions src/core/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package core
import (
"crypto/ed25519"
"encoding/json"
"fmt"
"net"
"net/url"
"sync/atomic"
Expand Down Expand Up @@ -192,28 +191,8 @@ func (c *Core) AddPeer(u *url.URL, sintf string) error {

// RemovePeer removes a peer. The peer should be specified in URI format, see AddPeer.
// The peer is not disconnected immediately.
func (c *Core) RemovePeer(uri string, sourceInterface string) error {
return fmt.Errorf("not implemented yet")
/*
var err error
phony.Block(c, func() {
peer := Peer{uri, sourceInterface}
linkInfo, ok := c.config._peers[peer]
if !ok {
err = fmt.Errorf("peer not configured")
return
}
if ok && linkInfo != nil {
c.links.Act(nil, func() {
if link := c.links._links[*linkInfo]; link != nil {
_ = link.conn.Close()
}
})
}
delete(c.config._peers, peer)
})
return err
*/
func (c *Core) RemovePeer(u *url.URL, sintf string) error {
return c.links.remove(u, sintf, linkTypePersistent)
}

// CallPeer calls a peer once. This should be specified in the peer URI format,
Expand Down
70 changes: 54 additions & 16 deletions src/core/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ type linkInfo struct {

// link tracks the state of a connection, either persistent or non-persistent
type link struct {
kick chan struct{} // Attempt to reconnect now, if backing off
linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral
linkProto string // Protocol carrier of link, e.g. TCP, AWDL
ctx context.Context // Connection context
cancel context.CancelFunc // Stop future redial attempts (when peer removed)
kick chan struct{} // Attempt to reconnect now, if backing off
linkType linkType // Type of link, i.e. outbound/inbound, persistent/ephemeral
linkProto string // Protocol carrier of link, e.g. TCP, AWDL
// The remaining fields can only be modified safely from within the links actor
_conn *linkConn // Connected link, if any, nil if not connected
_err error // Last error on the connection, if any
Expand Down Expand Up @@ -129,6 +131,7 @@ type linkError string
func (e linkError) Error() string { return string(e) }

const ErrLinkAlreadyConfigured = linkError("peer is already configured")
const ErrLinkNotConfigured = linkError("peer is not configured")
const ErrLinkPriorityInvalid = linkError("priority value is invalid")
const ErrLinkPinnedKeyInvalid = linkError("pinned public key is invalid")
const ErrLinkPasswordInvalid = linkError("password is invalid")
Expand Down Expand Up @@ -199,6 +202,7 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
linkProto: strings.ToUpper(u.Scheme),
kick: make(chan struct{}),
}
state.ctx, state.cancel = context.WithCancel(l.core.ctx)

// Store the state of the link so that it can be queried later.
l._links[info] = state
Expand All @@ -218,12 +222,14 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
backoff++
duration := time.Second * time.Duration(math.Exp2(float64(backoff)))
select {
case <-time.After(duration):
return true
case <-state.kick:
return true
case <-state.ctx.Done():
return false
case <-l.core.ctx.Done():
return false
case <-time.After(duration):
return true
}
}

Expand All @@ -232,19 +238,25 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
// then the loop will run endlessly, using backoffs as needed.
// Otherwise the loop will end, cleaning up the link entry.
go func() {
defer func() {
phony.Block(l, func() {
if l._links[info] == state {
delete(l._links, info)
}
})
}()
defer phony.Block(l, func() {
if l._links[info] == state {
delete(l._links, info)
}
})

// This loop will run each and every time we want to attempt
// a connection to this peer.
// TODO get rid of this loop, this is *exactly* what time.AfterFunc is for, we should just send a signal to the links actor to kick off a goroutine as needed
for {
conn, err := l.connect(u, info, options)
select {
case <-state.ctx.Done():
// The peering context has been cancelled, so don't try
// to dial again.
return
default:
}

conn, err := l.connect(state.ctx, u, info, options)
if err != nil {
if linkType == linkTypePersistent {
// If the link is a persistent configured peering,
Expand Down Expand Up @@ -319,13 +331,39 @@ func (l *links) add(u *url.URL, sintf string, linkType linkType) error {
}
return
}
break
}
}()
})
return retErr
}

func (l *links) remove(u *url.URL, sintf string, linkType linkType) error {
var retErr error
phony.Block(l, func() {
// Generate the link info and see whether we think we already
// have an open peering to this peer.
lu := urlForLinkInfo(*u)
info := linkInfo{
uri: lu.String(),
sintf: sintf,
}

// If this peer is already configured then we will close the
// connection and stop it from retrying.
state, ok := l._links[info]
if ok && state != nil {
state.cancel()
if conn := state._conn; conn != nil {
retErr = conn.Close()
}
return
}

retErr = ErrLinkNotConfigured
})
return retErr
}

func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
ctx, cancel := context.WithCancel(l.core.ctx)
var protocol linkProtocol
Expand Down Expand Up @@ -453,7 +491,7 @@ func (l *links) listen(u *url.URL, sintf string) (*Listener, error) {
return li, nil
}

func (l *links) connect(u *url.URL, info linkInfo, options linkOptions) (net.Conn, error) {
func (l *links) connect(ctx context.Context, u *url.URL, info linkInfo, options linkOptions) (net.Conn, error) {
var dialer linkProtocol
switch strings.ToLower(u.Scheme) {
case "tcp":
Expand Down Expand Up @@ -485,7 +523,7 @@ func (l *links) connect(u *url.URL, info linkInfo, options linkOptions) (net.Con
default:
return nil, ErrLinkUnrecognisedSchema
}
return dialer.dial(l.core.ctx, u, info, options)
return dialer.dial(ctx, u, info, options)
}

func (l *links) handler(linkType linkType, options linkOptions, conn net.Conn) error {
Expand Down

0 comments on commit 73c6c25

Please sign in to comment.