Skip to content

Commit

Permalink
chore: listener tests (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrun5 authored Oct 19, 2023
1 parent 1d184c8 commit 256532b
Show file tree
Hide file tree
Showing 7 changed files with 639 additions and 51 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,5 @@ genmocks:
mockgen -source=chains/evm/transactor/signAndSend/signAndSend.go -destination=./mock/signAndSend.go -package mock
mockgen -source=./store/store.go -destination=./mock/store.go -package mock
mockgen -source=./relayer/message/handler.go -destination=./mock/message.go -package mock
mockgen -source=./chains/evm/listener/listener.go -destination=./mock/evmListener.go -package mock
mockgen -destination=./mock/substrateListener.go -package mock github.com/sygmaprotocol/sygma-core/chains/substrate/listener ChainConnection
17 changes: 10 additions & 7 deletions chains/evm/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"math/big"
"time"

"github.com/sygmaprotocol/sygma-core/store"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand All @@ -26,13 +24,17 @@ type BlockDeltaMeter interface {
TrackBlockDelta(domainID uint8, head *big.Int, current *big.Int)
}

type BlockStorer interface {
StoreBlock(block *big.Int, domainID uint8) error
}

type EVMListener struct {
client ChainClient
eventHandlers []EventHandler
metrics BlockDeltaMeter
blockstore BlockStorer

domainID uint8
blockstore *store.BlockStore
blockRetryInterval time.Duration
blockConfirmations *big.Int
blockInterval *big.Int
Expand All @@ -45,7 +47,7 @@ type EVMListener struct {
func NewEVMListener(
client ChainClient,
eventHandlers []EventHandler,
blockstore *store.BlockStore,
blockstore BlockStorer,
metrics BlockDeltaMeter,
domainID uint8,
blockRetryInterval time.Duration,
Expand All @@ -69,14 +71,15 @@ func NewEVMListener(
// configured for the listener.
func (l *EVMListener) ListenToEvents(ctx context.Context, startBlock *big.Int) {
endBlock := big.NewInt(0)
loop:
for {
select {
case <-ctx.Done():
return
default:
head, err := l.client.LatestBlock()
if err != nil {
l.log.Error().Err(err).Msg("Unable to get latest block")
l.log.Warn().Err(err).Msg("Unable to get latest block")
time.Sleep(l.blockRetryInterval)
continue
}
Expand All @@ -97,8 +100,8 @@ func (l *EVMListener) ListenToEvents(ctx context.Context, startBlock *big.Int) {
for _, handler := range l.eventHandlers {
err := handler.HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1)))
if err != nil {
l.log.Error().Err(err).Msgf("Unable to handle events")
continue
l.log.Warn().Err(err).Msgf("Unable to handle events")
continue loop
}
}

Expand Down
159 changes: 159 additions & 0 deletions chains/evm/listener/listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package listener_test

import (
"context"
"fmt"
"math/big"
"testing"
"time"

"github.com/stretchr/testify/suite"
"github.com/sygmaprotocol/sygma-core/chains/evm/listener"
"github.com/sygmaprotocol/sygma-core/mock"
"go.uber.org/mock/gomock"
)

type ListenerTestSuite struct {
suite.Suite
listener *listener.EVMListener
mockClient *mock.MockChainClient
mockEventHandler *mock.MockEventHandler
mockBlockStorer *mock.MockBlockStorer
mockBlockDeltaMeter *mock.MockBlockDeltaMeter
domainID uint8
}

func TestRunTestSuite(t *testing.T) {
suite.Run(t, new(ListenerTestSuite))
}

func (s *ListenerTestSuite) SetupTest() {
ctrl := gomock.NewController(s.T())
s.domainID = 1
s.mockClient = mock.NewMockChainClient(ctrl)
s.mockEventHandler = mock.NewMockEventHandler(ctrl)
s.mockBlockStorer = mock.NewMockBlockStorer(ctrl)
s.mockBlockDeltaMeter = mock.NewMockBlockDeltaMeter(ctrl)
s.listener = listener.NewEVMListener(
s.mockClient,
[]listener.EventHandler{s.mockEventHandler, s.mockEventHandler},
s.mockBlockStorer,
s.mockBlockDeltaMeter,
s.domainID,
time.Millisecond*75,
big.NewInt(5),
big.NewInt(5))
}

func (s *ListenerTestSuite) Test_ListenToEvents_RetriesIfBlockUnavailable() {
s.mockClient.EXPECT().LatestBlock().Return(big.NewInt(0), fmt.Errorf("error"))

ctx, cancel := context.WithCancel(context.Background())
go s.listener.ListenToEvents(ctx, big.NewInt(100))

time.Sleep(time.Millisecond * 50)
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_SleepsIfBlockTooNew() {
s.mockClient.EXPECT().LatestBlock().Return(big.NewInt(109), nil)

ctx, cancel := context.WithCancel(context.Background())
go s.listener.ListenToEvents(ctx, big.NewInt(100))

time.Sleep(time.Millisecond * 50)
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_RetriesInCaseOfHandlerFailure() {
startBlock := big.NewInt(100)
endBlock := big.NewInt(105)
head := big.NewInt(110)

// First pass
s.mockClient.EXPECT().LatestBlock().Return(head, nil)
s.mockBlockDeltaMeter.EXPECT().TrackBlockDelta(uint8(1), head, endBlock)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(fmt.Errorf("error"))
// Second pass
s.mockClient.EXPECT().LatestBlock().Return(head, nil)
s.mockBlockDeltaMeter.EXPECT().TrackBlockDelta(uint8(1), head, endBlock)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockBlockStorer.EXPECT().StoreBlock(endBlock, s.domainID).Return(nil)
// third pass
s.mockClient.EXPECT().LatestBlock().Return(head, nil)

ctx, cancel := context.WithCancel(context.Background())

go s.listener.ListenToEvents(ctx, big.NewInt(100))

time.Sleep(time.Millisecond * 50)
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_StoresBlockIfEventHandlingSuccessful() {
startBlock := big.NewInt(100)
endBlock := big.NewInt(105)
head := big.NewInt(110)

s.mockClient.EXPECT().LatestBlock().Return(head, nil)
// prevent infinite runs
s.mockClient.EXPECT().LatestBlock().Return(big.NewInt(95), nil)
s.mockBlockDeltaMeter.EXPECT().TrackBlockDelta(uint8(1), head, endBlock)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockBlockStorer.EXPECT().StoreBlock(endBlock, s.domainID).Return(nil)

ctx, cancel := context.WithCancel(context.Background())

go s.listener.ListenToEvents(ctx, big.NewInt(100))

time.Sleep(time.Millisecond * 50)
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_IgnoresBlockStorerError() {
startBlock := big.NewInt(100)
endBlock := big.NewInt(105)
head := big.NewInt(110)

s.mockClient.EXPECT().LatestBlock().Return(head, nil)
s.mockBlockDeltaMeter.EXPECT().TrackBlockDelta(uint8(1), head, endBlock)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockBlockStorer.EXPECT().StoreBlock(endBlock, s.domainID).Return(fmt.Errorf("error"))

// prevent infinite runs
s.mockClient.EXPECT().LatestBlock().Return(big.NewInt(95), nil)

ctx, cancel := context.WithCancel(context.Background())

go s.listener.ListenToEvents(ctx, big.NewInt(100))

time.Sleep(time.Millisecond * 50)
cancel()
}

func (s *ListenerTestSuite) Test_ListenToEvents_UsesHeadAsStartBlockIfNilPassed() {
startBlock := big.NewInt(110)
endBlock := big.NewInt(115)
oldHead := big.NewInt(110)
newHead := big.NewInt(120)

s.mockClient.EXPECT().LatestBlock().Return(oldHead, nil)
s.mockClient.EXPECT().LatestBlock().Return(newHead, nil)
s.mockClient.EXPECT().LatestBlock().Return(big.NewInt(65), nil)

s.mockBlockDeltaMeter.EXPECT().TrackBlockDelta(uint8(1), big.NewInt(120), endBlock)

s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockEventHandler.EXPECT().HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))).Return(nil)
s.mockBlockStorer.EXPECT().StoreBlock(endBlock, s.domainID).Return(nil)

ctx, cancel := context.WithCancel(context.Background())

go s.listener.ListenToEvents(ctx, nil)

time.Sleep(time.Millisecond * 100)
cancel()
}
68 changes: 24 additions & 44 deletions chains/substrate/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,33 @@ import (
"math/big"
"time"

"github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/sygmaprotocol/sygma-core/store"
)

type EventHandler interface {
HandleEvents(evts []*parser.Event) error
HandleEvents(startBlock *big.Int, endBlock *big.Int) error
}

type ChainConnection interface {
UpdateMetatdata() error
GetHeaderLatest() (*types.Header, error)
GetBlockHash(blockNumber uint64) (types.Hash, error)
GetBlockEvents(hash types.Hash) ([]*parser.Event, error)
GetFinalizedHead() (types.Hash, error)
GetBlock(blockHash types.Hash) (*types.SignedBlock, error)
}

type SubstrateListener struct {
conn ChainConnection
type BlockStorer interface {
StoreBlock(block *big.Int, domainID uint8) error
}

blockstore store.BlockStore
type BlockDeltaMeter interface {
TrackBlockDelta(domainID uint8, head *big.Int, current *big.Int)
}

type SubstrateListener struct {
conn ChainConnection
blockstore BlockStorer
eventHandlers []EventHandler
metrics BlockDeltaMeter

blockRetryInterval time.Duration
blockInterval *big.Int
Expand All @@ -41,7 +43,7 @@ type SubstrateListener struct {
log zerolog.Logger
}

func NewSubstrateListener(connection ChainConnection, blockstore store.BlockStore, eventHandlers []EventHandler, domainID uint8, blockRetryInterval time.Duration, blockInterval *big.Int) *SubstrateListener {
func NewSubstrateListener(connection ChainConnection, eventHandlers []EventHandler, blockstore BlockStorer, metrics BlockDeltaMeter, domainID uint8, blockRetryInterval time.Duration, blockInterval *big.Int) *SubstrateListener {
return &SubstrateListener{
log: log.With().Uint8("domainID", domainID).Logger(),
domainID: domainID,
Expand All @@ -50,27 +52,29 @@ func NewSubstrateListener(connection ChainConnection, blockstore store.BlockStor
eventHandlers: eventHandlers,
blockRetryInterval: blockRetryInterval,
blockInterval: blockInterval,
metrics: metrics,
}
}

func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big.Int) {
endBlock := big.NewInt(0)

go func() {
loop:
for {
select {
case <-ctx.Done():
return
default:
hash, err := l.conn.GetFinalizedHead()
if err != nil {
l.log.Error().Err(err).Msg("Failed to fetch finalized header")
l.log.Warn().Err(err).Msg("Failed to fetch finalized header")
time.Sleep(l.blockRetryInterval)
continue
}
head, err := l.conn.GetBlock(hash)
if err != nil {
l.log.Error().Err(err).Msg("Failed to fetch block")
l.log.Warn().Err(err).Msg("Failed to fetch block")
time.Sleep(l.blockRetryInterval)
continue
}
Expand All @@ -86,21 +90,18 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big.
continue
}

evts, err := l.fetchEvents(startBlock, endBlock)
if err != nil {
l.log.Err(err).Msgf("Failed fetching events for block range %s-%s", startBlock, endBlock)
time.Sleep(l.blockRetryInterval)
continue
}
l.metrics.TrackBlockDelta(l.domainID, big.NewInt(int64(head.Block.Header.Number)), endBlock)
l.log.Debug().Msgf("Fetching substrate events for block range %s-%s", startBlock, endBlock)

for _, handler := range l.eventHandlers {
err := handler.HandleEvents(evts)
err := handler.HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1)))
if err != nil {
l.log.Error().Err(err).Msg("Error handling substrate events")
continue
l.log.Warn().Err(err).Msg("Error handling substrate events")
continue loop
}
}
err = l.blockstore.StoreBlock(startBlock, l.domainID)

err = l.blockstore.StoreBlock(endBlock, l.domainID)
if err != nil {
l.log.Error().Str("block", startBlock.String()).Err(err).Msg("Failed to write latest block to blockstore")
}
Expand All @@ -109,24 +110,3 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big.
}
}()
}

func (l *SubstrateListener) fetchEvents(startBlock *big.Int, endBlock *big.Int) ([]*parser.Event, error) {
l.log.Debug().Msgf("Fetching substrate events for block range %s-%s", startBlock, endBlock)

evts := make([]*parser.Event, 0)
for i := new(big.Int).Set(startBlock); i.Cmp(endBlock) == -1; i.Add(i, big.NewInt(1)) {
hash, err := l.conn.GetBlockHash(i.Uint64())
if err != nil {
return nil, err
}

evt, err := l.conn.GetBlockEvents(hash)
if err != nil {
return nil, err
}
evts = append(evts, evt...)

}

return evts, nil
}
Loading

0 comments on commit 256532b

Please sign in to comment.