Skip to content

Commit

Permalink
Backport substrate chain from relayer
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrun5 committed Oct 2, 2023
1 parent f9be6f9 commit 14105b6
Show file tree
Hide file tree
Showing 14 changed files with 842 additions and 310 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Fork the repository, make changes and open a PR to the `main` branch of the repo
A great way to contribute to the project is to send a detailed report when you encounter an issue. We always appreciate a well-written, thorough bug report, and will thank you for it!

When reporting issues, always include:
- chainbridge-core version
- sygma-core version
- modules used
- logs (don't forget to remove sensitive data)
- tx hashes related to issue (if applicable)
Expand Down
62 changes: 23 additions & 39 deletions chains/evm/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,73 +5,57 @@ package evm

import (
"context"
"fmt"
"math/big"

"github.com/ChainSafe/sygma-core/store"
"github.com/ChainSafe/sygma-core/types"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

type EventListener interface {
ListenToEvents(ctx context.Context, startBlock *big.Int, errChan chan<- error)
ListenToEvents(ctx context.Context, startBlock *big.Int)
}

type ProposalExecutor interface {
Execute(message *types.Message) error
Execute(messages []*types.Message) error
}

// EVMChain is struct that aggregates all data required for
type EVMChain struct {
listener EventListener
writer ProposalExecutor
executor ProposalExecutor
blockstore *store.BlockStore

domainID uint8
startBlock *big.Int
freshStart bool
latestBlock bool
domainID uint8
startBlock *big.Int

logger zerolog.Logger
}

func NewEVMChain(listener EventListener, writer ProposalExecutor, blockstore *store.BlockStore, domainID uint8, startBlock *big.Int, latestBlock bool, freshStart bool) *EVMChain {
func NewEVMChain(listener EventListener, executor ProposalExecutor, blockstore *store.BlockStore, domainID uint8, startBlock *big.Int) *EVMChain {
return &EVMChain{
listener: listener,
writer: writer,
blockstore: blockstore,
domainID: domainID,
startBlock: startBlock,
latestBlock: latestBlock,
freshStart: freshStart,
listener: listener,
executor: executor,
blockstore: blockstore,
domainID: domainID,
startBlock: startBlock,
logger: log.With().Uint8("domainID", domainID).Logger(),
}
}

// PollEvents is the goroutine that polls blocks and searches Deposit events in them.
// Events are then sent to eventsChan.
func (c *EVMChain) PollEvents(ctx context.Context, sysErr chan<- error) {
log.Info().Msg("Polling Blocks...")

startBlock, err := c.blockstore.GetStartBlock(
c.domainID,
c.startBlock,
c.latestBlock,
c.freshStart,
)
if err != nil {
sysErr <- fmt.Errorf("error %w on getting last stored block", err)
return
}

go c.listener.ListenToEvents(ctx, startBlock, sysErr)
func (c *EVMChain) PollEvents(ctx context.Context) {
c.logger.Info().Str("startBlock", c.startBlock.String()).Msg("Polling Blocks...")
go c.listener.ListenToEvents(ctx, c.startBlock)
}

func (c *EVMChain) Write(msg []*types.Message) error {
for _, msg := range msg {
go func(msg *types.Message) {
err := c.writer.Execute(msg)
if err != nil {
log.Err(err).Msgf("Failed writing message %v", msg)
}
}(msg)
func (c *EVMChain) Write(msgs []*types.Message) error {
err := c.executor.Execute(msgs)
if err != nil {
c.logger.Err(err).Msgf("error writing messages %+v on network %d", msgs, c.DomainID())
return err
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion chains/evm/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewEVMListener(

// ListenToEvents goes block by block of a network and executes event handlers that are
// configured for the listener.
func (l *EVMListener) ListenToEvents(ctx context.Context, startBlock *big.Int, errChn chan<- error) {
func (l *EVMListener) ListenToEvents(ctx context.Context, startBlock *big.Int) {
endBlock := big.NewInt(0)
for {
select {
Expand Down
64 changes: 64 additions & 0 deletions chains/substrate/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package substrate

import (
"context"
"math/big"

"github.com/ChainSafe/sygma-core/chains/substrate/client"
"github.com/ChainSafe/sygma-core/store"
"github.com/ChainSafe/sygma-core/types"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

type BatchProposalExecutor interface {
Execute(msgs []*types.Message) error
}

type SubstrateChain struct {
client *client.SubstrateClient

listener EventListener
executor BatchProposalExecutor

blockstore *store.BlockStore

domainID uint8
startBlock *big.Int

logger zerolog.Logger
}

type EventListener interface {
ListenToEvents(ctx context.Context, startBlock *big.Int)
}

func NewSubstrateChain(client *client.SubstrateClient, listener EventListener, blockstore *store.BlockStore, executor BatchProposalExecutor, domainID uint8, startBlock *big.Int) *SubstrateChain {
return &SubstrateChain{
client: client,
listener: listener,
blockstore: blockstore,
executor: executor,
logger: log.With().Uint8("domainID", domainID).Logger()}
}

// PollEvents is the goroutine that polls blocks and searches Deposit events in them.
// Events are then sent to eventsChan.
func (c *SubstrateChain) PollEvents(ctx context.Context) {
c.logger.Info().Str("startBlock", c.startBlock.String()).Msg("Polling Blocks...")
go c.listener.ListenToEvents(ctx, c.startBlock)
}

func (c *SubstrateChain) Write(msgs []*types.Message) error {
err := c.executor.Execute(msgs)
if err != nil {
c.logger.Err(err).Msgf("error writing messages %+v on network %d", msgs, c.DomainID())
return err
}

return nil
}

func (c *SubstrateChain) DomainID() uint8 {
return c.domainID
}
213 changes: 213 additions & 0 deletions chains/substrate/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// The Licensed Work is (c) 2022 Sygma
// SPDX-License-Identifier: LGPL-3.0-only

package client

import (
"bytes"
"context"
"fmt"
"math/big"
"sync"
"time"

"github.com/ChainSafe/sygma-core/chains/substrate/connection"

Check failure on line 14 in chains/substrate/client/client.go

View workflow job for this annotation

GitHub Actions / linter-check

could not import github.com/ChainSafe/sygma-core/chains/substrate/connection (-: # github.com/ChainSafe/sygma-core/chains/substrate/connection
"github.com/ChainSafe/sygma-core/chains/substrate/events"
"github.com/centrifuge/go-substrate-rpc-client/v4/rpc/author"
"github.com/centrifuge/go-substrate-rpc-client/v4/scale"
"github.com/centrifuge/go-substrate-rpc-client/v4/signature"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/rs/zerolog/log"
)

type SubstrateClient struct {
key *signature.KeyringPair // Keyring used for signing
nonceLock sync.Mutex // Locks nonce for updates
nonce types.U32 // Latest account nonce
tip uint64
Conn *connection.Connection
ChainID *big.Int
}

func NewSubstrateClient(conn *connection.Connection, key *signature.KeyringPair, chainID *big.Int, tip uint64) *SubstrateClient {
return &SubstrateClient{
key: key,
Conn: conn,
ChainID: chainID,
tip: tip,
}
}

// Transact constructs and submits an extrinsic to call the method with the given arguments.
// All args are passed directly into GSRPC. GSRPC types are recommended to avoid serialization inconsistencies.
func (c *SubstrateClient) Transact(method string, args ...interface{}) (types.Hash, *author.ExtrinsicStatusSubscription, error) {
log.Debug().Msgf("Submitting substrate call... method %s, sender %s", method, c.key.Address)

// Create call and extrinsic
meta := c.Conn.GetMetadata()
call, err := types.NewCall(
&meta,
method,
args...,
)
if err != nil {
return types.Hash{}, nil, fmt.Errorf("failed to construct call: %w", err)
}

ext := types.NewExtrinsic(call)
// Get latest runtime version
rv, err := c.Conn.RPC.State.GetRuntimeVersionLatest()
if err != nil {
return types.Hash{}, nil, err
}

c.nonceLock.Lock()
defer c.nonceLock.Unlock()

nonce, err := c.nextNonce(&meta)
if err != nil {
return types.Hash{}, nil, err
}

// Sign the extrinsic
o := types.SignatureOptions{
BlockHash: c.Conn.GenesisHash,
Era: types.ExtrinsicEra{IsMortalEra: false},
GenesisHash: c.Conn.GenesisHash,
Nonce: types.NewUCompactFromUInt(uint64(nonce)),
SpecVersion: rv.SpecVersion,
Tip: types.NewUCompactFromUInt(c.tip),
TransactionVersion: rv.TransactionVersion,
}
sub, err := c.submitAndWatchExtrinsic(o, &ext)
if err != nil {
return types.Hash{}, nil, fmt.Errorf("submission of extrinsic failed: %w", err)
}

hash, err := ExtrinsicHash(ext)
if err != nil {
return types.Hash{}, nil, err
}

log.Info().Str("extrinsic", hash.Hex()).Msgf("Extrinsic call submitted... method %s, sender %s, nonce %d", method, c.key.Address, nonce)
c.nonce = nonce + 1

return hash, sub, nil
}

func (c *SubstrateClient) TrackExtrinsic(extHash types.Hash, sub *author.ExtrinsicStatusSubscription) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Minute*10))
defer sub.Unsubscribe()
defer cancel()
subChan := sub.Chan()
for {
select {
case status := <-subChan:
{
if status.IsInBlock {
log.Debug().Str("extrinsic", extHash.Hex()).Msgf("Extrinsic in block with hash: %#x", status.AsInBlock)
}
if status.IsFinalized {
log.Info().Str("extrinsic", extHash.Hex()).Msgf("Extrinsic is finalized in block with hash: %#x", status.AsFinalized)
return c.checkExtrinsicSuccess(extHash, status.AsFinalized)
}
}
case <-ctx.Done():
return fmt.Errorf("extrinsic has timed out")
}
}
}

func (c *SubstrateClient) nextNonce(meta *types.Metadata) (types.U32, error) {
key, err := types.CreateStorageKey(meta, "System", "Account", c.key.PublicKey, nil)
if err != nil {
return 0, err
}

var latestNonce types.U32
var acct types.AccountInfo
exists, err := c.Conn.RPC.State.GetStorageLatest(key, &acct)
if err != nil {
return 0, err
}

if !exists {
latestNonce = 0
} else {
latestNonce = acct.Nonce
}

if latestNonce < c.nonce {
return c.nonce, nil
}

return latestNonce, nil
}

func (c *SubstrateClient) submitAndWatchExtrinsic(opts types.SignatureOptions, ext *types.Extrinsic) (*author.ExtrinsicStatusSubscription, error) {
err := ext.Sign(*c.key, opts)
if err != nil {
return nil, err
}

sub, err := c.Conn.RPC.Author.SubmitAndWatchExtrinsic(*ext)
if err != nil {
return nil, err
}

return sub, nil
}

func (c *SubstrateClient) checkExtrinsicSuccess(extHash types.Hash, blockHash types.Hash) error {
block, err := c.Conn.Chain.GetBlock(blockHash)
if err != nil {
return err
}

evts, err := c.Conn.GetBlockEvents(blockHash)
if err != nil {
return err
}

for _, event := range evts {
index := event.Phase.AsApplyExtrinsic
hash, err := ExtrinsicHash(block.Block.Extrinsics[index])
if err != nil {
return err
}

if extHash != hash {
continue
}

if event.Name == events.ExtrinsicFailedEvent {
return fmt.Errorf("extrinsic failed")
}
if event.Name == events.FailedHandlerExecutionEvent {
return fmt.Errorf("extrinsic failed with failed handler execution")
}
if event.Name == events.ExtrinsicSuccessEvent {
return nil
}
}

return fmt.Errorf("no event found")
}

func (c *SubstrateClient) LatestBlock() (*big.Int, error) {
block, err := c.Conn.Chain.GetBlockLatest()
if err != nil {
return nil, err
}
return big.NewInt(int64(block.Block.Header.Number)), nil
}

func ExtrinsicHash(ext types.Extrinsic) (types.Hash, error) {
extHash := bytes.NewBuffer([]byte{})
encoder := scale.NewEncoder(extHash)
err := ext.Encode(*encoder)
if err != nil {
return types.Hash{}, err
}
return types.NewHash(extHash.Bytes()), nil
}
Loading

0 comments on commit 14105b6

Please sign in to comment.