Skip to content

Commit

Permalink
Improve anchor healing [#3379]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 4, 2023
1 parent 632f986 commit 2501d94
Show file tree
Hide file tree
Showing 20 changed files with 1,338 additions and 547 deletions.
2 changes: 1 addition & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
*.exe
.git
*/Dockerfile
.test
.test
29 changes: 25 additions & 4 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@
"mode": "auto",
"program": "${workspaceFolder}/cmd/accumulated-http",
"args": [
"Kermit",
"MainNet",
"--debug",
"--http-listen=/ip4/0.0.0.0/tcp/26660",
"--peer=/dns/bootstrap.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWGJTh4aeF7bFnwo9sAYRujCkuVU1Cq8wNeTNGpFgZgXdg",
Expand Down Expand Up @@ -540,11 +540,14 @@
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/tools/cmd/resend-anchor",
"program": "${workspaceFolder}/tools/cmd/debug",
"cwd": "${workspaceFolder}",
"args": [
"heal",
"https://mainnet.accumulatenetwork.io/v3",
"anchor",
"MainNet",
"--cached-scan=${env:HOME}/.accumulate/cache/mainnet.json",
"Directory→Apollo", "70911"
]
},
{
Expand All @@ -558,9 +561,27 @@
"program": "${workspaceFolder}/tools/cmd/debug",
"cwd": "${workspaceFolder}",
"args": [
"heal-synth",
"heal",
"synth",
"mainnet",
"--cached-scan=${env:HOME}/.accumulate/cache/mainnet.json",
]
},
{
"name": "Heal single anchor",
"presentation": { "group": "99-Miscellaneous", },
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/tools/cmd/debug",
"cwd": "${workspaceFolder}",
"args": [
"heal",
"anchor",
"mainnet",
"Chandrayaan → Apollo",
// "acc://f18b9ddd70654849ce063581b7bcbc6947d4b763d7a2fa4668e6fd9129da0e23@dn.acme/anchors",
"--cached-scan=${env:HOME}/.accumulate/cache/mainnet.json",
]
},
{
Expand Down
3 changes: 3 additions & 0 deletions internal/api/routing/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func routeMessage(routeAccount func(*url.URL) (string, error), route *string, ms
case *messaging.SequencedMessage:
r, err = routeAccount(msg.Destination)

case *messaging.SyntheticMessage:
return routeMessage(routeAccount, route, msg.Message)

case *messaging.BlockAnchor:
return routeMessage(routeAccount, route, msg.Anchor)

Expand Down
309 changes: 309 additions & 0 deletions internal/core/healing/anchors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
// Copyright 2023 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package healing

import (
"context"
"encoding/hex"
"fmt"
"strings"
"time"

"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/network"
"gitlab.com/accumulatenetwork/accumulate/protocol"
"golang.org/x/exp/slog"
)

type HealAnchorArgs struct {
Client message.AddressedClient
Querier api.Querier
Submitter api.Submitter
NetInfo *NetworkInfo
Known map[[32]byte]*protocol.Transaction
Pretend bool
Wait bool
}

func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error {
srcUrl := protocol.PartitionUrl(si.Source)
dstUrl := protocol.PartitionUrl(si.Destination)

if args.Querier == nil {
args.Querier = args.Client
}
if args.Submitter == nil {
args.Submitter = args.Client
}

// If the message ID is not known, resolve it
var theAnchorTxn *protocol.Transaction
if si.ID == nil {
r, err := ResolveSequenced[*messaging.TransactionMessage](ctx, args.Client, args.NetInfo, si.Source, si.Destination, si.Number, true)
if err != nil {
return err
}
si.ID = r.ID
theAnchorTxn = r.Message.Transaction
}

// Fetch the transaction and signatures
var sigSets []*api.SignatureSetRecord
Q := api.Querier2{Querier: args.Querier}
res, err := Q.QueryMessage(ctx, si.ID, nil)
switch {
case err == nil:
switch msg := res.Message.(type) {
case *messaging.SequencedMessage:
txn, ok := msg.Message.(*messaging.TransactionMessage)
if !ok {
return errors.InternalError.WithFormat("expected %v, got %v", messaging.MessageTypeTransaction, msg.Message.Type())
}
theAnchorTxn = txn.Transaction
case *messaging.TransactionMessage:
theAnchorTxn = msg.Transaction
default:
return errors.InternalError.WithFormat("expected %v, got %v", messaging.MessageTypeSequenced, res.Message.Type())
}

sigSets = res.Signatures.Records

case !errors.Is(err, errors.NotFound):
return err

case theAnchorTxn == nil:
var ok bool
theAnchorTxn, ok = args.Known[si.ID.Hash()]
if !ok {
return err
}
}

// Mark which validators have signed
signed := map[[32]byte]bool{}
for _, sigs := range sigSets {
for _, sig := range sigs.Signatures.Records {
msg, ok := sig.Message.(*messaging.BlockAnchor)
if !ok {
continue
}
k := msg.Signature.GetPublicKey()
slog.DebugCtx(ctx, "Anchor has been signed by", "validator", hex.EncodeToString(k[:4]))
signed[*(*[32]byte)(k)] = true
}
}

g := &network.GlobalValues{
Oracle: args.NetInfo.Status.Oracle,
Globals: args.NetInfo.Status.Globals,
Network: args.NetInfo.Status.Network,
Routing: args.NetInfo.Status.Routing,
ExecutorVersion: args.NetInfo.Status.ExecutorVersion,
}
threshold := g.ValidatorThreshold(si.Source)

lkv := []any{
"source", si.Source,
"destination", si.Destination,
"sequence-number", si.Number,
"want", threshold,
"have", len(signed),
}
if theAnchorTxn != nil {
lkv = append(lkv,
"txid", theAnchorTxn.ID(),
)
}
slog.InfoCtx(ctx, "Healing anchor", lkv...)

if len(signed) >= int(threshold) {
slog.InfoCtx(ctx, "Sufficient signatures have been received")
return nil
}

seq := &messaging.SequencedMessage{
Source: srcUrl,
Destination: dstUrl,
Number: si.Number,
}
if theAnchorTxn != nil {
seq.Message = &messaging.TransactionMessage{
Transaction: theAnchorTxn,
}
}

// Get a signature from each node that hasn't signed
var gotPartSig bool
var signatures []protocol.Signature
for peer, info := range args.NetInfo.Peers[strings.ToLower(si.Source)] {
if signed[info.Key] {
continue
}

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

slog.InfoCtx(ctx, "Querying node for its signature", "id", peer)
res, err := args.Client.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(protocol.AnchorPool), dstUrl, si.Number)
if err != nil {
slog.ErrorCtx(ctx, "Query failed", "error", err)
continue
}

myTxn, ok := res.Message.(*messaging.TransactionMessage)
if !ok {
slog.ErrorCtx(ctx, "Node gave us an anchor that is not a transaction", "id", info, "type", res.Message.Type())
continue
}
if theAnchorTxn == nil {
theAnchorTxn = myTxn.Transaction
seq.Message = &messaging.TransactionMessage{
Transaction: theAnchorTxn,
}
} else if !myTxn.Transaction.Equal(theAnchorTxn) {
slog.ErrorCtx(ctx, "Node gave us an anchor with a different hash", "id", info,
"expected", hex.EncodeToString(theAnchorTxn.GetHash()),
"got", hex.EncodeToString(myTxn.Transaction.GetHash()))
continue
}

for _, sigs := range res.Signatures.Records {
for _, sig := range sigs.Signatures.Records {
msg, ok := sig.Message.(*messaging.SignatureMessage)
if !ok {
slog.ErrorCtx(ctx, "Node gave us a signature that is not a signature", "id", info, "type", sig.Message.Type())
continue
}

if args.NetInfo.Status.ExecutorVersion.V2Enabled() {
sig, ok := msg.Signature.(protocol.KeySignature)
if !ok {
slog.ErrorCtx(ctx, "Node gave us a signature that is not a key signature", "id", info, "type", sig.Type())
continue
}

// Filter out bad signatures
h := seq.Hash()
if !sig.Verify(nil, h[:]) {
slog.ErrorCtx(ctx, "Node gave us an invalid signature", "id", info)
continue
}

} else {
switch sig := msg.Signature.(type) {
case *protocol.PartitionSignature:
// We only want one partition signature
if gotPartSig {
continue
}
gotPartSig = true

case protocol.UserSignature:
// Filter out bad signatures
if !sig.Verify(nil, theAnchorTxn.GetHash()) {
slog.ErrorCtx(ctx, "Node gave us an invalid signature", "id", info)
continue
}

default:
slog.ErrorCtx(ctx, "Node gave us a signature that is not a user signature", "id", info, "type", sig.Type())
continue
}
}

signatures = append(signatures, msg.Signature)
}
}
}

if args.Pretend {
b, err := theAnchorTxn.MarshalBinary()
if err != nil {
panic(err)
}
slog.InfoCtx(ctx, "Would have submitted anchor", "signatures", len(signatures), "source", si.Source, "destination", si.Destination, "number", si.Number, "txn-size", len(b))
return nil
}

// We should always have a partition signature, so there's only something to
// sent if we have more than 1 signature
if gotPartSig && len(signatures) <= 1 || !gotPartSig && len(signatures) == 0 {
slog.InfoCtx(ctx, "Nothing to send")

} else {
slog.InfoCtx(ctx, "Submitting signatures", "count", len(signatures))

submit := func(env *messaging.Envelope) {
// addr := api.ServiceTypeSubmit.AddressFor(seq.Destination).Multiaddr()
sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{})
if err != nil {
slog.ErrorCtx(ctx, "Submission failed", "error", err)
}
for _, sub := range sub {
if sub.Success {
slog.InfoCtx(ctx, "Submission succeeded", "id", sub.Status.TxID)
} else {
slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status)
}
}
}

if args.NetInfo.Status.ExecutorVersion.V2Enabled() {
for _, sig := range signatures {
blk := &messaging.BlockAnchor{
Signature: sig.(protocol.KeySignature),
Anchor: seq,
}
submit(&messaging.Envelope{Messages: []messaging.Message{blk}})
}
} else {
env := new(messaging.Envelope)
env.Transaction = []*protocol.Transaction{theAnchorTxn}
env.Signatures = signatures
submit(env)
}
}

if !args.Wait {
return nil
}

slog.InfoCtx(ctx, "Waiting", "for", si.ID)
for i := 0; i < 10; i++ {
r, err := Q.QueryMessage(ctx, si.ID, nil)
switch {
case errors.Is(err, errors.NotFound):
// Not found, wait
slog.Info("Status", "id", si.ID, "code", errors.NotFound)
break

case err != nil:
// Unknown error
return err

case !r.Status.Delivered():
// Pending, wait
slog.Info("Status", "id", si.ID, "code", r.Status)
break

case r.Error != nil:
slog.Error("Failed", "id", si.ID, "error", r.Error)
return r.AsError()

default:
slog.Info("Delivered", "id", si.ID)
return nil
}
time.Sleep(time.Second / 2)
}
return ErrRetry
}

var ErrRetry = fmt.Errorf("retry")
Loading

0 comments on commit 2501d94

Please sign in to comment.