Skip to content

Commit

Permalink
Saving work
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Dec 6, 2024
1 parent e2f39cf commit 87da3e7
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 67 deletions.
15 changes: 13 additions & 2 deletions internal/core/healing/anchors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"gitlab.com/accumulatenetwork/accumulate/protocol"
)

var ErrRetry = fmt.Errorf("retry")

type HealAnchorArgs struct {
Client message.AddressedClient
Querier api.Querier
Expand All @@ -35,6 +37,17 @@ type HealAnchorArgs struct {
}

func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error {
if args.NetInfo.Status.ExecutorVersion.V2VandenbergEnabled() {
return healAnchorV2(ctx, args, si)
}
return healAnchorV1(ctx, args, si)
}

func healAnchorV2(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error {
panic("TODO")
}

func healAnchorV1(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error {
srcUrl := protocol.PartitionUrl(si.Source)
dstUrl := protocol.PartitionUrl(si.Destination)

Expand Down Expand Up @@ -268,5 +281,3 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro
}
return nil
}

var ErrRetry = fmt.Errorf("retry")
83 changes: 50 additions & 33 deletions internal/core/healing/synthetic.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ func (h *Healer) HealSynthetic(ctx context.Context, args HealSyntheticArgs, si S

slog.InfoContext(ctx, "Resubmitting", "source", si.Source, "destination", si.Destination, "number", si.Number, "id", r.Message.ID())

// Build the receipt
receipt, err := h.buildSynthReceipt(ctx, args, si)
if err != nil {
return err
}
receipt := r.SourceReceipt
// // Build the receipt
// receipt, err := h.buildSynthReceipt(ctx, args, si)
// if err != nil {
// return err
// }

// Submit the synthetic transaction directly to the destination partition
msg := &messaging.SyntheticMessage{
Expand Down Expand Up @@ -130,43 +131,59 @@ func (h *Healer) HealSynthetic(ctx context.Context, args HealSyntheticArgs, si S
return nil
}

// Submit directly to an appropriate node
if c, ok := args.Submitter.(message.AddressedClient); ok && c.Address == nil {
for peer, info := range args.NetInfo.Peers[strings.ToLower(si.Destination)] {
if len(info.Addresses) > 0 {
args.Submitter = c.ForAddress(info.Addresses[0]).ForPeer(peer)
} else {
args.Submitter = c.ForPeer(peer)
}
break
submit := func(s api.Submitter) (error, bool) {
sub, err := s.Submit(ctx, env, api.SubmitOptions{})
if err != nil {
slog.ErrorContext(ctx, "Submission failed", "error", err, "id", env.Messages[0].ID())
return nil, false
}
}
for _, sub := range sub {
if !sub.Success {
slog.ErrorContext(ctx, "Submission failed", "message", sub, "status", sub.Status, "id", sub.Status.TxID)
continue
}

sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{})
if err != nil {
slog.ErrorContext(ctx, "Submission failed", "error", err, "id", env.Messages[0].ID())
}
for _, sub := range sub {
if !sub.Success {
slog.ErrorContext(ctx, "Submission failed", "message", sub, "status", sub.Status, "id", sub.Status.TxID)
continue
}
slog.InfoContext(ctx, "Submission succeeded", "id", sub.Status.TxID)
if !args.Wait || dontWait[sub.Status.TxID.Hash()] {
continue
}

slog.InfoContext(ctx, "Submission succeeded", "id", sub.Status.TxID)
if !args.Wait || dontWait[sub.Status.TxID.Hash()] {
continue
err := waitFor(ctx, Q, sub.Status.TxID)
if err != nil && strings.HasSuffix(err.Error(), " is not a known directory anchor") {
return ErrRetry, false
}
}

err := waitFor(ctx, Q, sub.Status.TxID)
if err != nil && strings.HasSuffix(err.Error(), " is not a known directory anchor") {
return ErrRetry
if args.Wait {
return waitFor(ctx, Q, si.ID), true
}
return nil, true
}

if args.Wait {
return waitFor(ctx, Q, si.ID)
// Submit directly to an appropriate node
switch submitter := args.Submitter.(type) {
// case message.AddressedClient:

case *message.Client:
for peer, info := range args.NetInfo.Peers[strings.ToLower(si.Destination)] {
var s api.Submitter
if len(info.Addresses) > 0 {
s = submitter.ForAddress(info.Addresses[0]).ForPeer(peer)
} else {
s = submitter.ForPeer(peer)
}
slog.Info("Submitting to", "peer", peer)
err, ok := submit(s)
if ok || err != nil {
return err
}
}
return errors.UnknownError.With("failed to submit")

default:
err, _ := submit(args.Submitter)
return err
}
return nil
}

func waitFor(ctx context.Context, Q api.Querier, id *url.TxID) error {
Expand Down
8 changes: 6 additions & 2 deletions pkg/api/v3/message/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,12 @@ func (c *RoutedTransport) dial(ctx context.Context, addr multiaddr.Multiaddr, st
// Return the error if it's a client error (e.g. misdial)
return nil, errors.UnknownError.Wrap(err)

case errors.EncodingError.ErrorAs(err, &err2),
errors.StreamAborted.ErrorAs(err, &err2):
case errors.StreamAborted.ErrorAs(err, &err2):
// The other side hung up, try again. Historically this has been
// caused by encoding errors, which indicates a mismatch between the
// client and server, so it's possible retrying is a bad idea.

case errors.EncodingError.ErrorAs(err, &err2):
// If the error is an encoding issue, log it and return "internal error"
if isMulti {
multi.BadDial(ctx, addr, s, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/v3/p2p/dial/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (d *dialer) BadDial(ctx context.Context, addr multiaddr.Multiaddr, s messag

slog.InfoContext(ctx, "Bad dial", "peer", ss.peer, "address", addr, "error", err)

if errors.Is(err, errors.EncodingError) || errors.Is(err, errors.StreamAborted) {
if errors.Is(err, errors.EncodingError) /*|| errors.Is(err, errors.StreamAborted) */ {
// Don't mark a peer bad if there's an encoding failure. Is this a good
// idea?
return false
Expand Down
34 changes: 19 additions & 15 deletions tools/cmd/debug/heal_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,26 +312,30 @@ func (h *healer) submitLoop(wg *sync.WaitGroup) {

// getAccount fetches the given account.
func getAccount[T protocol.Account](ctx context.Context, q api.Querier, u *url.URL) T {
r, err := api.Querier2{Querier: q}.QueryAccount(ctx, u, nil)
checkf(err, "get %v", u)
for try := 0; try < 3; try++ {
r, err := api.Querier2{Querier: q}.QueryAccount(ctx, u, nil)
checkf(err, "get %v", u)

if r.LastBlockTime == nil {
fatalf("response for %v does not include a last block time", u)
}
if r.LastBlockTime == nil {
cmdutil.Warnf("response for %v does not include a last block time", u)
}

age := time.Since(*r.LastBlockTime)
if flagMaxResponseAge > 0 && age > flagMaxResponseAge {
fatalf("response for %v is too old (%v)", u, age)
}
age := time.Since(*r.LastBlockTime)
if flagMaxResponseAge > 0 && age > flagMaxResponseAge {
cmdutil.Warnf("response for %v is too old (%v)", u, age)
}

slog.InfoContext(ctx, "Got account", "url", u, "lastBlockAge", age.Round(time.Second))
slog.InfoContext(ctx, "Got account", "url", u, "lastBlockAge", age.Round(time.Second))

a := r.Account
b, ok := a.(T)
if !ok {
fatalf("%v is a %T not a %v", u, a, reflect.TypeOf(new(T)).Elem())
a := r.Account
b, ok := a.(T)
if !ok {
cmdutil.Warnf("%v is a %T not a %v", u, a, reflect.TypeOf(new(T)).Elem())
}
return b
}
return b
fatalf("unable to fetch %v", u)
panic("unreachable")
}

func (h *healer) tryEach() api.Querier2 {
Expand Down
32 changes: 18 additions & 14 deletions tools/cmd/debug/heal_synth.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ func healSynth(cmd *cobra.Command, args []string) {

h := &healer{
healSingle: func(h *healer, src, dst *protocol.PartitionInfo, num uint64, txid *url.TxID) {
srcUrl := protocol.PartitionUrl(src.ID)
dstUrl := protocol.PartitionUrl(dst.ID)
// srcUrl := protocol.PartitionUrl(src.ID)
// dstUrl := protocol.PartitionUrl(dst.ID)

// Pull chains
pullSynthDirChains(h)
pullSynthSrcChains(h, srcUrl)
pullSynthDstChains(h, dstUrl)
// // Pull chains
// pullSynthDirChains(h)
// pullSynthSrcChains(h, srcUrl)
// pullSynthDstChains(h, dstUrl)

// Pull accounts
pullSynthLedger(h, srcUrl)
pullSynthLedger(h, dstUrl)
// // Pull accounts
// pullSynthLedger(h, srcUrl)
// pullSynthLedger(h, dstUrl)

// Heal
healSingleSynth(h, src.ID, dst.ID, num, txid)
Expand All @@ -56,10 +56,10 @@ func healSynth(cmd *cobra.Command, args []string) {
srcUrl := protocol.PartitionUrl(src.ID)
dstUrl := protocol.PartitionUrl(dst.ID)

// Pull chains
pullSynthDirChains(h)
pullSynthSrcChains(h, srcUrl)
pullSynthDstChains(h, dstUrl)
// // Pull chains
// pullSynthDirChains(h)
// pullSynthSrcChains(h, srcUrl)
// pullSynthDstChains(h, dstUrl)

// Pull accounts
pullAgain:
Expand Down Expand Up @@ -169,7 +169,7 @@ func pullSynthDstChains(h *healer, part *url.URL) {
}

func pullSynthLedger(h *healer, part *url.URL) *protocol.SyntheticLedger {
check(h.light.PullAccountWithChains(h.ctx, part.JoinPath(protocol.Synthetic), skipSigChain))
check(h.light.PullAccountWithChains(h.ctx, part.JoinPath(protocol.Synthetic), skipAllChains))

batch := h.light.OpenDB(false)
defer batch.Discard()
Expand All @@ -182,3 +182,7 @@ func pullSynthLedger(h *healer, part *url.URL) *protocol.SyntheticLedger {
func skipSigChain(c *api.ChainRecord) bool {
return c.Name != "signature"
}

func skipAllChains(*api.ChainRecord) bool {
return false
}

0 comments on commit 87da3e7

Please sign in to comment.