diff --git a/.dockerignore b/.dockerignore index 466703db6..04b61b72a 100644 --- a/.dockerignore +++ b/.dockerignore @@ -4,4 +4,4 @@ *.exe .git */Dockerfile -.test \ No newline at end of file +.test diff --git a/.vscode/launch.json b/.vscode/launch.json index 48502d4d6..380b5e219 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -539,11 +539,14 @@ "type": "go", "request": "launch", "mode": "auto", - "program": "${workspaceFolder}/tools/cmd/resend-anchor", + "program": "${workspaceFolder}/tools/cmd/debug", "cwd": "${workspaceFolder}", "args": [ "heal", - "https://mainnet.accumulatenetwork.io/v3", + "anchor", + "MainNet", + "--cached-scan=${env:HOME}/.accumulate/cache/mainnet.json", + "Directory→Apollo", "70911" ] }, { @@ -557,9 +560,27 @@ "program": "${workspaceFolder}/tools/cmd/debug", "cwd": "${workspaceFolder}", "args": [ - "heal-synth", + "heal", + "synth", "mainnet", + "--cached-scan=${env:HOME}/.accumulate/cache/mainnet.json", + ] + }, + { + "name": "Heal single anchor", + "presentation": { "group": "99-Miscellaneous", }, + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/tools/cmd/debug", + "cwd": "${workspaceFolder}", + "args": [ + "heal", + "anchor", "mainnet", + "Chandrayaan → Apollo", + // "acc://f18b9ddd70654849ce063581b7bcbc6947d4b763d7a2fa4668e6fd9129da0e23@dn.acme/anchors", + "--cached-scan=${env:HOME}/.accumulate/cache/mainnet.json", ] }, { diff --git a/exp/apiutil/mainnet.go b/exp/apiutil/mainnet.go new file mode 100644 index 000000000..4c562d49a --- /dev/null +++ b/exp/apiutil/mainnet.go @@ -0,0 +1,58 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package apiutil + +import "github.com/multiformats/go-multiaddr" + +var MainnetAddrs = func() []multiaddr.Multiaddr { + s := []string{ + "/dns/apollo-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWAgrBYpWEXRViTnToNmpCoC3dvHdmR6m1FmyKjDn1NYpj", + "/dns/yutu-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWDqFDwjHEog1bNbxai2dKSaR1aFvq2LAZ2jivSohgoSc7", + "/dns/chandrayaan-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWHzjkoeAqe7L55tAaepCbMbhvNu9v52ayZNVQobdEE1RL", + "/ip4/116.202.214.38/tcp/16593/p2p/12D3KooWBkJQiuvotpMemWBYfAe4ctsVHi7fLvT8RT83oXJ5dsgV", + "/ip4/83.97.19.82/tcp/16593/p2p/12D3KooWHSbqS6K52d4ReauHAg4n8MFbAKkdEAae2fZXnzRYi9ce", + "/ip4/206.189.97.165/tcp/16593/p2p/12D3KooWHyA7zgAVqGvCBBJejgvKzv7DQZ3LabJMWqmCQ9wFbT3o", + "/ip4/144.76.105.23/tcp/16593/p2p/12D3KooWS2Adojqun5RV1Xy4k6vKXWpRQ3VdzXnW8SbW7ERzqKie", + "/ip4/18.190.77.236/tcp/16593/p2p/12D3KooWP1d9vUJCzqX5bTv13tCHmVssJrgK3EnJCC2C5Ep2SXbS", + "/ip4/3.28.207.55/tcp/16593/p2p/12D3KooWEzhg3CRvC3xdrUBFsWETF1nG3gyYfEjx4oEJer95y1Rk", + "/ip4/38.135.195.81/tcp/16593/p2p/12D3KooWDWCHGAyeUWdP8yuuSYvMoUfaPoGu4p3gJb51diqNQz6j", + // "/ip4/50.17.246.3/tcp/16593/p2p/12D3KooWKkNsxkHJqvSje2viyqKVxtqvbTpFrbASD3q1uv6td1pW", + "/dns/validator-eu01.acme.sphereon.com/tcp/16593/p2p/12D3KooWKYTWKJ5jeuZmbbwiN7PoinJ2yJLoQtZyfWi2ihjBnSUR", + "/ip4/35.86.120.53/tcp/16593/p2p/12D3KooWKJuspMDC5GXzLYJs9nHwYfqst9QAW4m5FakXNHVMNiq7", + "/ip4/65.109.48.173/tcp/16593/p2p/12D3KooWHkUtGcHY96bNavZMCP2k5ps5mC7GrF1hBC1CsyGJZSPY", + "/dns/accumulate.detroitledger.tech/tcp/16593/p2p/12D3KooWNe1QNh5mKAa8iAEP8vFwvmWFxaCLNcAdE1sH38Bz8sc9", + "/ip4/3.135.9.97/tcp/16593/p2p/12D3KooWEQG3X528Ct2Kd3kxhv6WZDBqaAoEw7AKiPoK1NmWJgx1", + // "/ip4/3.86.85.133/tcp/16593/p2p/12D3KooWJvReA1SuLkppyXKXq6fifVPLqvNtzsvPUqagVjvYe7qe", + "/ip4/193.35.56.176/tcp/16593/p2p/12D3KooWJevZUFLqN7zAamDh2EEYNQZPvxGFwiFVyPXfuXZNjg1J", + "/ip4/35.177.70.195/tcp/16593/p2p/12D3KooWPzpRp1UCu4nvXT9h8jKvmBmCADrMnoF72DrEbUrWrB2G", + "/ip4/3.99.81.122/tcp/16593/p2p/12D3KooWLL5kAbD7nhv6CM9x9L1zjxSnc6hdMVKcsK9wzMGBo99X", + "/ip4/34.219.75.234/tcp/16593/p2p/12D3KooWKHjS5nzG9dipBXn31pYEnfa8g5UzvkSYEsuiukGHzPvt", + "/ip4/3.122.254.53/tcp/16593/p2p/12D3KooWRU8obVzgfw6TsUHjoy2FDD3Vd7swrPNTM7DMFs8JG4dx", + "/ip4/35.92.228.236/tcp/16593/p2p/12D3KooWQqMqbyJ2Zay9KHeEDgDMAxQpKD1ypiBX5ByQAA2XpsZL", + "/ip4/3.135.184.194/tcp/16593/p2p/12D3KooWHcxyiE3AGdPnhtj87tByfLnJZVR6mLefadWccbMByrBa", + "/ip4/18.133.170.113/tcp/16593/p2p/12D3KooWFbWY2NhBEWTLHUCwwPmNHm4BoJXbojnrJJfuDCVoqrFY", + // "/ip4/44.204.224.126/tcp/16593/p2p/12D3KooWAiJJxdgsB39up5h6fz6TSfBz4HsLKTFiBXUrbwA8o54m", + "/ip4/35.92.21.90/tcp/16593/p2p/12D3KooWLTV3pTN2NbKeFeseCGHyMXuAkQv68KfCeK4uqJzJMfhZ", + "/ip4/3.99.166.147/tcp/16593/p2p/12D3KooWGYUf93iYWsUibSvKdxsYUY1p7fC1nQotCpUcDXD1ABvR", + "/ip4/16.171.4.135/tcp/16593/p2p/12D3KooWEMpAxKnXJPkcEXpDmrnjrZ5iFMZvvQtimmTTxuoRGkXV", + "/ip4/54.237.244.42/tcp/16593/p2p/12D3KooWLoMkrgW862Gs152jLt6FiZZs4GkY24Su4QojnvMoSNaQ", + // "/ip4/3.238.124.43/tcp/16593/p2p/12D3KooWJ8CA8pacTnKWVgBSEav4QG1zJpyeSSME47RugpDUrZp8", + "/ip4/13.53.125.115/tcp/16593/p2p/12D3KooWBJk52fQExXHWhFNk692hP7JvTxNTvUMdVne8tbJ3DBf3", + "/ip4/13.59.241.224/tcp/16593/p2p/12D3KooWKjYKqg2TgUSLq8CZAP8G6LhjXUWTcQBd9qYL2JHug9HW", + "/ip4/18.168.202.86/tcp/16593/p2p/12D3KooWDiKGbUZg1rB5EufRCkRPiDCEPMjyvTfTVR9qsKVVkcuC", + "/ip4/35.183.112.161/tcp/16593/p2p/12D3KooWFPKeXzKMd3jtoeG6ts6ADKmVV8rVkXR9k9YkQPgpLzd6", + } + addrs := make([]multiaddr.Multiaddr, len(s)) + for i, s := range s { + addr, err := multiaddr.NewMultiaddr(s) + if err != nil { + panic(err) + } + addrs[i] = addr + } + return addrs +}() diff --git a/exp/apiutil/scan.go b/exp/apiutil/scan.go new file mode 100644 index 000000000..d5303bc40 --- /dev/null +++ b/exp/apiutil/scan.go @@ -0,0 +1,202 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package apiutil + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "sync" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "golang.org/x/exp/slog" +) + +type NetworkScan = healing.NetworkInfo + +func LoadNetworkScan(file string) (*NetworkScan, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + defer f.Close() + + var net *NetworkScan + err = json.NewDecoder(f).Decode(&net) + if err != nil { + return nil, err + } + return net, nil +} + +func NewMessageRouter(scan *NetworkScan) (message.Router, error) { + var err error + router := new(routing.MessageRouter) + router.Router, err = routing.NewStaticRouter(scan.Status.Routing, nil) + return router, err +} + +type StaticDialer struct { + Scan *healing.NetworkInfo + Dialer message.Dialer + Nodes api.NodeService + + mu sync.RWMutex + good map[string]peer.ID +} + +func (h *StaticDialer) BadDial(ctx context.Context, addr multiaddr.Multiaddr, stream message.Stream, err error) bool { + return true +} + +func (h *StaticDialer) Dial(ctx context.Context, addr multiaddr.Multiaddr) (message.Stream, error) { + // Have we found a good peer? + s := h.dialKnownGood(ctx, addr) + if s != nil { + return s, nil + } + // Unpack the service address + network, peerID, service, _, err := api.UnpackAddress(addr) + if err != nil { + return nil, err + } + + // Check for a recorded address + if h.Scan != nil { + if peerID != "" { + info := h.Scan.PeerByID(peerID) + if info != nil { + addr := addr + if peerID == "" { + c, err := multiaddr.NewComponent("p2p", info.ID.String()) + if err != nil { + panic(err) + } + addr = c.Encapsulate(addr) + } + for _, paddr := range info.Addresses { + s, err := h.Dialer.Dial(ctx, paddr.Encapsulate(addr)) + if err == nil { + h.markGood(addr, info.ID) + return s, nil + } + slog.Error("Failed to connect", "peer", info.ID, "address", paddr, "service", addr, "error", err) + } + } + } else if service.Argument != "" { + // In the future not all peers will have all services + part, ok := h.Scan.Peers[strings.ToLower(service.Argument)] + if ok { + tried := map[string]bool{} + pick := func() (*healing.PeerInfo, multiaddr.Multiaddr) { + for _, p := range part { + for _, addr := range p.Addresses { + if tried[addr.String()] { + continue + } + tried[addr.String()] = true + c, err := multiaddr.NewComponent("p2p", p.ID.String()) + if err != nil { + panic(err) + } + addr = c.Encapsulate(addr) + return p, addr + } + } + return nil, nil + } + + for { + info, paddr := pick() + if paddr == nil { + break + } + s, err := h.Dialer.Dial(ctx, paddr.Encapsulate(addr)) + if err == nil { + h.markGood(addr, info.ID) + return s, nil + } + slog.Error("Failed to connect", "peer", info.ID, "address", paddr, "service", addr, "error", err) + } + } + } + } + + // If it specifies a node, do nothing + if h.Nodes == nil || peerID != "" { + return h.Dialer.Dial(ctx, addr) + } + + // Use the API to find a node + nodes, err := h.Nodes.FindService(ctx, api.FindServiceOptions{Network: network, Service: service}) + if err != nil { + return nil, errors.UnknownError.WithFormat("locate nodes for %v: %w", addr, err) + } + if len(nodes) == 0 { + return nil, errors.NoPeer.WithFormat("cannot locate a peer for %v", addr) + } + + // Try all the nodes + for _, n := range nodes { + s, err := h.dial(ctx, addr, n.PeerID) + if err == nil { + h.markGood(addr, n.PeerID) + return s, nil + } + fmt.Printf("%v failed with %v\n", n.PeerID, err) + } + return nil, errors.NoPeer.WithFormat("no peers are responding for %v", addr) +} + +func (h *StaticDialer) dial(ctx context.Context, addr multiaddr.Multiaddr, peer peer.ID) (message.Stream, error) { + c, err := multiaddr.NewComponent("p2p", peer.String()) + if err != nil { + return nil, err + } + addr = addr.Encapsulate(c) + return h.Dialer.Dial(ctx, addr) +} + +func (h *StaticDialer) dialKnownGood(ctx context.Context, addr multiaddr.Multiaddr) message.Stream { + h.mu.RLock() + id, ok := h.good[addr.String()] + h.mu.RUnlock() + if !ok { + return nil + } + + s, err := h.dial(ctx, addr, id) + if err == nil { + return s + } + + slog.Info("Failed to dial previously good node", "id", id, "error", err) + + h.mu.Lock() + defer h.mu.Unlock() + delete(h.good, addr.String()) + + return nil +} + +func (h *StaticDialer) markGood(addr multiaddr.Multiaddr, id peer.ID) { + h.mu.Lock() + defer h.mu.Unlock() + + if h.good == nil { + h.good = map[string]peer.ID{} + } + h.good[addr.String()] = id +} diff --git a/internal/api/routing/router.go b/internal/api/routing/router.go index ff74df23a..3a6d37d49 100644 --- a/internal/api/routing/router.go +++ b/internal/api/routing/router.go @@ -143,6 +143,9 @@ func routeMessage(routeAccount func(*url.URL) (string, error), route *string, ms case *messaging.SequencedMessage: r, err = routeAccount(msg.Destination) + case *messaging.SyntheticMessage: + return routeMessage(routeAccount, route, msg.Message) + case *messaging.BlockAnchor: return routeMessage(routeAccount, route, msg.Anchor) diff --git a/internal/core/healing/anchors.go b/internal/core/healing/anchors.go new file mode 100644 index 000000000..a64ebb850 --- /dev/null +++ b/internal/core/healing/anchors.go @@ -0,0 +1,309 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +import ( + "context" + "encoding/hex" + "fmt" + "strings" + "time" + + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/network" + "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" +) + +type HealAnchorArgs struct { + Client message.AddressedClient + Querier api.Querier + Submitter api.Submitter + NetInfo *NetworkInfo + Known map[[32]byte]*protocol.Transaction + Pretend bool + Wait bool +} + +func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error { + srcUrl := protocol.PartitionUrl(si.Source) + dstUrl := protocol.PartitionUrl(si.Destination) + + if args.Querier == nil { + args.Querier = args.Client + } + if args.Submitter == nil { + args.Submitter = args.Client + } + + // If the message ID is not known, resolve it + var theAnchorTxn *protocol.Transaction + if si.ID == nil { + r, err := ResolveSequenced[*messaging.TransactionMessage](ctx, args.Client, args.NetInfo, si.Source, si.Destination, si.Number, true) + if err != nil { + return err + } + si.ID = r.ID + theAnchorTxn = r.Message.Transaction + } + + // Fetch the transaction and signatures + var sigSets []*api.SignatureSetRecord + Q := api.Querier2{Querier: args.Querier} + res, err := Q.QueryMessage(ctx, si.ID, nil) + switch { + case err == nil: + switch msg := res.Message.(type) { + case *messaging.SequencedMessage: + txn, ok := msg.Message.(*messaging.TransactionMessage) + if !ok { + return errors.InternalError.WithFormat("expected %v, got %v", messaging.MessageTypeTransaction, msg.Message.Type()) + } + theAnchorTxn = txn.Transaction + case *messaging.TransactionMessage: + theAnchorTxn = msg.Transaction + default: + return errors.InternalError.WithFormat("expected %v, got %v", messaging.MessageTypeSequenced, res.Message.Type()) + } + + sigSets = res.Signatures.Records + + case !errors.Is(err, errors.NotFound): + return err + + case theAnchorTxn == nil: + var ok bool + theAnchorTxn, ok = args.Known[si.ID.Hash()] + if !ok { + return err + } + } + + // Mark which validators have signed + signed := map[[32]byte]bool{} + for _, sigs := range sigSets { + for _, sig := range sigs.Signatures.Records { + msg, ok := sig.Message.(*messaging.BlockAnchor) + if !ok { + continue + } + k := msg.Signature.GetPublicKey() + slog.DebugCtx(ctx, "Anchor has been signed by", "validator", hex.EncodeToString(k[:4])) + signed[*(*[32]byte)(k)] = true + } + } + + g := &network.GlobalValues{ + Oracle: args.NetInfo.Status.Oracle, + Globals: args.NetInfo.Status.Globals, + Network: args.NetInfo.Status.Network, + Routing: args.NetInfo.Status.Routing, + ExecutorVersion: args.NetInfo.Status.ExecutorVersion, + } + threshold := g.ValidatorThreshold(si.Source) + + lkv := []any{ + "source", si.Source, + "destination", si.Destination, + "sequence-number", si.Number, + "want", threshold, + "have", len(signed), + } + if theAnchorTxn != nil { + lkv = append(lkv, + "txid", theAnchorTxn.ID(), + ) + } + slog.InfoCtx(ctx, "Healing anchor", lkv...) + + if len(signed) >= int(threshold) { + slog.InfoCtx(ctx, "Sufficient signatures have been received") + return nil + } + + seq := &messaging.SequencedMessage{ + Source: srcUrl, + Destination: dstUrl, + Number: si.Number, + } + if theAnchorTxn != nil { + seq.Message = &messaging.TransactionMessage{ + Transaction: theAnchorTxn, + } + } + + // Get a signature from each node that hasn't signed + var gotPartSig bool + var signatures []protocol.Signature + for peer, info := range args.NetInfo.Peers[strings.ToLower(si.Source)] { + if signed[info.Key] { + continue + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + slog.InfoCtx(ctx, "Querying node for its signature", "id", peer) + res, err := args.Client.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(protocol.AnchorPool), dstUrl, si.Number) + if err != nil { + slog.ErrorCtx(ctx, "Query failed", "error", err) + continue + } + + myTxn, ok := res.Message.(*messaging.TransactionMessage) + if !ok { + slog.ErrorCtx(ctx, "Node gave us an anchor that is not a transaction", "id", info, "type", res.Message.Type()) + continue + } + if theAnchorTxn == nil { + theAnchorTxn = myTxn.Transaction + seq.Message = &messaging.TransactionMessage{ + Transaction: theAnchorTxn, + } + } else if !myTxn.Transaction.Equal(theAnchorTxn) { + slog.ErrorCtx(ctx, "Node gave us an anchor with a different hash", "id", info, + "expected", hex.EncodeToString(theAnchorTxn.GetHash()), + "got", hex.EncodeToString(myTxn.Transaction.GetHash())) + continue + } + + for _, sigs := range res.Signatures.Records { + for _, sig := range sigs.Signatures.Records { + msg, ok := sig.Message.(*messaging.SignatureMessage) + if !ok { + slog.ErrorCtx(ctx, "Node gave us a signature that is not a signature", "id", info, "type", sig.Message.Type()) + continue + } + + if args.NetInfo.Status.ExecutorVersion.V2Enabled() { + sig, ok := msg.Signature.(protocol.KeySignature) + if !ok { + slog.ErrorCtx(ctx, "Node gave us a signature that is not a key signature", "id", info, "type", sig.Type()) + continue + } + + // Filter out bad signatures + h := seq.Hash() + if !sig.Verify(nil, h[:]) { + slog.ErrorCtx(ctx, "Node gave us an invalid signature", "id", info) + continue + } + + } else { + switch sig := msg.Signature.(type) { + case *protocol.PartitionSignature: + // We only want one partition signature + if gotPartSig { + continue + } + gotPartSig = true + + case protocol.UserSignature: + // Filter out bad signatures + if !sig.Verify(nil, theAnchorTxn.GetHash()) { + slog.ErrorCtx(ctx, "Node gave us an invalid signature", "id", info) + continue + } + + default: + slog.ErrorCtx(ctx, "Node gave us a signature that is not a user signature", "id", info, "type", sig.Type()) + continue + } + } + + signatures = append(signatures, msg.Signature) + } + } + } + + if args.Pretend { + b, err := theAnchorTxn.MarshalBinary() + if err != nil { + panic(err) + } + slog.InfoCtx(ctx, "Would have submitted anchor", "signatures", len(signatures), "source", si.Source, "destination", si.Destination, "number", si.Number, "txn-size", len(b)) + return nil + } + + // We should always have a partition signature, so there's only something to + // sent if we have more than 1 signature + if gotPartSig && len(signatures) <= 1 || !gotPartSig && len(signatures) == 0 { + slog.InfoCtx(ctx, "Nothing to send") + + } else { + slog.InfoCtx(ctx, "Submitting signatures", "count", len(signatures)) + + submit := func(env *messaging.Envelope) { + // addr := api.ServiceTypeSubmit.AddressFor(seq.Destination).Multiaddr() + sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{}) + if err != nil { + slog.ErrorCtx(ctx, "Submission failed", "error", err) + } + for _, sub := range sub { + if sub.Success { + slog.InfoCtx(ctx, "Submission succeeded", "id", sub.Status.TxID) + } else { + slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status) + } + } + } + + if args.NetInfo.Status.ExecutorVersion.V2Enabled() { + for _, sig := range signatures { + blk := &messaging.BlockAnchor{ + Signature: sig.(protocol.KeySignature), + Anchor: seq, + } + submit(&messaging.Envelope{Messages: []messaging.Message{blk}}) + } + } else { + env := new(messaging.Envelope) + env.Transaction = []*protocol.Transaction{theAnchorTxn} + env.Signatures = signatures + submit(env) + } + } + + if !args.Wait { + return nil + } + + slog.InfoCtx(ctx, "Waiting", "for", si.ID) + for i := 0; i < 10; i++ { + r, err := Q.QueryMessage(ctx, si.ID, nil) + switch { + case errors.Is(err, errors.NotFound): + // Not found, wait + slog.Info("Status", "id", si.ID, "code", errors.NotFound) + break + + case err != nil: + // Unknown error + return err + + case !r.Status.Delivered(): + // Pending, wait + slog.Info("Status", "id", si.ID, "code", r.Status) + break + + case r.Error != nil: + slog.Error("Failed", "id", si.ID, "error", r.Error) + return r.AsError() + + default: + slog.Info("Delivered", "id", si.ID) + return nil + } + time.Sleep(time.Second / 2) + } + return ErrRetry +} + +var ErrRetry = fmt.Errorf("retry") diff --git a/internal/core/healing/scan.go b/internal/core/healing/scan.go new file mode 100644 index 000000000..795c0acfa --- /dev/null +++ b/internal/core/healing/scan.go @@ -0,0 +1,195 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "golang.org/x/exp/slog" +) + +func (p *PeerInfo) String() string { + if p.Operator != nil { + return fmt.Sprintf("%v (%v)", p.Operator, p.ID) + } + return p.ID.String() +} + +type NetworkInfo struct { + Status *api.NetworkStatus `json:"status"` + ID string `json:"id"` + Peers map[string]PeerList `json:"peers"` +} + +type PeerList map[peer.ID]*PeerInfo + +func (i *NetworkInfo) PeerByID(id peer.ID) *PeerInfo { + if i == nil { + return nil + } + for _, part := range i.Peers { + if p, ok := part[id]; ok { + return p + } + } + return nil +} + +func (l PeerList) MarshalJSON() ([]byte, error) { + m := make(map[string]*PeerInfo, len(l)) + for id, info := range l { + m[id.String()] = info + } + return json.Marshal(m) +} + +func (l *PeerList) UnmarshalJSON(data []byte) error { + var m map[string]*PeerInfo + err := json.Unmarshal(data, &m) + if err != nil { + return err + } + *l = make(PeerList, len(m)) + for s, info := range m { + id, err := peer.Decode(s) + if err != nil { + return err + } + info.ID = id + (*l)[id] = info + } + return nil +} + +type ScanServices = interface { + api.NodeService + api.ConsensusService + api.NetworkService +} + +func ScanNetwork(ctx context.Context, endpoint ScanServices) (*NetworkInfo, error) { + ctx, cancel, _ := api.ContextWithBatchData(ctx) + defer cancel() + + epNodeInfo, err := endpoint.NodeInfo(ctx, api.NodeInfoOptions{}) + if err != nil { + return nil, errors.UnknownError.WithFormat("query endpoint node info: %w", err) + } + + netStatus, err := endpoint.NetworkStatus(ctx, api.NetworkStatusOptions{}) + if err != nil { + return nil, errors.UnknownError.WithFormat("query network status: %w", err) + } + + hash2key := map[[32]byte][32]byte{} + for _, val := range netStatus.Network.Validators { + hash2key[val.PublicKeyHash] = *(*[32]byte)(val.PublicKey) + } + + peers := map[string]PeerList{} + for _, part := range netStatus.Network.Partitions { + partPeers := PeerList{} + peers[strings.ToLower(part.ID)] = partPeers + + slog.InfoCtx(ctx, "Finding peers for", "partition", part.ID) + find := api.FindServiceOptions{ + Network: epNodeInfo.Network, + Service: api.ServiceTypeConsensus.AddressFor(part.ID), + Timeout: 10 * time.Second, + } + res, err := endpoint.FindService(ctx, find) + if err != nil { + return nil, errors.UnknownError.WithFormat("find %s: %w", find.Service.String(), err) + } + + for _, peer := range res { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + slog.InfoCtx(ctx, "Getting identity of", "peer", peer.PeerID) + info, err := endpoint.ConsensusStatus(ctx, api.ConsensusStatusOptions{NodeID: peer.PeerID.String(), Partition: part.ID}) + if err != nil { + slog.ErrorCtx(ctx, "Query failed", "error", err) + continue + } + + key, ok := hash2key[info.ValidatorKeyHash] + if !ok { + continue // Not a validator + } + pi := &PeerInfo{ + ID: peer.PeerID, + Status: info, + Key: key, + Addresses: peer.Addresses, + } + partPeers[peer.PeerID] = pi + + _, val, ok := netStatus.Network.ValidatorByHash(info.ValidatorKeyHash[:]) + if ok { + pi.Operator = val.Operator + } + } + } + + return &NetworkInfo{ + Status: netStatus, + ID: epNodeInfo.Network, + Peers: peers, + }, nil +} + +func ScanNode(ctx context.Context, endpoint ScanServices) (*PeerInfo, error) { + ctx, cancel, _ := api.ContextWithBatchData(ctx) + defer cancel() + + nodeInfo, err := endpoint.NodeInfo(ctx, api.NodeInfoOptions{}) + if err != nil { + return nil, errors.UnknownError.WithFormat("query node info: %w", err) + } + + netStatus, err := endpoint.NetworkStatus(ctx, api.NetworkStatusOptions{}) + if err != nil { + return nil, errors.UnknownError.WithFormat("query network status: %w", err) + } + + hash2key := map[[32]byte][32]byte{} + for _, val := range netStatus.Network.Validators { + hash2key[val.PublicKeyHash] = *(*[32]byte)(val.PublicKey) + } + + slog.InfoCtx(ctx, "Getting identity of", "peer", nodeInfo.PeerID) + info, err := endpoint.ConsensusStatus(ctx, api.ConsensusStatusOptions{}) + if err != nil { + return nil, errors.UnknownError.WithFormat("query consensus status: %w", err) + } + + key, ok := hash2key[info.ValidatorKeyHash] + if !ok { + return nil, errors.UnknownError.With("not a validator") + } + + pi := &PeerInfo{ + ID: nodeInfo.PeerID, + Status: info, + Key: key, + } + + _, val, ok := netStatus.Network.ValidatorByHash(info.ValidatorKeyHash[:]) + if ok { + pi.Operator = val.Operator + } + + return pi, nil +} diff --git a/internal/core/healing/sequenced.go b/internal/core/healing/sequenced.go new file mode 100644 index 000000000..a0408c551 --- /dev/null +++ b/internal/core/healing/sequenced.go @@ -0,0 +1,83 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +import ( + "context" + "strings" + "time" + + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" + "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" +) + +type SequencedInfo struct { + Source string + Destination string + Number uint64 + ID *url.TxID +} + +// ResolveSequenced resolves an anchor or synthetic message (a sequenced +// message). If the client's address is non-nil, the query will be sent to that +// address. Otherwise, all of the source partition's nodes will be queried in +// order until one responds. +func ResolveSequenced[T messaging.Message](ctx context.Context, client message.AddressedClient, net *NetworkInfo, srcId, dstId string, seqNum uint64, anchor bool) (*api.MessageRecord[T], error) { + srcUrl := protocol.PartitionUrl(srcId) + dstUrl := protocol.PartitionUrl(dstId) + + var account string + if anchor { + account = protocol.AnchorPool + } else { + account = protocol.Synthetic + } + + // If the client has an address, use that + if client.Address != nil { + slog.InfoCtx(ctx, "Querying node", "address", client.Address) + res, err := client.Private().Sequence(ctx, srcUrl.JoinPath(account), dstUrl, seqNum) + if err != nil { + return nil, err + } + + r2, err := api.MessageRecordAs[T](res) + if err != nil { + return nil, err + } + return r2, nil + } + + // Otherwise try each node until one succeeds + slog.InfoCtx(ctx, "Resolving the message ID", "source", srcId, "destination", dstId, "number", seqNum) + for peer := range net.Peers[strings.ToLower(srcId)] { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + slog.InfoCtx(ctx, "Querying node", "id", peer) + res, err := client.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(account), dstUrl, seqNum) + if err != nil { + slog.ErrorCtx(ctx, "Query failed", "error", err) + continue + } + + r2, err := api.MessageRecordAs[T](res) + if err != nil { + slog.ErrorCtx(ctx, "Query failed", "error", err) + continue + } + + return r2, nil + } + + return nil, errors.UnknownError.WithFormat("cannot resolve %s→%s #%d", srcId, dstId, seqNum) +} diff --git a/internal/core/healing/synthetic.go b/internal/core/healing/synthetic.go new file mode 100644 index 000000000..82b1d282b --- /dev/null +++ b/internal/core/healing/synthetic.go @@ -0,0 +1,152 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +import ( + "context" + "fmt" + "strings" + "time" + + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" +) + +type HealSyntheticArgs struct { + Client message.AddressedClient + Querier api.Querier + Submitter api.Submitter + NetInfo *NetworkInfo + Pretend bool + Wait bool +} + +func HealSynthetic(ctx context.Context, args HealSyntheticArgs, si SequencedInfo) error { + if args.Querier == nil { + args.Querier = args.Client + } + if args.Submitter == nil { + args.Submitter = args.Client + } + + // Query the synthetic transaction + r, err := ResolveSequenced[messaging.Message](ctx, args.Client, args.NetInfo, si.Source, si.Destination, si.Number, false) + if err != nil { + return err + } + + // Has it already been delivered? + Q := api.Querier2{Querier: args.Client} + if r, err := Q.QueryMessage(ctx, r.ID, nil); err == nil && r.Status.Delivered() { + return nil + } + + slog.InfoCtx(ctx, "Resubmitting", "source", si.Source, "destination", si.Destination, "number", si.Number, "id", r.Message.ID()) + + // Submit the synthetic transaction directly to the destination partition + h := r.Sequence.Hash() + msg := &messaging.SyntheticMessage{ + Message: r.Sequence, + Proof: &protocol.AnnotatedReceipt{ + Receipt: r.SourceReceipt, + Anchor: &protocol.AnchorMetadata{ + Account: protocol.DnUrl(), + }, + }, + } + for _, sigs := range r.Signatures.Records { + for _, sig := range sigs.Signatures.Records { + sig, ok := sig.Message.(*messaging.SignatureMessage) + if !ok { + continue + } + ks, ok := sig.Signature.(protocol.KeySignature) + if !ok { + continue + } + msg.Signature = ks + } + } + if msg.Signature == nil { + return fmt.Errorf("synthetic message is not signed") + } + + h = msg.Message.Hash() + if !msg.Signature.Verify(nil, h[:]) { + return fmt.Errorf("signature is not valid") + } + + env := new(messaging.Envelope) + env.Messages = []messaging.Message{msg} + if msg, ok := r.Message.(messaging.MessageForTransaction); ok { + r, err := Q.QueryTransaction(ctx, msg.GetTxID(), nil) + if err != nil { + return errors.InternalError.WithFormat("query transaction for message: %w", err) + } + env.Messages = append(env.Messages, r.Message) + } + + if args.Pretend { + return nil + } + + // Submit directly to an appropriate node + if args.Client.Address == nil { + for peer := range args.NetInfo.Peers[strings.ToLower(si.Destination)] { + args.Client = args.Client.ForPeer(peer) + break + } + } + + sub, err := args.Client.Submit(ctx, env, api.SubmitOptions{}) + if err != nil { + slog.ErrorCtx(ctx, "Submission failed", "error", err) + } + for _, sub := range sub { + if !sub.Success { + slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status) + } + } + if !args.Wait { + return nil + } + + si.ID = r.ID + slog.InfoCtx(ctx, "Waiting", "for", si.ID) + for i := 0; i < 10; i++ { + r, err := Q.QueryMessage(ctx, si.ID, nil) + switch { + case errors.Is(err, errors.NotFound): + // Not found, wait + slog.Info("Status", "id", si.ID, "code", errors.NotFound) + break + + case err != nil: + // Unknown error + return err + + case !r.Status.Delivered(): + // Pending, wait + slog.Info("Status", "id", si.ID, "code", r.Status) + break + + case r.Error != nil: + slog.Error("Failed", "id", si.ID, "error", r.Error) + return r.AsError() + + default: + slog.Info("Delivered", "id", si.ID) + return nil + } + time.Sleep(time.Second / 2) + } + return ErrRetry +} diff --git a/internal/core/healing/types.go b/internal/core/healing/types.go new file mode 100644 index 000000000..e80d98665 --- /dev/null +++ b/internal/core/healing/types.go @@ -0,0 +1,9 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +//go:generate go run gitlab.com/accumulatenetwork/accumulate/tools/cmd/gen-types --package healing types.yml diff --git a/internal/core/healing/types.yml b/internal/core/healing/types.yml new file mode 100644 index 000000000..a8fdbea13 --- /dev/null +++ b/internal/core/healing/types.yml @@ -0,0 +1,19 @@ +PeerInfo: + non-binary: true + fields: + - name: ID + type: p2p.PeerID + marshal-as: none + - name: Operator + type: url + pointer: true + - name: Key + type: hash + - name: Status + type: api.ConsensusStatus + marshal-as: reference + pointer: true + - name: Addresses + type: p2p.Multiaddr + marshal-as: union + repeatable: true diff --git a/internal/core/healing/types_gen.go b/internal/core/healing/types_gen.go new file mode 100644 index 000000000..ae1ce03e5 --- /dev/null +++ b/internal/core/healing/types_gen.go @@ -0,0 +1,136 @@ +// Copyright 2022 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +// GENERATED BY go run ./tools/cmd/gen-types. DO NOT EDIT. + +//lint:file-ignore S1001,S1002,S1008,SA4013 generated code + +import ( + "encoding/json" + "fmt" + + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/encoding" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/p2p" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" +) + +type PeerInfo struct { + ID p2p.PeerID + Operator *url.URL `json:"operator,omitempty" form:"operator" query:"operator" validate:"required"` + Key [32]byte `json:"key,omitempty" form:"key" query:"key" validate:"required"` + Status *api.ConsensusStatus `json:"status,omitempty" form:"status" query:"status" validate:"required"` + Addresses []p2p.Multiaddr `json:"addresses,omitempty" form:"addresses" query:"addresses" validate:"required"` +} + +func (v *PeerInfo) Copy() *PeerInfo { + u := new(PeerInfo) + + if v.Operator != nil { + u.Operator = v.Operator + } + u.Key = v.Key + if v.Status != nil { + u.Status = (v.Status).Copy() + } + u.Addresses = make([]p2p.Multiaddr, len(v.Addresses)) + for i, v := range v.Addresses { + v := v + if v != nil { + u.Addresses[i] = p2p.CopyMultiaddr(v) + } + } + + return u +} + +func (v *PeerInfo) CopyAsInterface() interface{} { return v.Copy() } + +func (v *PeerInfo) Equal(u *PeerInfo) bool { + switch { + case v.Operator == u.Operator: + // equal + case v.Operator == nil || u.Operator == nil: + return false + case !((v.Operator).Equal(u.Operator)): + return false + } + if !(v.Key == u.Key) { + return false + } + switch { + case v.Status == u.Status: + // equal + case v.Status == nil || u.Status == nil: + return false + case !((v.Status).Equal(u.Status)): + return false + } + if len(v.Addresses) != len(u.Addresses) { + return false + } + for i := range v.Addresses { + if !(p2p.EqualMultiaddr(v.Addresses[i], u.Addresses[i])) { + return false + } + } + + return true +} + +func (v *PeerInfo) MarshalJSON() ([]byte, error) { + u := struct { + Operator *url.URL `json:"operator,omitempty"` + Key string `json:"key,omitempty"` + Status *api.ConsensusStatus `json:"status,omitempty"` + Addresses *encoding.JsonUnmarshalListWith[p2p.Multiaddr] `json:"addresses,omitempty"` + }{} + if !(v.Operator == nil) { + u.Operator = v.Operator + } + if !(v.Key == ([32]byte{})) { + u.Key = encoding.ChainToJSON(v.Key) + } + if !(v.Status == nil) { + u.Status = v.Status + } + if !(len(v.Addresses) == 0) { + u.Addresses = &encoding.JsonUnmarshalListWith[p2p.Multiaddr]{Value: v.Addresses, Func: p2p.UnmarshalMultiaddrJSON} + } + return json.Marshal(&u) +} + +func (v *PeerInfo) UnmarshalJSON(data []byte) error { + u := struct { + Operator *url.URL `json:"operator,omitempty"` + Key string `json:"key,omitempty"` + Status *api.ConsensusStatus `json:"status,omitempty"` + Addresses *encoding.JsonUnmarshalListWith[p2p.Multiaddr] `json:"addresses,omitempty"` + }{} + u.Operator = v.Operator + u.Key = encoding.ChainToJSON(v.Key) + u.Status = v.Status + u.Addresses = &encoding.JsonUnmarshalListWith[p2p.Multiaddr]{Value: v.Addresses, Func: p2p.UnmarshalMultiaddrJSON} + if err := json.Unmarshal(data, &u); err != nil { + return err + } + v.Operator = u.Operator + if x, err := encoding.ChainFromJSON(u.Key); err != nil { + return fmt.Errorf("error decoding Key: %w", err) + } else { + v.Key = x + } + v.Status = u.Status + if u.Addresses != nil { + v.Addresses = make([]p2p.Multiaddr, len(u.Addresses.Value)) + for i, x := range u.Addresses.Value { + v.Addresses[i] = x + } + } + return nil +} diff --git a/pkg/accumulate/api.go b/pkg/accumulate/api.go index cf601aa09..8adb2b32f 100644 --- a/pkg/accumulate/api.go +++ b/pkg/accumulate/api.go @@ -54,5 +54,6 @@ func ResolveWellKnownEndpoint(name string, version string) string { if u.Path == "" { addr += "/" } + version = strings.TrimPrefix(version, "/") return addr + version } diff --git a/pkg/api/v3/message/client.go b/pkg/api/v3/message/client.go index 0ca7df2e5..baa58fd15 100644 --- a/pkg/api/v3/message/client.go +++ b/pkg/api/v3/message/client.go @@ -42,18 +42,29 @@ var _ api.Submitter = (*Client)(nil) var _ api.Validator = (*Client)(nil) var _ api.Faucet = (*Client)(nil) -func (c *Client) ForAddress(addr multiaddr.Multiaddr) AddressedClient { - return AddressedClient{c, addr} +func (c AddressedClient) ForAddress(addr multiaddr.Multiaddr) AddressedClient { + if c.Address != nil { + addr = c.Address.Encapsulate(addr) + } + return AddressedClient{c.Client, addr} } -func (c *Client) ForPeer(peer peer.ID) AddressedClient { +func (c AddressedClient) ForPeer(peer peer.ID) AddressedClient { addr, err := multiaddr.NewComponent("p2p", peer.String()) if err != nil { panic(err) } + return c.ForAddress(addr) +} + +func (c *Client) ForAddress(addr multiaddr.Multiaddr) AddressedClient { return AddressedClient{c, addr} } +func (c *Client) ForPeer(peer peer.ID) AddressedClient { + return c.ForAddress(nil).ForPeer(peer) +} + // NodeInfo implements [api.NodeService.NodeInfo]. func (c *Client) NodeInfo(ctx context.Context, opts NodeInfoOptions) (*api.NodeInfo, error) { return c.ForAddress(nil).NodeInfo(ctx, opts) diff --git a/pkg/api/v3/p2p/p2p.go b/pkg/api/v3/p2p/p2p.go index d2b8e46cc..c928fc712 100644 --- a/pkg/api/v3/p2p/p2p.go +++ b/pkg/api/v3/p2p/p2p.go @@ -171,6 +171,8 @@ func New(opts Options) (_ *Node, err error) { func (n *Node) ID() peer.ID { return n.host.ID() } +func (n *Node) Services() *nodeService { return (*nodeService)(n) } + // Addresses lists the node's addresses. func (n *Node) Addresses() []multiaddr.Multiaddr { // Wrap the TCP/IP address with /p2p/{id} diff --git a/tools/cmd/debug/comet.go b/tools/cmd/debug/comet.go index 538402fac..4d61222f8 100644 --- a/tools/cmd/debug/comet.go +++ b/tools/cmd/debug/comet.go @@ -124,9 +124,3 @@ func downloadCometGenesis(_ *cobra.Command, args []string) { check(err) } } - -func checkf(err error, format string, otherArgs ...interface{}) { - if err != nil { - fatalf(format+": %v", append(otherArgs, err)...) - } -} diff --git a/tools/cmd/debug/heal_anchor.go b/tools/cmd/debug/heal_anchor.go new file mode 100644 index 000000000..821923d9f --- /dev/null +++ b/tools/cmd/debug/heal_anchor.go @@ -0,0 +1,194 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "strconv" + "strings" + "time" + + "github.com/spf13/cobra" + "gitlab.com/accumulatenetwork/accumulate/exp/apiutil" + "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/pkg/accumulate" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" + "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" +) + +var cmdHealAnchor = &cobra.Command{ + Use: "anchor [network] [txid or part→part (optional) [sequence number (optional)]]", + Short: "Heal anchoring", + Args: cobra.RangeArgs(1, 3), + Run: healAnchor, +} + +func init() { + cmdHeal.AddCommand(cmdHealAnchor) +} + +func healAnchor(_ *cobra.Command, args []string) { + ctx, cancel, _ := api.ContextWithBatchData(context.Background()) + defer cancel() + + // We should be able to use only the p2p client but it doesn't work well for + // some reason + C1 := jsonrpc.NewClient(accumulate.ResolveWellKnownEndpoint(args[0], "v3")) + C1.Client.Timeout = time.Hour + + ni, err := C1.NodeInfo(ctx, api.NodeInfoOptions{}) + check(err) + + node, err := p2p.New(p2p.Options{ + Network: ni.Network, + // BootstrapPeers: api.BootstrapServers, + BootstrapPeers: apiutil.MainnetAddrs, + }) + checkf(err, "start p2p node") + defer func() { _ = node.Close() }() + + fmt.Fprintf(os.Stderr, "We are %v\n", node.ID()) + + fmt.Fprintln(os.Stderr, "Waiting for addresses") + time.Sleep(time.Second) + + var net *healing.NetworkInfo + if cachedScan != "" { + data, err := os.ReadFile(cachedScan) + check(err) + check(json.Unmarshal(data, &net)) + } + + // Use a hack dialer that uses the API for peer discovery + router := new(routing.MessageRouter) + C2 := &message.Client{ + Transport: &message.RoutedTransport{ + Network: ni.Network, + // Dialer: node.DialNetwork(), + Dialer: &apiutil.StaticDialer{ + Scan: net, + Nodes: C1, + Dialer: node.DialNetwork(), + }, + Router: router, + }, + } + + if cachedScan == "" { + net, err = healing.ScanNetwork(ctx, C2) + check(err) + } + + router.Router, err = routing.NewStaticRouter(net.Status.Routing, nil) + check(err) + + if len(args) > 1 { + txid, err := url.ParseTxID(args[1]) + if err == nil { + r, err := api.Querier2{Querier: C2}.QueryTransaction(ctx, txid, nil) + check(err) + if r.Sequence == nil { + fatalf("%v is not sequenced", txid) + } + srcId, _ := protocol.ParsePartitionUrl(r.Sequence.Source) + dstId, _ := protocol.ParsePartitionUrl(r.Sequence.Destination) + healSingleAnchor(ctx, C1, C2, net, srcId, dstId, r.Sequence.Number, txid, nil) + return + } + + parts := strings.Split(args[1], "→") + if len(parts) != 2 { + fatalf("invalid transaction ID or sequence specifier: %q", args[1]) + } + srcId, dstId := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]) + srcUrl := protocol.PartitionUrl(srcId) + dstUrl := protocol.PartitionUrl(dstId) + + var seqNo uint64 + if len(args) > 2 { + seqNo, err = strconv.ParseUint(args[2], 10, 64) + check(err) + } + + if seqNo > 0 { + healSingleAnchor(ctx, C1, C2, net, srcId, dstId, seqNo, nil, map[[32]byte]*protocol.Transaction{}) + return + } + + ledger := getAccount[*protocol.AnchorLedger](C1, ctx, dstUrl.JoinPath(protocol.AnchorPool)) + healAnchorSequence(ctx, C1, C2, net, srcId, dstId, ledger.Anchor(srcUrl)) + return + } + +heal: + for _, dst := range net.Status.Network.Partitions { + dstUrl := protocol.PartitionUrl(dst.ID) + dstLedger := getAccount[*protocol.AnchorLedger](C1, ctx, dstUrl.JoinPath(protocol.AnchorPool)) + + for _, src := range net.Status.Network.Partitions { + // Anchors are always from and/or to the DN + if dst.Type != protocol.PartitionTypeDirectory && src.Type != protocol.PartitionTypeDirectory { + continue + } + + srcUrl := protocol.PartitionUrl(src.ID) + src2dst := dstLedger.Partition(srcUrl) + healAnchorSequence(ctx, C1, C2, net, src.ID, dst.ID, src2dst) + } + } + + // Heal continuously? + if healContinuous { + time.Sleep(time.Second) + goto heal + } +} + +func healAnchorSequence(ctx context.Context, C1 *jsonrpc.Client, C2 *message.Client, net *healing.NetworkInfo, srcId, dstId string, src2dst *protocol.PartitionSyntheticLedger) { + srcUrl := protocol.PartitionUrl(srcId) + dstUrl := protocol.PartitionUrl(dstId) + + ids, txns := findPendingAnchors(ctx, C2, api.Querier2{Querier: C2}, net, srcUrl, dstUrl, true) + src2dst.Pending = append(src2dst.Pending, ids...) + + for i, txid := range src2dst.Pending { + healSingleAnchor(ctx, C1, C2, net, srcId, dstId, src2dst.Delivered+1+uint64(i), txid, txns) + } +} + +func healSingleAnchor(ctx context.Context, C1 *jsonrpc.Client, C2 *message.Client, net *healing.NetworkInfo, srcId, dstId string, seqNum uint64, txid *url.TxID, txns map[[32]byte]*protocol.Transaction) { +again: + err := healing.HealAnchor(ctx, healing.HealAnchorArgs{ + Client: C2.ForAddress(nil), + Querier: C2, + Submitter: C2, + NetInfo: net, + Known: txns, + Pretend: pretend, + Wait: waitForTxn, + }, healing.SequencedInfo{ + Source: srcId, + Destination: dstId, + Number: seqNum, + ID: txid, + }) + if errors.Is(err, healing.ErrRetry) { + slog.Error("Anchor still pending after 10 attempts, retrying") + goto again + } + check(err) +} diff --git a/tools/cmd/debug/heal_common.go b/tools/cmd/debug/heal_common.go new file mode 100644 index 000000000..b6e7b913e --- /dev/null +++ b/tools/cmd/debug/heal_common.go @@ -0,0 +1,52 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package main + +import ( + "context" + + "github.com/spf13/cobra" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" + "gitlab.com/accumulatenetwork/accumulate/protocol" +) + +var cmdHeal = &cobra.Command{ + Use: "heal", +} + +func init() { + cmd.AddCommand(cmdHeal) + + cmdHeal.PersistentFlags().StringVar(&cachedScan, "cached-scan", "", "A cached network scan") + cmdHeal.PersistentFlags().BoolVarP(&pretend, "pretend", "n", false, "Do not submit envelopes, only scan") + cmdHeal.PersistentFlags().BoolVar(&waitForTxn, "wait", true, "Wait for the message to finalize") + cmdHeal.PersistentFlags().BoolVar(&healContinuous, "continuous", false, "Run healing in a loop every second") + + _ = cmdHeal.MarkFlagFilename("cached-scan", ".json") +} + +// resolveSeq resolves an anchor or synthetic message (a sequenced message). If +// the client's address is non-nil, the query will be sent to that address. +// Otherwise, all of the source partition's nodes will be queried in order until +// one responds. +func resolveSeq[T messaging.Message](ctx context.Context, client message.AddressedClient, net *healing.NetworkInfo, srcId, dstId string, seqNum uint64, anchor bool) *api.MessageRecord[T] { + r, err := healing.ResolveSequenced[T](ctx, client, net, srcId, dstId, seqNum, anchor) + check(err) + return r +} + +// getAccount fetches the given account. +func getAccount[T protocol.Account](C api.Querier, ctx context.Context, u *url.URL) T { + var v T + _, err := api.Querier2{Querier: C}.QueryAccountAs(ctx, u, nil, &v) + checkf(err, "get %v", u) + return v +} diff --git a/tools/cmd/debug/heal_synth.go b/tools/cmd/debug/heal_synth.go index 02a7fc351..20ff443d6 100644 --- a/tools/cmd/debug/heal_synth.go +++ b/tools/cmd/debug/heal_synth.go @@ -8,197 +8,157 @@ package main import ( "context" + "encoding/json" "fmt" - "net/url" "os" - "strconv" - "strings" + "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/spf13/cobra" + "gitlab.com/accumulatenetwork/accumulate/exp/apiutil" "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" - v2 "gitlab.com/accumulatenetwork/accumulate/internal/api/v2" - "gitlab.com/accumulatenetwork/accumulate/internal/node/config" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" "gitlab.com/accumulatenetwork/accumulate/pkg/accumulate" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" - client "gitlab.com/accumulatenetwork/accumulate/pkg/client/api/v2" - "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" "gitlab.com/accumulatenetwork/accumulate/protocol" - "gitlab.com/accumulatenetwork/accumulate/test/testing" + "golang.org/x/exp/slog" ) var cmdHealSynth = &cobra.Command{ - Use: "heal-synth [network] [server]", + Use: "synth [server]", Short: "Fixup synthetic transactions", - Args: cobra.ExactArgs(2), + Args: cobra.ExactArgs(1), Run: healSynth, } -var flagHealSynth = struct { - Peer string -}{} - func init() { - cmd.AddCommand(cmdHealSynth) - cmdHealSynth.Flags().StringVar(&flagHealSynth.Peer, "peer", "", "Query a specific peer") + cmdHeal.AddCommand(cmdHealSynth) + cmdHealSynth.Flags().StringVar(&usePeer, "peer", "", "Query a specific peer") } func healSynth(_ *cobra.Command, args []string) { - testing.EnableDebugFeatures() - c, err := client.New(args[1]) - check(err) + // testing.EnableDebugFeatures() + + ctx, cancel, _ := api.ContextWithBatchData(cmd.Context()) + defer cancel() + + C1 := jsonrpc.NewClient(accumulate.ResolveWellKnownEndpoint(args[0], "v3")) + Q := api.Querier2{Querier: C1} + C1.Client.Timeout = time.Hour - desc, err := c.Describe(context.Background()) + ni, err := C1.NodeInfo(ctx, api.NodeInfoOptions{}) check(err) + var net *healing.NetworkInfo + if cachedScan != "" { + data, err := os.ReadFile(cachedScan) + check(err) + check(json.Unmarshal(data, &net)) + } + node, err := p2p.New(p2p.Options{ - Network: args[0], + Network: ni.Network, BootstrapPeers: accumulate.BootstrapServers, + // BootstrapPeers: apiutil.MainnetAddrs, }) check(err) defer func() { _ = node.Close() }() fmt.Printf("We are %v\n", node.ID()) + fmt.Fprintln(os.Stderr, "Waiting for addresses") + time.Sleep(time.Second) + router := new(routing.MessageRouter) - c2 := &message.Client{ + C2 := (&message.Client{ Transport: &message.RoutedTransport{ - Network: args[0], - Dialer: node.DialNetwork(), - Router: router, + Network: ni.Network, + // Dialer: node.DialNetwork(), + Dialer: &apiutil.StaticDialer{ + Scan: net, + Nodes: C1, + Dialer: node.DialNetwork(), + }, + Router: router, }, + }).ForAddress(nil) + Q.Querier = C2 + + if usePeer != "" { + pid, err := peer.Decode(usePeer) + check(err) + C2 = C2.ForPeer(pid) } - router.Router, err = routing.NewStaticRouter(desc.Values.Routing, nil) + + if cachedScan == "" { + net, err = healing.ScanNetwork(ctx, C1) + check(err) + } + + router.Router, err = routing.NewStaticRouter(net.Status.Routing, nil) check(err) +heal: synths := map[string]*protocol.SyntheticLedger{} - for _, part := range desc.Values.Network.Partitions { + for _, dst := range net.Status.Network.Partitions { // Get synthetic ledger - req := new(v2.GeneralQuery) - req.Url = protocol.PartitionUrl(part.ID).JoinPath(protocol.Synthetic) - synth := new(protocol.SyntheticLedger) - res := new(v2.ChainQueryResponse) - res.Data = synth - err = c.RequestAPIv2(context.Background(), "query", req, res) + var synth *protocol.SyntheticLedger + u := protocol.PartitionUrl(dst.ID).JoinPath(protocol.Synthetic) + _, err = Q.QueryAccountAs(ctx, u, nil, &synth) check(err) - synths[part.ID] = synth - - for _, src := range synth.Sequence { - for _, txid := range src.Pending { - req.Url = txid.AsUrl() - res := new(v2.TransactionQueryResponse) - err = c.RequestAPIv2(context.Background(), "query", req, res) - check(err) - - xreq := new(v2.ExecuteRequest) - xreq.Envelope = new(messaging.Envelope) - xreq.Envelope.Transaction = []*protocol.Transaction{res.Transaction} - var partSig *protocol.PartitionSignature - for _, sig := range res.Signatures { - sig, ok := sig.(*protocol.PartitionSignature) - if ok { - partSig = sig - xreq.Envelope.Signatures = []protocol.Signature{sig} - } - } - - p := c2.Private() - if flagHealSynth.Peer != "" { - pid, err := peer.Decode(flagHealSynth.Peer) - check(err) - p = c2.ForPeer(pid).Private() - } - - // Get a signature - r, err := p.Sequence(context.Background(), partSig.SourceNetwork.JoinPath(protocol.Synthetic), partSig.DestinationNetwork, partSig.SequenceNumber) - check(err) - var note string - for _, sigs := range r.Signatures.Records { - for _, sig := range sigs.Signatures.Records { - if sig, ok := sig.Message.(*messaging.SignatureMessage); ok { - switch sig := sig.Signature.(type) { - case protocol.KeySignature: - xreq.Envelope.Signatures = append(xreq.Envelope.Signatures, sig) - case *protocol.ReceiptSignature: - if !res.Status.GotDirectoryReceipt { - note = " with DN receipt" - xreq.Envelope.Signatures = append(xreq.Envelope.Signatures, sig) - } - } - } - } - } - - fmt.Printf("Resubmitting %v%s\n", txid, note) - // b, _ := json.Marshal(xreq.Envelope) - // fmt.Printf("%s\n", b) - // return - - xres, err := c.ExecuteDirect(context.Background(), xreq) - check(err) - if xres.Message != "" { - fmt.Fprintf(os.Stderr, "Warning: %s\n", xres.Message) - } + synths[dst.ID] = synth + + for _, src2dst := range synth.Sequence { + srcId, _ := protocol.ParsePartitionUrl(src2dst.Url) + + for i := range src2dst.Pending { + resubmitByNumber(ctx, C2, net, srcId, dst.ID, src2dst.Delivered+1+uint64(i)) } } } // Check produced vs received - for i, a := range desc.Values.Network.Partitions { - for _, b := range desc.Values.Network.Partitions[i:] { + for i, a := range net.Status.Network.Partitions { + for _, b := range net.Status.Network.Partitions[i:] { ab := synths[a.ID].Partition(protocol.PartitionUrl(b.ID)) ba := synths[b.ID].Partition(protocol.PartitionUrl(a.ID)) for i := ba.Received + 1; i <= ab.Produced; i++ { - resubmitByNumber(desc, c, a.ID, b.ID, i, false) + resubmitByNumber(ctx, C2, net, a.ID, b.ID, i) } if a == b { continue } for i := ab.Received + 1; i <= ba.Produced; i++ { - resubmitByNumber(desc, c, b.ID, a.ID, i, false) + resubmitByNumber(ctx, C2, net, b.ID, a.ID, i) } } } -} -func resubmitByNumber(desc *v2.DescriptionResponse, c *client.Client, source, destination string, number uint64, anchor bool) { - // Get a client for the destination partition - var d *client.Client - for _, p := range desc.Network.Partitions { - if !strings.EqualFold(p.Id, destination) { - continue - } - if len(p.Nodes) == 0 { - fatalf("no nodes for %v", p.Id) - } - u, err := url.Parse(p.Nodes[0].Address) - check(err) - port, err := strconv.ParseUint(u.Port(), 10, 16) - check(err) - d, err = client.New(fmt.Sprintf("http://%s:%d", u.Hostname(), port+config.PortOffsetAccumulateApi.GetEnumValue())) - check(err) + // Heal continuously? + if healContinuous { + time.Sleep(time.Second) + goto heal } +} - // Query the synthetic transaction - req := new(v2.SyntheticTransactionRequest) - req.Source = protocol.PartitionUrl(source) - req.Destination = protocol.PartitionUrl(destination) - req.SequenceNumber = number - req.Anchor = anchor - res, err := c.QuerySynth(context.Background(), req) - check(err) - - // Submit the synthetic transaction directly to the destination partition - fmt.Printf("Resubmitting %v\n", res.Txid) - xreq := new(v2.ExecuteRequest) - xreq.Envelope = new(messaging.Envelope) - xreq.Envelope.Transaction = []*protocol.Transaction{res.Transaction} - xreq.Envelope.Signatures = res.Signatures - xres, err := d.ExecuteLocal(context.Background(), xreq) - check(err) - if xres.Message != "" { - fmt.Fprintf(os.Stderr, "Warning: %s\n", xres.Message) +func resubmitByNumber(ctx context.Context, C2 message.AddressedClient, net *healing.NetworkInfo, source, destination string, number uint64) { + err := healing.HealSynthetic(ctx, healing.HealSyntheticArgs{ + Client: C2, + Querier: C2, + Submitter: C2, + NetInfo: net, + Pretend: pretend, + Wait: waitForTxn, + }, healing.SequencedInfo{ + Source: source, + Destination: destination, + Number: number, + }) + if err != nil { + slog.Error("Failed to heal", "source", source, "destination", destination, "number", number, "error", err) } } diff --git a/tools/cmd/debug/heal_test.go b/tools/cmd/debug/heal_test.go index fe07b7926..87466a0fd 100644 --- a/tools/cmd/debug/heal_test.go +++ b/tools/cmd/debug/heal_test.go @@ -11,23 +11,160 @@ import ( "context" "encoding/json" "fmt" + "net" + "os" + "strings" "testing" + "time" abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/rpc/client/http" coretypes "github.com/cometbft/cometbft/rpc/core/types" + "github.com/fatih/color" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/pkg/accumulate" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" "gitlab.com/accumulatenetwork/accumulate/protocol" ) +func TestCheckTransactions(t *testing.T) { + node, err := p2p.New(p2p.Options{ + Network: "MainNet", + BootstrapPeers: accumulate.BootstrapServers, + }) + check(err) + defer func() { _ = node.Close() }() + + var netinfo *healing.NetworkInfo + data, err := os.ReadFile("/home/firelizzard/src/Accumulate/accumulate/mainnet.json") + check(err) + check(json.Unmarshal(data, &netinfo)) + + router := new(routing.MessageRouter) + router.Router, err = routing.NewStaticRouter(netinfo.Status.Routing, nil) + require.NoError(t, err) + + req := []struct { + Partition string + Account string + Name string + Start int + }{ + {"apollo", "reesor.acme/rewards", "main", 77}, + {"yutu", "ethan.acme/tokens", "main", 2}, + {"chandrayaan", "tfa.acme/staking-yield", "main", 39}, + {"directory", "ACME", "main", 28774}, + } + + for _, r := range req { + fmt.Println(r.Partition) + + nodes, err := node.Services().FindService(context.Background(), api.FindServiceOptions{Network: "MainNet", Service: api.ServiceTypeConsensus.AddressFor(r.Partition), Timeout: 10 * time.Second}) + require.NoError(t, err) + + for _, p := range nodes { + var addr string + for _, a := range p.Addresses { + s, err := a.ValueForProtocol(multiaddr.P_IP4) + if err != nil { + continue + } + ip := net.ParseIP(s) + if ip == nil || isPrivateIP(ip) { + continue + } + addr = s + break + } + if addr == "" { + fmt.Printf(" ! %v (no address)\n", p.PeerID) + continue + } + + var port = 16595 + if !strings.EqualFold(r.Partition, protocol.Directory) { + port = 16695 + } + c := jsonrpc.NewClient(fmt.Sprintf("http://%s:%d/v3", addr, port)) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var expand = true + var count uint64 = 2 + var failure error + r, err := api.Querier2{Querier: c}.QueryChainEntries(ctx, url.MustParse(r.Account), &api.ChainQuery{Name: r.Name, Range: &api.RangeOptions{Start: uint64(r.Start), Expand: &expand, Count: &count}}) + switch { + case err == nil: + for _, r := range r.Records { + err, ok := r.Value.(*api.ErrorRecord) + if ok { + failure = errors.Code(err.Value) + } + } + default: + failure = err + } + + s := " " + if failure == nil { + s += " " + color.GreenString("✔") + } else { + s += " " + color.RedString("🗴") + } + s += " " + p.PeerID.String() + if failure != nil { + s += " (" + failure.Error() + ")" + } + fmt.Println(s) + } + } +} + +var privateIPBlocks []*net.IPNet + +func init() { + for _, cidr := range []string{ + "127.0.0.0/8", // IPv4 loopback + "10.0.0.0/8", // RFC1918 + "172.16.0.0/12", // RFC1918 + "192.168.0.0/16", // RFC1918 + "169.254.0.0/16", // RFC3927 link-local + "::1/128", // IPv6 loopback + "fe80::/10", // IPv6 link-local + "fc00::/7", // IPv6 unique local addr + } { + _, block, err := net.ParseCIDR(cidr) + if err != nil { + panic(fmt.Errorf("parse error on %q: %v", cidr, err)) + } + privateIPBlocks = append(privateIPBlocks, block) + } +} + +func isPrivateIP(ip net.IP) bool { + if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() { + return true + } + + for _, block := range privateIPBlocks { + if block.Contains(ip) { + return true + } + } + return false +} + func TestHealSynth(t *testing.T) { t.Skip("Manual") @@ -78,6 +215,106 @@ func TestHealSynth(t *testing.T) { } } +func TestHealQueryAnchor(t *testing.T) { + // t.Skip("Manual") + + peer, err := peer.Decode("12D3KooWAgrBYpWEXRViTnToNmpCoC3dvHdmR6m1FmyKjDn1NYpj") + require.NoError(t, err) + + var mainnetAddrs = func() []multiaddr.Multiaddr { + s := []string{ + "/dns/apollo-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWAgrBYpWEXRViTnToNmpCoC3dvHdmR6m1FmyKjDn1NYpj", + "/dns/yutu-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWDqFDwjHEog1bNbxai2dKSaR1aFvq2LAZ2jivSohgoSc7", + "/dns/chandrayaan-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWHzjkoeAqe7L55tAaepCbMbhvNu9v52ayZNVQobdEE1RL", + "/ip4/116.202.214.38/tcp/16593/p2p/12D3KooWBkJQiuvotpMemWBYfAe4ctsVHi7fLvT8RT83oXJ5dsgV", + "/ip4/83.97.19.82/tcp/16593/p2p/12D3KooWHSbqS6K52d4ReauHAg4n8MFbAKkdEAae2fZXnzRYi9ce", + "/ip4/206.189.97.165/tcp/16593/p2p/12D3KooWHyA7zgAVqGvCBBJejgvKzv7DQZ3LabJMWqmCQ9wFbT3o", + "/ip4/144.76.105.23/tcp/16593/p2p/12D3KooWS2Adojqun5RV1Xy4k6vKXWpRQ3VdzXnW8SbW7ERzqKie", + "/ip4/18.190.77.236/tcp/16593/p2p/12D3KooWP1d9vUJCzqX5bTv13tCHmVssJrgK3EnJCC2C5Ep2SXbS", + "/ip4/3.28.207.55/tcp/16593/p2p/12D3KooWEzhg3CRvC3xdrUBFsWETF1nG3gyYfEjx4oEJer95y1Rk", + "/ip4/38.135.195.81/tcp/16593/p2p/12D3KooWDWCHGAyeUWdP8yuuSYvMoUfaPoGu4p3gJb51diqNQz6j", + // "/ip4/50.17.246.3/tcp/16593/p2p/12D3KooWKkNsxkHJqvSje2viyqKVxtqvbTpFrbASD3q1uv6td1pW", + "/dns/validator-eu01.acme.sphereon.com/tcp/16593/p2p/12D3KooWKYTWKJ5jeuZmbbwiN7PoinJ2yJLoQtZyfWi2ihjBnSUR", + "/ip4/35.86.120.53/tcp/16593/p2p/12D3KooWKJuspMDC5GXzLYJs9nHwYfqst9QAW4m5FakXNHVMNiq7", + "/ip4/65.109.48.173/tcp/16593/p2p/12D3KooWHkUtGcHY96bNavZMCP2k5ps5mC7GrF1hBC1CsyGJZSPY", + "/dns/accumulate.detroitledger.tech/tcp/16593/p2p/12D3KooWNe1QNh5mKAa8iAEP8vFwvmWFxaCLNcAdE1sH38Bz8sc9", + "/ip4/3.135.9.97/tcp/16593/p2p/12D3KooWEQG3X528Ct2Kd3kxhv6WZDBqaAoEw7AKiPoK1NmWJgx1", + // "/ip4/3.86.85.133/tcp/16593/p2p/12D3KooWJvReA1SuLkppyXKXq6fifVPLqvNtzsvPUqagVjvYe7qe", + "/ip4/193.35.56.176/tcp/16593/p2p/12D3KooWJevZUFLqN7zAamDh2EEYNQZPvxGFwiFVyPXfuXZNjg1J", + "/ip4/35.177.70.195/tcp/16593/p2p/12D3KooWPzpRp1UCu4nvXT9h8jKvmBmCADrMnoF72DrEbUrWrB2G", + "/ip4/3.99.81.122/tcp/16593/p2p/12D3KooWLL5kAbD7nhv6CM9x9L1zjxSnc6hdMVKcsK9wzMGBo99X", + "/ip4/34.219.75.234/tcp/16593/p2p/12D3KooWKHjS5nzG9dipBXn31pYEnfa8g5UzvkSYEsuiukGHzPvt", + "/ip4/3.122.254.53/tcp/16593/p2p/12D3KooWRU8obVzgfw6TsUHjoy2FDD3Vd7swrPNTM7DMFs8JG4dx", + "/ip4/35.92.228.236/tcp/16593/p2p/12D3KooWQqMqbyJ2Zay9KHeEDgDMAxQpKD1ypiBX5ByQAA2XpsZL", + "/ip4/3.135.184.194/tcp/16593/p2p/12D3KooWHcxyiE3AGdPnhtj87tByfLnJZVR6mLefadWccbMByrBa", + "/ip4/18.133.170.113/tcp/16593/p2p/12D3KooWFbWY2NhBEWTLHUCwwPmNHm4BoJXbojnrJJfuDCVoqrFY", + // "/ip4/44.204.224.126/tcp/16593/p2p/12D3KooWAiJJxdgsB39up5h6fz6TSfBz4HsLKTFiBXUrbwA8o54m", + "/ip4/35.92.21.90/tcp/16593/p2p/12D3KooWLTV3pTN2NbKeFeseCGHyMXuAkQv68KfCeK4uqJzJMfhZ", + "/ip4/3.99.166.147/tcp/16593/p2p/12D3KooWGYUf93iYWsUibSvKdxsYUY1p7fC1nQotCpUcDXD1ABvR", + "/ip4/16.171.4.135/tcp/16593/p2p/12D3KooWEMpAxKnXJPkcEXpDmrnjrZ5iFMZvvQtimmTTxuoRGkXV", + "/ip4/54.237.244.42/tcp/16593/p2p/12D3KooWLoMkrgW862Gs152jLt6FiZZs4GkY24Su4QojnvMoSNaQ", + // "/ip4/3.238.124.43/tcp/16593/p2p/12D3KooWJ8CA8pacTnKWVgBSEav4QG1zJpyeSSME47RugpDUrZp8", + "/ip4/13.53.125.115/tcp/16593/p2p/12D3KooWBJk52fQExXHWhFNk692hP7JvTxNTvUMdVne8tbJ3DBf3", + "/ip4/13.59.241.224/tcp/16593/p2p/12D3KooWKjYKqg2TgUSLq8CZAP8G6LhjXUWTcQBd9qYL2JHug9HW", + "/ip4/18.168.202.86/tcp/16593/p2p/12D3KooWDiKGbUZg1rB5EufRCkRPiDCEPMjyvTfTVR9qsKVVkcuC", + "/ip4/35.183.112.161/tcp/16593/p2p/12D3KooWFPKeXzKMd3jtoeG6ts6ADKmVV8rVkXR9k9YkQPgpLzd6", + } + addrs := make([]multiaddr.Multiaddr, len(s)) + for i, s := range s { + addr, err := multiaddr.NewMultiaddr(s) + if err != nil { + panic(err) + } + addrs[i] = addr + } + return addrs + }() + + node, err := p2p.New(p2p.Options{ + Network: "MainNet", + // BootstrapPeers: api.BootstrapServers, + BootstrapPeers: mainnetAddrs, + }) + require.NoError(t, err) + defer func() { _ = node.Close() }() + + fmt.Printf("We are %v\n", node.ID()) + time.Sleep(time.Second) + + // fmt.Println("Waiting for a live network service") + // svcAddr, err := api.ServiceTypeNetwork.AddressFor(protocol.Directory).MultiaddrFor("MainNet") + // require.NoError(t, err) + // require.NoError(t, node.WaitForService(context.Background(), svcAddr)) + + router := new(routing.MessageRouter) + client := &message.Client{ + Transport: &message.RoutedTransport{ + Network: "MainNet", + Dialer: node.DialNetwork(), + Router: router, + }, + } + // ns, err := client.NetworkStatus(context.Background(), api.NetworkStatusOptions{}) + // require.NoError(t, err) + // router.Router, err = routing.NewStaticRouter(ns.Routing, nil) + // require.NoError(t, err) + + r, err := client.ForPeer(peer).Private().Sequence(context.Background(), + protocol.PartitionUrl("Apollo").JoinPath(protocol.Synthetic), + protocol.PartitionUrl("Apollo"), + 3044) + require.NoError(t, err) + b, err := json.Marshal(r.Message) + require.NoError(t, err) + fmt.Println(string(b)) + // for _, set := range r.Signatures.Records { + // for _, sig := range set.Signatures.Records { + // _ = sig + // fmt.Println("got signature") + // } + // } +} + func TestHealAnchors(t *testing.T) { t.Skip("Manual") diff --git a/tools/cmd/debug/main.go b/tools/cmd/debug/main.go index f920e690d..94b93f38b 100644 --- a/tools/cmd/debug/main.go +++ b/tools/cmd/debug/main.go @@ -14,6 +14,16 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/errors" ) +var ( + outputJSON bool + healContinuous bool + cachedScan string + verbose bool + pretend bool + waitForTxn bool + usePeer string +) + var cmd = &cobra.Command{ Use: "debug", Short: "Accumulate debug utilities", @@ -34,3 +44,9 @@ func check(err error) { fatalf("%+v", err) } } + +func checkf(err error, format string, otherArgs ...interface{}) { + if err != nil { + fatalf(format+": %v", append(otherArgs, err)...) + } +} diff --git a/tools/cmd/debug/network.go b/tools/cmd/debug/network.go new file mode 100644 index 000000000..1fec29841 --- /dev/null +++ b/tools/cmd/debug/network.go @@ -0,0 +1,96 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "os" + "strconv" + "time" + + "github.com/multiformats/go-multiaddr" + "github.com/spf13/cobra" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/internal/node/config" + "gitlab.com/accumulatenetwork/accumulate/pkg/accumulate" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" +) + +var networkCmd = &cobra.Command{ + Use: "network", +} + +var networkScanCmd = &cobra.Command{ + Use: "scan [network]", + Short: "Scan the network for nodes", + Args: cobra.ExactArgs(1), + Run: scanNetwork, +} + +var networkScanNodeCmd = &cobra.Command{ + Use: "scan-node [address]", + Short: "Scan a node", + Args: cobra.ExactArgs(1), + Run: scanNode, +} + +func init() { + cmd.AddCommand(networkCmd) + networkCmd.AddCommand(networkScanCmd) + networkCmd.AddCommand(networkScanNodeCmd) + + networkCmd.PersistentFlags().BoolVarP(&outputJSON, "json", "j", false, "Output result as JSON") +} + +func scanNetwork(_ *cobra.Command, args []string) { + client := jsonrpc.NewClient(accumulate.ResolveWellKnownEndpoint(args[0], "v3")) + client.Client.Timeout = time.Hour + net, err := healing.ScanNetwork(context.Background(), client) + check(err) + + if outputJSON { + check(json.NewEncoder(os.Stdout).Encode(net)) + return + } + + for _, part := range net.Status.Network.Partitions { + fmt.Println(part.ID) + for _, peer := range net.Peers[part.ID] { + fmt.Printf(" %v\n", peer) + } + } +} + +func scanNode(_ *cobra.Command, args []string) { + u, err := url.Parse(args[0]) + checkf(err, "invalid URL") + port, err := strconv.ParseUint(u.Port(), 10, 64) + checkf(err, "invalid port") + + client := jsonrpc.NewClient(accumulate.ResolveWellKnownEndpoint(args[0], "v3")) + client.Client.Timeout = time.Hour + peer, err := healing.ScanNode(context.Background(), client) + check(err) + + p2pPort := port - uint64(config.PortOffsetAccumulateApi) + uint64(config.PortOffsetAccumulateP2P) + tcp, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", u.Hostname(), p2pPort)) + check(err) + udp, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/%d/quic", u.Hostname(), p2pPort)) + check(err) + peer.Addresses = []multiaddr.Multiaddr{tcp, udp} + + if outputJSON { + fmt.Fprintln(os.Stderr, peer.ID) + check(json.NewEncoder(os.Stdout).Encode(peer)) + return + } + + fmt.Printf(" %v\n", peer) +} diff --git a/tools/cmd/debug/node.go b/tools/cmd/debug/node.go index 8cbf6ec44..c3230a454 100644 --- a/tools/cmd/debug/node.go +++ b/tools/cmd/debug/node.go @@ -16,9 +16,11 @@ import ( "github.com/fatih/color" "github.com/multiformats/go-multiaddr" "github.com/spf13/cobra" + "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" "gitlab.com/accumulatenetwork/accumulate/pkg/accumulate" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" ) @@ -40,6 +42,8 @@ func checkNode(_ *cobra.Command, args []string) { jc := jsonrpc.NewClient("http://" + args[0] + ":16595/v3") ns, err := jc.NetworkStatus(ctx, api.NetworkStatusOptions{}) check(err) + router, err := routing.NewStaticRouter(ns.Routing, nil) + check(err) ni, err := jc.NodeInfo(ctx, api.NodeInfoOptions{}) check(err) @@ -79,12 +83,11 @@ func checkNode(_ *cobra.Command, args []string) { check(err) _, err = pc.DialNetwork().Dial(ctx, svcAddr) if err == nil { - fmt.Println(color.GreenString("✔"), "Direct connection") + fmt.Println(color.GreenString("✔"), "Can connect") } else { - fmt.Println(color.RedString("🗴"), "Direct connection") + fmt.Println(color.RedString("🗴"), "Can connect") fmt.Println(err) } - check(pc.Close()) // Indirect pc, err = p2p.New(p2p.Options{ @@ -93,14 +96,18 @@ func checkNode(_ *cobra.Command, args []string) { BootstrapPeers: accumulate.BootstrapServers, }) check(err) - time.Sleep(time.Second) - - _, err = pc.DialNetwork().Dial(ctx, svcAddr) + mc := &message.Client{Transport: &message.RoutedTransport{ + Network: ni.Network, + Dialer: pc.DialNetwork(), + Router: routing.MessageRouter{Router: router}, + }} + _, err = mc.NodeInfo(ctx, api.NodeInfoOptions{PeerID: ni.PeerID}) if err == nil { - fmt.Println(color.GreenString("✔"), "Indirect connection") + fmt.Println(color.GreenString("✔"), "Can query") } else { - fmt.Println(color.RedString("🗴"), "Indirect connection") + fmt.Println(color.RedString("🗴"), "Can query") fmt.Println(err) } + check(pc.Close()) } diff --git a/tools/cmd/debug/sequence.go b/tools/cmd/debug/sequence.go index 4e3747266..6e03b553c 100644 --- a/tools/cmd/debug/sequence.go +++ b/tools/cmd/debug/sequence.go @@ -8,12 +8,26 @@ package main import ( "context" + "encoding/json" + "fmt" + "os" + "strings" + "time" "github.com/fatih/color" "github.com/spf13/cobra" - "gitlab.com/accumulatenetwork/accumulate/internal/api/v2" - client "gitlab.com/accumulatenetwork/accumulate/pkg/client/api/v2" + "gitlab.com/accumulatenetwork/accumulate/exp/apiutil" + "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/pkg/accumulate" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" ) var cmdSequence = &cobra.Command{ @@ -24,39 +38,78 @@ var cmdSequence = &cobra.Command{ func init() { cmd.AddCommand(cmdSequence) + cmdSequence.Flags().BoolVarP(&verbose, "verbose", "v", false, "More verbose output") + cmdSequence.PersistentFlags().StringVar(&cachedScan, "cached-scan", "", "A cached network scan") } -func sequence(_ *cobra.Command, args []string) { - c, err := client.New(args[0]) +func sequence(cmd *cobra.Command, args []string) { + ctx, cancel, _ := api.ContextWithBatchData(cmd.Context()) + defer cancel() + + c := jsonrpc.NewClient(accumulate.ResolveWellKnownEndpoint(args[0], "v3")) + c.Client.Timeout = time.Hour + Q := api.Querier2{Querier: c} + + ni, err := c.NodeInfo(ctx, api.NodeInfoOptions{}) check(err) - desc, err := c.Describe(context.Background()) + node, err := p2p.New(p2p.Options{ + Network: ni.Network, + BootstrapPeers: accumulate.BootstrapServers, + }) + check(err) + defer func() { _ = node.Close() }() + + fmt.Printf("We are %v\n", node.ID()) + + var net *healing.NetworkInfo + if cachedScan != "" { + data, err := os.ReadFile(cachedScan) + check(err) + check(json.Unmarshal(data, &net)) + } + + router := new(routing.MessageRouter) + c2 := &message.Client{ + Transport: &message.RoutedTransport{ + Network: ni.Network, + Dialer: &apiutil.StaticDialer{ + Scan: net, + Nodes: c, + Dialer: node.DialNetwork(), + }, + Router: router, + }, + } + Q.Querier = c2 + + ns, err := c.NetworkStatus(ctx, api.NetworkStatusOptions{Partition: protocol.Directory}) + check(err) + router.Router, err = routing.NewStaticRouter(ns.Routing, nil) check(err) anchors := map[string]*protocol.AnchorLedger{} synths := map[string]*protocol.SyntheticLedger{} bad := map[Dir]bool{} - for _, part := range desc.Values.Network.Partitions { + for _, part := range ns.Network.Partitions { // Get anchor ledger - req := new(api.GeneralQuery) - req.Url = protocol.PartitionUrl(part.ID).JoinPath(protocol.AnchorPool) - anchor := new(protocol.AnchorLedger) - res := new(api.ChainQueryResponse) - res.Data = anchor - err = c.RequestAPIv2(context.Background(), "query", req, res) + dst := protocol.PartitionUrl(part.ID) + var anchor *protocol.AnchorLedger + _, err = Q.QueryAccountAs(ctx, dst.JoinPath(protocol.AnchorPool), nil, &anchor) check(err) anchors[part.ID] = anchor // Get synthetic ledger - req.Url = protocol.PartitionUrl(part.ID).JoinPath(protocol.Synthetic) - synth := new(protocol.SyntheticLedger) - res.Data = synth - err = c.RequestAPIv2(context.Background(), "query", req, res) + var synth *protocol.SyntheticLedger + _, err = Q.QueryAccountAs(ctx, dst.JoinPath(protocol.Synthetic), nil, &synth) check(err) synths[part.ID] = synth // Check pending and received vs delivered for _, src := range anchor.Sequence { + ids, _ := findPendingAnchors(ctx, c2, Q, net, src.Url, dst, verbose) + src.Pending = append(src.Pending, ids...) + checkSequence1(part, src, bad, "anchors") } @@ -66,8 +119,8 @@ func sequence(_ *cobra.Command, args []string) { } // Check produced vs received - for i, a := range desc.Values.Network.Partitions { - for _, b := range desc.Values.Network.Partitions[i:] { + for i, a := range ns.Network.Partitions { + for _, b := range ns.Network.Partitions[i:] { checkSequence2(a, b, bad, "anchors", anchors[a.ID].Anchor(protocol.PartitionUrl(b.ID)), anchors[b.ID].Anchor(protocol.PartitionUrl(a.ID)), @@ -79,8 +132,8 @@ func sequence(_ *cobra.Command, args []string) { } } - for _, a := range desc.Values.Network.Partitions { - for _, b := range desc.Values.Network.Partitions { + for _, a := range ns.Network.Partitions { + for _, b := range ns.Network.Partitions { if !bad[Dir{From: a.ID, To: b.ID}] { color.Green("✔ %s → %s\n", a.ID, b.ID) } @@ -111,9 +164,70 @@ func checkSequence1(dst *protocol.PartitionInfo, src *protocol.PartitionSyntheti if len(src.Pending) > 0 { color.Red("🗴 %s → %s has %d pending %s\n", id, dst.ID, len(src.Pending), kind) bad[Dir{From: id, To: dst.ID}] = true + if verbose { + for _, id := range src.Pending { + fmt.Printf(" %v\n", id) + } + } } if src.Received > src.Delivered { color.Red("🗴 %s → %s has %d unprocessed %s\n", id, dst.ID, src.Received-src.Delivered, kind) bad[Dir{From: id, To: dst.ID}] = true } } + +func findPendingAnchors(ctx context.Context, C *message.Client, Q api.Querier2, net *healing.NetworkInfo, src, dst *url.URL, resolve bool) ([]*url.TxID, map[[32]byte]*protocol.Transaction) { + srcId, _ := protocol.ParsePartitionUrl(src) + dstId, _ := protocol.ParsePartitionUrl(dst) + + // Check how many have been received + var dstLedger *protocol.AnchorLedger + _, err := Q.QueryAccountAs(ctx, dst.JoinPath(protocol.AnchorPool), nil, &dstLedger) + checkf(err, "query %v → %v anchor ledger", srcId, dstId) + dstSrcLedger := dstLedger.Partition(src) + received := dstSrcLedger.Received + + // Check how many should have been sent + srcDstChain, err := Q.QueryChain(ctx, src.JoinPath(protocol.AnchorPool), &api.ChainQuery{Name: "anchor-sequence"}) + checkf(err, "query %v anchor sequence chain", srcId) + + if received >= srcDstChain.Count-1 { + return nil, nil + } + + // Non-verbose mode doesn't care about the actual IDs + if !resolve { + return make([]*url.TxID, srcDstChain.Count-received-1), nil + } + + var ids []*url.TxID + txns := map[[32]byte]*protocol.Transaction{} + for i := received + 1; i <= srcDstChain.Count; i++ { + var msg *api.MessageRecord[messaging.Message] + if net == nil { + slog.Info("Checking anchor", "source", src, "destination", dst, "number", i, "remaining", srcDstChain.Count-i) + msg, err = C.Private().Sequence(ctx, src.JoinPath(protocol.AnchorPool), dst, i) + checkf(err, "query %v → %v anchor #%d", srcId, dstId, i) + } else { + for _, peer := range net.Peers[strings.ToLower(srcId)] { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + slog.Info("Checking anchor", "source", src, "destination", dst, "number", i, "remaining", srcDstChain.Count-i, "peer", peer.ID) + msg, err = C.ForPeer(peer.ID).Private().Sequence(ctx, src.JoinPath(protocol.AnchorPool), dst, i) + if err == nil { + break + } + slog.Error("Failed to check anchor", "source", src, "destination", dst, "number", i, "remaining", srcDstChain.Count-i, "peer", peer.ID, "error", err) + } + if msg == nil { + fatalf("query %v → %v anchor #%d failed", srcId, dstId, i) + } + } + + ids = append(ids, msg.ID) + + txn := msg.Message.(*messaging.TransactionMessage) + txns[txn.Hash()] = txn.Transaction + } + return ids, txns +} diff --git a/tools/cmd/resend-anchor/Dockerfile b/tools/cmd/resend-anchor/Dockerfile index 8a8b788ba..7180b50c3 100644 --- a/tools/cmd/resend-anchor/Dockerfile +++ b/tools/cmd/resend-anchor/Dockerfile @@ -16,4 +16,4 @@ RUN apk add --no-cache bash curl COPY --from=build /go/bin/resend-anchor /go/bin/dlv /bin/ ENTRYPOINT ["resend-anchor"] -CMD ["heal", "--help"] \ No newline at end of file +CMD ["--help"] \ No newline at end of file diff --git a/tools/cmd/resend-anchor/heal.go b/tools/cmd/resend-anchor/heal.go deleted file mode 100644 index 48b6d63e8..000000000 --- a/tools/cmd/resend-anchor/heal.go +++ /dev/null @@ -1,370 +0,0 @@ -// Copyright 2023 The Accumulate Authors -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -package main - -import ( - "context" - "encoding/hex" - "encoding/json" - "fmt" - "log" - "os" - "strings" - "time" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/spf13/cobra" - "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" - "gitlab.com/accumulatenetwork/accumulate/internal/core" - "gitlab.com/accumulatenetwork/accumulate/pkg/accumulate" - "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" - "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" - "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" - "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" - client "gitlab.com/accumulatenetwork/accumulate/pkg/client/api/v2" - "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" - "gitlab.com/accumulatenetwork/accumulate/pkg/url" - "gitlab.com/accumulatenetwork/accumulate/protocol" -) - -var healCmd = &cobra.Command{ - Use: "heal [network]", - Args: cobra.ExactArgs(1), - Run: heal, -} - -func init() { - cmd.AddCommand(healCmd) - healCmd.Flags().BoolVar(&healFlag.Continuous, "continuous", false, "Run healing in a loop every second") -} - -var healFlag = struct { - Continuous bool -}{} - -func heal(_ *cobra.Command, args []string) { - ctx, cancel, _ := api.ContextWithBatchData(context.Background()) - defer cancel() - - C2 := jsonrpc.NewClient(args[0]) - apiNode, err := C2.NodeInfo(ctx, api.NodeInfoOptions{}) - checkf(err, "query node info") - - status, err := C2.NetworkStatus(ctx, api.NetworkStatusOptions{}) - checkf(err, "query network status") - - node, err := p2p.New(p2p.Options{ - Network: apiNode.Network, - BootstrapPeers: accumulate.BootstrapServers, - }) - checkf(err, "start p2p node") - defer func() { _ = node.Close() }() - - fmt.Printf("We are %v\n", node.ID()) - - router := new(routing.MessageRouter) - C := &message.Client{ - Transport: &message.RoutedTransport{ - Network: apiNode.Network, - Dialer: node.DialNetwork(), - Router: router, - }, - } - router.Router, err = routing.NewStaticRouter(status.Routing, nil) - check(err) - - peers := getPeers(C2, ctx) - -heal: - // Heal BVN -> DN - for _, part := range status.Network.Partitions { - if part.Type != protocol.PartitionTypeBlockValidator { - continue - } - - partUrl := protocol.PartitionUrl(part.ID) - ledger := getAccount[*protocol.AnchorLedger](C2, ctx, partUrl.JoinPath(protocol.AnchorPool)) - partLedger := ledger.Anchor(protocol.DnUrl()) - - for i, txid := range ledger.Anchor(protocol.DnUrl()).Pending { - healAnchor(C, C2, ctx, protocol.DnUrl(), partUrl, txid, partLedger.Delivered+1+uint64(i), peers[protocol.Directory]) - } - } - - // Heal DN -> BVN, DN -> DN - { - ledger := getAccount[*protocol.AnchorLedger](C2, ctx, protocol.DnUrl().JoinPath(protocol.AnchorPool)) - - for _, part := range status.Network.Partitions { - partUrl := protocol.PartitionUrl(part.ID) - partLedger := ledger.Anchor(partUrl) - for i, txid := range ledger.Anchor(partUrl).Pending { - healAnchor(C, C2, ctx, partUrl, protocol.DnUrl(), txid, partLedger.Delivered+1+uint64(i), peers[part.ID]) - } - } - } - - // Heal continuously? - if healFlag.Continuous { - time.Sleep(time.Second) - goto heal - } -} - -type PeerInfo struct { - api.ConsensusStatus - Key [32]byte - Operator *url.URL -} - -func (p *PeerInfo) String() string { - if p.Operator != nil { - return fmt.Sprintf("%v (%x)", p.Operator, p.Key) - } - return hex.EncodeToString(p.Key[:]) -} - -func getPeers(C2 *jsonrpc.Client, ctx context.Context) map[string]map[peer.ID]*PeerInfo { - apiNode, err := C2.NodeInfo(ctx, api.NodeInfoOptions{}) - checkf(err, "query node info") - - status, err := C2.NetworkStatus(ctx, api.NetworkStatusOptions{}) - checkf(err, "query network status") - - hash2key := map[[32]byte][32]byte{} - for _, val := range status.Network.Validators { - hash2key[val.PublicKeyHash] = *(*[32]byte)(val.PublicKey) - } - - peers := map[string]map[peer.ID]*PeerInfo{} - for _, part := range status.Network.Partitions { - peers[part.ID] = map[peer.ID]*PeerInfo{} - - fmt.Printf("Getting peers for %s\n", part.ID) - find := api.FindServiceOptions{ - Network: apiNode.Network, - Service: api.ServiceTypeConsensus.AddressFor(part.ID), - } - res, err := C2.FindService(ctx, find) - checkf(err, "find %s on %s", find.Service.String(), find.Network) - - for _, peer := range res { - fmt.Printf("Getting identity of %v\n", peer.PeerID) - info, err := C2.ConsensusStatus(ctx, api.ConsensusStatusOptions{NodeID: peer.PeerID.String(), Partition: part.ID}) - if err != nil { - fmt.Printf("%+v\n", err) - continue - } - - key, ok := hash2key[info.ValidatorKeyHash] - if !ok { - continue // Not a validator - } - pi := &PeerInfo{ - ConsensusStatus: *info, - Key: key, - } - peers[part.ID][peer.PeerID] = pi - - _, val, ok := status.Network.ValidatorByHash(info.ValidatorKeyHash[:]) - if ok { - pi.Operator = val.Operator - } - } - } - return peers -} - -func getLedger(c *client.Client, part *url.URL) *protocol.AnchorLedger { //nolint:unused - ledger := new(protocol.AnchorLedger) - res := new(client.ChainQueryResponse) - res.Data = ledger - req := new(client.GeneralQuery) - req.Url = part.JoinPath(protocol.AnchorPool) - err := c.RequestAPIv2(context.Background(), "query", req, res) - checkf(err, "query %s anchor ledger", part) - return ledger -} - -func healTx(g *core.GlobalValues, nodes map[string][]*NodeData, netClient *client.Client, srcUrl, dstUrl *url.URL, txid *url.TxID) { //nolint:unused - // dstId, _ := protocol.ParsePartitionUrl(dstUrl) - srcId, _ := protocol.ParsePartitionUrl(srcUrl) - - // Query the transaction - res, err := netClient.QueryTx(context.Background(), &client.TxnQuery{TxIdUrl: txid}) - if err != nil { - log.Printf("Failed to query %v: %v\n", txid, err) - return - } - - // Check if there are already enough transactions - if uint64(len(res.Status.AnchorSigners)) >= g.ValidatorThreshold(srcId) { - return // Already have enough signers - } - - fmt.Printf("Healing anchor %v\n", txid) - - // Mark which nodes have signed - signed := map[[32]byte]bool{} - for _, s := range res.Status.AnchorSigners { - signed[*(*[32]byte)(s)] = true - } - - // // Make a client for the destination - // dstClient := nodes[strings.ToLower(dstId)][0].AccumulateAPIForUrl(dstUrl) - - // Get a signature from each node that hasn't signed - for _, node := range nodes[strings.ToLower(srcId)] { - if signed[*(*[32]byte)(node.Info.PublicKey)] { - continue - } - - // Make a client for the source - srcClient := node.AccumulateAPIForUrl(srcUrl) - - // Query and execute the anchor - querySynthAndExecute(srcClient, netClient, srcUrl, dstUrl, res.Status.SequenceNumber, false) - } -} - -func getAccount[T protocol.Account](C api.Querier, ctx context.Context, u *url.URL) T { - var v T - _, err := api.Querier2{Querier: C}.QueryAccountAs(ctx, u, nil, &v) - checkf(err, "get %v", u) - return v -} - -func healAnchor(C *message.Client, C2 *jsonrpc.Client, ctx context.Context, srcUrl, dstUrl *url.URL, txid *url.TxID, seqNum uint64, peers map[peer.ID]*PeerInfo) { - fmt.Printf("Healing anchor %v\n", txid) - - dstId, ok := protocol.ParsePartitionUrl(dstUrl) - if !ok { - panic("not a partition: " + dstUrl.String()) - } - - // Query the transaction - res, err := api.Querier2{Querier: C2}.QueryTransaction(ctx, txid, nil) - checkf(err, "get %v", txid) - - // Mark which nodes have signed - signed := map[[32]byte]bool{} - for _, sigs := range res.Signatures.Records { - for _, sig := range sigs.Signatures.Records { - msg, ok := sig.Message.(*messaging.BlockAnchor) - if !ok { - continue - } - signed[*(*[32]byte)(msg.Signature.GetPublicKey())] = true - } - } - - theAnchorTxn := res.Message.Transaction - env := new(messaging.Envelope) - env.Transaction = []*protocol.Transaction{theAnchorTxn} - - // Get a signature from each node that hasn't signed - var bad []peer.ID - var gotPartSig bool - for peer, info := range peers { - if signed[info.Key] { - continue - } - - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - fmt.Printf("Querying %v for %v\n", peer, txid) - res, err := C.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(protocol.AnchorPool), dstUrl, seqNum) - if err != nil { - fmt.Printf("%+v\n", err) - bad = append(bad, peer) - continue - } - - myTxn, ok := res.Message.(*messaging.TransactionMessage) - if !ok { - err := fmt.Errorf("expected %v, got %v", messaging.MessageTypeTransaction, res.Message.Type()) - warnf(err, "%v gave us an anchor that is not a transaction", info) - continue - } - if !myTxn.Transaction.Equal(theAnchorTxn) { - err := fmt.Errorf("expected %x, got %x", theAnchorTxn.GetHash(), myTxn.Transaction.GetHash()) - warnf(err, "%v gave us an anchor that doesn't match what we expect", info) - if b, err := json.Marshal(theAnchorTxn); err != nil { - check(err) - } else { - fmt.Fprintf(os.Stderr, "Want: %s\n", b) - } - if b, err := json.Marshal(myTxn.Transaction); err != nil { - check(err) - } else { - fmt.Fprintf(os.Stderr, "Got: %s\n", b) - } - continue - } - - for _, sigs := range res.Signatures.Records { - for _, sig := range sigs.Signatures.Records { - msg, ok := sig.Message.(*messaging.SignatureMessage) - if !ok { - err := fmt.Errorf("expected %v, got %v", messaging.MessageTypeSignature, sig.Message.Type()) - warnf(err, "%v gave us a signature that is not a signature", info) - continue - } - - switch sig := msg.Signature.(type) { - case *protocol.PartitionSignature: - // We only want one partition signature - if gotPartSig { - continue - } - gotPartSig = true - - case protocol.UserSignature: - // Filter out bad signatures - if !sig.Verify(nil, theAnchorTxn.GetHash()) { - err := fmt.Errorf("invalid signature") - warnf(err, "%v gave us an invalid signature", info) - continue - } - - default: - err := fmt.Errorf("expected user signature, got %v", sig.Type()) - warnf(err, "%v gave us a signature that is not a signature", info) - continue - } - - env.Signatures = append(env.Signatures, msg.Signature) - } - } - } - - for _, peer := range bad { - fmt.Printf("Removing bad peer %v from the list of candidates\n", peer) - delete(peers, peer) - } - - // We should always have a partition signature, so there's only something to - // sent if we have more than 1 signature - if len(env.Signatures) == 1 { - fmt.Println("Nothing to send") - return - } - - fmt.Printf("Submitting %d signatures\n", len(env.Signatures)) - addr := api.ServiceTypeSubmit.AddressFor(dstId).Multiaddr() - sub, err := C.ForAddress(addr).Submit(ctx, env, api.SubmitOptions{}) - if err != nil { - fmt.Println(err) - return - } - for _, sub := range sub { - if !sub.Success { - fmt.Println(sub.Message) - } - } -}