From 2501d94f139ac23160a130a48edcd4ac7904eee5 Mon Sep 17 00:00:00 2001 From: Ethan Reesor Date: Mon, 14 Aug 2023 16:01:19 -0500 Subject: [PATCH] Improve anchor healing [#3379] --- .dockerignore | 2 +- .vscode/launch.json | 29 ++- internal/api/routing/router.go | 3 + internal/core/healing/anchors.go | 309 ++++++++++++++++++++++++ internal/core/healing/sequenced.go | 83 +++++++ internal/core/healing/synthetic.go | 152 ++++++++++++ pkg/api/v3/api.go | 30 +++ pkg/api/v3/api_test.go | 25 ++ pkg/api/v3/message/types_gen.go | 12 + tools/cmd/debug/comet.go | 6 - tools/cmd/debug/heal_anchor.go | 193 +++++++++++++++ tools/cmd/debug/heal_common.go | 52 ++++ tools/cmd/debug/heal_synth.go | 223 +++++++---------- tools/cmd/debug/heal_test.go | 101 ++++++++ tools/cmd/debug/main.go | 16 ++ tools/cmd/debug/network.go | 96 ++++++++ tools/cmd/debug/node.go | 28 +-- tools/cmd/debug/sequence.go | 154 ++++++++++-- tools/cmd/resend-anchor/Dockerfile | 2 +- tools/cmd/resend-anchor/heal.go | 369 ----------------------------- 20 files changed, 1338 insertions(+), 547 deletions(-) create mode 100644 internal/core/healing/anchors.go create mode 100644 internal/core/healing/sequenced.go create mode 100644 internal/core/healing/synthetic.go create mode 100644 pkg/api/v3/api_test.go create mode 100644 tools/cmd/debug/heal_anchor.go create mode 100644 tools/cmd/debug/heal_common.go create mode 100644 tools/cmd/debug/network.go delete mode 100644 tools/cmd/resend-anchor/heal.go diff --git a/.dockerignore b/.dockerignore index 466703db6..04b61b72a 100644 --- a/.dockerignore +++ b/.dockerignore @@ -4,4 +4,4 @@ *.exe .git */Dockerfile -.test \ No newline at end of file +.test diff --git a/.vscode/launch.json b/.vscode/launch.json index 9b0f9456a..057961a43 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -261,7 +261,7 @@ "mode": "auto", "program": "${workspaceFolder}/cmd/accumulated-http", "args": [ - "Kermit", + "MainNet", "--debug", "--http-listen=/ip4/0.0.0.0/tcp/26660", "--peer=/dns/bootstrap.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWGJTh4aeF7bFnwo9sAYRujCkuVU1Cq8wNeTNGpFgZgXdg", @@ -540,11 +540,14 @@ "type": "go", "request": "launch", "mode": "auto", - "program": "${workspaceFolder}/tools/cmd/resend-anchor", + "program": "${workspaceFolder}/tools/cmd/debug", "cwd": "${workspaceFolder}", "args": [ "heal", - "https://mainnet.accumulatenetwork.io/v3", + "anchor", + "MainNet", + "--cached-scan=${env:HOME}/.accumulate/cache/mainnet.json", + "Directory→Apollo", "70911" ] }, { @@ -558,9 +561,27 @@ "program": "${workspaceFolder}/tools/cmd/debug", "cwd": "${workspaceFolder}", "args": [ - "heal-synth", + "heal", + "synth", "mainnet", + "--cached-scan=${env:HOME}/.accumulate/cache/mainnet.json", + ] + }, + { + "name": "Heal single anchor", + "presentation": { "group": "99-Miscellaneous", }, + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/tools/cmd/debug", + "cwd": "${workspaceFolder}", + "args": [ + "heal", + "anchor", "mainnet", + "Chandrayaan → Apollo", + // "acc://f18b9ddd70654849ce063581b7bcbc6947d4b763d7a2fa4668e6fd9129da0e23@dn.acme/anchors", + "--cached-scan=${env:HOME}/.accumulate/cache/mainnet.json", ] }, { diff --git a/internal/api/routing/router.go b/internal/api/routing/router.go index ff74df23a..3a6d37d49 100644 --- a/internal/api/routing/router.go +++ b/internal/api/routing/router.go @@ -143,6 +143,9 @@ func routeMessage(routeAccount func(*url.URL) (string, error), route *string, ms case *messaging.SequencedMessage: r, err = routeAccount(msg.Destination) + case *messaging.SyntheticMessage: + return routeMessage(routeAccount, route, msg.Message) + case *messaging.BlockAnchor: return routeMessage(routeAccount, route, msg.Anchor) diff --git a/internal/core/healing/anchors.go b/internal/core/healing/anchors.go new file mode 100644 index 000000000..a64ebb850 --- /dev/null +++ b/internal/core/healing/anchors.go @@ -0,0 +1,309 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +import ( + "context" + "encoding/hex" + "fmt" + "strings" + "time" + + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/network" + "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" +) + +type HealAnchorArgs struct { + Client message.AddressedClient + Querier api.Querier + Submitter api.Submitter + NetInfo *NetworkInfo + Known map[[32]byte]*protocol.Transaction + Pretend bool + Wait bool +} + +func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error { + srcUrl := protocol.PartitionUrl(si.Source) + dstUrl := protocol.PartitionUrl(si.Destination) + + if args.Querier == nil { + args.Querier = args.Client + } + if args.Submitter == nil { + args.Submitter = args.Client + } + + // If the message ID is not known, resolve it + var theAnchorTxn *protocol.Transaction + if si.ID == nil { + r, err := ResolveSequenced[*messaging.TransactionMessage](ctx, args.Client, args.NetInfo, si.Source, si.Destination, si.Number, true) + if err != nil { + return err + } + si.ID = r.ID + theAnchorTxn = r.Message.Transaction + } + + // Fetch the transaction and signatures + var sigSets []*api.SignatureSetRecord + Q := api.Querier2{Querier: args.Querier} + res, err := Q.QueryMessage(ctx, si.ID, nil) + switch { + case err == nil: + switch msg := res.Message.(type) { + case *messaging.SequencedMessage: + txn, ok := msg.Message.(*messaging.TransactionMessage) + if !ok { + return errors.InternalError.WithFormat("expected %v, got %v", messaging.MessageTypeTransaction, msg.Message.Type()) + } + theAnchorTxn = txn.Transaction + case *messaging.TransactionMessage: + theAnchorTxn = msg.Transaction + default: + return errors.InternalError.WithFormat("expected %v, got %v", messaging.MessageTypeSequenced, res.Message.Type()) + } + + sigSets = res.Signatures.Records + + case !errors.Is(err, errors.NotFound): + return err + + case theAnchorTxn == nil: + var ok bool + theAnchorTxn, ok = args.Known[si.ID.Hash()] + if !ok { + return err + } + } + + // Mark which validators have signed + signed := map[[32]byte]bool{} + for _, sigs := range sigSets { + for _, sig := range sigs.Signatures.Records { + msg, ok := sig.Message.(*messaging.BlockAnchor) + if !ok { + continue + } + k := msg.Signature.GetPublicKey() + slog.DebugCtx(ctx, "Anchor has been signed by", "validator", hex.EncodeToString(k[:4])) + signed[*(*[32]byte)(k)] = true + } + } + + g := &network.GlobalValues{ + Oracle: args.NetInfo.Status.Oracle, + Globals: args.NetInfo.Status.Globals, + Network: args.NetInfo.Status.Network, + Routing: args.NetInfo.Status.Routing, + ExecutorVersion: args.NetInfo.Status.ExecutorVersion, + } + threshold := g.ValidatorThreshold(si.Source) + + lkv := []any{ + "source", si.Source, + "destination", si.Destination, + "sequence-number", si.Number, + "want", threshold, + "have", len(signed), + } + if theAnchorTxn != nil { + lkv = append(lkv, + "txid", theAnchorTxn.ID(), + ) + } + slog.InfoCtx(ctx, "Healing anchor", lkv...) + + if len(signed) >= int(threshold) { + slog.InfoCtx(ctx, "Sufficient signatures have been received") + return nil + } + + seq := &messaging.SequencedMessage{ + Source: srcUrl, + Destination: dstUrl, + Number: si.Number, + } + if theAnchorTxn != nil { + seq.Message = &messaging.TransactionMessage{ + Transaction: theAnchorTxn, + } + } + + // Get a signature from each node that hasn't signed + var gotPartSig bool + var signatures []protocol.Signature + for peer, info := range args.NetInfo.Peers[strings.ToLower(si.Source)] { + if signed[info.Key] { + continue + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + slog.InfoCtx(ctx, "Querying node for its signature", "id", peer) + res, err := args.Client.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(protocol.AnchorPool), dstUrl, si.Number) + if err != nil { + slog.ErrorCtx(ctx, "Query failed", "error", err) + continue + } + + myTxn, ok := res.Message.(*messaging.TransactionMessage) + if !ok { + slog.ErrorCtx(ctx, "Node gave us an anchor that is not a transaction", "id", info, "type", res.Message.Type()) + continue + } + if theAnchorTxn == nil { + theAnchorTxn = myTxn.Transaction + seq.Message = &messaging.TransactionMessage{ + Transaction: theAnchorTxn, + } + } else if !myTxn.Transaction.Equal(theAnchorTxn) { + slog.ErrorCtx(ctx, "Node gave us an anchor with a different hash", "id", info, + "expected", hex.EncodeToString(theAnchorTxn.GetHash()), + "got", hex.EncodeToString(myTxn.Transaction.GetHash())) + continue + } + + for _, sigs := range res.Signatures.Records { + for _, sig := range sigs.Signatures.Records { + msg, ok := sig.Message.(*messaging.SignatureMessage) + if !ok { + slog.ErrorCtx(ctx, "Node gave us a signature that is not a signature", "id", info, "type", sig.Message.Type()) + continue + } + + if args.NetInfo.Status.ExecutorVersion.V2Enabled() { + sig, ok := msg.Signature.(protocol.KeySignature) + if !ok { + slog.ErrorCtx(ctx, "Node gave us a signature that is not a key signature", "id", info, "type", sig.Type()) + continue + } + + // Filter out bad signatures + h := seq.Hash() + if !sig.Verify(nil, h[:]) { + slog.ErrorCtx(ctx, "Node gave us an invalid signature", "id", info) + continue + } + + } else { + switch sig := msg.Signature.(type) { + case *protocol.PartitionSignature: + // We only want one partition signature + if gotPartSig { + continue + } + gotPartSig = true + + case protocol.UserSignature: + // Filter out bad signatures + if !sig.Verify(nil, theAnchorTxn.GetHash()) { + slog.ErrorCtx(ctx, "Node gave us an invalid signature", "id", info) + continue + } + + default: + slog.ErrorCtx(ctx, "Node gave us a signature that is not a user signature", "id", info, "type", sig.Type()) + continue + } + } + + signatures = append(signatures, msg.Signature) + } + } + } + + if args.Pretend { + b, err := theAnchorTxn.MarshalBinary() + if err != nil { + panic(err) + } + slog.InfoCtx(ctx, "Would have submitted anchor", "signatures", len(signatures), "source", si.Source, "destination", si.Destination, "number", si.Number, "txn-size", len(b)) + return nil + } + + // We should always have a partition signature, so there's only something to + // sent if we have more than 1 signature + if gotPartSig && len(signatures) <= 1 || !gotPartSig && len(signatures) == 0 { + slog.InfoCtx(ctx, "Nothing to send") + + } else { + slog.InfoCtx(ctx, "Submitting signatures", "count", len(signatures)) + + submit := func(env *messaging.Envelope) { + // addr := api.ServiceTypeSubmit.AddressFor(seq.Destination).Multiaddr() + sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{}) + if err != nil { + slog.ErrorCtx(ctx, "Submission failed", "error", err) + } + for _, sub := range sub { + if sub.Success { + slog.InfoCtx(ctx, "Submission succeeded", "id", sub.Status.TxID) + } else { + slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status) + } + } + } + + if args.NetInfo.Status.ExecutorVersion.V2Enabled() { + for _, sig := range signatures { + blk := &messaging.BlockAnchor{ + Signature: sig.(protocol.KeySignature), + Anchor: seq, + } + submit(&messaging.Envelope{Messages: []messaging.Message{blk}}) + } + } else { + env := new(messaging.Envelope) + env.Transaction = []*protocol.Transaction{theAnchorTxn} + env.Signatures = signatures + submit(env) + } + } + + if !args.Wait { + return nil + } + + slog.InfoCtx(ctx, "Waiting", "for", si.ID) + for i := 0; i < 10; i++ { + r, err := Q.QueryMessage(ctx, si.ID, nil) + switch { + case errors.Is(err, errors.NotFound): + // Not found, wait + slog.Info("Status", "id", si.ID, "code", errors.NotFound) + break + + case err != nil: + // Unknown error + return err + + case !r.Status.Delivered(): + // Pending, wait + slog.Info("Status", "id", si.ID, "code", r.Status) + break + + case r.Error != nil: + slog.Error("Failed", "id", si.ID, "error", r.Error) + return r.AsError() + + default: + slog.Info("Delivered", "id", si.ID) + return nil + } + time.Sleep(time.Second / 2) + } + return ErrRetry +} + +var ErrRetry = fmt.Errorf("retry") diff --git a/internal/core/healing/sequenced.go b/internal/core/healing/sequenced.go new file mode 100644 index 000000000..a0408c551 --- /dev/null +++ b/internal/core/healing/sequenced.go @@ -0,0 +1,83 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +import ( + "context" + "strings" + "time" + + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" + "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" +) + +type SequencedInfo struct { + Source string + Destination string + Number uint64 + ID *url.TxID +} + +// ResolveSequenced resolves an anchor or synthetic message (a sequenced +// message). If the client's address is non-nil, the query will be sent to that +// address. Otherwise, all of the source partition's nodes will be queried in +// order until one responds. +func ResolveSequenced[T messaging.Message](ctx context.Context, client message.AddressedClient, net *NetworkInfo, srcId, dstId string, seqNum uint64, anchor bool) (*api.MessageRecord[T], error) { + srcUrl := protocol.PartitionUrl(srcId) + dstUrl := protocol.PartitionUrl(dstId) + + var account string + if anchor { + account = protocol.AnchorPool + } else { + account = protocol.Synthetic + } + + // If the client has an address, use that + if client.Address != nil { + slog.InfoCtx(ctx, "Querying node", "address", client.Address) + res, err := client.Private().Sequence(ctx, srcUrl.JoinPath(account), dstUrl, seqNum) + if err != nil { + return nil, err + } + + r2, err := api.MessageRecordAs[T](res) + if err != nil { + return nil, err + } + return r2, nil + } + + // Otherwise try each node until one succeeds + slog.InfoCtx(ctx, "Resolving the message ID", "source", srcId, "destination", dstId, "number", seqNum) + for peer := range net.Peers[strings.ToLower(srcId)] { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + slog.InfoCtx(ctx, "Querying node", "id", peer) + res, err := client.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(account), dstUrl, seqNum) + if err != nil { + slog.ErrorCtx(ctx, "Query failed", "error", err) + continue + } + + r2, err := api.MessageRecordAs[T](res) + if err != nil { + slog.ErrorCtx(ctx, "Query failed", "error", err) + continue + } + + return r2, nil + } + + return nil, errors.UnknownError.WithFormat("cannot resolve %s→%s #%d", srcId, dstId, seqNum) +} diff --git a/internal/core/healing/synthetic.go b/internal/core/healing/synthetic.go new file mode 100644 index 000000000..82b1d282b --- /dev/null +++ b/internal/core/healing/synthetic.go @@ -0,0 +1,152 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +import ( + "context" + "fmt" + "strings" + "time" + + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" +) + +type HealSyntheticArgs struct { + Client message.AddressedClient + Querier api.Querier + Submitter api.Submitter + NetInfo *NetworkInfo + Pretend bool + Wait bool +} + +func HealSynthetic(ctx context.Context, args HealSyntheticArgs, si SequencedInfo) error { + if args.Querier == nil { + args.Querier = args.Client + } + if args.Submitter == nil { + args.Submitter = args.Client + } + + // Query the synthetic transaction + r, err := ResolveSequenced[messaging.Message](ctx, args.Client, args.NetInfo, si.Source, si.Destination, si.Number, false) + if err != nil { + return err + } + + // Has it already been delivered? + Q := api.Querier2{Querier: args.Client} + if r, err := Q.QueryMessage(ctx, r.ID, nil); err == nil && r.Status.Delivered() { + return nil + } + + slog.InfoCtx(ctx, "Resubmitting", "source", si.Source, "destination", si.Destination, "number", si.Number, "id", r.Message.ID()) + + // Submit the synthetic transaction directly to the destination partition + h := r.Sequence.Hash() + msg := &messaging.SyntheticMessage{ + Message: r.Sequence, + Proof: &protocol.AnnotatedReceipt{ + Receipt: r.SourceReceipt, + Anchor: &protocol.AnchorMetadata{ + Account: protocol.DnUrl(), + }, + }, + } + for _, sigs := range r.Signatures.Records { + for _, sig := range sigs.Signatures.Records { + sig, ok := sig.Message.(*messaging.SignatureMessage) + if !ok { + continue + } + ks, ok := sig.Signature.(protocol.KeySignature) + if !ok { + continue + } + msg.Signature = ks + } + } + if msg.Signature == nil { + return fmt.Errorf("synthetic message is not signed") + } + + h = msg.Message.Hash() + if !msg.Signature.Verify(nil, h[:]) { + return fmt.Errorf("signature is not valid") + } + + env := new(messaging.Envelope) + env.Messages = []messaging.Message{msg} + if msg, ok := r.Message.(messaging.MessageForTransaction); ok { + r, err := Q.QueryTransaction(ctx, msg.GetTxID(), nil) + if err != nil { + return errors.InternalError.WithFormat("query transaction for message: %w", err) + } + env.Messages = append(env.Messages, r.Message) + } + + if args.Pretend { + return nil + } + + // Submit directly to an appropriate node + if args.Client.Address == nil { + for peer := range args.NetInfo.Peers[strings.ToLower(si.Destination)] { + args.Client = args.Client.ForPeer(peer) + break + } + } + + sub, err := args.Client.Submit(ctx, env, api.SubmitOptions{}) + if err != nil { + slog.ErrorCtx(ctx, "Submission failed", "error", err) + } + for _, sub := range sub { + if !sub.Success { + slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status) + } + } + if !args.Wait { + return nil + } + + si.ID = r.ID + slog.InfoCtx(ctx, "Waiting", "for", si.ID) + for i := 0; i < 10; i++ { + r, err := Q.QueryMessage(ctx, si.ID, nil) + switch { + case errors.Is(err, errors.NotFound): + // Not found, wait + slog.Info("Status", "id", si.ID, "code", errors.NotFound) + break + + case err != nil: + // Unknown error + return err + + case !r.Status.Delivered(): + // Pending, wait + slog.Info("Status", "id", si.ID, "code", r.Status) + break + + case r.Error != nil: + slog.Error("Failed", "id", si.ID, "error", r.Error) + return r.AsError() + + default: + slog.Info("Delivered", "id", si.ID) + return nil + } + time.Sleep(time.Second / 2) + } + return ErrRetry +} diff --git a/pkg/api/v3/api.go b/pkg/api/v3/api.go index 2d6db1c85..c4b3f2201 100644 --- a/pkg/api/v3/api.go +++ b/pkg/api/v3/api.go @@ -8,6 +8,8 @@ package api import ( "context" + stdurl "net/url" + "strings" "github.com/multiformats/go-multiaddr" "gitlab.com/accumulatenetwork/accumulate/pkg/types/encoding" @@ -34,6 +36,34 @@ var BootstrapServers = func() []multiaddr.Multiaddr { return addrs }() +var WellKnownNetworks = map[string]string{ + "mainnet": "https://mainnet.accumulatenetwork.io", + "kermit": "https://kermit.accumulatenetwork.io", + "fozzie": "https://fozzie.accumulatenetwork.io", + + "testnet": "https://kermit.accumulatenetwork.io", + "local": "http://127.0.1.1:26660", +} + +func ResolveWellKnownEndpoint(name string) string { + addr, ok := WellKnownNetworks[strings.ToLower(name)] + if !ok { + addr = name + } + + u, err := stdurl.Parse(addr) + if err != nil { + return addr + } + switch u.Path { + case "": + addr += "/v3" + case "/": + addr += "v3" + } + return addr +} + // ServiceType is used to identify services. type ServiceType uint64 diff --git a/pkg/api/v3/api_test.go b/pkg/api/v3/api_test.go new file mode 100644 index 000000000..d634b21b4 --- /dev/null +++ b/pkg/api/v3/api_test.go @@ -0,0 +1,25 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package api + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestResolveWellKnownEndpoint(t *testing.T) { + cases := map[string]string{ + "mainnet": "https://mainnet.accumulatenetwork.io/v3", + "https://mainnet.accumulatenetwork.io": "https://mainnet.accumulatenetwork.io/v3", + } + + for input, expect := range cases { + actual := ResolveWellKnownEndpoint(input) + require.Equal(t, expect, actual) + } +} diff --git a/pkg/api/v3/message/types_gen.go b/pkg/api/v3/message/types_gen.go index ec4cabf83..b4271b84f 100644 --- a/pkg/api/v3/message/types_gen.go +++ b/pkg/api/v3/message/types_gen.go @@ -3169,6 +3169,7 @@ func (v *FindServiceRequest) MarshalJSON() ([]byte, error) { Network string `json:"network,omitempty"` Service *api.ServiceAddress `json:"service,omitempty"` Known bool `json:"known,omitempty"` + Timeout interface{} `json:"timeout,omitempty"` }{} u.Type = v.Type() if !(len(v.FindServiceOptions.Network) == 0) { @@ -3183,6 +3184,10 @@ func (v *FindServiceRequest) MarshalJSON() ([]byte, error) { u.Known = v.FindServiceOptions.Known } + if !(v.FindServiceOptions.Timeout == 0) { + + u.Timeout = encoding.DurationToJSON(v.FindServiceOptions.Timeout) + } return json.Marshal(&u) } @@ -3566,11 +3571,13 @@ func (v *FindServiceRequest) UnmarshalJSON(data []byte) error { Network string `json:"network,omitempty"` Service *api.ServiceAddress `json:"service,omitempty"` Known bool `json:"known,omitempty"` + Timeout interface{} `json:"timeout,omitempty"` }{} u.Type = v.Type() u.Network = v.FindServiceOptions.Network u.Service = v.FindServiceOptions.Service u.Known = v.FindServiceOptions.Known + u.Timeout = encoding.DurationToJSON(v.FindServiceOptions.Timeout) if err := json.Unmarshal(data, &u); err != nil { return err } @@ -3580,6 +3587,11 @@ func (v *FindServiceRequest) UnmarshalJSON(data []byte) error { v.FindServiceOptions.Network = u.Network v.FindServiceOptions.Service = u.Service v.FindServiceOptions.Known = u.Known + if x, err := encoding.DurationFromJSON(u.Timeout); err != nil { + return fmt.Errorf("error decoding Timeout: %w", err) + } else { + v.FindServiceOptions.Timeout = x + } return nil } diff --git a/tools/cmd/debug/comet.go b/tools/cmd/debug/comet.go index 538402fac..4d61222f8 100644 --- a/tools/cmd/debug/comet.go +++ b/tools/cmd/debug/comet.go @@ -124,9 +124,3 @@ func downloadCometGenesis(_ *cobra.Command, args []string) { check(err) } } - -func checkf(err error, format string, otherArgs ...interface{}) { - if err != nil { - fatalf(format+": %v", append(otherArgs, err)...) - } -} diff --git a/tools/cmd/debug/heal_anchor.go b/tools/cmd/debug/heal_anchor.go new file mode 100644 index 000000000..c0412aded --- /dev/null +++ b/tools/cmd/debug/heal_anchor.go @@ -0,0 +1,193 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "strconv" + "strings" + "time" + + "github.com/spf13/cobra" + "gitlab.com/accumulatenetwork/accumulate/exp/apiutil" + "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" + "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" +) + +var cmdHealAnchor = &cobra.Command{ + Use: "anchor [network] [txid or part→part (optional) [sequence number (optional)]]", + Short: "Heal anchoring", + Args: cobra.RangeArgs(1, 3), + Run: healAnchor, +} + +func init() { + cmdHeal.AddCommand(cmdHealAnchor) +} + +func healAnchor(_ *cobra.Command, args []string) { + ctx, cancel, _ := api.ContextWithBatchData(context.Background()) + defer cancel() + + // We should be able to use only the p2p client but it doesn't work well for + // some reason + C1 := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(args[0])) + C1.Client.Timeout = time.Hour + + ni, err := C1.NodeInfo(ctx, api.NodeInfoOptions{}) + check(err) + + node, err := p2p.New(p2p.Options{ + Network: ni.Network, + // BootstrapPeers: api.BootstrapServers, + BootstrapPeers: apiutil.MainnetAddrs, + }) + checkf(err, "start p2p node") + defer func() { _ = node.Close() }() + + fmt.Fprintf(os.Stderr, "We are %v\n", node.ID()) + + fmt.Fprintln(os.Stderr, "Waiting for addresses") + time.Sleep(time.Second) + + var net *healing.NetworkInfo + if cachedScan != "" { + data, err := os.ReadFile(cachedScan) + check(err) + check(json.Unmarshal(data, &net)) + } + + // Use a hack dialer that uses the API for peer discovery + router := new(routing.MessageRouter) + C2 := &message.Client{ + Transport: &message.RoutedTransport{ + Network: ni.Network, + // Dialer: node.DialNetwork(), + Dialer: &apiutil.StaticDialer{ + Scan: net, + Nodes: C1, + Dialer: node.DialNetwork(), + }, + Router: router, + }, + } + + if cachedScan == "" { + net, err = healing.ScanNetwork(ctx, C2) + check(err) + } + + router.Router, err = routing.NewStaticRouter(net.Status.Routing, nil) + check(err) + + if len(args) > 1 { + txid, err := url.ParseTxID(args[1]) + if err == nil { + r, err := api.Querier2{Querier: C2}.QueryTransaction(ctx, txid, nil) + check(err) + if r.Sequence == nil { + fatalf("%v is not sequenced", txid) + } + srcId, _ := protocol.ParsePartitionUrl(r.Sequence.Source) + dstId, _ := protocol.ParsePartitionUrl(r.Sequence.Destination) + healSingleAnchor(ctx, C1, C2, net, srcId, dstId, r.Sequence.Number, txid, nil) + return + } + + parts := strings.Split(args[1], "→") + if len(parts) != 2 { + fatalf("invalid transaction ID or sequence specifier: %q", args[1]) + } + srcId, dstId := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]) + srcUrl := protocol.PartitionUrl(srcId) + dstUrl := protocol.PartitionUrl(dstId) + + var seqNo uint64 + if len(args) > 2 { + seqNo, err = strconv.ParseUint(args[2], 10, 64) + check(err) + } + + if seqNo > 0 { + healSingleAnchor(ctx, C1, C2, net, srcId, dstId, seqNo, nil, map[[32]byte]*protocol.Transaction{}) + return + } + + ledger := getAccount[*protocol.AnchorLedger](C1, ctx, dstUrl.JoinPath(protocol.AnchorPool)) + healAnchorSequence(ctx, C1, C2, net, srcId, dstId, ledger.Anchor(srcUrl)) + return + } + +heal: + for _, dst := range net.Status.Network.Partitions { + dstUrl := protocol.PartitionUrl(dst.ID) + dstLedger := getAccount[*protocol.AnchorLedger](C1, ctx, dstUrl.JoinPath(protocol.AnchorPool)) + + for _, src := range net.Status.Network.Partitions { + // Anchors are always from and/or to the DN + if dst.Type != protocol.PartitionTypeDirectory && src.Type != protocol.PartitionTypeDirectory { + continue + } + + srcUrl := protocol.PartitionUrl(src.ID) + src2dst := dstLedger.Partition(srcUrl) + healAnchorSequence(ctx, C1, C2, net, src.ID, dst.ID, src2dst) + } + } + + // Heal continuously? + if healContinuous { + time.Sleep(time.Second) + goto heal + } +} + +func healAnchorSequence(ctx context.Context, C1 *jsonrpc.Client, C2 *message.Client, net *healing.NetworkInfo, srcId, dstId string, src2dst *protocol.PartitionSyntheticLedger) { + srcUrl := protocol.PartitionUrl(srcId) + dstUrl := protocol.PartitionUrl(dstId) + + ids, txns := findPendingAnchors(ctx, C2, api.Querier2{Querier: C2}, net, srcUrl, dstUrl, true) + src2dst.Pending = append(src2dst.Pending, ids...) + + for i, txid := range src2dst.Pending { + healSingleAnchor(ctx, C1, C2, net, srcId, dstId, src2dst.Delivered+1+uint64(i), txid, txns) + } +} + +func healSingleAnchor(ctx context.Context, C1 *jsonrpc.Client, C2 *message.Client, net *healing.NetworkInfo, srcId, dstId string, seqNum uint64, txid *url.TxID, txns map[[32]byte]*protocol.Transaction) { +again: + err := healing.HealAnchor(ctx, healing.HealAnchorArgs{ + Client: C2.ForAddress(nil), + Querier: C2, + Submitter: C2, + NetInfo: net, + Known: txns, + Pretend: pretend, + Wait: waitForTxn, + }, healing.SequencedInfo{ + Source: srcId, + Destination: dstId, + Number: seqNum, + ID: txid, + }) + if errors.Is(err, healing.ErrRetry) { + slog.Error("Anchor still pending after 10 attempts, retrying") + goto again + } + check(err) +} diff --git a/tools/cmd/debug/heal_common.go b/tools/cmd/debug/heal_common.go new file mode 100644 index 000000000..b6e7b913e --- /dev/null +++ b/tools/cmd/debug/heal_common.go @@ -0,0 +1,52 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package main + +import ( + "context" + + "github.com/spf13/cobra" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" + "gitlab.com/accumulatenetwork/accumulate/protocol" +) + +var cmdHeal = &cobra.Command{ + Use: "heal", +} + +func init() { + cmd.AddCommand(cmdHeal) + + cmdHeal.PersistentFlags().StringVar(&cachedScan, "cached-scan", "", "A cached network scan") + cmdHeal.PersistentFlags().BoolVarP(&pretend, "pretend", "n", false, "Do not submit envelopes, only scan") + cmdHeal.PersistentFlags().BoolVar(&waitForTxn, "wait", true, "Wait for the message to finalize") + cmdHeal.PersistentFlags().BoolVar(&healContinuous, "continuous", false, "Run healing in a loop every second") + + _ = cmdHeal.MarkFlagFilename("cached-scan", ".json") +} + +// resolveSeq resolves an anchor or synthetic message (a sequenced message). If +// the client's address is non-nil, the query will be sent to that address. +// Otherwise, all of the source partition's nodes will be queried in order until +// one responds. +func resolveSeq[T messaging.Message](ctx context.Context, client message.AddressedClient, net *healing.NetworkInfo, srcId, dstId string, seqNum uint64, anchor bool) *api.MessageRecord[T] { + r, err := healing.ResolveSequenced[T](ctx, client, net, srcId, dstId, seqNum, anchor) + check(err) + return r +} + +// getAccount fetches the given account. +func getAccount[T protocol.Account](C api.Querier, ctx context.Context, u *url.URL) T { + var v T + _, err := api.Querier2{Querier: C}.QueryAccountAs(ctx, u, nil, &v) + checkf(err, "get %v", u) + return v +} diff --git a/tools/cmd/debug/heal_synth.go b/tools/cmd/debug/heal_synth.go index d7bc1e031..43cb4c10a 100644 --- a/tools/cmd/debug/heal_synth.go +++ b/tools/cmd/debug/heal_synth.go @@ -8,197 +8,156 @@ package main import ( "context" + "encoding/json" "fmt" - "net/url" "os" - "strconv" - "strings" + "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/spf13/cobra" + "gitlab.com/accumulatenetwork/accumulate/exp/apiutil" "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" - v2 "gitlab.com/accumulatenetwork/accumulate/internal/api/v2" - "gitlab.com/accumulatenetwork/accumulate/internal/node/config" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" - client "gitlab.com/accumulatenetwork/accumulate/pkg/client/api/v2" - "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" "gitlab.com/accumulatenetwork/accumulate/protocol" - "gitlab.com/accumulatenetwork/accumulate/test/testing" + "golang.org/x/exp/slog" ) var cmdHealSynth = &cobra.Command{ - Use: "heal-synth [network] [server]", + Use: "synth [server]", Short: "Fixup synthetic transactions", - Args: cobra.ExactArgs(2), + Args: cobra.ExactArgs(1), Run: healSynth, } -var flagHealSynth = struct { - Peer string -}{} - func init() { - cmd.AddCommand(cmdHealSynth) - cmdHealSynth.Flags().StringVar(&flagHealSynth.Peer, "peer", "", "Query a specific peer") + cmdHeal.AddCommand(cmdHealSynth) + cmdHealSynth.Flags().StringVar(&usePeer, "peer", "", "Query a specific peer") } func healSynth(_ *cobra.Command, args []string) { - testing.EnableDebugFeatures() - c, err := client.New(args[1]) - check(err) + // testing.EnableDebugFeatures() + + ctx, cancel, _ := api.ContextWithBatchData(cmd.Context()) + defer cancel() + + C1 := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(args[0])) + Q := api.Querier2{Querier: C1} + C1.Client.Timeout = time.Hour - desc, err := c.Describe(context.Background()) + ni, err := C1.NodeInfo(ctx, api.NodeInfoOptions{}) check(err) + var net *healing.NetworkInfo + if cachedScan != "" { + data, err := os.ReadFile(cachedScan) + check(err) + check(json.Unmarshal(data, &net)) + } + node, err := p2p.New(p2p.Options{ - Network: args[0], - BootstrapPeers: api.BootstrapServers, + Network: ni.Network, + // BootstrapPeers: api.BootstrapServers, + BootstrapPeers: apiutil.MainnetAddrs, }) check(err) defer func() { _ = node.Close() }() fmt.Printf("We are %v\n", node.ID()) + fmt.Fprintln(os.Stderr, "Waiting for addresses") + time.Sleep(time.Second) + router := new(routing.MessageRouter) - c2 := &message.Client{ + C2 := (&message.Client{ Transport: &message.RoutedTransport{ - Network: args[0], - Dialer: node.DialNetwork(), - Router: router, + Network: ni.Network, + // Dialer: node.DialNetwork(), + Dialer: &apiutil.StaticDialer{ + Scan: net, + Nodes: C1, + Dialer: node.DialNetwork(), + }, + Router: router, }, + }).ForAddress(nil) + Q.Querier = C2 + + if usePeer != "" { + pid, err := peer.Decode(usePeer) + check(err) + C2 = C2.ForPeer(pid) } - router.Router, err = routing.NewStaticRouter(desc.Values.Routing, nil) + + if cachedScan == "" { + net, err = healing.ScanNetwork(ctx, C1) + check(err) + } + + router.Router, err = routing.NewStaticRouter(net.Status.Routing, nil) check(err) +heal: synths := map[string]*protocol.SyntheticLedger{} - for _, part := range desc.Values.Network.Partitions { + for _, dst := range net.Status.Network.Partitions { // Get synthetic ledger - req := new(v2.GeneralQuery) - req.Url = protocol.PartitionUrl(part.ID).JoinPath(protocol.Synthetic) - synth := new(protocol.SyntheticLedger) - res := new(v2.ChainQueryResponse) - res.Data = synth - err = c.RequestAPIv2(context.Background(), "query", req, res) + var synth *protocol.SyntheticLedger + u := protocol.PartitionUrl(dst.ID).JoinPath(protocol.Synthetic) + _, err = Q.QueryAccountAs(ctx, u, nil, &synth) check(err) - synths[part.ID] = synth - - for _, src := range synth.Sequence { - for _, txid := range src.Pending { - req.Url = txid.AsUrl() - res := new(v2.TransactionQueryResponse) - err = c.RequestAPIv2(context.Background(), "query", req, res) - check(err) - - xreq := new(v2.ExecuteRequest) - xreq.Envelope = new(messaging.Envelope) - xreq.Envelope.Transaction = []*protocol.Transaction{res.Transaction} - var partSig *protocol.PartitionSignature - for _, sig := range res.Signatures { - sig, ok := sig.(*protocol.PartitionSignature) - if ok { - partSig = sig - xreq.Envelope.Signatures = []protocol.Signature{sig} - } - } - - p := c2.Private() - if flagHealSynth.Peer != "" { - pid, err := peer.Decode(flagHealSynth.Peer) - check(err) - p = c2.ForPeer(pid).Private() - } - - // Get a signature - r, err := p.Sequence(context.Background(), partSig.SourceNetwork.JoinPath(protocol.Synthetic), partSig.DestinationNetwork, partSig.SequenceNumber) - check(err) - var note string - for _, sigs := range r.Signatures.Records { - for _, sig := range sigs.Signatures.Records { - if sig, ok := sig.Message.(*messaging.SignatureMessage); ok { - switch sig := sig.Signature.(type) { - case protocol.KeySignature: - xreq.Envelope.Signatures = append(xreq.Envelope.Signatures, sig) - case *protocol.ReceiptSignature: - if !res.Status.GotDirectoryReceipt { - note = " with DN receipt" - xreq.Envelope.Signatures = append(xreq.Envelope.Signatures, sig) - } - } - } - } - } - - fmt.Printf("Resubmitting %v%s\n", txid, note) - // b, _ := json.Marshal(xreq.Envelope) - // fmt.Printf("%s\n", b) - // return - - xres, err := c.ExecuteDirect(context.Background(), xreq) - check(err) - if xres.Message != "" { - fmt.Fprintf(os.Stderr, "Warning: %s\n", xres.Message) - } + synths[dst.ID] = synth + + for _, src2dst := range synth.Sequence { + srcId, _ := protocol.ParsePartitionUrl(src2dst.Url) + + for i := range src2dst.Pending { + resubmitByNumber(ctx, C2, net, srcId, dst.ID, src2dst.Delivered+1+uint64(i)) } } } // Check produced vs received - for i, a := range desc.Values.Network.Partitions { - for _, b := range desc.Values.Network.Partitions[i:] { + for i, a := range net.Status.Network.Partitions { + for _, b := range net.Status.Network.Partitions[i:] { ab := synths[a.ID].Partition(protocol.PartitionUrl(b.ID)) ba := synths[b.ID].Partition(protocol.PartitionUrl(a.ID)) for i := ba.Received + 1; i <= ab.Produced; i++ { - resubmitByNumber(desc, c, a.ID, b.ID, i, false) + resubmitByNumber(ctx, C2, net, a.ID, b.ID, i) } if a == b { continue } for i := ab.Received + 1; i <= ba.Produced; i++ { - resubmitByNumber(desc, c, b.ID, a.ID, i, false) + resubmitByNumber(ctx, C2, net, b.ID, a.ID, i) } } } -} -func resubmitByNumber(desc *v2.DescriptionResponse, c *client.Client, source, destination string, number uint64, anchor bool) { - // Get a client for the destination partition - var d *client.Client - for _, p := range desc.Network.Partitions { - if !strings.EqualFold(p.Id, destination) { - continue - } - if len(p.Nodes) == 0 { - fatalf("no nodes for %v", p.Id) - } - u, err := url.Parse(p.Nodes[0].Address) - check(err) - port, err := strconv.ParseUint(u.Port(), 10, 16) - check(err) - d, err = client.New(fmt.Sprintf("http://%s:%d", u.Hostname(), port+config.PortOffsetAccumulateApi.GetEnumValue())) - check(err) + // Heal continuously? + if healContinuous { + time.Sleep(time.Second) + goto heal } +} - // Query the synthetic transaction - req := new(v2.SyntheticTransactionRequest) - req.Source = protocol.PartitionUrl(source) - req.Destination = protocol.PartitionUrl(destination) - req.SequenceNumber = number - req.Anchor = anchor - res, err := c.QuerySynth(context.Background(), req) - check(err) - - // Submit the synthetic transaction directly to the destination partition - fmt.Printf("Resubmitting %v\n", res.Txid) - xreq := new(v2.ExecuteRequest) - xreq.Envelope = new(messaging.Envelope) - xreq.Envelope.Transaction = []*protocol.Transaction{res.Transaction} - xreq.Envelope.Signatures = res.Signatures - xres, err := d.ExecuteLocal(context.Background(), xreq) - check(err) - if xres.Message != "" { - fmt.Fprintf(os.Stderr, "Warning: %s\n", xres.Message) +func resubmitByNumber(ctx context.Context, C2 message.AddressedClient, net *healing.NetworkInfo, source, destination string, number uint64) { + err := healing.HealSynthetic(ctx, healing.HealSyntheticArgs{ + Client: C2, + Querier: C2, + Submitter: C2, + NetInfo: net, + Pretend: pretend, + Wait: waitForTxn, + }, healing.SequencedInfo{ + Source: source, + Destination: destination, + Number: number, + }) + if err != nil { + slog.Error("Failed to heal", "source", source, "destination", destination, "number", number, "error", err) } } diff --git a/tools/cmd/debug/heal_test.go b/tools/cmd/debug/heal_test.go index fe07b7926..e28984e9c 100644 --- a/tools/cmd/debug/heal_test.go +++ b/tools/cmd/debug/heal_test.go @@ -12,6 +12,7 @@ import ( "encoding/json" "fmt" "testing" + "time" abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/rpc/client/http" @@ -78,6 +79,106 @@ func TestHealSynth(t *testing.T) { } } +func TestHealQueryAnchor(t *testing.T) { + // t.Skip("Manual") + + peer, err := peer.Decode("12D3KooWAgrBYpWEXRViTnToNmpCoC3dvHdmR6m1FmyKjDn1NYpj") + require.NoError(t, err) + + var mainnetAddrs = func() []multiaddr.Multiaddr { + s := []string{ + "/dns/apollo-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWAgrBYpWEXRViTnToNmpCoC3dvHdmR6m1FmyKjDn1NYpj", + "/dns/yutu-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWDqFDwjHEog1bNbxai2dKSaR1aFvq2LAZ2jivSohgoSc7", + "/dns/chandrayaan-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWHzjkoeAqe7L55tAaepCbMbhvNu9v52ayZNVQobdEE1RL", + "/ip4/116.202.214.38/tcp/16593/p2p/12D3KooWBkJQiuvotpMemWBYfAe4ctsVHi7fLvT8RT83oXJ5dsgV", + "/ip4/83.97.19.82/tcp/16593/p2p/12D3KooWHSbqS6K52d4ReauHAg4n8MFbAKkdEAae2fZXnzRYi9ce", + "/ip4/206.189.97.165/tcp/16593/p2p/12D3KooWHyA7zgAVqGvCBBJejgvKzv7DQZ3LabJMWqmCQ9wFbT3o", + "/ip4/144.76.105.23/tcp/16593/p2p/12D3KooWS2Adojqun5RV1Xy4k6vKXWpRQ3VdzXnW8SbW7ERzqKie", + "/ip4/18.190.77.236/tcp/16593/p2p/12D3KooWP1d9vUJCzqX5bTv13tCHmVssJrgK3EnJCC2C5Ep2SXbS", + "/ip4/3.28.207.55/tcp/16593/p2p/12D3KooWEzhg3CRvC3xdrUBFsWETF1nG3gyYfEjx4oEJer95y1Rk", + "/ip4/38.135.195.81/tcp/16593/p2p/12D3KooWDWCHGAyeUWdP8yuuSYvMoUfaPoGu4p3gJb51diqNQz6j", + // "/ip4/50.17.246.3/tcp/16593/p2p/12D3KooWKkNsxkHJqvSje2viyqKVxtqvbTpFrbASD3q1uv6td1pW", + "/dns/validator-eu01.acme.sphereon.com/tcp/16593/p2p/12D3KooWKYTWKJ5jeuZmbbwiN7PoinJ2yJLoQtZyfWi2ihjBnSUR", + "/ip4/35.86.120.53/tcp/16593/p2p/12D3KooWKJuspMDC5GXzLYJs9nHwYfqst9QAW4m5FakXNHVMNiq7", + "/ip4/65.109.48.173/tcp/16593/p2p/12D3KooWHkUtGcHY96bNavZMCP2k5ps5mC7GrF1hBC1CsyGJZSPY", + "/dns/accumulate.detroitledger.tech/tcp/16593/p2p/12D3KooWNe1QNh5mKAa8iAEP8vFwvmWFxaCLNcAdE1sH38Bz8sc9", + "/ip4/3.135.9.97/tcp/16593/p2p/12D3KooWEQG3X528Ct2Kd3kxhv6WZDBqaAoEw7AKiPoK1NmWJgx1", + // "/ip4/3.86.85.133/tcp/16593/p2p/12D3KooWJvReA1SuLkppyXKXq6fifVPLqvNtzsvPUqagVjvYe7qe", + "/ip4/193.35.56.176/tcp/16593/p2p/12D3KooWJevZUFLqN7zAamDh2EEYNQZPvxGFwiFVyPXfuXZNjg1J", + "/ip4/35.177.70.195/tcp/16593/p2p/12D3KooWPzpRp1UCu4nvXT9h8jKvmBmCADrMnoF72DrEbUrWrB2G", + "/ip4/3.99.81.122/tcp/16593/p2p/12D3KooWLL5kAbD7nhv6CM9x9L1zjxSnc6hdMVKcsK9wzMGBo99X", + "/ip4/34.219.75.234/tcp/16593/p2p/12D3KooWKHjS5nzG9dipBXn31pYEnfa8g5UzvkSYEsuiukGHzPvt", + "/ip4/3.122.254.53/tcp/16593/p2p/12D3KooWRU8obVzgfw6TsUHjoy2FDD3Vd7swrPNTM7DMFs8JG4dx", + "/ip4/35.92.228.236/tcp/16593/p2p/12D3KooWQqMqbyJ2Zay9KHeEDgDMAxQpKD1ypiBX5ByQAA2XpsZL", + "/ip4/3.135.184.194/tcp/16593/p2p/12D3KooWHcxyiE3AGdPnhtj87tByfLnJZVR6mLefadWccbMByrBa", + "/ip4/18.133.170.113/tcp/16593/p2p/12D3KooWFbWY2NhBEWTLHUCwwPmNHm4BoJXbojnrJJfuDCVoqrFY", + // "/ip4/44.204.224.126/tcp/16593/p2p/12D3KooWAiJJxdgsB39up5h6fz6TSfBz4HsLKTFiBXUrbwA8o54m", + "/ip4/35.92.21.90/tcp/16593/p2p/12D3KooWLTV3pTN2NbKeFeseCGHyMXuAkQv68KfCeK4uqJzJMfhZ", + "/ip4/3.99.166.147/tcp/16593/p2p/12D3KooWGYUf93iYWsUibSvKdxsYUY1p7fC1nQotCpUcDXD1ABvR", + "/ip4/16.171.4.135/tcp/16593/p2p/12D3KooWEMpAxKnXJPkcEXpDmrnjrZ5iFMZvvQtimmTTxuoRGkXV", + "/ip4/54.237.244.42/tcp/16593/p2p/12D3KooWLoMkrgW862Gs152jLt6FiZZs4GkY24Su4QojnvMoSNaQ", + // "/ip4/3.238.124.43/tcp/16593/p2p/12D3KooWJ8CA8pacTnKWVgBSEav4QG1zJpyeSSME47RugpDUrZp8", + "/ip4/13.53.125.115/tcp/16593/p2p/12D3KooWBJk52fQExXHWhFNk692hP7JvTxNTvUMdVne8tbJ3DBf3", + "/ip4/13.59.241.224/tcp/16593/p2p/12D3KooWKjYKqg2TgUSLq8CZAP8G6LhjXUWTcQBd9qYL2JHug9HW", + "/ip4/18.168.202.86/tcp/16593/p2p/12D3KooWDiKGbUZg1rB5EufRCkRPiDCEPMjyvTfTVR9qsKVVkcuC", + "/ip4/35.183.112.161/tcp/16593/p2p/12D3KooWFPKeXzKMd3jtoeG6ts6ADKmVV8rVkXR9k9YkQPgpLzd6", + } + addrs := make([]multiaddr.Multiaddr, len(s)) + for i, s := range s { + addr, err := multiaddr.NewMultiaddr(s) + if err != nil { + panic(err) + } + addrs[i] = addr + } + return addrs + }() + + node, err := p2p.New(p2p.Options{ + Network: "MainNet", + // BootstrapPeers: api.BootstrapServers, + BootstrapPeers: mainnetAddrs, + }) + require.NoError(t, err) + defer func() { _ = node.Close() }() + + fmt.Printf("We are %v\n", node.ID()) + time.Sleep(time.Second) + + // fmt.Println("Waiting for a live network service") + // svcAddr, err := api.ServiceTypeNetwork.AddressFor(protocol.Directory).MultiaddrFor("MainNet") + // require.NoError(t, err) + // require.NoError(t, node.WaitForService(context.Background(), svcAddr)) + + router := new(routing.MessageRouter) + client := &message.Client{ + Transport: &message.RoutedTransport{ + Network: "MainNet", + Dialer: node.DialNetwork(), + Router: router, + }, + } + // ns, err := client.NetworkStatus(context.Background(), api.NetworkStatusOptions{}) + // require.NoError(t, err) + // router.Router, err = routing.NewStaticRouter(ns.Routing, nil) + // require.NoError(t, err) + + r, err := client.ForPeer(peer).Private().Sequence(context.Background(), + protocol.PartitionUrl("Apollo").JoinPath(protocol.Synthetic), + protocol.PartitionUrl("Apollo"), + 3044) + require.NoError(t, err) + b, err := json.Marshal(r.Message) + require.NoError(t, err) + fmt.Println(string(b)) + // for _, set := range r.Signatures.Records { + // for _, sig := range set.Signatures.Records { + // _ = sig + // fmt.Println("got signature") + // } + // } +} + func TestHealAnchors(t *testing.T) { t.Skip("Manual") diff --git a/tools/cmd/debug/main.go b/tools/cmd/debug/main.go index f920e690d..94b93f38b 100644 --- a/tools/cmd/debug/main.go +++ b/tools/cmd/debug/main.go @@ -14,6 +14,16 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/errors" ) +var ( + outputJSON bool + healContinuous bool + cachedScan string + verbose bool + pretend bool + waitForTxn bool + usePeer string +) + var cmd = &cobra.Command{ Use: "debug", Short: "Accumulate debug utilities", @@ -34,3 +44,9 @@ func check(err error) { fatalf("%+v", err) } } + +func checkf(err error, format string, otherArgs ...interface{}) { + if err != nil { + fatalf(format+": %v", append(otherArgs, err)...) + } +} diff --git a/tools/cmd/debug/network.go b/tools/cmd/debug/network.go new file mode 100644 index 000000000..135532d03 --- /dev/null +++ b/tools/cmd/debug/network.go @@ -0,0 +1,96 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "os" + "strconv" + "time" + + "github.com/multiformats/go-multiaddr" + "github.com/spf13/cobra" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/internal/node/config" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" +) + +var networkCmd = &cobra.Command{ + Use: "network", +} + +var networkScanCmd = &cobra.Command{ + Use: "scan [network]", + Short: "Scan the network for nodes", + Args: cobra.ExactArgs(1), + Run: scanNetwork, +} + +var networkScanNodeCmd = &cobra.Command{ + Use: "scan-node [address]", + Short: "Scan a node", + Args: cobra.ExactArgs(1), + Run: scanNode, +} + +func init() { + cmd.AddCommand(networkCmd) + networkCmd.AddCommand(networkScanCmd) + networkCmd.AddCommand(networkScanNodeCmd) + + networkCmd.PersistentFlags().BoolVarP(&outputJSON, "json", "j", false, "Output result as JSON") +} + +func scanNetwork(_ *cobra.Command, args []string) { + client := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(args[0])) + client.Client.Timeout = time.Hour + net, err := healing.ScanNetwork(context.Background(), client) + check(err) + + if outputJSON { + check(json.NewEncoder(os.Stdout).Encode(net)) + return + } + + for _, part := range net.Status.Network.Partitions { + fmt.Println(part.ID) + for _, peer := range net.Peers[part.ID] { + fmt.Printf(" %v\n", peer) + } + } +} + +func scanNode(_ *cobra.Command, args []string) { + u, err := url.Parse(args[0]) + checkf(err, "invalid URL") + port, err := strconv.ParseUint(u.Port(), 10, 64) + checkf(err, "invalid port") + + client := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(args[0])) + client.Client.Timeout = time.Hour + peer, err := healing.ScanNode(context.Background(), client) + check(err) + + p2pPort := port - uint64(config.PortOffsetAccumulateApi) + uint64(config.PortOffsetAccumulateP2P) + tcp, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", u.Hostname(), p2pPort)) + check(err) + udp, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/%d/quic", u.Hostname(), p2pPort)) + check(err) + peer.Addresses = []multiaddr.Multiaddr{tcp, udp} + + if outputJSON { + fmt.Fprintln(os.Stderr, peer.ID) + check(json.NewEncoder(os.Stdout).Encode(peer)) + return + } + + fmt.Printf(" %v\n", peer) +} diff --git a/tools/cmd/debug/node.go b/tools/cmd/debug/node.go index 10566dcd7..4be59fccd 100644 --- a/tools/cmd/debug/node.go +++ b/tools/cmd/debug/node.go @@ -16,8 +16,10 @@ import ( "github.com/fatih/color" "github.com/multiformats/go-multiaddr" "github.com/spf13/cobra" + "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" ) @@ -78,28 +80,26 @@ func checkNode(_ *cobra.Command, args []string) { check(err) _, err = pc.DialNetwork().Dial(ctx, svcAddr) if err == nil { - fmt.Println(color.GreenString("✔"), "Direct connection") + fmt.Println(color.GreenString("✔"), "Can connect") } else { - fmt.Println(color.RedString("🗴"), "Direct connection") + fmt.Println(color.RedString("🗴"), "Can connect") fmt.Println(err) } - check(pc.Close()) - // Indirect - pc, err = p2p.New(p2p.Options{ - Network: ni.Network, - Key: key, - BootstrapPeers: api.BootstrapServers, - }) + router, err := routing.NewStaticRouter(ns.Routing, nil) check(err) - time.Sleep(time.Second) - - _, err = pc.DialNetwork().Dial(ctx, svcAddr) + mc := &message.Client{Transport: &message.RoutedTransport{ + Network: ni.Network, + Dialer: pc.DialNetwork(), + Router: routing.MessageRouter{Router: router}, + }} + _, err = mc.NodeInfo(ctx, api.NodeInfoOptions{PeerID: ni.PeerID}) if err == nil { - fmt.Println(color.GreenString("✔"), "Indirect connection") + fmt.Println(color.GreenString("✔"), "Can query") } else { - fmt.Println(color.RedString("🗴"), "Indirect connection") + fmt.Println(color.RedString("🗴"), "Can query") fmt.Println(err) } + check(pc.Close()) } diff --git a/tools/cmd/debug/sequence.go b/tools/cmd/debug/sequence.go index 4e3747266..e3a9499be 100644 --- a/tools/cmd/debug/sequence.go +++ b/tools/cmd/debug/sequence.go @@ -8,12 +8,25 @@ package main import ( "context" + "encoding/json" + "fmt" + "os" + "strings" + "time" "github.com/fatih/color" "github.com/spf13/cobra" - "gitlab.com/accumulatenetwork/accumulate/internal/api/v2" - client "gitlab.com/accumulatenetwork/accumulate/pkg/client/api/v2" + "gitlab.com/accumulatenetwork/accumulate/exp/apiutil" + "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" ) var cmdSequence = &cobra.Command{ @@ -24,39 +37,79 @@ var cmdSequence = &cobra.Command{ func init() { cmd.AddCommand(cmdSequence) + cmdSequence.Flags().BoolVarP(&verbose, "verbose", "v", false, "More verbose output") + cmdSequence.PersistentFlags().StringVar(&cachedScan, "cached-scan", "", "A cached network scan") + } -func sequence(_ *cobra.Command, args []string) { - c, err := client.New(args[0]) +func sequence(cmd *cobra.Command, args []string) { + ctx, cancel, _ := api.ContextWithBatchData(cmd.Context()) + defer cancel() + + c := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(args[0])) + c.Client.Timeout = time.Hour + Q := api.Querier2{Querier: c} + + ni, err := c.NodeInfo(ctx, api.NodeInfoOptions{}) check(err) - desc, err := c.Describe(context.Background()) + node, err := p2p.New(p2p.Options{ + Network: ni.Network, + BootstrapPeers: api.BootstrapServers, + }) + check(err) + defer func() { _ = node.Close() }() + + fmt.Printf("We are %v\n", node.ID()) + + var net *healing.NetworkInfo + if cachedScan != "" { + data, err := os.ReadFile(cachedScan) + check(err) + check(json.Unmarshal(data, &net)) + } + + router := new(routing.MessageRouter) + c2 := &message.Client{ + Transport: &message.RoutedTransport{ + Network: ni.Network, + Dialer: &apiutil.StaticDialer{ + Scan: net, + Nodes: c, + Dialer: node.DialNetwork(), + }, + Router: router, + }, + } + Q.Querier = c2 + + ns, err := c.NetworkStatus(ctx, api.NetworkStatusOptions{Partition: protocol.Directory}) + check(err) + router.Router, err = routing.NewStaticRouter(ns.Routing, nil) check(err) anchors := map[string]*protocol.AnchorLedger{} synths := map[string]*protocol.SyntheticLedger{} bad := map[Dir]bool{} - for _, part := range desc.Values.Network.Partitions { + for _, part := range ns.Network.Partitions { // Get anchor ledger - req := new(api.GeneralQuery) - req.Url = protocol.PartitionUrl(part.ID).JoinPath(protocol.AnchorPool) - anchor := new(protocol.AnchorLedger) - res := new(api.ChainQueryResponse) - res.Data = anchor - err = c.RequestAPIv2(context.Background(), "query", req, res) + dst := protocol.PartitionUrl(part.ID) + var anchor *protocol.AnchorLedger + _, err = Q.QueryAccountAs(ctx, dst.JoinPath(protocol.AnchorPool), nil, &anchor) check(err) anchors[part.ID] = anchor // Get synthetic ledger - req.Url = protocol.PartitionUrl(part.ID).JoinPath(protocol.Synthetic) - synth := new(protocol.SyntheticLedger) - res.Data = synth - err = c.RequestAPIv2(context.Background(), "query", req, res) + var synth *protocol.SyntheticLedger + _, err = Q.QueryAccountAs(ctx, dst.JoinPath(protocol.Synthetic), nil, &synth) check(err) synths[part.ID] = synth // Check pending and received vs delivered for _, src := range anchor.Sequence { + ids, _ := findPendingAnchors(ctx, c2, Q, net, src.Url, dst, verbose) + src.Pending = append(src.Pending, ids...) + checkSequence1(part, src, bad, "anchors") } @@ -66,8 +119,8 @@ func sequence(_ *cobra.Command, args []string) { } // Check produced vs received - for i, a := range desc.Values.Network.Partitions { - for _, b := range desc.Values.Network.Partitions[i:] { + for i, a := range ns.Network.Partitions { + for _, b := range ns.Network.Partitions[i:] { checkSequence2(a, b, bad, "anchors", anchors[a.ID].Anchor(protocol.PartitionUrl(b.ID)), anchors[b.ID].Anchor(protocol.PartitionUrl(a.ID)), @@ -79,8 +132,8 @@ func sequence(_ *cobra.Command, args []string) { } } - for _, a := range desc.Values.Network.Partitions { - for _, b := range desc.Values.Network.Partitions { + for _, a := range ns.Network.Partitions { + for _, b := range ns.Network.Partitions { if !bad[Dir{From: a.ID, To: b.ID}] { color.Green("✔ %s → %s\n", a.ID, b.ID) } @@ -111,9 +164,70 @@ func checkSequence1(dst *protocol.PartitionInfo, src *protocol.PartitionSyntheti if len(src.Pending) > 0 { color.Red("🗴 %s → %s has %d pending %s\n", id, dst.ID, len(src.Pending), kind) bad[Dir{From: id, To: dst.ID}] = true + if verbose { + for _, id := range src.Pending { + fmt.Printf(" %v\n", id) + } + } } if src.Received > src.Delivered { color.Red("🗴 %s → %s has %d unprocessed %s\n", id, dst.ID, src.Received-src.Delivered, kind) bad[Dir{From: id, To: dst.ID}] = true } } + +func findPendingAnchors(ctx context.Context, C *message.Client, Q api.Querier2, net *healing.NetworkInfo, src, dst *url.URL, resolve bool) ([]*url.TxID, map[[32]byte]*protocol.Transaction) { + srcId, _ := protocol.ParsePartitionUrl(src) + dstId, _ := protocol.ParsePartitionUrl(dst) + + // Check how many have been received + var dstLedger *protocol.AnchorLedger + _, err := Q.QueryAccountAs(ctx, dst.JoinPath(protocol.AnchorPool), nil, &dstLedger) + checkf(err, "query %v → %v anchor ledger", srcId, dstId) + dstSrcLedger := dstLedger.Partition(src) + received := dstSrcLedger.Received + + // Check how many should have been sent + srcDstChain, err := Q.QueryChain(ctx, src.JoinPath(protocol.AnchorPool), &api.ChainQuery{Name: "anchor-sequence"}) + checkf(err, "query %v anchor sequence chain", srcId) + + if received >= srcDstChain.Count-1 { + return nil, nil + } + + // Non-verbose mode doesn't care about the actual IDs + if !resolve { + return make([]*url.TxID, srcDstChain.Count-received-1), nil + } + + var ids []*url.TxID + txns := map[[32]byte]*protocol.Transaction{} + for i := received + 1; i <= srcDstChain.Count; i++ { + var msg *api.MessageRecord[messaging.Message] + if net == nil { + slog.Info("Checking anchor", "source", src, "destination", dst, "number", i, "remaining", srcDstChain.Count-i) + msg, err = C.Private().Sequence(ctx, src.JoinPath(protocol.AnchorPool), dst, i) + checkf(err, "query %v → %v anchor #%d", srcId, dstId, i) + } else { + for _, peer := range net.Peers[strings.ToLower(srcId)] { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + slog.Info("Checking anchor", "source", src, "destination", dst, "number", i, "remaining", srcDstChain.Count-i, "peer", peer.ID) + msg, err = C.ForPeer(peer.ID).Private().Sequence(ctx, src.JoinPath(protocol.AnchorPool), dst, i) + if err == nil { + break + } + slog.Error("Failed to check anchor", "source", src, "destination", dst, "number", i, "remaining", srcDstChain.Count-i, "peer", peer.ID, "error", err) + } + if msg == nil { + fatalf("query %v → %v anchor #%d failed", srcId, dstId, i) + } + } + + ids = append(ids, msg.ID) + + txn := msg.Message.(*messaging.TransactionMessage) + txns[txn.Hash()] = txn.Transaction + } + return ids, txns +} diff --git a/tools/cmd/resend-anchor/Dockerfile b/tools/cmd/resend-anchor/Dockerfile index 8a8b788ba..7180b50c3 100644 --- a/tools/cmd/resend-anchor/Dockerfile +++ b/tools/cmd/resend-anchor/Dockerfile @@ -16,4 +16,4 @@ RUN apk add --no-cache bash curl COPY --from=build /go/bin/resend-anchor /go/bin/dlv /bin/ ENTRYPOINT ["resend-anchor"] -CMD ["heal", "--help"] \ No newline at end of file +CMD ["--help"] \ No newline at end of file diff --git a/tools/cmd/resend-anchor/heal.go b/tools/cmd/resend-anchor/heal.go deleted file mode 100644 index e0890ac8f..000000000 --- a/tools/cmd/resend-anchor/heal.go +++ /dev/null @@ -1,369 +0,0 @@ -// Copyright 2023 The Accumulate Authors -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -package main - -import ( - "context" - "encoding/hex" - "encoding/json" - "fmt" - "log" - "os" - "strings" - "time" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/spf13/cobra" - "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" - "gitlab.com/accumulatenetwork/accumulate/internal/core" - "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" - "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" - "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" - "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" - client "gitlab.com/accumulatenetwork/accumulate/pkg/client/api/v2" - "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" - "gitlab.com/accumulatenetwork/accumulate/pkg/url" - "gitlab.com/accumulatenetwork/accumulate/protocol" -) - -var healCmd = &cobra.Command{ - Use: "heal [network]", - Args: cobra.ExactArgs(1), - Run: heal, -} - -func init() { - cmd.AddCommand(healCmd) - healCmd.Flags().BoolVar(&healFlag.Continuous, "continuous", false, "Run healing in a loop every second") -} - -var healFlag = struct { - Continuous bool -}{} - -func heal(_ *cobra.Command, args []string) { - ctx, cancel, _ := api.ContextWithBatchData(context.Background()) - defer cancel() - - C2 := jsonrpc.NewClient(args[0]) - apiNode, err := C2.NodeInfo(ctx, api.NodeInfoOptions{}) - checkf(err, "query node info") - - status, err := C2.NetworkStatus(ctx, api.NetworkStatusOptions{}) - checkf(err, "query network status") - - node, err := p2p.New(p2p.Options{ - Network: apiNode.Network, - BootstrapPeers: api.BootstrapServers, - }) - checkf(err, "start p2p node") - defer func() { _ = node.Close() }() - - fmt.Printf("We are %v\n", node.ID()) - - router := new(routing.MessageRouter) - C := &message.Client{ - Transport: &message.RoutedTransport{ - Network: apiNode.Network, - Dialer: node.DialNetwork(), - Router: router, - }, - } - router.Router, err = routing.NewStaticRouter(status.Routing, nil) - check(err) - - peers := getPeers(C2, ctx) - -heal: - // Heal BVN -> DN - for _, part := range status.Network.Partitions { - if part.Type != protocol.PartitionTypeBlockValidator { - continue - } - - partUrl := protocol.PartitionUrl(part.ID) - ledger := getAccount[*protocol.AnchorLedger](C2, ctx, partUrl.JoinPath(protocol.AnchorPool)) - partLedger := ledger.Anchor(protocol.DnUrl()) - - for i, txid := range ledger.Anchor(protocol.DnUrl()).Pending { - healAnchor(C, C2, ctx, protocol.DnUrl(), partUrl, txid, partLedger.Delivered+1+uint64(i), peers[protocol.Directory]) - } - } - - // Heal DN -> BVN, DN -> DN - { - ledger := getAccount[*protocol.AnchorLedger](C2, ctx, protocol.DnUrl().JoinPath(protocol.AnchorPool)) - - for _, part := range status.Network.Partitions { - partUrl := protocol.PartitionUrl(part.ID) - partLedger := ledger.Anchor(partUrl) - for i, txid := range ledger.Anchor(partUrl).Pending { - healAnchor(C, C2, ctx, partUrl, protocol.DnUrl(), txid, partLedger.Delivered+1+uint64(i), peers[part.ID]) - } - } - } - - // Heal continuously? - if healFlag.Continuous { - time.Sleep(time.Second) - goto heal - } -} - -type PeerInfo struct { - api.ConsensusStatus - Key [32]byte - Operator *url.URL -} - -func (p *PeerInfo) String() string { - if p.Operator != nil { - return fmt.Sprintf("%v (%x)", p.Operator, p.Key) - } - return hex.EncodeToString(p.Key[:]) -} - -func getPeers(C2 *jsonrpc.Client, ctx context.Context) map[string]map[peer.ID]*PeerInfo { - apiNode, err := C2.NodeInfo(ctx, api.NodeInfoOptions{}) - checkf(err, "query node info") - - status, err := C2.NetworkStatus(ctx, api.NetworkStatusOptions{}) - checkf(err, "query network status") - - hash2key := map[[32]byte][32]byte{} - for _, val := range status.Network.Validators { - hash2key[val.PublicKeyHash] = *(*[32]byte)(val.PublicKey) - } - - peers := map[string]map[peer.ID]*PeerInfo{} - for _, part := range status.Network.Partitions { - peers[part.ID] = map[peer.ID]*PeerInfo{} - - fmt.Printf("Getting peers for %s\n", part.ID) - find := api.FindServiceOptions{ - Network: apiNode.Network, - Service: api.ServiceTypeConsensus.AddressFor(part.ID), - } - res, err := C2.FindService(ctx, find) - checkf(err, "find %s on %s", find.Service.String(), find.Network) - - for _, peer := range res { - fmt.Printf("Getting identity of %v\n", peer.PeerID) - info, err := C2.ConsensusStatus(ctx, api.ConsensusStatusOptions{NodeID: peer.PeerID.String(), Partition: part.ID}) - if err != nil { - fmt.Printf("%+v\n", err) - continue - } - - key, ok := hash2key[info.ValidatorKeyHash] - if !ok { - continue // Not a validator - } - pi := &PeerInfo{ - ConsensusStatus: *info, - Key: key, - } - peers[part.ID][peer.PeerID] = pi - - _, val, ok := status.Network.ValidatorByHash(info.ValidatorKeyHash[:]) - if ok { - pi.Operator = val.Operator - } - } - } - return peers -} - -func getLedger(c *client.Client, part *url.URL) *protocol.AnchorLedger { //nolint:unused - ledger := new(protocol.AnchorLedger) - res := new(client.ChainQueryResponse) - res.Data = ledger - req := new(client.GeneralQuery) - req.Url = part.JoinPath(protocol.AnchorPool) - err := c.RequestAPIv2(context.Background(), "query", req, res) - checkf(err, "query %s anchor ledger", part) - return ledger -} - -func healTx(g *core.GlobalValues, nodes map[string][]*NodeData, netClient *client.Client, srcUrl, dstUrl *url.URL, txid *url.TxID) { //nolint:unused - // dstId, _ := protocol.ParsePartitionUrl(dstUrl) - srcId, _ := protocol.ParsePartitionUrl(srcUrl) - - // Query the transaction - res, err := netClient.QueryTx(context.Background(), &client.TxnQuery{TxIdUrl: txid}) - if err != nil { - log.Printf("Failed to query %v: %v\n", txid, err) - return - } - - // Check if there are already enough transactions - if uint64(len(res.Status.AnchorSigners)) >= g.ValidatorThreshold(srcId) { - return // Already have enough signers - } - - fmt.Printf("Healing anchor %v\n", txid) - - // Mark which nodes have signed - signed := map[[32]byte]bool{} - for _, s := range res.Status.AnchorSigners { - signed[*(*[32]byte)(s)] = true - } - - // // Make a client for the destination - // dstClient := nodes[strings.ToLower(dstId)][0].AccumulateAPIForUrl(dstUrl) - - // Get a signature from each node that hasn't signed - for _, node := range nodes[strings.ToLower(srcId)] { - if signed[*(*[32]byte)(node.Info.PublicKey)] { - continue - } - - // Make a client for the source - srcClient := node.AccumulateAPIForUrl(srcUrl) - - // Query and execute the anchor - querySynthAndExecute(srcClient, netClient, srcUrl, dstUrl, res.Status.SequenceNumber, false) - } -} - -func getAccount[T protocol.Account](C api.Querier, ctx context.Context, u *url.URL) T { - var v T - _, err := api.Querier2{Querier: C}.QueryAccountAs(ctx, u, nil, &v) - checkf(err, "get %v", u) - return v -} - -func healAnchor(C *message.Client, C2 *jsonrpc.Client, ctx context.Context, srcUrl, dstUrl *url.URL, txid *url.TxID, seqNum uint64, peers map[peer.ID]*PeerInfo) { - fmt.Printf("Healing anchor %v\n", txid) - - dstId, ok := protocol.ParsePartitionUrl(dstUrl) - if !ok { - panic("not a partition: " + dstUrl.String()) - } - - // Query the transaction - res, err := api.Querier2{Querier: C2}.QueryTransaction(ctx, txid, nil) - checkf(err, "get %v", txid) - - // Mark which nodes have signed - signed := map[[32]byte]bool{} - for _, sigs := range res.Signatures.Records { - for _, sig := range sigs.Signatures.Records { - msg, ok := sig.Message.(*messaging.BlockAnchor) - if !ok { - continue - } - signed[*(*[32]byte)(msg.Signature.GetPublicKey())] = true - } - } - - theAnchorTxn := res.Message.Transaction - env := new(messaging.Envelope) - env.Transaction = []*protocol.Transaction{theAnchorTxn} - - // Get a signature from each node that hasn't signed - var bad []peer.ID - var gotPartSig bool - for peer, info := range peers { - if signed[info.Key] { - continue - } - - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - fmt.Printf("Querying %v for %v\n", peer, txid) - res, err := C.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(protocol.AnchorPool), dstUrl, seqNum) - if err != nil { - fmt.Printf("%+v\n", err) - bad = append(bad, peer) - continue - } - - myTxn, ok := res.Message.(*messaging.TransactionMessage) - if !ok { - err := fmt.Errorf("expected %v, got %v", messaging.MessageTypeTransaction, res.Message.Type()) - warnf(err, "%v gave us an anchor that is not a transaction", info) - continue - } - if !myTxn.Transaction.Equal(theAnchorTxn) { - err := fmt.Errorf("expected %x, got %x", theAnchorTxn.GetHash(), myTxn.Transaction.GetHash()) - warnf(err, "%v gave us an anchor that doesn't match what we expect", info) - if b, err := json.Marshal(theAnchorTxn); err != nil { - check(err) - } else { - fmt.Fprintf(os.Stderr, "Want: %s\n", b) - } - if b, err := json.Marshal(myTxn.Transaction); err != nil { - check(err) - } else { - fmt.Fprintf(os.Stderr, "Got: %s\n", b) - } - continue - } - - for _, sigs := range res.Signatures.Records { - for _, sig := range sigs.Signatures.Records { - msg, ok := sig.Message.(*messaging.SignatureMessage) - if !ok { - err := fmt.Errorf("expected %v, got %v", messaging.MessageTypeSignature, sig.Message.Type()) - warnf(err, "%v gave us a signature that is not a signature", info) - continue - } - - switch sig := msg.Signature.(type) { - case *protocol.PartitionSignature: - // We only want one partition signature - if gotPartSig { - continue - } - gotPartSig = true - - case protocol.UserSignature: - // Filter out bad signatures - if !sig.Verify(nil, theAnchorTxn.GetHash()) { - err := fmt.Errorf("invalid signature") - warnf(err, "%v gave us an invalid signature", info) - continue - } - - default: - err := fmt.Errorf("expected user signature, got %v", sig.Type()) - warnf(err, "%v gave us a signature that is not a signature", info) - continue - } - - env.Signatures = append(env.Signatures, msg.Signature) - } - } - } - - for _, peer := range bad { - fmt.Printf("Removing bad peer %v from the list of candidates\n", peer) - delete(peers, peer) - } - - // We should always have a partition signature, so there's only something to - // sent if we have more than 1 signature - if len(env.Signatures) == 1 { - fmt.Println("Nothing to send") - return - } - - fmt.Printf("Submitting %d signatures\n", len(env.Signatures)) - addr := api.ServiceTypeSubmit.AddressFor(dstId).Multiaddr() - sub, err := C.ForAddress(addr).Submit(ctx, env, api.SubmitOptions{}) - if err != nil { - fmt.Println(err) - return - } - for _, sub := range sub { - if !sub.Success { - fmt.Println(sub.Message) - } - } -}