diff --git a/internal/core/healing/anchors.go b/internal/core/healing/anchors.go index 04d6d92d1..ff14b66a2 100644 --- a/internal/core/healing/anchors.go +++ b/internal/core/healing/anchors.go @@ -8,7 +8,6 @@ package healing import ( "context" - "crypto/sha256" "encoding/hex" "fmt" "strings" @@ -16,7 +15,6 @@ import ( "github.com/multiformats/go-multiaddr" "gitlab.com/accumulatenetwork/accumulate/internal/api/private" - "gitlab.com/accumulatenetwork/accumulate/internal/logging" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/errors" @@ -27,13 +25,13 @@ import ( ) type HealAnchorArgs struct { - Client message.AddressedClient - Querier api.Querier - Submitter api.Submitter - NetInfo *NetworkInfo - Known map[[32]byte]*protocol.Transaction - Pretend bool - Wait bool + Client message.AddressedClient + Querier api.Querier + Submit func(...messaging.Message) error + NetInfo *NetworkInfo + Known map[[32]byte]*protocol.Transaction + Pretend bool + Wait bool } func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error { @@ -43,9 +41,6 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro if args.Querier == nil { args.Querier = args.Client } - if args.Submitter == nil { - args.Submitter = args.Client - } // If the message ID is not known, resolve it var theAnchorTxn *protocol.Transaction @@ -247,39 +242,25 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro } else { slog.InfoCtx(ctx, "Submitting signatures", "count", len(signatures)) - submit := func(env *messaging.Envelope) { - // addr := api.ServiceTypeSubmit.AddressFor(seq.Destination).Multiaddr() - sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{}) - if err != nil { - b, e := env.MarshalBinary() - if e == nil { - h := sha256.Sum256(b) - b = h[:] - } - slog.ErrorCtx(ctx, "Submission failed", "error", err, "id", env.Messages[0].ID(), "hash", logging.AsHex(b)) - } - for _, sub := range sub { - if sub.Success { - slog.InfoCtx(ctx, "Submission succeeded", "id", sub.Status.TxID) - } else { - slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status) - } - } - } - if args.NetInfo.Status.ExecutorVersion.V2Enabled() { for _, sig := range signatures { blk := &messaging.BlockAnchor{ Signature: sig.(protocol.KeySignature), Anchor: seq, } - submit(&messaging.Envelope{Messages: []messaging.Message{blk}}) + err = args.Submit(blk) } } else { - env := new(messaging.Envelope) - env.Transaction = []*protocol.Transaction{theAnchorTxn} - env.Signatures = signatures - submit(env) + msg := []messaging.Message{ + &messaging.TransactionMessage{Transaction: theAnchorTxn}, + } + for _, sig := range signatures { + msg = append(msg, &messaging.SignatureMessage{Signature: sig}) + } + err = args.Submit(msg...) + } + if err != nil { + return err } } diff --git a/tools/cmd/debug/heal_anchor.go b/tools/cmd/debug/heal_anchor.go index da63dc121..4c29922fd 100644 --- a/tools/cmd/debug/heal_anchor.go +++ b/tools/cmd/debug/heal_anchor.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/cobra" "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" "gitlab.com/accumulatenetwork/accumulate/pkg/url" "gitlab.com/accumulatenetwork/accumulate/protocol" "golang.org/x/exp/slog" @@ -72,13 +73,20 @@ func (h *healer) healSingleAnchor(srcId, dstId string, seqNum uint64, txid *url. var count int retry: err := healing.HealAnchor(h.ctx, healing.HealAnchorArgs{ - Client: h.C2.ForAddress(nil), - Querier: h.tryEach(), - Submitter: h.C2, - NetInfo: h.net, - Known: txns, - Pretend: pretend, - Wait: waitForTxn, + Client: h.C2.ForAddress(nil), + Querier: h.tryEach(), + NetInfo: h.net, + Known: txns, + Pretend: pretend, + Wait: waitForTxn, + Submit: func(m ...messaging.Message) error { + select { + case h.submit <- m: + return nil + case <-h.ctx.Done(): + return errors.NotReady.With("canceled") + } + }, }, healing.SequencedInfo{ Source: srcId, Destination: dstId, diff --git a/tools/cmd/debug/heal_common.go b/tools/cmd/debug/heal_common.go index 966f2925b..127fce047 100644 --- a/tools/cmd/debug/heal_common.go +++ b/tools/cmd/debug/heal_common.go @@ -17,6 +17,7 @@ import ( "reflect" "strconv" "strings" + "sync" "time" "github.com/spf13/cobra" @@ -34,6 +35,7 @@ import ( client "gitlab.com/accumulatenetwork/accumulate/pkg/client/api/v2" "gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue/bolt" "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" "gitlab.com/accumulatenetwork/accumulate/pkg/url" "gitlab.com/accumulatenetwork/accumulate/protocol" "golang.org/x/exp/slog" @@ -86,6 +88,8 @@ type healer struct { light *light.Client router routing.Router + submit chan []messaging.Message + accounts map[[32]byte]protocol.Account } @@ -171,6 +175,13 @@ func (h *healer) heal(args []string) { }, } + wg := new(sync.WaitGroup) + defer wg.Wait() + + h.submit = make(chan []messaging.Message) + wg.Add(1) + go h.submitLoop(wg) + if lightDb != "" { cv2, err := client.New(accumulate.ResolveWellKnownEndpoint(args[0], "v2")) check(err) @@ -258,6 +269,44 @@ func (h *healer) heal(args []string) { h.healSingle(h, parts[strings.ToLower(srcId)], parts[strings.ToLower(dstId)], seqNo, nil) } +func (h *healer) submitLoop(wg *sync.WaitGroup) { + defer wg.Done() + t := time.NewTicker(3 * time.Second) + defer t.Stop() + + var messages []messaging.Message + var stop bool + for !stop { + select { + case <-h.ctx.Done(): + stop = true + case msg := <-h.submit: + messages = append(messages, msg...) + if len(messages) < 50 { + continue + } + case <-t.C: + } + if len(messages) == 0 { + continue + } + + env := &messaging.Envelope{Messages: messages} + subs, err := h.C2.Submit(h.ctx, env, api.SubmitOptions{}) + messages = messages[:0] + if err != nil { + slog.ErrorCtx(h.ctx, "Submission failed", "error", err, "id", env.Messages[0].ID()) + } + for _, sub := range subs { + if sub.Success { + slog.InfoCtx(h.ctx, "Submission succeeded", "id", sub.Status.TxID) + } else { + slog.ErrorCtx(h.ctx, "Submission failed", "message", sub, "status", sub.Status) + } + } + } +} + // getAccount fetches the given account. func getAccount[T protocol.Account](ctx context.Context, q api.Querier, u *url.URL) T { r, err := api.Querier2{Querier: q}.QueryAccount(ctx, u, nil)