diff --git a/cmd/accumulated-http/main.go b/cmd/accumulated-http/main.go index 22308d8a3..e3a634fd6 100644 --- a/cmd/accumulated-http/main.go +++ b/cmd/accumulated-http/main.go @@ -33,6 +33,7 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p/dial" "gitlab.com/accumulatenetwork/accumulate/protocol" "golang.org/x/crypto/acme/autocert" "golang.org/x/exp/slog" @@ -118,10 +119,27 @@ func run(_ *cobra.Command, args []string) { fmt.Printf("We are %v\n", node.ID()) - fmt.Println("Waiting for a live network service") - svcAddr, err := api.ServiceTypeNetwork.AddressFor(protocol.Directory).MultiaddrFor(args[0]) - Check(err) - Check(node.WaitForService(ctx, svcAddr)) + dirNetSvc := api.ServiceTypeNetwork.AddressFor(protocol.Directory) + tr, ok := node.Tracker().(*dial.PersistentTracker) + if !ok { + fmt.Println("Waiting for a live network service") + svcAddr, err := dirNetSvc.MultiaddrFor(args[0]) + Check(err) + Check(node.WaitForService(ctx, svcAddr)) + + } else { + var found bool + for _, peer := range tr.DB().Peers() { + if peer.Network(args[0]).Service(dirNetSvc).Last.Success != nil { + found = true + } + } + + if !found { + fmt.Println("Scanning for peers") + tr.ScanPeers(5 * time.Minute) + } + } fmt.Println("Fetching routing information") router := new(routing.MessageRouter) diff --git a/pkg/api/v3/address.go b/pkg/api/v3/address.go index 1802b54ae..c70a2c1eb 100644 --- a/pkg/api/v3/address.go +++ b/pkg/api/v3/address.go @@ -321,18 +321,8 @@ func UnpackAddress(addr multiaddr.Multiaddr) (string, peer.ID, *ServiceAddress, return true }) - // The address must contain a /acc-svc component and must not contain any - // unexpected components - if bad || cService == nil { - return "", "", nil, nil, errors.BadRequest.WithFormat("invalid address %v", addr) - } - - // Parse the /acc-svc component - sa := new(ServiceAddress) - err := sa.UnmarshalBinary(cService.RawValue()) - if err != nil { - return "", "", nil, nil, errors.BadRequest.WithCauseAndFormat(err, "invalid address %v", addr) - } else if sa.Type == ServiceTypeUnknown { + // The address must not contain any unexpected components + if bad { return "", "", nil, nil, errors.BadRequest.WithFormat("invalid address %v", addr) } @@ -346,5 +336,18 @@ func UnpackAddress(addr multiaddr.Multiaddr) (string, peer.ID, *ServiceAddress, net = string(cNetwork.RawValue()) } + if cService == nil { + return net, peerID, nil, netAddr, nil + } + + // Parse the /acc-svc component + sa := new(ServiceAddress) + err := sa.UnmarshalBinary(cService.RawValue()) + if err != nil { + return "", "", nil, nil, errors.BadRequest.WithCauseAndFormat(err, "invalid address %v", addr) + } else if sa.Type == ServiceTypeUnknown { + return "", "", nil, nil, errors.BadRequest.WithFormat("invalid address %v", addr) + } + return net, peerID, sa, netAddr, nil } diff --git a/pkg/api/v3/p2p/dial/dialer.go b/pkg/api/v3/p2p/dial/dialer.go index 1e4d9484b..e0128e7f0 100644 --- a/pkg/api/v3/p2p/dial/dialer.go +++ b/pkg/api/v3/p2p/dial/dialer.go @@ -72,6 +72,9 @@ func (d *dialer) Dial(ctx context.Context, addr multiaddr.Multiaddr) (stream mes if err != nil { return nil, errors.UnknownError.Wrap(err) } + if sa == nil { + return nil, errors.BadRequest.WithFormat("invalid address %v", addr) + } if ip != nil && peer == "" { return nil, errors.BadRequest.WithFormat("cannot specify address without peer ID") @@ -119,7 +122,7 @@ func (d *dialer) newNetworkStream(ctx context.Context, service *api.ServiceAddre return nil, errors.UnknownError.Wrap(err) } - // Query the DHT for peers that provide the service + // Discover peers that provide the service callCtx, cancel := context.WithCancel(ctx) defer cancel() resp, err := d.peers.Discover(callCtx, &DiscoveryRequest{ @@ -290,12 +293,14 @@ func (d *dialer) dial(ctx context.Context, peer peer.ID, service *api.ServiceAdd // Context was canceled, don't mark the peer case errors.Is(err, network.ErrNoConn), - errors.Is(err, network.ErrNoRemoteAddrs): + errors.Is(err, network.ErrNoRemoteAddrs), + errors.Is(err, swarm.ErrNoAddresses): // Mark the peer as dead d.tracker.Mark(peer, addr, api.PeerStatusIsUnknown) slog.InfoCtx(ctx, "Unable to dial peer", "peer", peer, "service", service, "error", err) case errors.Is(err, swarm.ErrDialBackoff), + errors.Is(err, swarm.ErrNoGoodAddresses), errors.As(err, &timeoutError) && timeoutError.Timeout(): // Mark the peer bad d.tracker.Mark(peer, addr, api.PeerStatusIsKnownBad) diff --git a/pkg/api/v3/p2p/dial/tracker_persistent.go b/pkg/api/v3/p2p/dial/tracker_persistent.go index 336dcb636..8cbddb667 100644 --- a/pkg/api/v3/p2p/dial/tracker_persistent.go +++ b/pkg/api/v3/p2p/dial/tracker_persistent.go @@ -8,6 +8,7 @@ package dial import ( "context" + "net" "os" "sort" "sync" @@ -86,16 +87,18 @@ func NewPersistentTracker(ctx context.Context, opts PersistentTrackerOptions) (* // Launch async jobs t.runJob(t.writeDb, opts.PersistFrequency, defaultPersistFrequency, false) - - if opts.Network == "" { - slog.Info("Scanning disabled, network unspecified") - } else { - t.runJob(t.scanPeers, opts.ScanFrequency, defaultScanFrequency, true) - } + t.runJob(t.scanPeers, opts.ScanFrequency, defaultScanFrequency, true) return t, nil } +func (t *PersistentTracker) DB() *peerdb.DB { return t.db } + +func (t *PersistentTracker) ScanPeers(duration time.Duration) { + t.scanPeers(duration) + t.writeDb(0) +} + func (t *PersistentTracker) Stop() { t.cancel() t.stopwg.Wait() @@ -178,13 +181,41 @@ func (t *PersistentTracker) scanPeers(duration time.Duration) { } func (t *PersistentTracker) scanPeer(ctx context.Context, peer peer.AddrInfo) { - slog.InfoCtx(t.context, "Scanning peer", "id", peer.ID) + slog.DebugCtx(ctx, "Scanning peer", "id", peer.ID) // TODO Check addresses ctx, cancel := context.WithCancel(ctx) defer cancel() + t.scanPeerAddresses(ctx, peer) + t.scanPeerServices(ctx, peer) +} + +func (t *PersistentTracker) scanPeerAddresses(ctx context.Context, peer peer.AddrInfo) { + for _, addr := range peer.Addrs { + // Ignore private IPs + if s, err := addr.ValueForProtocol(multiaddr.P_IP4); err == nil { + if ip := net.ParseIP(s); ip != nil && isPrivateIP(ip) { + continue + } + } + + t.db.Peer(peer.ID).Address(addr).Last.DidAttempt() + _, err := t.host.Connect(ctx, &ConnectionRequest{ + Service: api.ServiceTypeNode.Address(), + PeerID: peer.ID, + PeerAddr: addr, + }) + if err != nil { + slog.InfoCtx(ctx, "Unable to connect to peer", "peer", peer.ID, "error", err, "address", addr) + continue + } + t.db.Peer(peer.ID).Address(addr).Last.DidSucceed() + } +} + +func (t *PersistentTracker) scanPeerServices(ctx context.Context, peer peer.AddrInfo) { creq := &ConnectionRequest{ Service: api.ServiceTypeNode.Address(), PeerID: peer.ID, @@ -196,40 +227,40 @@ func (t *PersistentTracker) scanPeer(ctx context.Context, peer peer.AddrInfo) { t.db.Peer(peer.ID).Network(t.network).Service(creq.Service).Last.DidAttempt() s, err := t.host.Connect(ctx, creq) if err != nil { - slog.Info("Unable to connect to peer", "peer", peer.ID, "error", err) + slog.InfoCtx(ctx, "Unable to connect to peer", "peer", peer.ID, "error", err) return } t.db.Peer(peer.ID).Network(t.network).Service(creq.Service).Last.DidSucceed() err = s.Write(&message.NodeInfoRequest{}) if err != nil { - slog.Info("Failed to request node info", "peer", peer.ID, "error", err) + slog.InfoCtx(ctx, "Failed to request node info", "peer", peer.ID, "error", err) return } res, err := s.Read() if err != nil { - slog.Info("Failed to request node info", "peer", peer.ID, "error", err) + slog.InfoCtx(ctx, "Failed to request node info", "peer", peer.ID, "error", err) return } var ni *message.NodeInfoResponse switch res := res.(type) { case *message.ErrorResponse: - slog.Info("Failed to request node info", "peer", peer.ID, "error", res.Error) + slog.InfoCtx(ctx, "Failed to request node info", "peer", peer.ID, "error", res.Error) return case *message.NodeInfoResponse: ni = res default: - slog.Info("Invalid node info response", "peer", peer.ID, "want", message.TypeNodeInfoResponse, "got", res.Type()) + slog.InfoCtx(ctx, "Invalid node info response", "peer", peer.ID, "want", message.TypeNodeInfoResponse, "got", res.Type()) return } for _, svc := range ni.Value.Services { - slog.InfoCtx(t.context, "Attempting to conenct to service", "id", peer.ID, "service", svc) + slog.DebugCtx(ctx, "Attempting to conenct to service", "id", peer.ID, "service", svc) t.db.Peer(peer.ID).Network(t.network).Service(svc).Last.DidAttempt() _, err := t.host.Connect(ctx, &ConnectionRequest{Service: svc, PeerID: peer.ID}) if err != nil { - slog.Info("Unable to connect to peer", "peer", peer.ID, "error", err) + slog.InfoCtx(ctx, "Unable to connect to peer", "peer", peer.ID, "error", err) return } t.db.Peer(peer.ID).Network(t.network).Service(svc).Last.DidSucceed() @@ -241,6 +272,9 @@ func (t *PersistentTracker) Mark(peer peer.ID, addr multiaddr.Multiaddr, status if err != nil { panic(err) } + if service == nil { + return // Cannot mark if there's no service + } switch status { case api.PeerStatusIsKnownGood: @@ -266,6 +300,10 @@ func (t *PersistentTracker) Status(peer peer.ID, addr multiaddr.Multiaddr) api.K if err != nil { panic(err) } + if service == nil { + // If there's no service, the status is unknown + return api.PeerStatusIsUnknown + } s := t.db.Peer(peer).Network(netName).Service(service) return t.statusForLast(s.Last) @@ -326,6 +364,10 @@ func (t *PersistentTracker) Next(addr multiaddr.Multiaddr, status api.KnownPeerS if err != nil { panic(err) } + if service == nil { + // Don't answer if the service is unspecified + return "", false + } // Get all the candidates with the given status var candidates []*peerdb.PeerStatus @@ -359,7 +401,6 @@ func (t *PersistentTracker) Next(addr multiaddr.Multiaddr, status api.KnownPeerS }) } - candidates[0].Network(netName).Service(service).Last.DidAttempt() return candidates[0].ID, true } @@ -374,10 +415,85 @@ func (t *PersistentTracker) All(addr multiaddr.Multiaddr, status api.KnownPeerSt var peers []peer.ID for _, p := range t.db.Peers() { - s := p.Network(netName).Service(service) - if t.statusForLast(s.Last) == status { - peers = append(peers, p.ID) + n := p.Network(netName) + if service != nil { + s := n.Service(service) + if t.statusForLast(s.Last) == status { + peers = append(peers, p.ID) + } + continue + } + + for _, s := range n.Services.Load() { + if t.statusForLast(s.Last) == status { + peers = append(peers, p.ID) + break + } } } return peers } + +func (t *PersistentTracker) Discover(ctx context.Context, req *DiscoveryRequest) (DiscoveryResponse, error) { + if req.Network == "" { + req.Network = t.network + } + if req.Service == nil { + return nil, errors.BadRequest.With("missing service") + } + + ch := make(chan peer.AddrInfo) + go func() { + defer close(ch) + + for _, p := range t.db.Peers() { + if p.Network(req.Network).Service(req.Service).Last.SinceSuccess() > t.successThreshold { + continue + } + + info := peer.AddrInfo{ID: p.ID} + for _, a := range p.Addresses.Load() { + if a.Last.SinceSuccess() < t.successThreshold { + info.Addrs = append(info.Addrs, a.Address) + } + } + ch <- info + } + }() + + return DiscoveredPeers(ch), nil +} + +func (t *PersistentTracker) Connect(ctx context.Context, req *ConnectionRequest) (message.Stream, error) { + if req.Service == nil { + return nil, errors.BadRequest.With("missing service") + } + if req.PeerID == "" { + return nil, errors.BadRequest.With("missing peer") + } + + // Try to provide a good address + peer := t.db.Peer(req.PeerID) + if req.PeerAddr == nil { + for _, addr := range peer.Addresses.Load() { + if t.statusForLast(addr.Last) == api.PeerStatusIsKnownGood { + req.PeerAddr = addr.Address + break + } + } + } + + peer.Network(t.network).Service(req.Service).Last.DidAttempt() + if req.PeerAddr != nil { + peer.Address(req.PeerAddr).Last.DidAttempt() + } + s, err := t.host.Connect(ctx, req) + if err != nil { + return nil, err + } + peer.Network(t.network).Service(req.Service).Last.DidSucceed() + if req.PeerAddr != nil { + peer.Address(req.PeerAddr).Last.DidSucceed() + } + return s, nil +} diff --git a/pkg/api/v3/p2p/dial/utils.go b/pkg/api/v3/p2p/dial/utils.go new file mode 100644 index 000000000..2c8b61c6d --- /dev/null +++ b/pkg/api/v3/p2p/dial/utils.go @@ -0,0 +1,49 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package dial + +import ( + "fmt" + "net" + "sync" +) + +var initIsPrivate sync.Once +var privateIPBlocks []*net.IPNet + +// https://stackoverflow.com/questions/41240761/check-if-ip-address-is-in-private-network-space +func isPrivateIP(ip net.IP) bool { + if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() { + return true + } + + initIsPrivate.Do(func() { + for _, cidr := range []string{ + "127.0.0.0/8", // IPv4 loopback + "10.0.0.0/8", // RFC1918 + "172.16.0.0/12", // RFC1918 + "192.168.0.0/16", // RFC1918 + "169.254.0.0/16", // RFC3927 link-local + "::1/128", // IPv6 loopback + "fe80::/10", // IPv6 link-local + "fc00::/7", // IPv6 unique local addr + } { + _, block, err := net.ParseCIDR(cidr) + if err != nil { + panic(fmt.Errorf("parse error on %q: %v", cidr, err)) + } + privateIPBlocks = append(privateIPBlocks, block) + } + }) + + for _, block := range privateIPBlocks { + if block.Contains(ip) { + return true + } + } + return false +} diff --git a/pkg/api/v3/p2p/dial_network.go b/pkg/api/v3/p2p/dial_network.go index b69a03644..8b70333e0 100644 --- a/pkg/api/v3/p2p/dial_network.go +++ b/pkg/api/v3/p2p/dial_network.go @@ -16,15 +16,57 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/errors" ) +func (n *Node) Tracker() dial.Tracker { return n.tracker } + // DialNetwork returns a [message.MultiDialer] that opens a stream to a node // that can provides a given service. func (n *Node) DialNetwork() message.Dialer { - return dial.New(n.dialOpts...) + var host dial.Connector + var peers dial.Discoverer + + tr, ok := n.tracker.(*dial.PersistentTracker) + if ok { + // Use the persistent tracker as the host and for discovery + host, peers = tr, tr + } else { + // Use the basic host and discovery + host = (*connector)(n) + peers = (*dhtDiscoverer)(n) + } + + // Always use self-discovery + peers = &selfDiscoverer{n, peers} + + return dial.New( + dial.WithConnector(host), + dial.WithDiscoverer(peers), + dial.WithTracker(n.tracker), + ) +} + +type selfDiscoverer struct { + n *Node + d dial.Discoverer +} + +func (d *selfDiscoverer) Discover(ctx context.Context, req *dial.DiscoveryRequest) (dial.DiscoveryResponse, error) { + if req.Service == nil { + return d.d.Discover(ctx, req) + } + + s, ok := d.n.getOwnService(req.Network, req.Service) + if !ok { + return d.d.Discover(ctx, req) + } + + return dial.DiscoveredLocal(func(ctx context.Context) (message.Stream, error) { + return handleLocally(ctx, s), nil + }), nil } -type discoverer Node +type dhtDiscoverer Node -func (d *discoverer) Discover(ctx context.Context, req *dial.DiscoveryRequest) (dial.DiscoveryResponse, error) { +func (d *dhtDiscoverer) Discover(ctx context.Context, req *dial.DiscoveryRequest) (dial.DiscoveryResponse, error) { var addr multiaddr.Multiaddr if req.Network != "" { c, err := multiaddr.NewComponent(api.N_ACC, req.Network) @@ -48,15 +90,6 @@ func (d *discoverer) Discover(ctx context.Context, req *dial.DiscoveryRequest) ( return nil, errors.BadRequest.With("no network or service specified") } - if req.Service != nil { - s, ok := (*Node)(d).getOwnService(req.Network, req.Service) - if ok { - return dial.DiscoveredLocal(func(ctx context.Context) (message.Stream, error) { - return handleLocally(ctx, s), nil - }), nil - } - } - ch, err := (*Node)(d).peermgr.getPeers(ctx, addr, req.Limit, req.Timeout) return dial.DiscoveredPeers(ch), err } diff --git a/pkg/api/v3/p2p/p2p.go b/pkg/api/v3/p2p/p2p.go index 35978f3d1..d2b8e46cc 100644 --- a/pkg/api/v3/p2p/p2p.go +++ b/pkg/api/v3/p2p/p2p.go @@ -11,7 +11,6 @@ import ( "crypto/ed25519" "net" "strings" - "time" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -35,7 +34,6 @@ type Node struct { cancel context.CancelFunc peermgr *peerManager host host.Host - dialOpts []dial.Option tracker dial.Tracker services []*serviceHandler } @@ -151,13 +149,13 @@ func New(opts Options) (_ *Node, err error) { util.Advertise(n.context, n.peermgr.routing, c.String()) } + // Set up the peer tracker if opts.PeerDatabase != "" { n.tracker, err = dial.NewPersistentTracker(n.context, dial.PersistentTrackerOptions{ - Network: opts.Network, - Filename: opts.PeerDatabase, - Host: (*connector)(n), - Peers: (*discoverer)(n), - PersistFrequency: 10 * time.Second, + Network: opts.Network, + Filename: opts.PeerDatabase, + Host: (*connector)(n), + Peers: (*dhtDiscoverer)(n), }) if err != nil { return nil, err @@ -167,11 +165,6 @@ func New(opts Options) (_ *Node, err error) { } else { n.tracker = dial.FakeTracker } - n.dialOpts = []dial.Option{ - dial.WithConnector((*connector)(n)), - dial.WithDiscoverer((*discoverer)(n)), - dial.WithTracker(n.tracker), - } return n, nil } diff --git a/pkg/api/v3/p2p/peerdb/types.go b/pkg/api/v3/p2p/peerdb/types.go index 46576fb0d..05a611099 100644 --- a/pkg/api/v3/p2p/peerdb/types.go +++ b/pkg/api/v3/p2p/peerdb/types.go @@ -8,6 +8,7 @@ package peerdb import ( "encoding/json" + "math" "strings" "time" ) @@ -66,3 +67,17 @@ func (l *LastStatus) DidSucceed() { now := time.Now() l.Success = &now } + +func (l *LastStatus) SinceAttempt() time.Duration { + if l.Attempt == nil { + return math.MaxInt64 + } + return time.Since(*l.Attempt) +} + +func (l *LastStatus) SinceSuccess() time.Duration { + if l.Success == nil { + return math.MaxInt64 + } + return time.Since(*l.Success) +} diff --git a/test/simulator/services/services.go b/test/simulator/services/services.go index 4186fa274..e43ff4f07 100644 --- a/test/simulator/services/services.go +++ b/test/simulator/services/services.go @@ -64,6 +64,9 @@ func (s Services) Dial(ctx context.Context, addr multiaddr.Multiaddr) (message.S if err != nil { return nil, errors.UnknownError.Wrap(err) } + if sa == nil { + return nil, errors.BadRequest.WithFormat("invalid address %v", addr) + } m, ok := s[sa.String()] if !ok {