diff --git a/.dockerignore b/.dockerignore index 0c3538442..9b8e0b283 100644 --- a/.dockerignore +++ b/.dockerignore @@ -2,3 +2,4 @@ .factom .genesis *.exe +.git \ No newline at end of file diff --git a/cmd/accumulated-http/main.go b/cmd/accumulated-http/main.go index 7034835f0..6241a305a 100644 --- a/cmd/accumulated-http/main.go +++ b/cmd/accumulated-http/main.go @@ -82,6 +82,23 @@ func init() { cmd.Flags().BoolVar(&jsonrpc2.DebugMethodFunc, "debug", false, "Print out a stack trace if an API method fails") } +var mainnetAddrs = func() []multiaddr.Multiaddr { + s := []string{ + "/dns/apollo-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWAgrBYpWEXRViTnToNmpCoC3dvHdmR6m1FmyKjDn1NYpj", + "/dns/yutu-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWDqFDwjHEog1bNbxai2dKSaR1aFvq2LAZ2jivSohgoSc7", + "/dns/chandrayaan-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWHzjkoeAqe7L55tAaepCbMbhvNu9v52ayZNVQobdEE1RL", + } + addrs := make([]multiaddr.Multiaddr, len(s)) + for i, s := range s { + addr, err := multiaddr.NewMultiaddr(s) + if err != nil { + panic(err) + } + addrs[i] = addr + } + return addrs +}() + func run(_ *cobra.Command, args []string) { ctx, cancel := context.WithCancel(context.Background()) sigs := make(chan os.Signal, 1) @@ -111,10 +128,11 @@ func run(_ *cobra.Command, args []string) { Check(err) node, err := p2p.New(p2p.Options{ - Key: loadOrGenerateKey(), - Network: args[0], - Listen: flag.P2pListen, - BootstrapPeers: flag.Peers, + Key: loadOrGenerateKey(), + Network: args[0], + Listen: flag.P2pListen, + // BootstrapPeers: flag.Peers, + BootstrapPeers: mainnetAddrs, EnablePeerTracker: true, }) Check(err) diff --git a/internal/core/healing/anchors.go b/internal/core/healing/anchors.go index be3844610..078c035f9 100644 --- a/internal/core/healing/anchors.go +++ b/internal/core/healing/anchors.go @@ -9,6 +9,7 @@ package healing import ( "context" "encoding/hex" + "fmt" "strings" "time" @@ -28,6 +29,7 @@ type HealAnchorArgs struct { NetInfo *NetworkInfo Known map[[32]byte]*protocol.Transaction Pretend bool + Wait bool } func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error { @@ -54,7 +56,8 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro // Fetch the transaction and signatures var sigSets []*api.SignatureSetRecord - res, err := api.Querier2{Querier: args.Querier}.QueryMessage(ctx, si.ID, nil) + Q := api.Querier2{Querier: args.Querier} + res, err := Q.QueryMessage(ctx, si.ID, nil) switch { case err == nil: switch msg := res.Message.(type) { @@ -144,10 +147,10 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro continue } - ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - slog.InfoCtx(ctx, "Querying node for its signature", "seq.ID", peer) + slog.InfoCtx(ctx, "Querying node for its signature", "id", peer) res, err := args.Client.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(protocol.AnchorPool), dstUrl, si.Number) if err != nil { slog.ErrorCtx(ctx, "Query failed", "error", err) @@ -156,7 +159,7 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro myTxn, ok := res.Message.(*messaging.TransactionMessage) if !ok { - slog.ErrorCtx(ctx, "Node gave us an anchor that is not a transaction", "seq.ID", info, "type", res.Message.Type()) + slog.ErrorCtx(ctx, "Node gave us an anchor that is not a transaction", "id", info, "type", res.Message.Type()) continue } if theAnchorTxn == nil { @@ -165,7 +168,7 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro Transaction: theAnchorTxn, } } else if !myTxn.Transaction.Equal(theAnchorTxn) { - slog.ErrorCtx(ctx, "Node gave us an anchor with a different hash", "seq.ID", info, + slog.ErrorCtx(ctx, "Node gave us an anchor with a different hash", "id", info, "expected", hex.EncodeToString(theAnchorTxn.GetHash()), "got", hex.EncodeToString(myTxn.Transaction.GetHash())) continue @@ -205,12 +208,12 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro case protocol.UserSignature: // Filter out bad signatures if !sig.Verify(nil, theAnchorTxn.GetHash()) { - slog.ErrorCtx(ctx, "Node gave us an invalid signature", "seq.ID", info) + slog.ErrorCtx(ctx, "Node gave us an invalid signature", "id", info) continue } default: - slog.ErrorCtx(ctx, "Node gave us a signature that is not a user signature", "seq.ID", info, "type", sig.Type()) + slog.ErrorCtx(ctx, "Node gave us a signature that is not a user signature", "id", info, "type", sig.Type()) continue } } @@ -233,43 +236,74 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro // sent if we have more than 1 signature if gotPartSig && len(signatures) <= 1 || !gotPartSig && len(signatures) == 0 { slog.InfoCtx(ctx, "Nothing to send") - return nil - } - slog.InfoCtx(ctx, "Submitting signatures", "count", len(signatures)) - env := new(messaging.Envelope) - if args.NetInfo.Status.ExecutorVersion.V2() { - for i, sig := range signatures { - seq := seq.Copy() - if i > 0 { - seq.Message = &messaging.TransactionMessage{ - Transaction: &protocol.Transaction{ - Body: &protocol.RemoteTransaction{ - Hash: theAnchorTxn.ID().Hash(), - }, - }, + } else { + slog.InfoCtx(ctx, "Submitting signatures", "count", len(signatures)) + + submit := func(env *messaging.Envelope) { + // addr := api.ServiceTypeSubmit.AddressFor(seq.Destination).Multiaddr() + sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{}) + if err != nil { + slog.ErrorCtx(ctx, "Submission failed", "error", err) + } + for _, sub := range sub { + if sub.Success { + slog.InfoCtx(ctx, "Submission succeeded", "id", sub.Status.TxID) + } else { + slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status) } } - env.Messages = append(env.Messages, &messaging.BlockAnchor{ - Signature: sig.(protocol.KeySignature), - Anchor: seq, - }) } - } else { - env.Transaction = []*protocol.Transaction{theAnchorTxn} - env.Signatures = signatures + + if args.NetInfo.Status.ExecutorVersion.V2() { + for _, sig := range signatures { + blk := &messaging.BlockAnchor{ + Signature: sig.(protocol.KeySignature), + Anchor: seq, + } + submit(&messaging.Envelope{Messages: []messaging.Message{blk}}) + } + } else { + env := new(messaging.Envelope) + env.Transaction = []*protocol.Transaction{theAnchorTxn} + env.Signatures = signatures + submit(env) + } } - // addr := api.ServiceTypeSubmit.AddressFor(seq.Destination).Multiaddr() - sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{}) - if err != nil { - slog.ErrorCtx(ctx, "Submission failed", "error", err) + if !args.Wait { + return nil } - for _, sub := range sub { - if !sub.Success { - slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status) + + slog.InfoCtx(ctx, "Waiting", "for", si.ID) + for i := 0; i < 10; i++ { + r, err := Q.QueryMessage(ctx, si.ID, nil) + switch { + case errors.Is(err, errors.NotFound): + // Not found, wait + slog.Info("Status", "id", si.ID, "code", errors.NotFound) + break + + case err != nil: + // Unknown error + return err + + case !r.Status.Delivered(): + // Pending, wait + slog.Info("Status", "id", si.ID, "code", r.Status) + break + + case r.Error != nil: + slog.Error("Failed", "id", si.ID, "error", r.Error) + return r.AsError() + + default: + slog.Info("Delivered", "id", si.ID) + return nil } + time.Sleep(time.Second / 2) } - - return nil + return ErrRetry } + +var ErrRetry = fmt.Errorf("retry") diff --git a/internal/core/healing/scan.go b/internal/core/healing/scan.go index 9eca4d67c..795c0acfa 100644 --- a/internal/core/healing/scan.go +++ b/internal/core/healing/scan.go @@ -11,21 +11,14 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/libp2p/go-libp2p/core/peer" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/errors" - "gitlab.com/accumulatenetwork/accumulate/pkg/url" "golang.org/x/exp/slog" ) -type PeerInfo struct { - ID peer.ID `json:"-"` - Status *api.ConsensusStatus `json:"status"` - Key [32]byte `json:"key"` - Operator *url.URL `json:"operator"` -} - func (p *PeerInfo) String() string { if p.Operator != nil { return fmt.Sprintf("%v (%v)", p.Operator, p.ID) @@ -41,6 +34,18 @@ type NetworkInfo struct { type PeerList map[peer.ID]*PeerInfo +func (i *NetworkInfo) PeerByID(id peer.ID) *PeerInfo { + if i == nil { + return nil + } + for _, part := range i.Peers { + if p, ok := part[id]; ok { + return p + } + } + return nil +} + func (l PeerList) MarshalJSON() ([]byte, error) { m := make(map[string]*PeerInfo, len(l)) for id, info := range l { @@ -101,6 +106,7 @@ func ScanNetwork(ctx context.Context, endpoint ScanServices) (*NetworkInfo, erro find := api.FindServiceOptions{ Network: epNodeInfo.Network, Service: api.ServiceTypeConsensus.AddressFor(part.ID), + Timeout: 10 * time.Second, } res, err := endpoint.FindService(ctx, find) if err != nil { @@ -108,6 +114,9 @@ func ScanNetwork(ctx context.Context, endpoint ScanServices) (*NetworkInfo, erro } for _, peer := range res { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + slog.InfoCtx(ctx, "Getting identity of", "peer", peer.PeerID) info, err := endpoint.ConsensusStatus(ctx, api.ConsensusStatusOptions{NodeID: peer.PeerID.String(), Partition: part.ID}) if err != nil { @@ -120,9 +129,10 @@ func ScanNetwork(ctx context.Context, endpoint ScanServices) (*NetworkInfo, erro continue // Not a validator } pi := &PeerInfo{ - ID: peer.PeerID, - Status: info, - Key: key, + ID: peer.PeerID, + Status: info, + Key: key, + Addresses: peer.Addresses, } partPeers[peer.PeerID] = pi @@ -139,3 +149,47 @@ func ScanNetwork(ctx context.Context, endpoint ScanServices) (*NetworkInfo, erro Peers: peers, }, nil } + +func ScanNode(ctx context.Context, endpoint ScanServices) (*PeerInfo, error) { + ctx, cancel, _ := api.ContextWithBatchData(ctx) + defer cancel() + + nodeInfo, err := endpoint.NodeInfo(ctx, api.NodeInfoOptions{}) + if err != nil { + return nil, errors.UnknownError.WithFormat("query node info: %w", err) + } + + netStatus, err := endpoint.NetworkStatus(ctx, api.NetworkStatusOptions{}) + if err != nil { + return nil, errors.UnknownError.WithFormat("query network status: %w", err) + } + + hash2key := map[[32]byte][32]byte{} + for _, val := range netStatus.Network.Validators { + hash2key[val.PublicKeyHash] = *(*[32]byte)(val.PublicKey) + } + + slog.InfoCtx(ctx, "Getting identity of", "peer", nodeInfo.PeerID) + info, err := endpoint.ConsensusStatus(ctx, api.ConsensusStatusOptions{}) + if err != nil { + return nil, errors.UnknownError.WithFormat("query consensus status: %w", err) + } + + key, ok := hash2key[info.ValidatorKeyHash] + if !ok { + return nil, errors.UnknownError.With("not a validator") + } + + pi := &PeerInfo{ + ID: nodeInfo.PeerID, + Status: info, + Key: key, + } + + _, val, ok := netStatus.Network.ValidatorByHash(info.ValidatorKeyHash[:]) + if ok { + pi.Operator = val.Operator + } + + return pi, nil +} diff --git a/internal/core/healing/sequenced.go b/internal/core/healing/sequenced.go index 0a10d551d..a0408c551 100644 --- a/internal/core/healing/sequenced.go +++ b/internal/core/healing/sequenced.go @@ -60,7 +60,7 @@ func ResolveSequenced[T messaging.Message](ctx context.Context, client message.A // Otherwise try each node until one succeeds slog.InfoCtx(ctx, "Resolving the message ID", "source", srcId, "destination", dstId, "number", seqNum) for peer := range net.Peers[strings.ToLower(srcId)] { - ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() slog.InfoCtx(ctx, "Querying node", "id", peer) diff --git a/internal/core/healing/synthetic.go b/internal/core/healing/synthetic.go index 1d9bf3dd0..82b1d282b 100644 --- a/internal/core/healing/synthetic.go +++ b/internal/core/healing/synthetic.go @@ -43,6 +43,12 @@ func HealSynthetic(ctx context.Context, args HealSyntheticArgs, si SequencedInfo return err } + // Has it already been delivered? + Q := api.Querier2{Querier: args.Client} + if r, err := Q.QueryMessage(ctx, r.ID, nil); err == nil && r.Status.Delivered() { + return nil + } + slog.InfoCtx(ctx, "Resubmitting", "source", si.Source, "destination", si.Destination, "number", si.Number, "id", r.Message.ID()) // Submit the synthetic transaction directly to the destination partition @@ -78,6 +84,16 @@ func HealSynthetic(ctx context.Context, args HealSyntheticArgs, si SequencedInfo return fmt.Errorf("signature is not valid") } + env := new(messaging.Envelope) + env.Messages = []messaging.Message{msg} + if msg, ok := r.Message.(messaging.MessageForTransaction); ok { + r, err := Q.QueryTransaction(ctx, msg.GetTxID(), nil) + if err != nil { + return errors.InternalError.WithFormat("query transaction for message: %w", err) + } + env.Messages = append(env.Messages, r.Message) + } + if args.Pretend { return nil } @@ -90,7 +106,7 @@ func HealSynthetic(ctx context.Context, args HealSyntheticArgs, si SequencedInfo } } - sub, err := args.Client.Submit(ctx, &messaging.Envelope{Messages: []messaging.Message{msg}}, api.SubmitOptions{}) + sub, err := args.Client.Submit(ctx, env, api.SubmitOptions{}) if err != nil { slog.ErrorCtx(ctx, "Submission failed", "error", err) } @@ -103,16 +119,15 @@ func HealSynthetic(ctx context.Context, args HealSyntheticArgs, si SequencedInfo return nil } - Q := api.Querier2{Querier: args.Client} - txid := r.Sequence.Destination.WithTxID(msg.Hash()) - slog.InfoCtx(ctx, "Waiting", "for", txid) - for { - time.Sleep(time.Second) - r, err := Q.QueryMessage(ctx, txid, nil) + si.ID = r.ID + slog.InfoCtx(ctx, "Waiting", "for", si.ID) + for i := 0; i < 10; i++ { + r, err := Q.QueryMessage(ctx, si.ID, nil) switch { case errors.Is(err, errors.NotFound): // Not found, wait - continue + slog.Info("Status", "id", si.ID, "code", errors.NotFound) + break case err != nil: // Unknown error @@ -120,11 +135,18 @@ func HealSynthetic(ctx context.Context, args HealSyntheticArgs, si SequencedInfo case !r.Status.Delivered(): // Pending, wait - continue + slog.Info("Status", "id", si.ID, "code", r.Status) + break - default: - // Failed? + case r.Error != nil: + slog.Error("Failed", "id", si.ID, "error", r.Error) return r.AsError() + + default: + slog.Info("Delivered", "id", si.ID) + return nil } + time.Sleep(time.Second / 2) } + return ErrRetry } diff --git a/internal/core/healing/types.go b/internal/core/healing/types.go new file mode 100644 index 000000000..e80d98665 --- /dev/null +++ b/internal/core/healing/types.go @@ -0,0 +1,9 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +//go:generate go run gitlab.com/accumulatenetwork/accumulate/tools/cmd/gen-types --package healing types.yml diff --git a/internal/core/healing/types.yml b/internal/core/healing/types.yml new file mode 100644 index 000000000..a8fdbea13 --- /dev/null +++ b/internal/core/healing/types.yml @@ -0,0 +1,19 @@ +PeerInfo: + non-binary: true + fields: + - name: ID + type: p2p.PeerID + marshal-as: none + - name: Operator + type: url + pointer: true + - name: Key + type: hash + - name: Status + type: api.ConsensusStatus + marshal-as: reference + pointer: true + - name: Addresses + type: p2p.Multiaddr + marshal-as: union + repeatable: true diff --git a/internal/core/healing/types_gen.go b/internal/core/healing/types_gen.go new file mode 100644 index 000000000..ae1ce03e5 --- /dev/null +++ b/internal/core/healing/types_gen.go @@ -0,0 +1,136 @@ +// Copyright 2022 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +// GENERATED BY go run ./tools/cmd/gen-types. DO NOT EDIT. + +//lint:file-ignore S1001,S1002,S1008,SA4013 generated code + +import ( + "encoding/json" + "fmt" + + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/encoding" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/p2p" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" +) + +type PeerInfo struct { + ID p2p.PeerID + Operator *url.URL `json:"operator,omitempty" form:"operator" query:"operator" validate:"required"` + Key [32]byte `json:"key,omitempty" form:"key" query:"key" validate:"required"` + Status *api.ConsensusStatus `json:"status,omitempty" form:"status" query:"status" validate:"required"` + Addresses []p2p.Multiaddr `json:"addresses,omitempty" form:"addresses" query:"addresses" validate:"required"` +} + +func (v *PeerInfo) Copy() *PeerInfo { + u := new(PeerInfo) + + if v.Operator != nil { + u.Operator = v.Operator + } + u.Key = v.Key + if v.Status != nil { + u.Status = (v.Status).Copy() + } + u.Addresses = make([]p2p.Multiaddr, len(v.Addresses)) + for i, v := range v.Addresses { + v := v + if v != nil { + u.Addresses[i] = p2p.CopyMultiaddr(v) + } + } + + return u +} + +func (v *PeerInfo) CopyAsInterface() interface{} { return v.Copy() } + +func (v *PeerInfo) Equal(u *PeerInfo) bool { + switch { + case v.Operator == u.Operator: + // equal + case v.Operator == nil || u.Operator == nil: + return false + case !((v.Operator).Equal(u.Operator)): + return false + } + if !(v.Key == u.Key) { + return false + } + switch { + case v.Status == u.Status: + // equal + case v.Status == nil || u.Status == nil: + return false + case !((v.Status).Equal(u.Status)): + return false + } + if len(v.Addresses) != len(u.Addresses) { + return false + } + for i := range v.Addresses { + if !(p2p.EqualMultiaddr(v.Addresses[i], u.Addresses[i])) { + return false + } + } + + return true +} + +func (v *PeerInfo) MarshalJSON() ([]byte, error) { + u := struct { + Operator *url.URL `json:"operator,omitempty"` + Key string `json:"key,omitempty"` + Status *api.ConsensusStatus `json:"status,omitempty"` + Addresses *encoding.JsonUnmarshalListWith[p2p.Multiaddr] `json:"addresses,omitempty"` + }{} + if !(v.Operator == nil) { + u.Operator = v.Operator + } + if !(v.Key == ([32]byte{})) { + u.Key = encoding.ChainToJSON(v.Key) + } + if !(v.Status == nil) { + u.Status = v.Status + } + if !(len(v.Addresses) == 0) { + u.Addresses = &encoding.JsonUnmarshalListWith[p2p.Multiaddr]{Value: v.Addresses, Func: p2p.UnmarshalMultiaddrJSON} + } + return json.Marshal(&u) +} + +func (v *PeerInfo) UnmarshalJSON(data []byte) error { + u := struct { + Operator *url.URL `json:"operator,omitempty"` + Key string `json:"key,omitempty"` + Status *api.ConsensusStatus `json:"status,omitempty"` + Addresses *encoding.JsonUnmarshalListWith[p2p.Multiaddr] `json:"addresses,omitempty"` + }{} + u.Operator = v.Operator + u.Key = encoding.ChainToJSON(v.Key) + u.Status = v.Status + u.Addresses = &encoding.JsonUnmarshalListWith[p2p.Multiaddr]{Value: v.Addresses, Func: p2p.UnmarshalMultiaddrJSON} + if err := json.Unmarshal(data, &u); err != nil { + return err + } + v.Operator = u.Operator + if x, err := encoding.ChainFromJSON(u.Key); err != nil { + return fmt.Errorf("error decoding Key: %w", err) + } else { + v.Key = x + } + v.Status = u.Status + if u.Addresses != nil { + v.Addresses = make([]p2p.Multiaddr, len(u.Addresses.Value)) + for i, x := range u.Addresses.Value { + v.Addresses[i] = x + } + } + return nil +} diff --git a/pkg/api/v3/address.go b/pkg/api/v3/address.go index 421c99142..1802b54ae 100644 --- a/pkg/api/v3/address.go +++ b/pkg/api/v3/address.go @@ -288,9 +288,10 @@ func (a *ServiceAddress) UnmarshalJSON(b []byte) error { // include an /acc-svc component and may include a /p2p component or an /acc // component. UnpackAddress will return an error if the address includes any // other components. -func UnpackAddress(addr multiaddr.Multiaddr) (string, peer.ID, *ServiceAddress, error) { +func UnpackAddress(addr multiaddr.Multiaddr) (string, peer.ID, *ServiceAddress, multiaddr.Multiaddr, error) { // Scan the address for /acc, /acc-svc, and /p2p components var cNetwork, cService, cPeer *multiaddr.Component + var netAddr multiaddr.Multiaddr var bad bool multiaddr.ForEach(addr, func(c multiaddr.Component) bool { switch c.Protocol().Code { @@ -300,6 +301,20 @@ func UnpackAddress(addr multiaddr.Multiaddr) (string, peer.ID, *ServiceAddress, cService = &c case multiaddr.P_P2P: cPeer = &c + case multiaddr.P_DNS, + multiaddr.P_DNS4, + multiaddr.P_DNS6, + multiaddr.P_IP4, + multiaddr.P_IP6, + multiaddr.P_TCP, + multiaddr.P_UDP, + multiaddr.P_QUIC, + multiaddr.P_QUIC_V1: + if netAddr == nil { + netAddr = &c + } else { + netAddr = netAddr.Encapsulate(&c) + } default: bad = true } @@ -309,16 +324,16 @@ func UnpackAddress(addr multiaddr.Multiaddr) (string, peer.ID, *ServiceAddress, // The address must contain a /acc-svc component and must not contain any // unexpected components if bad || cService == nil { - return "", "", nil, errors.BadRequest.WithFormat("invalid address %v", addr) + return "", "", nil, nil, errors.BadRequest.WithFormat("invalid address %v", addr) } // Parse the /acc-svc component sa := new(ServiceAddress) err := sa.UnmarshalBinary(cService.RawValue()) if err != nil { - return "", "", nil, errors.BadRequest.WithCauseAndFormat(err, "invalid address %v", addr) + return "", "", nil, nil, errors.BadRequest.WithCauseAndFormat(err, "invalid address %v", addr) } else if sa.Type == ServiceTypeUnknown { - return "", "", nil, errors.BadRequest.WithFormat("invalid address %v", addr) + return "", "", nil, nil, errors.BadRequest.WithFormat("invalid address %v", addr) } var peerID peer.ID @@ -331,5 +346,5 @@ func UnpackAddress(addr multiaddr.Multiaddr) (string, peer.ID, *ServiceAddress, net = string(cNetwork.RawValue()) } - return net, peerID, sa, nil + return net, peerID, sa, netAddr, nil } diff --git a/pkg/api/v3/batch.go b/pkg/api/v3/batch.go index e2c21bbb8..cb9a942b4 100644 --- a/pkg/api/v3/batch.go +++ b/pkg/api/v3/batch.go @@ -9,11 +9,13 @@ package api import "context" type BatchData struct { - values map[any]any + context context.Context + values map[any]any } -func (d *BatchData) Get(k any) any { return d.values[k] } -func (d *BatchData) Put(k, v any) { d.values[k] = v } +func (d *BatchData) Context() context.Context { return d.context } +func (d *BatchData) Get(k any) any { return d.values[k] } +func (d *BatchData) Put(k, v any) { d.values[k] = v } type contextKeyBatch struct{} @@ -29,9 +31,10 @@ func ContextWithBatchData(ctx context.Context) (context.Context, context.CancelF return ctx, func() {}, v } + ctx, cancel := context.WithCancel(ctx) bd := new(BatchData) + bd.context = ctx bd.values = map[any]any{} - ctx, cancel := context.WithCancel(ctx) ctx = context.WithValue(ctx, contextKeyBatch{}, bd) return ctx, cancel, bd } diff --git a/pkg/api/v3/message/types_gen.go b/pkg/api/v3/message/types_gen.go index ec4cabf83..b4271b84f 100644 --- a/pkg/api/v3/message/types_gen.go +++ b/pkg/api/v3/message/types_gen.go @@ -3169,6 +3169,7 @@ func (v *FindServiceRequest) MarshalJSON() ([]byte, error) { Network string `json:"network,omitempty"` Service *api.ServiceAddress `json:"service,omitempty"` Known bool `json:"known,omitempty"` + Timeout interface{} `json:"timeout,omitempty"` }{} u.Type = v.Type() if !(len(v.FindServiceOptions.Network) == 0) { @@ -3183,6 +3184,10 @@ func (v *FindServiceRequest) MarshalJSON() ([]byte, error) { u.Known = v.FindServiceOptions.Known } + if !(v.FindServiceOptions.Timeout == 0) { + + u.Timeout = encoding.DurationToJSON(v.FindServiceOptions.Timeout) + } return json.Marshal(&u) } @@ -3566,11 +3571,13 @@ func (v *FindServiceRequest) UnmarshalJSON(data []byte) error { Network string `json:"network,omitempty"` Service *api.ServiceAddress `json:"service,omitempty"` Known bool `json:"known,omitempty"` + Timeout interface{} `json:"timeout,omitempty"` }{} u.Type = v.Type() u.Network = v.FindServiceOptions.Network u.Service = v.FindServiceOptions.Service u.Known = v.FindServiceOptions.Known + u.Timeout = encoding.DurationToJSON(v.FindServiceOptions.Timeout) if err := json.Unmarshal(data, &u); err != nil { return err } @@ -3580,6 +3587,11 @@ func (v *FindServiceRequest) UnmarshalJSON(data []byte) error { v.FindServiceOptions.Network = u.Network v.FindServiceOptions.Service = u.Service v.FindServiceOptions.Known = u.Known + if x, err := encoding.DurationFromJSON(u.Timeout); err != nil { + return fmt.Errorf("error decoding Timeout: %w", err) + } else { + v.FindServiceOptions.Timeout = x + } return nil } diff --git a/pkg/api/v3/options.yml b/pkg/api/v3/options.yml index e9672d3b1..6e7f95396 100644 --- a/pkg/api/v3/options.yml +++ b/pkg/api/v3/options.yml @@ -37,6 +37,10 @@ FindServiceOptions: description: restricts the results to known peers type: boolean optional: true + - name: Timeout + description: is the time to wait before stopping, when querying the DHT + type: duration + optional: true ConsensusStatusOptions: fields: diff --git a/pkg/api/v3/p2p/dial.go b/pkg/api/v3/p2p/dial.go index ac90f2021..f6f79291a 100644 --- a/pkg/api/v3/p2p/dial.go +++ b/pkg/api/v3/p2p/dial.go @@ -37,7 +37,7 @@ var _ message.MultiDialer = (*dialer)(nil) type dialerHost interface { selfID() peer.ID getOwnService(network string, sa *api.ServiceAddress) (*serviceHandler, bool) - getPeerService(ctx context.Context, peer peer.ID, service *api.ServiceAddress) (io.ReadWriteCloser, error) + getPeerService(ctx context.Context, peer peer.ID, service *api.ServiceAddress, ip multiaddr.Multiaddr) (io.ReadWriteCloser, error) } // dialerPeers are the parts of [peerManager] required by [dialer]. dialerPeers @@ -65,17 +65,28 @@ func (n *Node) DialNetwork() message.MultiDialer { // find an appropriate peer that can service the address. If no peer can be // found, Dial will return [errors.NoPeer]. func (d *dialer) Dial(ctx context.Context, addr multiaddr.Multiaddr) (stream message.Stream, err error) { - net, peer, sa, err := api.UnpackAddress(addr) + net, peer, sa, ip, err := api.UnpackAddress(addr) if err != nil { return nil, errors.UnknownError.Wrap(err) } + if ip != nil { + if peer == "" { + return nil, errors.BadRequest.WithFormat("cannot specify address without peer ID") + } + c, err := multiaddr.NewComponent("p2p", peer.String()) + if err != nil { + return nil, errors.InternalError.With(err) + } + ip = ip.Encapsulate(c) + } + if peer == "" { // Do not set the wait group return d.newNetworkStream(ctx, sa, net, nil) } - return d.newPeerStream(ctx, sa, peer) + return d.newPeerStream(ctx, sa, peer, ip) } // BadDial notifies the dialer that a transport error was encountered while @@ -95,7 +106,7 @@ func (d *dialer) BadDial(ctx context.Context, addr multiaddr.Multiaddr, s messag // // If the peer ID does not match a peer known by the node, or if the node does // not have an address for the given peer, newPeerStream will fail. -func (d *dialer) newPeerStream(ctx context.Context, sa *api.ServiceAddress, peer peer.ID) (message.Stream, error) { +func (d *dialer) newPeerStream(ctx context.Context, sa *api.ServiceAddress, peer peer.ID, ip multiaddr.Multiaddr) (message.Stream, error) { // If the peer ID is our ID if d.host.selfID() == peer { // Check if we have the service @@ -109,7 +120,7 @@ func (d *dialer) newPeerStream(ctx context.Context, sa *api.ServiceAddress, peer } // Open a new stream - return openStreamFor(ctx, d.host, peer, sa) + return openStreamFor(ctx, d.host, peer, sa, ip) } // newNetworkStream opens a stream to the highest priority peer that @@ -269,7 +280,7 @@ func (d *dialer) dial(ctx context.Context, peer peer.ID, service *api.ServiceAdd }() // Open a stream - stream, err := openStreamFor(ctx, d.host, peer, service) + stream, err := openStreamFor(ctx, d.host, peer, service, nil) if err == nil { // Mark the peer good d.tracker.markGood(ctx, peer, addr) diff --git a/pkg/api/v3/p2p/dial_self.go b/pkg/api/v3/p2p/dial_self.go index d951a1b09..d6ccb0c55 100644 --- a/pkg/api/v3/p2p/dial_self.go +++ b/pkg/api/v3/p2p/dial_self.go @@ -26,12 +26,12 @@ func (n *Node) DialSelf() message.Dialer { return (*selfDialer)(n) } // Dial returns a stream for the current node. func (d *selfDialer) Dial(ctx context.Context, addr multiaddr.Multiaddr) (message.Stream, error) { // Parse the address - _, peer, sa, err := api.UnpackAddress(addr) + _, peer, sa, _, err := api.UnpackAddress(addr) if err != nil { return nil, errors.UnknownError.Wrap(err) } if peer != "" && peer != d.host.ID() { - s, err := openStreamFor(ctx, (*Node)(d), peer, sa) + s, err := openStreamFor(ctx, (*Node)(d), peer, sa, nil) return s, errors.UnknownError.Wrap(err) } @@ -46,6 +46,9 @@ func (d *selfDialer) Dial(ctx context.Context, addr multiaddr.Multiaddr) (messag } func handleLocally(ctx context.Context, service *serviceHandler) message.Stream { + if batch := api.GetBatchData(ctx); batch != nil { + ctx = batch.Context() + } p, q := message.DuplexPipe(ctx) go func() { // Panic protection diff --git a/pkg/api/v3/p2p/dial_test.go b/pkg/api/v3/p2p/dial_test.go index 9688a20b9..a6c8c8578 100644 --- a/pkg/api/v3/p2p/dial_test.go +++ b/pkg/api/v3/p2p/dial_test.go @@ -57,7 +57,7 @@ func TestDialAddress(t *testing.T) { host := newMockDialerHost(t) host.EXPECT().selfID().Return("").Maybe() host.EXPECT().getOwnService(mock.Anything, mock.Anything).Return(nil, false).Maybe() - host.EXPECT().getPeerService(mock.Anything, mock.Anything, mock.Anything).Return(fakeStream{}, nil).Maybe() + host.EXPECT().getPeerService(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fakeStream{}, nil).Maybe() peers := newMockDialerPeers(t) peers.EXPECT().getPeers(mock.Anything, mock.Anything, mock.Anything).Return(ch, nil).Run(func(context.Context, multiaddr.Multiaddr, int) { ch <- peer.AddrInfo{ID: pid} @@ -264,7 +264,7 @@ func (h *fakeHost) getOwnService(network string, sa *api.ServiceAddress) (*servi return nil, false } -func (h *fakeHost) getPeerService(ctx context.Context, peer peer.ID, service *api.ServiceAddress) (io.ReadWriteCloser, error) { +func (h *fakeHost) getPeerService(ctx context.Context, peer peer.ID, service *api.ServiceAddress, ip multiaddr.Multiaddr) (io.ReadWriteCloser, error) { h.RLock() defer h.RUnlock() if h.good[peer.String()+"|"+service.String()] { diff --git a/pkg/api/v3/p2p/mock_dialerHost_test.go b/pkg/api/v3/p2p/mock_dialerHost_test.go index 6c42aa47b..0aab414c9 100644 --- a/pkg/api/v3/p2p/mock_dialerHost_test.go +++ b/pkg/api/v3/p2p/mock_dialerHost_test.go @@ -7,6 +7,7 @@ import ( io "io" peer "github.com/libp2p/go-libp2p/core/peer" + multiaddr "github.com/multiformats/go-multiaddr" mock "github.com/stretchr/testify/mock" api "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" ) @@ -79,25 +80,25 @@ func (_c *mockDialerHost_getOwnService_Call) RunAndReturn(run func(string, *api. return _c } -// getPeerService provides a mock function with given fields: ctx, _a1, service -func (_m *mockDialerHost) getPeerService(ctx context.Context, _a1 peer.ID, service *api.ServiceAddress) (io.ReadWriteCloser, error) { - ret := _m.Called(ctx, _a1, service) +// getPeerService provides a mock function with given fields: ctx, _a1, service, ip +func (_m *mockDialerHost) getPeerService(ctx context.Context, _a1 peer.ID, service *api.ServiceAddress, ip multiaddr.Multiaddr) (io.ReadWriteCloser, error) { + ret := _m.Called(ctx, _a1, service, ip) var r0 io.ReadWriteCloser var r1 error - if rf, ok := ret.Get(0).(func(context.Context, peer.ID, *api.ServiceAddress) (io.ReadWriteCloser, error)); ok { - return rf(ctx, _a1, service) + if rf, ok := ret.Get(0).(func(context.Context, peer.ID, *api.ServiceAddress, multiaddr.Multiaddr) (io.ReadWriteCloser, error)); ok { + return rf(ctx, _a1, service, ip) } - if rf, ok := ret.Get(0).(func(context.Context, peer.ID, *api.ServiceAddress) io.ReadWriteCloser); ok { - r0 = rf(ctx, _a1, service) + if rf, ok := ret.Get(0).(func(context.Context, peer.ID, *api.ServiceAddress, multiaddr.Multiaddr) io.ReadWriteCloser); ok { + r0 = rf(ctx, _a1, service, ip) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(io.ReadWriteCloser) } } - if rf, ok := ret.Get(1).(func(context.Context, peer.ID, *api.ServiceAddress) error); ok { - r1 = rf(ctx, _a1, service) + if rf, ok := ret.Get(1).(func(context.Context, peer.ID, *api.ServiceAddress, multiaddr.Multiaddr) error); ok { + r1 = rf(ctx, _a1, service, ip) } else { r1 = ret.Error(1) } @@ -114,13 +115,14 @@ type mockDialerHost_getPeerService_Call struct { // - ctx context.Context // - _a1 peer.ID // - service *api.ServiceAddress -func (_e *mockDialerHost_Expecter) getPeerService(ctx interface{}, _a1 interface{}, service interface{}) *mockDialerHost_getPeerService_Call { - return &mockDialerHost_getPeerService_Call{Call: _e.mock.On("getPeerService", ctx, _a1, service)} +// - ip multiaddr.Multiaddr +func (_e *mockDialerHost_Expecter) getPeerService(ctx interface{}, _a1 interface{}, service interface{}, ip interface{}) *mockDialerHost_getPeerService_Call { + return &mockDialerHost_getPeerService_Call{Call: _e.mock.On("getPeerService", ctx, _a1, service, ip)} } -func (_c *mockDialerHost_getPeerService_Call) Run(run func(ctx context.Context, _a1 peer.ID, service *api.ServiceAddress)) *mockDialerHost_getPeerService_Call { +func (_c *mockDialerHost_getPeerService_Call) Run(run func(ctx context.Context, _a1 peer.ID, service *api.ServiceAddress, ip multiaddr.Multiaddr)) *mockDialerHost_getPeerService_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(peer.ID), args[2].(*api.ServiceAddress)) + run(args[0].(context.Context), args[1].(peer.ID), args[2].(*api.ServiceAddress), args[3].(multiaddr.Multiaddr)) }) return _c } @@ -130,7 +132,7 @@ func (_c *mockDialerHost_getPeerService_Call) Return(_a0 io.ReadWriteCloser, _a1 return _c } -func (_c *mockDialerHost_getPeerService_Call) RunAndReturn(run func(context.Context, peer.ID, *api.ServiceAddress) (io.ReadWriteCloser, error)) *mockDialerHost_getPeerService_Call { +func (_c *mockDialerHost_getPeerService_Call) RunAndReturn(run func(context.Context, peer.ID, *api.ServiceAddress, multiaddr.Multiaddr) (io.ReadWriteCloser, error)) *mockDialerHost_getPeerService_Call { _c.Call.Return(run) return _c } diff --git a/pkg/api/v3/p2p/p2p.go b/pkg/api/v3/p2p/p2p.go index 1cb4c871d..4a0f9bd08 100644 --- a/pkg/api/v3/p2p/p2p.go +++ b/pkg/api/v3/p2p/p2p.go @@ -214,8 +214,17 @@ func (n *Node) selfID() peer.ID { } // getPeerService returns a new stream for the given peer and service. -func (n *Node) getPeerService(ctx context.Context, peer peer.ID, service *api.ServiceAddress) (io.ReadWriteCloser, error) { - return n.host.NewStream(ctx, peer, idRpc(service)) +func (n *Node) getPeerService(ctx context.Context, peerID peer.ID, service *api.ServiceAddress, ip multiaddr.Multiaddr) (io.ReadWriteCloser, error) { + if ip != nil { + err := n.host.Connect(ctx, peer.AddrInfo{ + ID: peerID, + Addrs: []multiaddr.Multiaddr{ip}, + }) + if err != nil { + return nil, err + } + } + return n.host.NewStream(ctx, peerID, idRpc(service)) } // getOwnService returns a service of this node. diff --git a/pkg/api/v3/p2p/stream.go b/pkg/api/v3/p2p/stream.go index 8f4b19d93..16cbddebc 100644 --- a/pkg/api/v3/p2p/stream.go +++ b/pkg/api/v3/p2p/stream.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/errors" @@ -24,15 +25,22 @@ type stream struct { stream message.Stream } -func openStreamFor(ctx context.Context, host dialerHost, peer peer.ID, sa *api.ServiceAddress) (*stream, error) { - conn, err := host.getPeerService(ctx, peer, sa) +func openStreamFor(ctx context.Context, host dialerHost, peer peer.ID, sa *api.ServiceAddress, ip multiaddr.Multiaddr) (*stream, error) { + conn, err := host.getPeerService(ctx, peer, sa, ip) if err != nil { // Do not wrap as it will clobber the error return nil, err } // Close the stream when the context is canceled - go func() { <-ctx.Done(); _ = conn.Close() }() + go func() { + ctx := ctx + if batch := api.GetBatchData(ctx); batch != nil { + ctx = batch.Context() + } + <-ctx.Done() + _ = conn.Close() + }() s := new(stream) s.peer = peer diff --git a/pkg/api/v3/responses.yml b/pkg/api/v3/responses.yml index d02ff682b..50f50dd6d 100644 --- a/pkg/api/v3/responses.yml +++ b/pkg/api/v3/responses.yml @@ -21,6 +21,10 @@ FindServiceResult: - name: Status type: KnownPeerStatus marshal-as: enum + - name: Addresses + type: p2p.Multiaddr + marshal-as: union + repeatable: true ConsensusStatus: fields: diff --git a/pkg/api/v3/types_gen.go b/pkg/api/v3/types_gen.go index 98161009c..dc9b93880 100644 --- a/pkg/api/v3/types_gen.go +++ b/pkg/api/v3/types_gen.go @@ -180,7 +180,9 @@ type FindServiceOptions struct { Network string `json:"network,omitempty" form:"network" query:"network" validate:"required"` Service *ServiceAddress `json:"service,omitempty" form:"service" query:"service" validate:"required"` // Known restricts the results to known peers. - Known bool `json:"known,omitempty" form:"known" query:"known"` + Known bool `json:"known,omitempty" form:"known" query:"known"` + // Timeout is the time to wait before stopping, when querying the DHT. + Timeout time.Duration `json:"timeout,omitempty" form:"timeout" query:"timeout"` extraData []byte } @@ -188,6 +190,7 @@ type FindServiceResult struct { fieldsSet []bool PeerID p2p.PeerID `json:"peerID,omitempty" form:"peerID" query:"peerID" validate:"required"` Status KnownPeerStatus `json:"status,omitempty" form:"status" query:"status" validate:"required"` + Addresses []p2p.Multiaddr `json:"addresses,omitempty" form:"addresses" query:"addresses" validate:"required"` extraData []byte } @@ -851,6 +854,7 @@ func (v *FindServiceOptions) Copy() *FindServiceOptions { u.Service = (v.Service).Copy() } u.Known = v.Known + u.Timeout = v.Timeout if len(v.extraData) > 0 { u.extraData = make([]byte, len(v.extraData)) copy(u.extraData, v.extraData) @@ -868,6 +872,13 @@ func (v *FindServiceResult) Copy() *FindServiceResult { u.PeerID = p2p.CopyPeerID(v.PeerID) } u.Status = v.Status + u.Addresses = make([]p2p.Multiaddr, len(v.Addresses)) + for i, v := range v.Addresses { + v := v + if v != nil { + u.Addresses[i] = p2p.CopyMultiaddr(v) + } + } if len(v.extraData) > 0 { u.extraData = make([]byte, len(v.extraData)) copy(u.extraData, v.extraData) @@ -1837,6 +1848,9 @@ func (v *FindServiceOptions) Equal(u *FindServiceOptions) bool { if !(v.Known == u.Known) { return false } + if !(v.Timeout == u.Timeout) { + return false + } return true } @@ -1848,6 +1862,14 @@ func (v *FindServiceResult) Equal(u *FindServiceResult) bool { if !(v.Status == u.Status) { return false } + if len(v.Addresses) != len(u.Addresses) { + return false + } + for i := range v.Addresses { + if !(p2p.EqualMultiaddr(v.Addresses[i], u.Addresses[i])) { + return false + } + } return true } @@ -3468,6 +3490,7 @@ var fieldNames_FindServiceOptions = []string{ 1: "Network", 2: "Service", 3: "Known", + 4: "Timeout", } func (v *FindServiceOptions) MarshalBinary() ([]byte, error) { @@ -3487,6 +3510,9 @@ func (v *FindServiceOptions) MarshalBinary() ([]byte, error) { if !(!v.Known) { writer.WriteBool(3, v.Known) } + if !(v.Timeout == 0) { + writer.WriteDuration(4, v.Timeout) + } _, _, err := writer.Reset(fieldNames_FindServiceOptions) if err != nil { @@ -3523,6 +3549,7 @@ func (v *FindServiceOptions) IsValid() error { var fieldNames_FindServiceResult = []string{ 1: "PeerID", 2: "Status", + 3: "Addresses", } func (v *FindServiceResult) MarshalBinary() ([]byte, error) { @@ -3539,6 +3566,11 @@ func (v *FindServiceResult) MarshalBinary() ([]byte, error) { if !(v.Status == 0) { writer.WriteEnum(2, v.Status) } + if !(len(v.Addresses) == 0) { + for _, v := range v.Addresses { + writer.WriteValue(3, v.MarshalBinary) + } + } _, _, err := writer.Reset(fieldNames_FindServiceResult) if err != nil { @@ -3561,6 +3593,11 @@ func (v *FindServiceResult) IsValid() error { } else if v.Status == 0 { errs = append(errs, "field Status is not set") } + if len(v.fieldsSet) > 2 && !v.fieldsSet[2] { + errs = append(errs, "field Addresses is missing") + } else if len(v.Addresses) == 0 { + errs = append(errs, "field Addresses is not set") + } switch len(errs) { case 0: @@ -5922,6 +5959,9 @@ func (v *FindServiceOptions) UnmarshalBinaryFrom(rd io.Reader) error { if x, ok := reader.ReadBool(3); ok { v.Known = x } + if x, ok := reader.ReadDuration(4); ok { + v.Timeout = x + } seen, err := reader.Reset(fieldNames_FindServiceOptions) if err != nil { @@ -5952,6 +5992,18 @@ func (v *FindServiceResult) UnmarshalBinaryFrom(rd io.Reader) error { if x := new(KnownPeerStatus); reader.ReadEnum(2, x) { v.Status = *x } + for { + ok := reader.ReadValue(3, func(r io.Reader) error { + x, err := p2p.UnmarshalMultiaddrFrom(r) + if err == nil { + v.Addresses = append(v.Addresses, x) + } + return err + }) + if !ok { + break + } + } seen, err := reader.Reset(fieldNames_FindServiceResult) if err != nil { @@ -7276,10 +7328,33 @@ func (v *ErrorRecord) MarshalJSON() ([]byte, error) { return json.Marshal(&u) } +func (v *FindServiceOptions) MarshalJSON() ([]byte, error) { + u := struct { + Network string `json:"network,omitempty"` + Service *ServiceAddress `json:"service,omitempty"` + Known bool `json:"known,omitempty"` + Timeout interface{} `json:"timeout,omitempty"` + }{} + if !(len(v.Network) == 0) { + u.Network = v.Network + } + if !(v.Service == nil) { + u.Service = v.Service + } + if !(!v.Known) { + u.Known = v.Known + } + if !(v.Timeout == 0) { + u.Timeout = encoding.DurationToJSON(v.Timeout) + } + return json.Marshal(&u) +} + func (v *FindServiceResult) MarshalJSON() ([]byte, error) { u := struct { - PeerID *encoding.JsonUnmarshalWith[p2p.PeerID] `json:"peerID,omitempty"` - Status KnownPeerStatus `json:"status,omitempty"` + PeerID *encoding.JsonUnmarshalWith[p2p.PeerID] `json:"peerID,omitempty"` + Status KnownPeerStatus `json:"status,omitempty"` + Addresses *encoding.JsonUnmarshalListWith[p2p.Multiaddr] `json:"addresses,omitempty"` }{} if !(v.PeerID == ("")) { u.PeerID = &encoding.JsonUnmarshalWith[p2p.PeerID]{Value: v.PeerID, Func: p2p.UnmarshalPeerIDJSON} @@ -7287,6 +7362,9 @@ func (v *FindServiceResult) MarshalJSON() ([]byte, error) { if !(v.Status == 0) { u.Status = v.Status } + if !(len(v.Addresses) == 0) { + u.Addresses = &encoding.JsonUnmarshalListWith[p2p.Multiaddr]{Value: v.Addresses, Func: p2p.UnmarshalMultiaddrJSON} + } return json.Marshal(&u) } @@ -8058,13 +8136,40 @@ func (v *ErrorRecord) UnmarshalJSON(data []byte) error { return nil } +func (v *FindServiceOptions) UnmarshalJSON(data []byte) error { + u := struct { + Network string `json:"network,omitempty"` + Service *ServiceAddress `json:"service,omitempty"` + Known bool `json:"known,omitempty"` + Timeout interface{} `json:"timeout,omitempty"` + }{} + u.Network = v.Network + u.Service = v.Service + u.Known = v.Known + u.Timeout = encoding.DurationToJSON(v.Timeout) + if err := json.Unmarshal(data, &u); err != nil { + return err + } + v.Network = u.Network + v.Service = u.Service + v.Known = u.Known + if x, err := encoding.DurationFromJSON(u.Timeout); err != nil { + return fmt.Errorf("error decoding Timeout: %w", err) + } else { + v.Timeout = x + } + return nil +} + func (v *FindServiceResult) UnmarshalJSON(data []byte) error { u := struct { - PeerID *encoding.JsonUnmarshalWith[p2p.PeerID] `json:"peerID,omitempty"` - Status KnownPeerStatus `json:"status,omitempty"` + PeerID *encoding.JsonUnmarshalWith[p2p.PeerID] `json:"peerID,omitempty"` + Status KnownPeerStatus `json:"status,omitempty"` + Addresses *encoding.JsonUnmarshalListWith[p2p.Multiaddr] `json:"addresses,omitempty"` }{} u.PeerID = &encoding.JsonUnmarshalWith[p2p.PeerID]{Value: v.PeerID, Func: p2p.UnmarshalPeerIDJSON} u.Status = v.Status + u.Addresses = &encoding.JsonUnmarshalListWith[p2p.Multiaddr]{Value: v.Addresses, Func: p2p.UnmarshalMultiaddrJSON} if err := json.Unmarshal(data, &u); err != nil { return err } @@ -8073,6 +8178,12 @@ func (v *FindServiceResult) UnmarshalJSON(data []byte) error { } v.Status = u.Status + if u.Addresses != nil { + v.Addresses = make([]p2p.Multiaddr, len(u.Addresses.Value)) + for i, x := range u.Addresses.Value { + v.Addresses[i] = x + } + } return nil } diff --git a/test/simulator/services/services.go b/test/simulator/services/services.go index 673349783..4186fa274 100644 --- a/test/simulator/services/services.go +++ b/test/simulator/services/services.go @@ -60,7 +60,7 @@ func (s Services) Register(id peer.ID, address *api.ServiceAddress, handler Hand } func (s Services) Dial(ctx context.Context, addr multiaddr.Multiaddr) (message.Stream, error) { - _, peer, sa, err := api.UnpackAddress(addr) + _, peer, sa, _, err := api.UnpackAddress(addr) if err != nil { return nil, errors.UnknownError.Wrap(err) } diff --git a/tools/cmd/debug/heal_anchor.go b/tools/cmd/debug/heal_anchor.go index db0c70199..b3b51f50a 100644 --- a/tools/cmd/debug/heal_anchor.go +++ b/tools/cmd/debug/heal_anchor.go @@ -9,6 +9,7 @@ package main import ( "context" "encoding/json" + "errors" "fmt" "os" "strconv" @@ -25,6 +26,7 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" "gitlab.com/accumulatenetwork/accumulate/pkg/url" "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" ) var cmdHealAnchor = &cobra.Command{ @@ -36,19 +38,24 @@ var cmdHealAnchor = &cobra.Command{ func init() { cmdHeal.AddCommand(cmdHealAnchor) - cmdHealAnchor.Flags().BoolVar(&healContinuous, "continuous", false, "Run healing in a loop every second") - _ = cmdHealAnchor.MarkFlagFilename("cached-scan", ".json") } func healAnchor(_ *cobra.Command, args []string) { ctx, cancel, _ := api.ContextWithBatchData(context.Background()) defer cancel() - networkID := args[0] + // We should be able to use only the p2p client but it doesn't work well for + // some reason + C1 := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(args[0])) + C1.Client.Timeout = time.Hour + + ni, err := C1.NodeInfo(ctx, api.NodeInfoOptions{}) + check(err) + node, err := p2p.New(p2p.Options{ - Network: networkID, - BootstrapPeers: api.BootstrapServers, - // BootstrapPeers: mainnetAddrs, + Network: ni.Network, + // BootstrapPeers: api.BootstrapServers, + BootstrapPeers: mainnetAddrs, }) checkf(err, "start p2p node") defer func() { _ = node.Close() }() @@ -58,36 +65,36 @@ func healAnchor(_ *cobra.Command, args []string) { fmt.Fprintln(os.Stderr, "Waiting for addresses") time.Sleep(time.Second) - // We should be able to use only the p2p client but it doesn't work well for - // some reason - C1 := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(networkID)) - C1.Client.Timeout = time.Hour + var net *healing.NetworkInfo + if cachedScan != "" { + data, err := os.ReadFile(cachedScan) + check(err) + check(json.Unmarshal(data, &net)) + } // Use a hack dialer that uses the API for peer discovery router := new(routing.MessageRouter) C2 := &message.Client{ Transport: &message.RoutedTransport{ - Network: networkID, + Network: ni.Network, // Dialer: node.DialNetwork(), - Dialer: &hackDialer{C1, node.DialNetwork(), map[string]peer.ID{}}, + Dialer: &hackDialer{net, C1, node.DialNetwork(), map[string]peer.ID{}}, Router: router, }, } - var net *healing.NetworkInfo if cachedScan == "" { - net, err = healing.ScanNetwork(ctx, C1) - check(err) - } else { - data, err := os.ReadFile(cachedScan) + net, err = healing.ScanNetwork(ctx, C2) check(err) - check(json.Unmarshal(data, &net)) } + router.Router, err = routing.NewStaticRouter(net.Status.Routing, nil) + check(err) + if len(args) > 1 { txid, err := url.ParseTxID(args[1]) if err == nil { - r, err := api.Querier2{Querier: C1}.QueryTransaction(ctx, txid, nil) + r, err := api.Querier2{Querier: C2}.QueryTransaction(ctx, txid, nil) check(err) if r.Sequence == nil { fatalf("%v is not sequenced", txid) @@ -150,7 +157,7 @@ func healAnchorSequence(ctx context.Context, C1 *jsonrpc.Client, C2 *message.Cli srcUrl := protocol.PartitionUrl(srcId) dstUrl := protocol.PartitionUrl(dstId) - ids, txns := findPendingAnchors(ctx, C2, api.Querier2{Querier: C1}, srcUrl, dstUrl, true) + ids, txns := findPendingAnchors(ctx, C2, api.Querier2{Querier: C2}, net, srcUrl, dstUrl, true) src2dst.Pending = append(src2dst.Pending, ids...) for i, txid := range src2dst.Pending { @@ -159,18 +166,24 @@ func healAnchorSequence(ctx context.Context, C1 *jsonrpc.Client, C2 *message.Cli } func healSingleAnchor(ctx context.Context, C1 *jsonrpc.Client, C2 *message.Client, net *healing.NetworkInfo, srcId, dstId string, seqNum uint64, txid *url.TxID, txns map[[32]byte]*protocol.Transaction) { +again: err := healing.HealAnchor(ctx, healing.HealAnchorArgs{ Client: C2.ForAddress(nil), - Querier: C1, - Submitter: C1, + Querier: C2, + Submitter: C2, NetInfo: net, Known: txns, Pretend: pretend, + Wait: waitForTxn, }, healing.SequencedInfo{ Source: srcId, Destination: dstId, Number: seqNum, ID: txid, }) + if errors.Is(err, healing.ErrRetry) { + slog.Error("Anchor still pending after 10 attempts, retrying") + goto again + } check(err) } diff --git a/tools/cmd/debug/heal_common.go b/tools/cmd/debug/heal_common.go index 4e18ac118..94946b268 100644 --- a/tools/cmd/debug/heal_common.go +++ b/tools/cmd/debug/heal_common.go @@ -10,6 +10,7 @@ import ( "context" "github.com/multiformats/go-multiaddr" + "github.com/spf13/cobra" "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" @@ -18,6 +19,21 @@ import ( "gitlab.com/accumulatenetwork/accumulate/protocol" ) +var cmdHeal = &cobra.Command{ + Use: "heal", +} + +func init() { + cmd.AddCommand(cmdHeal) + + cmdHeal.PersistentFlags().StringVar(&cachedScan, "cached-scan", "", "A cached network scan") + cmdHeal.PersistentFlags().BoolVarP(&pretend, "pretend", "n", false, "Do not submit envelopes, only scan") + cmdHeal.PersistentFlags().BoolVar(&waitForTxn, "wait", true, "Wait for the message to finalize") + cmdHeal.PersistentFlags().BoolVar(&healContinuous, "continuous", false, "Run healing in a loop every second") + + _ = cmdHeal.MarkFlagFilename("cached-scan", ".json") +} + // resolveSeq resolves an anchor or synthetic message (a sequenced message). If // the client's address is non-nil, the query will be sent to that address. // Otherwise, all of the source partition's nodes will be queried in order until diff --git a/tools/cmd/debug/heal_synth.go b/tools/cmd/debug/heal_synth.go index 54830a700..af2b4bb6f 100644 --- a/tools/cmd/debug/heal_synth.go +++ b/tools/cmd/debug/heal_synth.go @@ -22,12 +22,9 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" ) -var cmdHeal = &cobra.Command{ - Use: "heal", -} - var cmdHealSynth = &cobra.Command{ Use: "synth [server]", Short: "Fixup synthetic transactions", @@ -36,13 +33,7 @@ var cmdHealSynth = &cobra.Command{ } func init() { - cmd.AddCommand(cmdHeal) cmdHeal.AddCommand(cmdHealSynth) - - cmdHeal.PersistentFlags().StringVar(&cachedScan, "cached-scan", "", "A cached network scan") - cmdHeal.PersistentFlags().BoolVarP(&pretend, "pretend", "n", false, "Do not submit envelopes, only scan") - cmdHeal.PersistentFlags().BoolVar(&waitForTxn, "wait", false, "Wait for the message to finalize") - cmdHealSynth.Flags().StringVar(&usePeer, "peer", "", "Query a specific peer") } @@ -59,6 +50,13 @@ func healSynth(_ *cobra.Command, args []string) { ni, err := C1.NodeInfo(ctx, api.NodeInfoOptions{}) check(err) + var net *healing.NetworkInfo + if cachedScan != "" { + data, err := os.ReadFile(cachedScan) + check(err) + check(json.Unmarshal(data, &net)) + } + node, err := p2p.New(p2p.Options{ Network: ni.Network, // BootstrapPeers: api.BootstrapServers, @@ -76,10 +74,12 @@ func healSynth(_ *cobra.Command, args []string) { C2 := (&message.Client{ Transport: &message.RoutedTransport{ Network: ni.Network, - Dialer: node.DialNetwork(), - Router: router, + // Dialer: node.DialNetwork(), + Dialer: &hackDialer{net, C1, node.DialNetwork(), map[string]peer.ID{}}, + Router: router, }, }).ForAddress(nil) + Q.Querier = C2 if usePeer != "" { pid, err := peer.Decode(usePeer) @@ -87,23 +87,17 @@ func healSynth(_ *cobra.Command, args []string) { C2 = C2.ForPeer(pid) } - ns, err := C1.NetworkStatus(ctx, api.NetworkStatusOptions{Partition: protocol.Directory}) - check(err) - router.Router, err = routing.NewStaticRouter(ns.Routing, nil) - check(err) - - var net *healing.NetworkInfo if cachedScan == "" { net, err = healing.ScanNetwork(ctx, C1) check(err) - } else { - data, err := os.ReadFile(cachedScan) - check(err) - check(json.Unmarshal(data, &net)) } + router.Router, err = routing.NewStaticRouter(net.Status.Routing, nil) + check(err) + +heal: synths := map[string]*protocol.SyntheticLedger{} - for _, dst := range ns.Network.Partitions { + for _, dst := range net.Status.Network.Partitions { // Get synthetic ledger var synth *protocol.SyntheticLedger u := protocol.PartitionUrl(dst.ID).JoinPath(protocol.Synthetic) @@ -121,8 +115,8 @@ func healSynth(_ *cobra.Command, args []string) { } // Check produced vs received - for i, a := range ns.Network.Partitions { - for _, b := range ns.Network.Partitions[i:] { + for i, a := range net.Status.Network.Partitions { + for _, b := range net.Status.Network.Partitions[i:] { ab := synths[a.ID].Partition(protocol.PartitionUrl(b.ID)) ba := synths[b.ID].Partition(protocol.PartitionUrl(a.ID)) @@ -137,6 +131,12 @@ func healSynth(_ *cobra.Command, args []string) { } } } + + // Heal continuously? + if healContinuous { + time.Sleep(time.Second) + goto heal + } } func resubmitByNumber(ctx context.Context, C2 message.AddressedClient, net *healing.NetworkInfo, source, destination string, number uint64) { @@ -152,5 +152,7 @@ func resubmitByNumber(ctx context.Context, C2 message.AddressedClient, net *heal Destination: destination, Number: number, }) - check(err) + if err != nil { + slog.Error("Failed to heal", "source", source, "destination", destination, "number", number, "error", err) + } } diff --git a/tools/cmd/debug/network.go b/tools/cmd/debug/network.go index e7cecf017..135532d03 100644 --- a/tools/cmd/debug/network.go +++ b/tools/cmd/debug/network.go @@ -10,11 +10,15 @@ import ( "context" "encoding/json" "fmt" + "net/url" "os" + "strconv" "time" + "github.com/multiformats/go-multiaddr" "github.com/spf13/cobra" "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/internal/node/config" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" ) @@ -26,14 +30,23 @@ var networkCmd = &cobra.Command{ var networkScanCmd = &cobra.Command{ Use: "scan [network]", Short: "Scan the network for nodes", + Args: cobra.ExactArgs(1), Run: scanNetwork, } +var networkScanNodeCmd = &cobra.Command{ + Use: "scan-node [address]", + Short: "Scan a node", + Args: cobra.ExactArgs(1), + Run: scanNode, +} + func init() { cmd.AddCommand(networkCmd) networkCmd.AddCommand(networkScanCmd) + networkCmd.AddCommand(networkScanNodeCmd) - networkScanCmd.Flags().BoolVarP(&outputJSON, "json", "j", false, "Output result as JSON") + networkCmd.PersistentFlags().BoolVarP(&outputJSON, "json", "j", false, "Output result as JSON") } func scanNetwork(_ *cobra.Command, args []string) { @@ -54,3 +67,30 @@ func scanNetwork(_ *cobra.Command, args []string) { } } } + +func scanNode(_ *cobra.Command, args []string) { + u, err := url.Parse(args[0]) + checkf(err, "invalid URL") + port, err := strconv.ParseUint(u.Port(), 10, 64) + checkf(err, "invalid port") + + client := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(args[0])) + client.Client.Timeout = time.Hour + peer, err := healing.ScanNode(context.Background(), client) + check(err) + + p2pPort := port - uint64(config.PortOffsetAccumulateApi) + uint64(config.PortOffsetAccumulateP2P) + tcp, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", u.Hostname(), p2pPort)) + check(err) + udp, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/%d/quic", u.Hostname(), p2pPort)) + check(err) + peer.Addresses = []multiaddr.Multiaddr{tcp, udp} + + if outputJSON { + fmt.Fprintln(os.Stderr, peer.ID) + check(json.NewEncoder(os.Stdout).Encode(peer)) + return + } + + fmt.Printf(" %v\n", peer) +} diff --git a/tools/cmd/debug/sequence.go b/tools/cmd/debug/sequence.go index 3a94389c2..03c9f187d 100644 --- a/tools/cmd/debug/sequence.go +++ b/tools/cmd/debug/sequence.go @@ -8,13 +8,18 @@ package main import ( "context" + "encoding/json" "fmt" + "os" + "strings" + "time" "github.com/fatih/color" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/spf13/cobra" "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" @@ -23,6 +28,7 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" "gitlab.com/accumulatenetwork/accumulate/pkg/url" "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" ) var cmdSequence = &cobra.Command{ @@ -33,7 +39,9 @@ var cmdSequence = &cobra.Command{ func init() { cmd.AddCommand(cmdSequence) - cmdSequence.Flags().BoolVarP(&verbose, "verbose", "v", false, "More verbose outputt") + cmdSequence.Flags().BoolVarP(&verbose, "verbose", "v", false, "More verbose output") + cmdSequence.PersistentFlags().StringVar(&cachedScan, "cached-scan", "", "A cached network scan") + } func sequence(cmd *cobra.Command, args []string) { @@ -41,6 +49,7 @@ func sequence(cmd *cobra.Command, args []string) { defer cancel() c := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(args[0])) + c.Client.Timeout = time.Hour Q := api.Querier2{Querier: c} ni, err := c.NodeInfo(ctx, api.NodeInfoOptions{}) @@ -55,14 +64,22 @@ func sequence(cmd *cobra.Command, args []string) { fmt.Printf("We are %v\n", node.ID()) + var net *healing.NetworkInfo + if cachedScan != "" { + data, err := os.ReadFile(cachedScan) + check(err) + check(json.Unmarshal(data, &net)) + } + router := new(routing.MessageRouter) c2 := &message.Client{ Transport: &message.RoutedTransport{ - Network: args[0], - Dialer: &hackDialer{c, node.DialNetwork(), map[string]peer.ID{}}, + Network: ni.Network, + Dialer: &hackDialer{net, c, node.DialNetwork(), map[string]peer.ID{}}, Router: router, }, } + Q.Querier = c2 ns, err := c.NetworkStatus(ctx, api.NetworkStatusOptions{Partition: protocol.Directory}) check(err) @@ -88,7 +105,7 @@ func sequence(cmd *cobra.Command, args []string) { // Check pending and received vs delivered for _, src := range anchor.Sequence { - ids, _ := findPendingAnchors(ctx, c2, Q, src.Url, dst, verbose) + ids, _ := findPendingAnchors(ctx, c2, Q, net, src.Url, dst, verbose) src.Pending = append(src.Pending, ids...) checkSequence1(part, src, bad, "anchors") @@ -157,7 +174,7 @@ func checkSequence1(dst *protocol.PartitionInfo, src *protocol.PartitionSyntheti } } -func findPendingAnchors(ctx context.Context, C *message.Client, Q api.Querier2, src, dst *url.URL, resolve bool) ([]*url.TxID, map[[32]byte]*protocol.Transaction) { +func findPendingAnchors(ctx context.Context, C *message.Client, Q api.Querier2, net *healing.NetworkInfo, src, dst *url.URL, resolve bool) ([]*url.TxID, map[[32]byte]*protocol.Transaction) { srcId, _ := protocol.ParsePartitionUrl(src) dstId, _ := protocol.ParsePartitionUrl(dst) @@ -184,8 +201,27 @@ func findPendingAnchors(ctx context.Context, C *message.Client, Q api.Querier2, var ids []*url.TxID txns := map[[32]byte]*protocol.Transaction{} for i := received + 1; i <= srcDstChain.Count; i++ { - msg, err := C.Private().Sequence(ctx, src.JoinPath(protocol.AnchorPool), dst, i) - checkf(err, "query %v → %v anchor #%d", srcId, dstId, i) + var msg *api.MessageRecord[messaging.Message] + if net == nil { + slog.Info("Checking anchor", "source", src, "destination", dst, "number", i, "remaining", srcDstChain.Count-i) + msg, err = C.Private().Sequence(ctx, src.JoinPath(protocol.AnchorPool), dst, i) + checkf(err, "query %v → %v anchor #%d", srcId, dstId, i) + } else { + for _, peer := range net.Peers[strings.ToLower(srcId)] { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + slog.Info("Checking anchor", "source", src, "destination", dst, "number", i, "remaining", srcDstChain.Count-i, "peer", peer.ID) + msg, err = C.ForPeer(peer.ID).Private().Sequence(ctx, src.JoinPath(protocol.AnchorPool), dst, i) + if err == nil { + break + } + slog.Error("Failed to check anchor", "source", src, "destination", dst, "number", i, "remaining", srcDstChain.Count-i, "peer", peer.ID, "error", err) + } + if msg == nil { + fatalf("query %v → %v anchor #%d failed", srcId, dstId, i) + } + } + ids = append(ids, msg.ID) txn := msg.Message.(*messaging.TransactionMessage) @@ -195,11 +231,16 @@ func findPendingAnchors(ctx context.Context, C *message.Client, Q api.Querier2, } type hackDialer struct { + net *healing.NetworkInfo api api.NodeService node message.Dialer good map[string]peer.ID } +func (h *hackDialer) BadDial(ctx context.Context, addr multiaddr.Multiaddr, stream message.Stream, err error) bool { + return true +} + func (h *hackDialer) Dial(ctx context.Context, addr multiaddr.Multiaddr) (message.Stream, error) { // Have we found a good peer? if id, ok := h.good[addr.String()]; ok { @@ -212,13 +253,68 @@ func (h *hackDialer) Dial(ctx context.Context, addr multiaddr.Multiaddr) (messag } // Unpack the service address - network, peer, service, err := api.UnpackAddress(addr) + network, peerID, service, _, err := api.UnpackAddress(addr) if err != nil { return nil, err } + // Check for a recorded address + if peerID != "" { + info := h.net.PeerByID(peerID) + if info != nil { + addr := addr + if peerID == "" { + c, err := multiaddr.NewComponent("p2p", info.ID.String()) + check(err) + addr = c.Encapsulate(addr) + } + for _, paddr := range info.Addresses { + s, err := h.node.Dial(ctx, paddr.Encapsulate(addr)) + if err == nil { + h.good[addr.String()] = info.ID + return s, nil + } + slog.Error("Failed to connect", "peer", info.ID, "address", paddr, "service", addr, "error", err) + } + } + } else if service.Argument != "" { + // In the future not all peers will have all services + part, ok := h.net.Peers[strings.ToLower(service.Argument)] + if ok { + tried := map[string]bool{} + pick := func() (*healing.PeerInfo, multiaddr.Multiaddr) { + for _, p := range part { + for _, addr := range p.Addresses { + if tried[addr.String()] { + continue + } + tried[addr.String()] = true + c, err := multiaddr.NewComponent("p2p", p.ID.String()) + check(err) + addr = c.Encapsulate(addr) + return p, addr + } + } + return nil, nil + } + + for { + info, paddr := pick() + if paddr == nil { + break + } + s, err := h.node.Dial(ctx, paddr.Encapsulate(addr)) + if err == nil { + h.good[addr.String()] = info.ID + return s, nil + } + slog.Error("Failed to connect", "peer", info.ID, "address", paddr, "service", addr, "error", err) + } + } + } + // If it specifies a node, do nothing - if peer != "" { + if peerID != "" { return h.node.Dial(ctx, addr) }