diff --git a/internal/core/healing/anchors.go b/internal/core/healing/anchors.go index 9292aaab1..51c358b8c 100644 --- a/internal/core/healing/anchors.go +++ b/internal/core/healing/anchors.go @@ -24,6 +24,8 @@ import ( "gitlab.com/accumulatenetwork/accumulate/protocol" ) +var ErrRetry = fmt.Errorf("retry") + type HealAnchorArgs struct { Client message.AddressedClient Querier api.Querier @@ -35,6 +37,17 @@ type HealAnchorArgs struct { } func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error { + if args.NetInfo.Status.ExecutorVersion.V2VandenbergEnabled() { + return healAnchorV2(ctx, args, si) + } + return healAnchorV1(ctx, args, si) +} + +func healAnchorV2(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error { + panic("TODO") +} + +func healAnchorV1(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error { srcUrl := protocol.PartitionUrl(si.Source) dstUrl := protocol.PartitionUrl(si.Destination) @@ -268,5 +281,3 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro } return nil } - -var ErrRetry = fmt.Errorf("retry") diff --git a/internal/core/healing/synthetic.go b/internal/core/healing/synthetic.go index 45ac3139d..418e49ed3 100644 --- a/internal/core/healing/synthetic.go +++ b/internal/core/healing/synthetic.go @@ -66,11 +66,12 @@ func (h *Healer) HealSynthetic(ctx context.Context, args HealSyntheticArgs, si S slog.InfoContext(ctx, "Resubmitting", "source", si.Source, "destination", si.Destination, "number", si.Number, "id", r.Message.ID()) - // Build the receipt - receipt, err := h.buildSynthReceipt(ctx, args, si) - if err != nil { - return err - } + receipt := r.SourceReceipt + // // Build the receipt + // receipt, err := h.buildSynthReceipt(ctx, args, si) + // if err != nil { + // return err + // } // Submit the synthetic transaction directly to the destination partition msg := &messaging.SyntheticMessage{ @@ -130,43 +131,59 @@ func (h *Healer) HealSynthetic(ctx context.Context, args HealSyntheticArgs, si S return nil } - // Submit directly to an appropriate node - if c, ok := args.Submitter.(message.AddressedClient); ok && c.Address == nil { - for peer, info := range args.NetInfo.Peers[strings.ToLower(si.Destination)] { - if len(info.Addresses) > 0 { - args.Submitter = c.ForAddress(info.Addresses[0]).ForPeer(peer) - } else { - args.Submitter = c.ForPeer(peer) - } - break + submit := func(s api.Submitter) (error, bool) { + sub, err := s.Submit(ctx, env, api.SubmitOptions{}) + if err != nil { + slog.ErrorContext(ctx, "Submission failed", "error", err, "id", env.Messages[0].ID()) + return nil, false } - } + for _, sub := range sub { + if !sub.Success { + slog.ErrorContext(ctx, "Submission failed", "message", sub, "status", sub.Status, "id", sub.Status.TxID) + continue + } - sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{}) - if err != nil { - slog.ErrorContext(ctx, "Submission failed", "error", err, "id", env.Messages[0].ID()) - } - for _, sub := range sub { - if !sub.Success { - slog.ErrorContext(ctx, "Submission failed", "message", sub, "status", sub.Status, "id", sub.Status.TxID) - continue - } + slog.InfoContext(ctx, "Submission succeeded", "id", sub.Status.TxID) + if !args.Wait || dontWait[sub.Status.TxID.Hash()] { + continue + } - slog.InfoContext(ctx, "Submission succeeded", "id", sub.Status.TxID) - if !args.Wait || dontWait[sub.Status.TxID.Hash()] { - continue + err := waitFor(ctx, Q, sub.Status.TxID) + if err != nil && strings.HasSuffix(err.Error(), " is not a known directory anchor") { + return ErrRetry, false + } } - err := waitFor(ctx, Q, sub.Status.TxID) - if err != nil && strings.HasSuffix(err.Error(), " is not a known directory anchor") { - return ErrRetry + if args.Wait { + return waitFor(ctx, Q, si.ID), true } + return nil, true } - if args.Wait { - return waitFor(ctx, Q, si.ID) + // Submit directly to an appropriate node + switch submitter := args.Submitter.(type) { + // case message.AddressedClient: + + case *message.Client: + for peer, info := range args.NetInfo.Peers[strings.ToLower(si.Destination)] { + var s api.Submitter + if len(info.Addresses) > 0 { + s = submitter.ForAddress(info.Addresses[0]).ForPeer(peer) + } else { + s = submitter.ForPeer(peer) + } + slog.Info("Submitting to", "peer", peer) + err, ok := submit(s) + if ok || err != nil { + return err + } + } + return errors.UnknownError.With("failed to submit") + + default: + err, _ := submit(args.Submitter) + return err } - return nil } func waitFor(ctx context.Context, Q api.Querier, id *url.TxID) error { diff --git a/pkg/api/v3/message/transport.go b/pkg/api/v3/message/transport.go index 2d95b7adf..3df660b66 100644 --- a/pkg/api/v3/message/transport.go +++ b/pkg/api/v3/message/transport.go @@ -322,8 +322,12 @@ func (c *RoutedTransport) dial(ctx context.Context, addr multiaddr.Multiaddr, st // Return the error if it's a client error (e.g. misdial) return nil, errors.UnknownError.Wrap(err) - case errors.EncodingError.ErrorAs(err, &err2), - errors.StreamAborted.ErrorAs(err, &err2): + case errors.StreamAborted.ErrorAs(err, &err2): + // The other side hung up, try again. Historically this has been + // caused by encoding errors, which indicates a mismatch between the + // client and server, so it's possible retrying is a bad idea. + + case errors.EncodingError.ErrorAs(err, &err2): // If the error is an encoding issue, log it and return "internal error" if isMulti { multi.BadDial(ctx, addr, s, err) diff --git a/pkg/api/v3/p2p/dial/dialer.go b/pkg/api/v3/p2p/dial/dialer.go index aaa4b51fb..32c1ca027 100644 --- a/pkg/api/v3/p2p/dial/dialer.go +++ b/pkg/api/v3/p2p/dial/dialer.go @@ -106,7 +106,7 @@ func (d *dialer) BadDial(ctx context.Context, addr multiaddr.Multiaddr, s messag slog.InfoContext(ctx, "Bad dial", "peer", ss.peer, "address", addr, "error", err) - if errors.Is(err, errors.EncodingError) || errors.Is(err, errors.StreamAborted) { + if errors.Is(err, errors.EncodingError) /*|| errors.Is(err, errors.StreamAborted) */ { // Don't mark a peer bad if there's an encoding failure. Is this a good // idea? return false diff --git a/tools/cmd/debug/heal_common.go b/tools/cmd/debug/heal_common.go index 82910ba1f..fce946224 100644 --- a/tools/cmd/debug/heal_common.go +++ b/tools/cmd/debug/heal_common.go @@ -312,26 +312,30 @@ func (h *healer) submitLoop(wg *sync.WaitGroup) { // getAccount fetches the given account. func getAccount[T protocol.Account](ctx context.Context, q api.Querier, u *url.URL) T { - r, err := api.Querier2{Querier: q}.QueryAccount(ctx, u, nil) - checkf(err, "get %v", u) + for try := 0; try < 3; try++ { + r, err := api.Querier2{Querier: q}.QueryAccount(ctx, u, nil) + checkf(err, "get %v", u) - if r.LastBlockTime == nil { - fatalf("response for %v does not include a last block time", u) - } + if r.LastBlockTime == nil { + cmdutil.Warnf("response for %v does not include a last block time", u) + } - age := time.Since(*r.LastBlockTime) - if flagMaxResponseAge > 0 && age > flagMaxResponseAge { - fatalf("response for %v is too old (%v)", u, age) - } + age := time.Since(*r.LastBlockTime) + if flagMaxResponseAge > 0 && age > flagMaxResponseAge { + cmdutil.Warnf("response for %v is too old (%v)", u, age) + } - slog.InfoContext(ctx, "Got account", "url", u, "lastBlockAge", age.Round(time.Second)) + slog.InfoContext(ctx, "Got account", "url", u, "lastBlockAge", age.Round(time.Second)) - a := r.Account - b, ok := a.(T) - if !ok { - fatalf("%v is a %T not a %v", u, a, reflect.TypeOf(new(T)).Elem()) + a := r.Account + b, ok := a.(T) + if !ok { + cmdutil.Warnf("%v is a %T not a %v", u, a, reflect.TypeOf(new(T)).Elem()) + } + return b } - return b + fatalf("unable to fetch %v", u) + panic("unreachable") } func (h *healer) tryEach() api.Querier2 { diff --git a/tools/cmd/debug/heal_synth.go b/tools/cmd/debug/heal_synth.go index 2132f7460..d58cbc45c 100644 --- a/tools/cmd/debug/heal_synth.go +++ b/tools/cmd/debug/heal_synth.go @@ -37,17 +37,17 @@ func healSynth(cmd *cobra.Command, args []string) { h := &healer{ healSingle: func(h *healer, src, dst *protocol.PartitionInfo, num uint64, txid *url.TxID) { - srcUrl := protocol.PartitionUrl(src.ID) - dstUrl := protocol.PartitionUrl(dst.ID) + // srcUrl := protocol.PartitionUrl(src.ID) + // dstUrl := protocol.PartitionUrl(dst.ID) - // Pull chains - pullSynthDirChains(h) - pullSynthSrcChains(h, srcUrl) - pullSynthDstChains(h, dstUrl) + // // Pull chains + // pullSynthDirChains(h) + // pullSynthSrcChains(h, srcUrl) + // pullSynthDstChains(h, dstUrl) - // Pull accounts - pullSynthLedger(h, srcUrl) - pullSynthLedger(h, dstUrl) + // // Pull accounts + // pullSynthLedger(h, srcUrl) + // pullSynthLedger(h, dstUrl) // Heal healSingleSynth(h, src.ID, dst.ID, num, txid) @@ -56,10 +56,10 @@ func healSynth(cmd *cobra.Command, args []string) { srcUrl := protocol.PartitionUrl(src.ID) dstUrl := protocol.PartitionUrl(dst.ID) - // Pull chains - pullSynthDirChains(h) - pullSynthSrcChains(h, srcUrl) - pullSynthDstChains(h, dstUrl) + // // Pull chains + // pullSynthDirChains(h) + // pullSynthSrcChains(h, srcUrl) + // pullSynthDstChains(h, dstUrl) // Pull accounts pullAgain: @@ -169,7 +169,7 @@ func pullSynthDstChains(h *healer, part *url.URL) { } func pullSynthLedger(h *healer, part *url.URL) *protocol.SyntheticLedger { - check(h.light.PullAccountWithChains(h.ctx, part.JoinPath(protocol.Synthetic), skipSigChain)) + check(h.light.PullAccountWithChains(h.ctx, part.JoinPath(protocol.Synthetic), skipAllChains)) batch := h.light.OpenDB(false) defer batch.Discard() @@ -182,3 +182,7 @@ func pullSynthLedger(h *healer, part *url.URL) *protocol.SyntheticLedger { func skipSigChain(c *api.ChainRecord) bool { return c.Name != "signature" } + +func skipAllChains(*api.ChainRecord) bool { + return false +}