Skip to content

Commit

Permalink
Periodically scan for services [#3445]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 5, 2023
1 parent 731c5d3 commit e4c83b2
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 51 deletions.
185 changes: 155 additions & 30 deletions pkg/api/v3/p2p/dial/tracker_persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p/peerdb"
"golang.org/x/exp/slog"
)
Expand All @@ -25,28 +26,44 @@ type PersistentTracker struct {
cancel context.CancelFunc
db *peerdb.DB
file string
network string
host Connector
peers Discoverer
stopwg *sync.WaitGroup

successThreshold time.Duration
}

type PersistentTrackerOptions struct {
Network string
Filename string
Host Connector
Peers Discoverer
PersistFrequency time.Duration
ScanFrequency time.Duration
}

const defaultPersistFrequency = time.Hour
const defaultScanFrequency = time.Hour

func NewPersistentTracker(ctx context.Context, opts PersistentTrackerOptions) (*PersistentTracker, error) {
t := new(PersistentTracker)
t.db = peerdb.New()
t.file = opts.Filename
t.network = opts.Network
t.host = opts.Host
t.peers = opts.Peers
t.stopwg = new(sync.WaitGroup)

t.context, t.cancel = context.WithCancel(ctx)

// Set the success threshold ~5% higher than the scan frequency
if opts.ScanFrequency == 0 {
t.successThreshold = defaultScanFrequency
} else {
t.successThreshold = opts.ScanFrequency
}
t.successThreshold = 17 * t.successThreshold / 16

// Ensure the file can be created
f, err := os.OpenFile(opts.Filename, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
Expand All @@ -66,11 +83,14 @@ func NewPersistentTracker(ctx context.Context, opts PersistentTrackerOptions) (*
}
}

if opts.PersistFrequency == 0 {
opts.PersistFrequency = time.Hour
// Launch async jobs
t.runJob(t.writeDb, opts.PersistFrequency, defaultPersistFrequency, false)

if opts.Network == "" {
slog.Info("Scanning disabled, network unspecified")
} else {
t.runJob(t.scanPeers, opts.ScanFrequency, defaultScanFrequency, true)
}
t.stopwg.Add(1)
go t.writeDb(opts.PersistFrequency)

return t, nil
}
Expand All @@ -80,27 +100,129 @@ func (t *PersistentTracker) Stop() {
t.stopwg.Wait()
}

func (t *PersistentTracker) writeDb(frequency time.Duration) {
defer t.stopwg.Done()
func (t *PersistentTracker) runJob(fn func(time.Duration), frequency, defaultFrequency time.Duration, immediate bool) {
if frequency == 0 {
frequency = defaultFrequency
}

tick := time.NewTicker(frequency)
go func() { <-t.context.Done(); tick.Stop() }()
t.stopwg.Add(1)

for range tick.C {
slog.InfoCtx(t.context, "Writing peer database")
go func() {
defer t.stopwg.Done()

f, err := os.OpenFile(t.file, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
slog.ErrorCtx(t.context, "Failed to open peer database", "error", err)
continue
if immediate {
fn(frequency)
}

err = t.db.Store(f)
if err != nil {
slog.ErrorCtx(t.context, "Failed to write peer database", "error", err)
tick := time.NewTicker(frequency)
go func() { <-t.context.Done(); tick.Stop() }()

for range tick.C {
fn(frequency)
}
}()
}

func (t *PersistentTracker) writeDb(time.Duration) {
slog.InfoCtx(t.context, "Writing peer database")

f, err := os.OpenFile(t.file, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
slog.ErrorCtx(t.context, "Failed to open peer database", "error", err)
return
}
defer f.Close()

err = t.db.Store(f)
if err != nil {
slog.ErrorCtx(t.context, "Failed to write peer database", "error", err)
}
}

func (t *PersistentTracker) scanPeers(duration time.Duration) {
slog.InfoCtx(t.context, "Scanning for peers")

peers, err := t.peers.Discover(t.context, &DiscoveryRequest{
Timeout: duration,
Network: t.network,
})
if err != nil {
slog.ErrorCtx(t.context, "Failed to scan for peers", "error", err)
return
}

f.Close()
ctx, cancel := context.WithTimeout(t.context, duration/2)
defer cancel()

wg := new(sync.WaitGroup)
for peer := range peers {
wg.Add(1)
peer := peer
go func() {
defer wg.Done()
t.scanPeer(ctx, peer)
}()
}

wg.Wait()
}

func (t *PersistentTracker) scanPeer(ctx context.Context, peer peer.AddrInfo) {
slog.InfoCtx(t.context, "Scanning peer", "id", peer.ID)

// TODO Check addresses

ctx, cancel := context.WithCancel(ctx)
defer cancel()

creq := &ConnectionRequest{
Service: api.ServiceTypeNode.Address(),
PeerID: peer.ID,
}
if len(peer.Addrs) > 0 {
creq.PeerAddr = peer.Addrs[0]
}

t.db.Peer(peer.ID).Network(t.network).Service(creq.Service).Last.DidAttempt()
s, err := t.host.Connect(ctx, creq)
if err != nil {
slog.Info("Unable to connect to peer", "peer", peer.ID, "error", err)
return
}
t.db.Peer(peer.ID).Network(t.network).Service(creq.Service).Last.DidSucceed()

err = s.Write(&message.NodeInfoRequest{})
if err != nil {
slog.Info("Failed to request node info", "peer", peer.ID, "error", err)
return
}
res, err := s.Read()
if err != nil {
slog.Info("Failed to request node info", "peer", peer.ID, "error", err)
return
}
var ni *message.NodeInfoResponse
switch res := res.(type) {
case *message.ErrorResponse:
slog.Info("Failed to request node info", "peer", peer.ID, "error", res.Error)
return
case *message.NodeInfoResponse:
ni = res
default:
slog.Info("Invalid node info response", "peer", peer.ID, "want", message.TypeNodeInfoResponse, "got", res.Type())
return
}

for _, svc := range ni.Value.Services {
slog.InfoCtx(t.context, "Attempting to conenct to service", "id", peer.ID, "service", svc)

t.db.Peer(peer.ID).Network(t.network).Service(svc).Last.DidAttempt()
_, err := t.host.Connect(ctx, &ConnectionRequest{Service: svc, PeerID: peer.ID})
if err != nil {
slog.Info("Unable to connect to peer", "peer", peer.ID, "error", err)
return
}
t.db.Peer(peer.ID).Network(t.network).Service(svc).Last.DidSucceed()
}
}

Expand Down Expand Up @@ -136,10 +258,10 @@ func (t *PersistentTracker) Status(peer peer.ID, addr multiaddr.Multiaddr) api.K
}

s := t.db.Peer(peer).Network(netName).Service(service)
return statusForLast(s.Last)
return t.statusForLast(s.Last)
}

func statusForLast(l peerdb.LastStatus) api.KnownPeerStatus {
func (t *PersistentTracker) statusForLast(l peerdb.LastStatus) api.KnownPeerStatus {
switch {
case l.Attempt == nil:
// No connection attempted
Expand All @@ -149,7 +271,7 @@ func statusForLast(l peerdb.LastStatus) api.KnownPeerStatus {
// No successful connection

// Attempt was too long ago?
if attemptIsTooOld(l) {
if t.attemptIsTooOld(l) {
return api.PeerStatusIsKnownBad
}

Expand All @@ -160,28 +282,28 @@ func statusForLast(l peerdb.LastStatus) api.KnownPeerStatus {
// Connection attempted since the last success

// Attempt was too long ago?
if attemptIsTooOld(l) {
if t.attemptIsTooOld(l) {
return api.PeerStatusIsKnownBad
}

// Last success was too long ago?
return statusForLastSuccess(l)
return t.statusForLastSuccess(l)

default:
// Last attempt was successful

// Was it too long ago?
return statusForLastSuccess(l)
return t.statusForLastSuccess(l)
}
}

func attemptIsTooOld(l peerdb.LastStatus) bool {
func (t *PersistentTracker) attemptIsTooOld(l peerdb.LastStatus) bool {
return time.Since(*l.Attempt) > time.Second
}

func statusForLastSuccess(l peerdb.LastStatus) api.KnownPeerStatus {
func (t *PersistentTracker) statusForLastSuccess(l peerdb.LastStatus) api.KnownPeerStatus {
// Last success was too long ago?
if time.Since(*l.Success) > 10*time.Minute {
if time.Since(*l.Success) > t.successThreshold {
return api.PeerStatusIsUnknown
}

Expand All @@ -199,7 +321,7 @@ func (t *PersistentTracker) Next(addr multiaddr.Multiaddr, status api.KnownPeerS
var candidates []*peerdb.PeerStatus
for _, p := range t.db.Peers() {
s := p.Network(netName).Service(service)
if statusForLast(s.Last) == status {
if t.statusForLast(s.Last) == status {
candidates = append(candidates, p)
}
}
Expand Down Expand Up @@ -236,11 +358,14 @@ func (t *PersistentTracker) All(addr multiaddr.Multiaddr, status api.KnownPeerSt
if err != nil {
panic(err)
}
if netName == "" {
netName = t.network
}

var peers []peer.ID
for _, p := range t.db.Peers() {
s := p.Network(netName).Service(service)
if statusForLast(s.Last) == status {
if t.statusForLast(s.Last) == status {
peers = append(peers, p.ID)
}
}
Expand Down
43 changes: 22 additions & 21 deletions pkg/api/v3/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,6 @@ func New(opts Options) (_ *Node, err error) {
n := new(Node)
n.context, n.cancel = context.WithCancel(context.Background())

if opts.PeerDatabase != "" {
n.tracker, err = dial.NewPersistentTracker(n.context, dial.PersistentTrackerOptions{
Filename: opts.PeerDatabase,
Host: (*connector)(n),
Peers: (*discoverer)(n),
PersistFrequency: 10 * time.Second,
})
if err != nil {
return nil, err
}
} else if opts.EnablePeerTracker {
n.tracker = new(dial.SimpleTracker)
} else {
n.tracker = dial.FakeTracker
}
n.dialOpts = []dial.Option{
dial.WithConnector((*connector)(n)),
dial.WithDiscoverer((*discoverer)(n)),
dial.WithTracker(n.tracker),
}

// Cancel on fail
defer func() {
if err != nil {
Expand Down Expand Up @@ -170,6 +149,28 @@ func New(opts Options) (_ *Node, err error) {
util.Advertise(n.context, n.peermgr.routing, c.String())
}

if opts.PeerDatabase != "" {
n.tracker, err = dial.NewPersistentTracker(n.context, dial.PersistentTrackerOptions{
Network: opts.Network,
Filename: opts.PeerDatabase,
Host: (*connector)(n),
Peers: (*discoverer)(n),
PersistFrequency: 10 * time.Second,
})
if err != nil {
return nil, err
}
} else if opts.EnablePeerTracker {
n.tracker = new(dial.SimpleTracker)
} else {
n.tracker = dial.FakeTracker
}
n.dialOpts = []dial.Option{
dial.WithConnector((*connector)(n)),
dial.WithDiscoverer((*discoverer)(n)),
dial.WithTracker(n.tracker),
}

return n, nil
}

Expand Down

0 comments on commit e4c83b2

Please sign in to comment.