Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split BEEFY relayer into two separate relayers #1216

Merged
merged 11 commits into from
Jun 4, 2024
7 changes: 3 additions & 4 deletions relayer/cmd/scan_beefy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ func ScanBeefyFn(cmd *cobra.Command, _ []string) error {
beefyBlock, _ := cmd.Flags().GetUint64("beefy-block")
validatorSetID, _ := cmd.Flags().GetUint64("validator-set-id")
logrus.WithFields(logrus.Fields{
"polkadot-url": polkadotUrl,
"fast-forward-depth": fastForwardDepth,
"beefy-block": beefyBlock,
"validator-set-id": validatorSetID,
"polkadot-url": polkadotUrl,
"beefy-block": beefyBlock,
"validator-set-id": validatorSetID,
}).Info("Connected to relaychain.")

commitments, err := polkadotListener.Start(ctx, eg, beefyBlock, validatorSetID)
Expand Down
60 changes: 47 additions & 13 deletions relayer/relays/beefy/ethereum-writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,24 @@ func (wr *EthereumWriter) Start(ctx context.Context, eg *errgroup.Group, request
return nil
}

err := wr.submit(ctx, task)
state, err := wr.queryBeefyClientState(ctx)
if err != nil {
return fmt.Errorf("query beefy client state: %w", err)
}

if task.SignedCommitment.Commitment.BlockNumber < uint32(state.LatestBeefyBlock) {
log.WithFields(logrus.Fields{
"beefyBlockNumber": task.SignedCommitment.Commitment.BlockNumber,
"latestBeefyBlock": state.LatestBeefyBlock,
}).Info("Commitment already synced")
continue
}

// Mandatory commitments are always signed by the next validator set recorded in
// the beefy light client
task.ValidatorsRoot = state.NextValidatorSetRoot

err = wr.submit(ctx, task)
if err != nil {
return fmt.Errorf("submit request: %w", err)
}
Expand All @@ -79,32 +96,49 @@ func (wr *EthereumWriter) Start(ctx context.Context, eg *errgroup.Group, request
return nil
}

func (wr *EthereumWriter) submit(ctx context.Context, task Request) error {
type FilterMode struct {
MandatoryCommitmentsOnly bool
All bool
DiscardDepth uint64
}
vgeddes marked this conversation as resolved.
Show resolved Hide resolved

type BeefyClientState struct {
LatestBeefyBlock uint64
CurrentValidatorSetID uint64
CurrentValidatorSetRoot [32]byte
NextValidatorSetID uint64
NextValidatorSetRoot [32]byte
}

func (wr *EthereumWriter) queryBeefyClientState(ctx context.Context) (*BeefyClientState, error) {
callOpts := bind.CallOpts{
Context: ctx,
}

latestBeefyBlock, err := wr.contract.LatestBeefyBlock(&callOpts)
if err != nil {
return err
}
if uint32(latestBeefyBlock) >= task.SignedCommitment.Commitment.BlockNumber {
return nil
return nil, err
}

currentValidatorSet, err := wr.contract.CurrentValidatorSet(&callOpts)
if err != nil {
return err
return nil, err
}
nextValidatorSet, err := wr.contract.NextValidatorSet(&callOpts)
if err != nil {
return err
}
task.ValidatorsRoot = currentValidatorSet.Root
if task.IsHandover {
task.ValidatorsRoot = nextValidatorSet.Root
return nil, err
}

return &BeefyClientState{
LatestBeefyBlock: latestBeefyBlock,
CurrentValidatorSetID: currentValidatorSet.Id.Uint64(),
CurrentValidatorSetRoot: currentValidatorSet.Root,
NextValidatorSetID: nextValidatorSet.Id.Uint64(),
NextValidatorSetRoot: nextValidatorSet.Root,
}, nil
}

func (wr *EthereumWriter) submit(ctx context.Context, task Request) error {
// Initial submission
tx, initialBitfield, err := wr.doSubmitInitial(ctx, &task)
if err != nil {
Expand All @@ -131,6 +165,7 @@ func (wr *EthereumWriter) submit(ctx context.Context, task Request) error {
wr.conn.MakeTxOpts(ctx),
*commitmentHash,
)

_, err = wr.conn.WatchTransaction(ctx, tx, 1)
if err != nil {
log.WithError(err).Error("Failed to CommitPrevRandao")
Expand All @@ -153,7 +188,6 @@ func (wr *EthereumWriter) submit(ctx context.Context, task Request) error {
log.WithFields(logrus.Fields{
"tx": tx.Hash().Hex(),
"blockNumber": task.SignedCommitment.Commitment.BlockNumber,
"IsHandover": task.IsHandover,
}).Debug("Transaction SubmitFinal succeeded")

return nil
Expand Down
1 change: 0 additions & 1 deletion relayer/relays/beefy/fixture-data-logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (wr *EthereumWriter) makeSubmitFinalLogFields(
"leafProofOrder": params.LeafProofOrder,
},
"commitmentHash": commitmentHash,
"handover": task.IsHandover,
}

return fields, nil
Expand Down
29 changes: 13 additions & 16 deletions relayer/relays/beefy/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,22 +164,19 @@ func (r *Request) MakeSubmitFinalParams(validatorIndices []uint64, initialBitfie
var merkleProofItems [][32]byte

proofOrder := new(big.Int)

if r.IsHandover {
inputLeaf = contracts.BeefyClientMMRLeaf{
Version: uint8(r.Proof.Leaf.Version),
ParentNumber: uint32(r.Proof.Leaf.ParentNumberAndHash.ParentNumber),
ParentHash: r.Proof.Leaf.ParentNumberAndHash.Hash,
ParachainHeadsRoot: r.Proof.Leaf.ParachainHeads,
NextAuthoritySetID: uint64(r.Proof.Leaf.BeefyNextAuthoritySet.ID),
NextAuthoritySetLen: uint32(r.Proof.Leaf.BeefyNextAuthoritySet.Len),
NextAuthoritySetRoot: r.Proof.Leaf.BeefyNextAuthoritySet.Root,
}
for _, mmrProofItem := range r.Proof.MerkleProofItems {
merkleProofItems = append(merkleProofItems, mmrProofItem)
}
proofOrder = proofOrder.SetUint64(r.Proof.MerkleProofOrder)
}
inputLeaf = contracts.BeefyClientMMRLeaf{
Version: uint8(r.Proof.Leaf.Version),
ParentNumber: uint32(r.Proof.Leaf.ParentNumberAndHash.ParentNumber),
ParentHash: r.Proof.Leaf.ParentNumberAndHash.Hash,
ParachainHeadsRoot: r.Proof.Leaf.ParachainHeads,
NextAuthoritySetID: uint64(r.Proof.Leaf.BeefyNextAuthoritySet.ID),
NextAuthoritySetLen: uint32(r.Proof.Leaf.BeefyNextAuthoritySet.Len),
NextAuthoritySetRoot: r.Proof.Leaf.BeefyNextAuthoritySet.Root,
}
for _, mmrProofItem := range r.Proof.MerkleProofItems {
merkleProofItems = append(merkleProofItems, mmrProofItem)
}
proofOrder = proofOrder.SetUint64(r.Proof.MerkleProofOrder)

msg := FinalRequestParams{
Commitment: commitment,
Expand Down
97 changes: 36 additions & 61 deletions relayer/relays/beefy/polkadot-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import (
"context"
"fmt"

log "github.com/sirupsen/logrus"
"github.com/snowfork/go-substrate-rpc-client/v4/types"
"golang.org/x/sync/errgroup"

"github.com/snowfork/snowbridge/relayer/chain/relaychain"
"github.com/snowfork/snowbridge/relayer/substrate"

log "github.com/sirupsen/logrus"
)

type PolkadotListener struct {
Expand Down Expand Up @@ -41,7 +40,7 @@ func (li *PolkadotListener) Start(
}
li.beefyAuthoritiesKey = storageKey

requests := make(chan Request)
requests := make(chan Request, 1)
yrong marked this conversation as resolved.
Show resolved Hide resolved

eg.Go(func() error {
defer close(requests)
Expand All @@ -61,11 +60,10 @@ func (li *PolkadotListener) scanCommitments(
currentValidatorSet uint64,
requests chan<- Request,
) error {
in, err := ScanSafeCommitments(ctx, li.conn.Metadata(), li.conn.API(), currentBeefyBlock+1)
in, err := ScanCommitments(ctx, li.conn.Metadata(), li.conn.API(), currentBeefyBlock+1)
if err != nil {
return fmt.Errorf("scan commitments: %w", err)
return fmt.Errorf("scan provable commitments: %w", err)
}
lastSyncedBeefyBlock := currentBeefyBlock

for {
select {
Expand All @@ -81,69 +79,32 @@ func (li *PolkadotListener) scanCommitments(

committedBeefyBlock := uint64(result.SignedCommitment.Commitment.BlockNumber)
validatorSetID := result.SignedCommitment.Commitment.ValidatorSetID
nextValidatorSetID := uint64(result.MMRProof.Leaf.BeefyNextAuthoritySet.ID)

if validatorSetID != currentValidatorSet && validatorSetID != currentValidatorSet+1 {
return fmt.Errorf("commitment has unexpected validatorSetID: blockNumber=%v validatorSetID=%v expectedValidatorSetID=%v",
committedBeefyBlock,
validatorSetID,
currentValidatorSet,
)
}

logEntry := log.WithFields(log.Fields{
"commitment": log.Fields{
"blockNumber": committedBeefyBlock,
"validatorSetID": validatorSetID,
"nextValidatorSetID": nextValidatorSetID,
},
"validatorSetID": currentValidatorSet,
"IsHandover": validatorSetID == currentValidatorSet+1,
"lastSyncedBeefyBlock": lastSyncedBeefyBlock,
})
nextValidatorSetID := uint64(result.Proof.Leaf.BeefyNextAuthoritySet.ID)

validators, err := li.queryBeefyAuthorities(result.BlockHash)
if err != nil {
return fmt.Errorf("fetch beefy authorities at block %v: %w", result.BlockHash, err)
}

task := Request{
Validators: validators,
SignedCommitment: result.SignedCommitment,
Proof: result.MMRProof,
Proof: result.Proof,
}

if validatorSetID == currentValidatorSet+1 && validatorSetID == nextValidatorSetID-1 {
task.IsHandover = true
select {
case <-ctx.Done():
return ctx.Err()
case requests <- task:
logEntry.Info("New commitment with handover added to channel")
currentValidatorSet++
lastSyncedBeefyBlock = committedBeefyBlock
}
} else if validatorSetID == currentValidatorSet {
if result.Depth > li.config.FastForwardDepth {
logEntry.Warn("Discarded commitment with depth not fast forward")
continue
}
if committedBeefyBlock < lastSyncedBeefyBlock+li.config.UpdatePeriod {
logEntry.Info("Discarded commitment with sampling")
continue
}

// drop task if it can't be processed immediately
select {
case <-ctx.Done():
return ctx.Err()
case requests <- task:
lastSyncedBeefyBlock = committedBeefyBlock
logEntry.Info("New commitment added to channel")
default:
logEntry.Warn("Discarded commitment fail adding to channel")
}
} else {
logEntry.Warn("Discarded invalid commitment")
log.WithFields(log.Fields{
"commitment": log.Fields{
"blockNumber": committedBeefyBlock,
"validatorSetID": validatorSetID,
"nextValidatorSetID": nextValidatorSetID,
},
"validatorSetID": currentValidatorSet,
}).Info("Sending BEEFY commitment to ethereum writer")

select {
case <-ctx.Done():
return ctx.Err()
case requests <- task:
}
}
}
Expand All @@ -162,8 +123,22 @@ func (li *PolkadotListener) queryBeefyAuthorities(blockHash types.Hash) ([]subst
return authorities, nil
}

func (li *PolkadotListener) queryBeefyNextAuthoritySet(blockHash types.Hash) (types.BeefyNextAuthoritySet, error) {
var nextAuthoritySet types.BeefyNextAuthoritySet
func (li *PolkadotListener) queryBeefyAuthoritySet(blockHash types.Hash) (BeefyAuthoritySet, error) {
var authoritySet BeefyAuthoritySet
storageKey, err := types.CreateStorageKey(li.conn.Metadata(), "MmrLeaf", "BeefyAuthorities", nil, nil)
ok, err := li.conn.API().RPC.State.GetStorage(storageKey, &authoritySet, blockHash)
if err != nil {
return authoritySet, err
}
if !ok {
return authoritySet, fmt.Errorf("beefy authoritySet not found")
}

return authoritySet, nil
}

func (li *PolkadotListener) queryBeefyNextAuthoritySet(blockHash types.Hash) (BeefyAuthoritySet, error) {
var nextAuthoritySet BeefyAuthoritySet
storageKey, err := types.CreateStorageKey(li.conn.Metadata(), "MmrLeaf", "BeefyNextAuthorities", nil, nil)
vgeddes marked this conversation as resolved.
Show resolved Hide resolved
ok, err := li.conn.API().RPC.State.GetStorage(storageKey, &nextAuthoritySet, blockHash)
if err != nil {
Expand Down
Loading
Loading