Skip to content

Commit

Permalink
Persistent peer tracker [#3444]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 9, 2023
1 parent 78f3bfe commit f00bf63
Show file tree
Hide file tree
Showing 12 changed files with 881 additions and 34 deletions.
32 changes: 18 additions & 14 deletions cmd/accumulated-http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
accumulated "gitlab.com/accumulatenetwork/accumulate/internal/node/daemon"
nodehttp "gitlab.com/accumulatenetwork/accumulate/internal/node/http"
. "gitlab.com/accumulatenetwork/accumulate/internal/util/cmd"
"gitlab.com/accumulatenetwork/accumulate/pkg/accumulate"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p"
Expand All @@ -49,18 +50,21 @@ var cmd = &cobra.Command{
}

var flag = struct {
Key string
LogLevel string
HttpListen []multiaddr.Multiaddr
P2pListen []multiaddr.Multiaddr
Peers []multiaddr.Multiaddr
Timeout time.Duration
ConnLimit int
CorsOrigins []string
LetsEncrypt []string
TlsCert string
TlsKey string
}{}
Key string
LogLevel string
HttpListen []multiaddr.Multiaddr
P2pListen []multiaddr.Multiaddr
Peers []multiaddr.Multiaddr
Timeout time.Duration
ConnLimit int
CorsOrigins []string
LetsEncrypt []string
TlsCert string
TlsKey string
PeerDatabase string
}{
Peers: accumulate.BootstrapServers,
}

func init() {
cmd.Flags().StringVar(&flag.Key, "key", "", "The node key - not required but highly recommended. The value can be a key or a file containing a key. The key must be hex, base64, or an Accumulate secret key address.")
Expand All @@ -74,9 +78,8 @@ func init() {
cmd.Flags().StringSliceVar(&flag.LetsEncrypt, "lets-encrypt", nil, "Enable HTTPS on 443 and use Let's Encrypt to retrieve a certificate. Use of this feature implies acceptance of the LetsEncrypt Terms of Service.")
cmd.Flags().StringVar(&flag.TlsCert, "tls-cert", "", "Certificate used for HTTPS")
cmd.Flags().StringVar(&flag.TlsKey, "tls-key", "", "Private key used for HTTPS")
cmd.Flags().StringVar(&flag.PeerDatabase, "peer-db", "peerdb.json", "Track peers using a persistent database")
cmd.Flags().BoolVar(&jsonrpc2.DebugMethodFunc, "debug", false, "Print out a stack trace if an API method fails")

_ = cmd.MarkFlagRequired("peer")
}

func run(_ *cobra.Command, args []string) {
Expand Down Expand Up @@ -107,6 +110,7 @@ func run(_ *cobra.Command, args []string) {
Network: args[0],
Listen: flag.P2pListen,
BootstrapPeers: flag.Peers,
PeerDatabase: flag.PeerDatabase,
EnablePeerTracker: true,
})
Check(err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/v3/p2p/dial/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ func (d *dialer) tryDial(peer peer.ID, service *api.ServiceAddress, addr multiad
}

go func() {
// Panic protection
defer func() {
if r := recover(); r != nil {
slog.Error("Panicked while handling stream", "error", r, "stack", debug.Stack(), "module", "api")
}
}()

if wg != nil {
defer wg.Done()
}
Expand Down
File renamed without changes.
248 changes: 248 additions & 0 deletions pkg/api/v3/p2p/dial/tracker_persistent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
// Copyright 2023 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package dial

import (
"context"
"os"
"sort"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p/peerdb"
"golang.org/x/exp/slog"
)

type PersistentTracker struct {
context context.Context
cancel context.CancelFunc
db *peerdb.DB
file string
host Connector
peers Discoverer
stopwg *sync.WaitGroup
}

type PersistentTrackerOptions struct {
Filename string
Host Connector
Peers Discoverer
PersistFrequency time.Duration
}

func NewPersistentTracker(ctx context.Context, opts PersistentTrackerOptions) (*PersistentTracker, error) {
t := new(PersistentTracker)
t.db = peerdb.New()
t.file = opts.Filename
t.host = opts.Host
t.peers = opts.Peers
t.stopwg = new(sync.WaitGroup)

t.context, t.cancel = context.WithCancel(ctx)

// Ensure the file can be created
f, err := os.OpenFile(opts.Filename, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
defer f.Close()

// If the file is non-empty, read it
st, err := f.Stat()
if err != nil {
return nil, err
}
if st.Size() > 0 {
err = t.db.Load(f)
if err != nil {
return nil, err
}
}

if opts.PersistFrequency == 0 {
opts.PersistFrequency = time.Hour
}
t.stopwg.Add(1)
go t.writeDb(opts.PersistFrequency)

return t, nil
}

func (t *PersistentTracker) Stop() {
t.cancel()
t.stopwg.Wait()
}

func (t *PersistentTracker) writeDb(frequency time.Duration) {
defer t.stopwg.Done()

tick := time.NewTicker(frequency)
go func() { <-t.context.Done(); tick.Stop() }()

for range tick.C {
slog.InfoCtx(t.context, "Writing peer database")

f, err := os.OpenFile(t.file, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
slog.ErrorCtx(t.context, "Failed to open peer database", "error", err)
continue
}

err = t.db.Store(f)
if err != nil {
slog.ErrorCtx(t.context, "Failed to write peer database", "error", err)
}

f.Close()
}
}

func (t *PersistentTracker) Mark(peer peer.ID, addr multiaddr.Multiaddr, status api.KnownPeerStatus) {
netName, _, service, inetAddr, err := api.UnpackAddress(addr)
if err != nil {
panic(err)
}

switch status {
case api.PeerStatusIsKnownGood:
if inetAddr != nil {
// Mark that we connected to the address
t.db.Peer(peer).Address(inetAddr).Last.DidSucceed()
}

// Mark that we connected to the service
t.db.Peer(peer).Network(netName).Service(service).Last.DidSucceed()

case api.PeerStatusIsUnknown:
// Reset the last connected time
t.db.Peer(peer).Network(netName).Service(service).Last.Success = nil

case api.PeerStatusIsKnownBad:
// Don't do anything - last attempt will be greater than last success
}
}

func (t *PersistentTracker) Status(peer peer.ID, addr multiaddr.Multiaddr) api.KnownPeerStatus {
netName, _, service, _, err := api.UnpackAddress(addr)
if err != nil {
panic(err)
}

s := t.db.Peer(peer).Network(netName).Service(service)
return statusForLast(s.Last)
}

func statusForLast(l peerdb.LastStatus) api.KnownPeerStatus {
switch {
case l.Attempt == nil:
// No connection attempted
return api.PeerStatusIsUnknown

case l.Success == nil:
// No successful connection

// Attempt was too long ago?
if attemptIsTooOld(l) {
return api.PeerStatusIsKnownBad
}

// Attempt was recent
return api.PeerStatusIsUnknown

case l.Attempt.After(*l.Success):
// Connection attempted since the last success

// Attempt was too long ago?
if attemptIsTooOld(l) {
return api.PeerStatusIsKnownBad
}

// Last success was too long ago?
return statusForLastSuccess(l)

default:
// Last attempt was successful

// Was it too long ago?
return statusForLastSuccess(l)
}
}

func attemptIsTooOld(l peerdb.LastStatus) bool {
return time.Since(*l.Attempt) > time.Second
}

func statusForLastSuccess(l peerdb.LastStatus) api.KnownPeerStatus {
// Last success was too long ago?
if time.Since(*l.Success) > 10*time.Minute {
return api.PeerStatusIsUnknown
}

// Last success was recent
return api.PeerStatusIsKnownGood
}

func (t *PersistentTracker) Next(addr multiaddr.Multiaddr, status api.KnownPeerStatus) (peer.ID, bool) {
netName, _, service, _, err := api.UnpackAddress(addr)
if err != nil {
panic(err)
}

// Get all the candidates with the given status
var candidates []*peerdb.PeerStatus
for _, p := range t.db.Peers() {
s := p.Network(netName).Service(service)
if statusForLast(s.Last) == status {
candidates = append(candidates, p)
}
}
if len(candidates) == 0 {
return "", false
}

switch status {
case api.PeerStatusIsKnownGood,
api.PeerStatusIsKnownBad:
// Pick the least recently used one
sort.Slice(candidates, func(i, j int) bool {
a := candidates[i].Network(netName).Service(service)
b := candidates[j].Network(netName).Service(service)
switch {
case a.Last.Attempt == nil || b.Last.Attempt == nil:
return false
case a.Last.Attempt == nil:
return true
case b.Last.Attempt == nil:
return false
default:
return a.Last.Attempt.Before(*b.Last.Attempt)
}
})
}

candidates[0].Network(netName).Service(service).Last.DidAttempt()
return candidates[0].ID, true
}

func (t *PersistentTracker) All(addr multiaddr.Multiaddr, status api.KnownPeerStatus) []peer.ID {
netName, _, service, _, err := api.UnpackAddress(addr)
if err != nil {
panic(err)
}

var peers []peer.ID
for _, p := range t.db.Peers() {
s := p.Network(netName).Service(service)
if statusForLast(s.Last) == status {
peers = append(peers, p.ID)
}
}
return peers
}
File renamed without changes.
10 changes: 6 additions & 4 deletions pkg/api/v3/p2p/dial_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ func (d *discoverer) Discover(ctx context.Context, req *dial.DiscoveryRequest) (
return nil, errors.BadRequest.With("no network or service specified")
}

s, ok := (*Node)(d).getOwnService(req.Network, req.Service)
if ok {
s := handleLocally(ctx, s)
return dial.DiscoveredStream{S: s}, nil
if req.Service != nil {
s, ok := (*Node)(d).getOwnService(req.Network, req.Service)
if ok {
s := handleLocally(ctx, s)
return dial.DiscoveredStream{S: s}, nil
}
}

ch, err := (*Node)(d).peermgr.getPeers(ctx, addr, req.Limit, req.Timeout)
Expand Down
30 changes: 14 additions & 16 deletions pkg/api/v3/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"crypto/ed25519"
"net"
"strings"
"time"

"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
Expand All @@ -27,21 +28,6 @@ import (
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
)

var BootstrapNodes = func() []multiaddr.Multiaddr {
p := func(s string) multiaddr.Multiaddr {
addr, err := multiaddr.NewMultiaddr(s)
if err != nil {
panic(err)
}
return addr
}

return []multiaddr.Multiaddr{
// Defi Devs bootstrap node
p("/dns/bootstrap.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWGJTh4aeF7bFnwo9sAYRujCkuVU1Cq8wNeTNGpFgZgXdg"),
}
}()

// Node implements peer-to-peer routing of API v3 messages over via binary
// message transport.
type Node struct {
Expand Down Expand Up @@ -81,6 +67,8 @@ type Options struct {
// EnablePeerTracker enables the peer tracker to reduce the impact of
// mis-configured peers. This is currently experimental.
EnablePeerTracker bool

PeerDatabase string
}

// New creates a node with the given [Options].
Expand All @@ -89,7 +77,17 @@ func New(opts Options) (_ *Node, err error) {
n := new(Node)
n.context, n.cancel = context.WithCancel(context.Background())

if opts.EnablePeerTracker {
if opts.PeerDatabase != "" {
n.tracker, err = dial.NewPersistentTracker(n.context, dial.PersistentTrackerOptions{
Filename: opts.PeerDatabase,
Host: (*connector)(n),
Peers: (*discoverer)(n),
PersistFrequency: 10 * time.Second,
})
if err != nil {
return nil, err
}
} else if opts.EnablePeerTracker {
n.tracker = new(dial.SimpleTracker)
} else {
n.tracker = dial.FakeTracker
Expand Down
Loading

0 comments on commit f00bf63

Please sign in to comment.