Skip to content

Commit

Permalink
Use the persistent tracker as the connector [#3446]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 9, 2023
1 parent 99d97b9 commit a18f883
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 60 deletions.
26 changes: 22 additions & 4 deletions cmd/accumulated-http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p/dial"
"gitlab.com/accumulatenetwork/accumulate/protocol"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/exp/slog"
Expand Down Expand Up @@ -118,10 +119,27 @@ func run(_ *cobra.Command, args []string) {

fmt.Printf("We are %v\n", node.ID())

fmt.Println("Waiting for a live network service")
svcAddr, err := api.ServiceTypeNetwork.AddressFor(protocol.Directory).MultiaddrFor(args[0])
Check(err)
Check(node.WaitForService(ctx, svcAddr))
dirNetSvc := api.ServiceTypeNetwork.AddressFor(protocol.Directory)
tr, ok := node.Tracker().(*dial.PersistentTracker)
if !ok {
fmt.Println("Waiting for a live network service")
svcAddr, err := dirNetSvc.MultiaddrFor(args[0])
Check(err)
Check(node.WaitForService(ctx, svcAddr))

} else {
var found bool
for _, peer := range tr.DB().Peers() {
if peer.Network(args[0]).Service(dirNetSvc).Last.Success != nil {
found = true
}
}

if !found {
fmt.Println("Scanning for peers")
tr.ScanPeers(5 * time.Minute)
}
}

fmt.Println("Fetching routing information")
router := new(routing.MessageRouter)
Expand Down
27 changes: 15 additions & 12 deletions pkg/api/v3/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,18 +321,8 @@ func UnpackAddress(addr multiaddr.Multiaddr) (string, peer.ID, *ServiceAddress,
return true
})

// The address must contain a /acc-svc component and must not contain any
// unexpected components
if bad || cService == nil {
return "", "", nil, nil, errors.BadRequest.WithFormat("invalid address %v", addr)
}

// Parse the /acc-svc component
sa := new(ServiceAddress)
err := sa.UnmarshalBinary(cService.RawValue())
if err != nil {
return "", "", nil, nil, errors.BadRequest.WithCauseAndFormat(err, "invalid address %v", addr)
} else if sa.Type == ServiceTypeUnknown {
// The address must not contain any unexpected components
if bad {
return "", "", nil, nil, errors.BadRequest.WithFormat("invalid address %v", addr)
}

Expand All @@ -346,5 +336,18 @@ func UnpackAddress(addr multiaddr.Multiaddr) (string, peer.ID, *ServiceAddress,
net = string(cNetwork.RawValue())
}

if cService == nil {
return net, peerID, nil, netAddr, nil
}

// Parse the /acc-svc component
sa := new(ServiceAddress)
err := sa.UnmarshalBinary(cService.RawValue())
if err != nil {
return "", "", nil, nil, errors.BadRequest.WithCauseAndFormat(err, "invalid address %v", addr)
} else if sa.Type == ServiceTypeUnknown {
return "", "", nil, nil, errors.BadRequest.WithFormat("invalid address %v", addr)
}

return net, peerID, sa, netAddr, nil
}
9 changes: 7 additions & 2 deletions pkg/api/v3/p2p/dial/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func (d *dialer) Dial(ctx context.Context, addr multiaddr.Multiaddr) (stream mes
if err != nil {
return nil, errors.UnknownError.Wrap(err)
}
if sa == nil {
return nil, errors.BadRequest.WithFormat("invalid address %v", addr)
}

if ip != nil && peer == "" {
return nil, errors.BadRequest.WithFormat("cannot specify address without peer ID")
Expand Down Expand Up @@ -119,7 +122,7 @@ func (d *dialer) newNetworkStream(ctx context.Context, service *api.ServiceAddre
return nil, errors.UnknownError.Wrap(err)
}

// Query the DHT for peers that provide the service
// Discover peers that provide the service
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
resp, err := d.peers.Discover(callCtx, &DiscoveryRequest{
Expand Down Expand Up @@ -290,12 +293,14 @@ func (d *dialer) dial(ctx context.Context, peer peer.ID, service *api.ServiceAdd
// Context was canceled, don't mark the peer

case errors.Is(err, network.ErrNoConn),
errors.Is(err, network.ErrNoRemoteAddrs):
errors.Is(err, network.ErrNoRemoteAddrs),
errors.Is(err, swarm.ErrNoAddresses):
// Mark the peer as dead
d.tracker.Mark(peer, addr, api.PeerStatusIsUnknown)
slog.InfoCtx(ctx, "Unable to dial peer", "peer", peer, "service", service, "error", err)

case errors.Is(err, swarm.ErrDialBackoff),
errors.Is(err, swarm.ErrNoGoodAddresses),
errors.As(err, &timeoutError) && timeoutError.Timeout():
// Mark the peer bad
d.tracker.Mark(peer, addr, api.PeerStatusIsKnownBad)
Expand Down
152 changes: 134 additions & 18 deletions pkg/api/v3/p2p/dial/tracker_persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package dial

import (
"context"
"net"
"os"
"sort"
"sync"
Expand Down Expand Up @@ -86,16 +87,18 @@ func NewPersistentTracker(ctx context.Context, opts PersistentTrackerOptions) (*

// Launch async jobs
t.runJob(t.writeDb, opts.PersistFrequency, defaultPersistFrequency, false)

if opts.Network == "" {
slog.Info("Scanning disabled, network unspecified")
} else {
t.runJob(t.scanPeers, opts.ScanFrequency, defaultScanFrequency, true)
}
t.runJob(t.scanPeers, opts.ScanFrequency, defaultScanFrequency, true)

return t, nil
}

func (t *PersistentTracker) DB() *peerdb.DB { return t.db }

func (t *PersistentTracker) ScanPeers(duration time.Duration) {
t.scanPeers(duration)
t.writeDb(0)
}

func (t *PersistentTracker) Stop() {
t.cancel()
t.stopwg.Wait()
Expand Down Expand Up @@ -178,13 +181,41 @@ func (t *PersistentTracker) scanPeers(duration time.Duration) {
}

func (t *PersistentTracker) scanPeer(ctx context.Context, peer peer.AddrInfo) {
slog.InfoCtx(t.context, "Scanning peer", "id", peer.ID)
slog.DebugCtx(ctx, "Scanning peer", "id", peer.ID)

// TODO Check addresses

ctx, cancel := context.WithCancel(ctx)
defer cancel()

t.scanPeerAddresses(ctx, peer)
t.scanPeerServices(ctx, peer)
}

func (t *PersistentTracker) scanPeerAddresses(ctx context.Context, peer peer.AddrInfo) {
for _, addr := range peer.Addrs {
// Ignore private IPs
if s, err := addr.ValueForProtocol(multiaddr.P_IP4); err == nil {
if ip := net.ParseIP(s); ip != nil && isPrivateIP(ip) {
continue
}
}

t.db.Peer(peer.ID).Address(addr).Last.DidAttempt()
_, err := t.host.Connect(ctx, &ConnectionRequest{
Service: api.ServiceTypeNode.Address(),
PeerID: peer.ID,
PeerAddr: addr,
})
if err != nil {
slog.InfoCtx(ctx, "Unable to connect to peer", "peer", peer.ID, "error", err, "address", addr)
continue
}
t.db.Peer(peer.ID).Address(addr).Last.DidSucceed()
}
}

func (t *PersistentTracker) scanPeerServices(ctx context.Context, peer peer.AddrInfo) {
creq := &ConnectionRequest{
Service: api.ServiceTypeNode.Address(),
PeerID: peer.ID,
Expand All @@ -196,40 +227,40 @@ func (t *PersistentTracker) scanPeer(ctx context.Context, peer peer.AddrInfo) {
t.db.Peer(peer.ID).Network(t.network).Service(creq.Service).Last.DidAttempt()
s, err := t.host.Connect(ctx, creq)
if err != nil {
slog.Info("Unable to connect to peer", "peer", peer.ID, "error", err)
slog.InfoCtx(ctx, "Unable to connect to peer", "peer", peer.ID, "error", err)
return
}
t.db.Peer(peer.ID).Network(t.network).Service(creq.Service).Last.DidSucceed()

err = s.Write(&message.NodeInfoRequest{})
if err != nil {
slog.Info("Failed to request node info", "peer", peer.ID, "error", err)
slog.InfoCtx(ctx, "Failed to request node info", "peer", peer.ID, "error", err)
return
}
res, err := s.Read()
if err != nil {
slog.Info("Failed to request node info", "peer", peer.ID, "error", err)
slog.InfoCtx(ctx, "Failed to request node info", "peer", peer.ID, "error", err)
return
}
var ni *message.NodeInfoResponse
switch res := res.(type) {
case *message.ErrorResponse:
slog.Info("Failed to request node info", "peer", peer.ID, "error", res.Error)
slog.InfoCtx(ctx, "Failed to request node info", "peer", peer.ID, "error", res.Error)
return
case *message.NodeInfoResponse:
ni = res
default:
slog.Info("Invalid node info response", "peer", peer.ID, "want", message.TypeNodeInfoResponse, "got", res.Type())
slog.InfoCtx(ctx, "Invalid node info response", "peer", peer.ID, "want", message.TypeNodeInfoResponse, "got", res.Type())
return
}

for _, svc := range ni.Value.Services {
slog.InfoCtx(t.context, "Attempting to conenct to service", "id", peer.ID, "service", svc)
slog.DebugCtx(ctx, "Attempting to conenct to service", "id", peer.ID, "service", svc)

t.db.Peer(peer.ID).Network(t.network).Service(svc).Last.DidAttempt()
_, err := t.host.Connect(ctx, &ConnectionRequest{Service: svc, PeerID: peer.ID})
if err != nil {
slog.Info("Unable to connect to peer", "peer", peer.ID, "error", err)
slog.InfoCtx(ctx, "Unable to connect to peer", "peer", peer.ID, "error", err)
return
}
t.db.Peer(peer.ID).Network(t.network).Service(svc).Last.DidSucceed()
Expand All @@ -241,6 +272,9 @@ func (t *PersistentTracker) Mark(peer peer.ID, addr multiaddr.Multiaddr, status
if err != nil {
panic(err)
}
if service == nil {
return // Cannot mark if there's no service
}

switch status {
case api.PeerStatusIsKnownGood:
Expand All @@ -266,6 +300,10 @@ func (t *PersistentTracker) Status(peer peer.ID, addr multiaddr.Multiaddr) api.K
if err != nil {
panic(err)
}
if service == nil {
// If there's no service, the status is unknown
return api.PeerStatusIsUnknown
}

s := t.db.Peer(peer).Network(netName).Service(service)
return t.statusForLast(s.Last)
Expand Down Expand Up @@ -326,6 +364,10 @@ func (t *PersistentTracker) Next(addr multiaddr.Multiaddr, status api.KnownPeerS
if err != nil {
panic(err)
}
if service == nil {
// Don't answer if the service is unspecified
return "", false
}

// Get all the candidates with the given status
var candidates []*peerdb.PeerStatus
Expand Down Expand Up @@ -359,7 +401,6 @@ func (t *PersistentTracker) Next(addr multiaddr.Multiaddr, status api.KnownPeerS
})
}

candidates[0].Network(netName).Service(service).Last.DidAttempt()
return candidates[0].ID, true
}

Expand All @@ -374,10 +415,85 @@ func (t *PersistentTracker) All(addr multiaddr.Multiaddr, status api.KnownPeerSt

var peers []peer.ID
for _, p := range t.db.Peers() {
s := p.Network(netName).Service(service)
if t.statusForLast(s.Last) == status {
peers = append(peers, p.ID)
n := p.Network(netName)
if service != nil {
s := n.Service(service)
if t.statusForLast(s.Last) == status {
peers = append(peers, p.ID)
}
continue
}

for _, s := range n.Services.Load() {
if t.statusForLast(s.Last) == status {
peers = append(peers, p.ID)
break
}
}
}
return peers
}

func (t *PersistentTracker) Discover(ctx context.Context, req *DiscoveryRequest) (DiscoveryResponse, error) {
if req.Network == "" {
req.Network = t.network
}
if req.Service == nil {
return nil, errors.BadRequest.With("missing service")
}

ch := make(chan peer.AddrInfo)
go func() {
defer close(ch)

for _, p := range t.db.Peers() {
if p.Network(req.Network).Service(req.Service).Last.SinceSuccess() > t.successThreshold {
continue
}

info := peer.AddrInfo{ID: p.ID}
for _, a := range p.Addresses.Load() {
if a.Last.SinceSuccess() < t.successThreshold {
info.Addrs = append(info.Addrs, a.Address)
}
}
ch <- info
}
}()

return DiscoveredPeers(ch), nil
}

func (t *PersistentTracker) Connect(ctx context.Context, req *ConnectionRequest) (message.Stream, error) {
if req.Service == nil {
return nil, errors.BadRequest.With("missing service")
}
if req.PeerID == "" {
return nil, errors.BadRequest.With("missing peer")
}

// Try to provide a good address
peer := t.db.Peer(req.PeerID)
if req.PeerAddr == nil {
for _, addr := range peer.Addresses.Load() {
if t.statusForLast(addr.Last) == api.PeerStatusIsKnownGood {
req.PeerAddr = addr.Address
break
}
}
}

peer.Network(t.network).Service(req.Service).Last.DidAttempt()
if req.PeerAddr != nil {
peer.Address(req.PeerAddr).Last.DidAttempt()
}
s, err := t.host.Connect(ctx, req)
if err != nil {
return nil, err
}
peer.Network(t.network).Service(req.Service).Last.DidSucceed()
if req.PeerAddr != nil {
peer.Address(req.PeerAddr).Last.DidSucceed()
}
return s, nil
}
Loading

0 comments on commit a18f883

Please sign in to comment.