From 9b84fb46b99ecd7f5bebf88d8803b5a8785041ca Mon Sep 17 00:00:00 2001 From: mpetrun5 Date: Mon, 16 Oct 2023 17:43:36 +0200 Subject: [PATCH] Remove fetching events from substrate listener --- chains/substrate/listener/listener.go | 47 +++++++-------------------- 1 file changed, 11 insertions(+), 36 deletions(-) diff --git a/chains/substrate/listener/listener.go b/chains/substrate/listener/listener.go index a1b47aec..d379408e 100644 --- a/chains/substrate/listener/listener.go +++ b/chains/substrate/listener/listener.go @@ -8,7 +8,6 @@ import ( "math/big" "time" - "github.com/ChainSafe/sygma-core/store" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser" "github.com/centrifuge/go-substrate-rpc-client/v4/types" "github.com/rs/zerolog" @@ -16,8 +15,9 @@ import ( ) type EventHandler interface { - HandleEvents(evts []*parser.Event) error + HandleEvents(startBlock *big.Int, endBlock *big.Int) error } + type ChainConnection interface { UpdateMetatdata() error GetHeaderLatest() (*types.Header, error) @@ -27,11 +27,13 @@ type ChainConnection interface { GetBlock(blockHash types.Hash) (*types.SignedBlock, error) } -type SubstrateListener struct { - conn ChainConnection - - blockstore store.BlockStore +type BlockStorer interface { + StoreBlock(block *big.Int, domainID uint8) error +} +type SubstrateListener struct { + conn ChainConnection + blockstore BlockStorer eventHandlers []EventHandler blockRetryInterval time.Duration @@ -41,7 +43,7 @@ type SubstrateListener struct { log zerolog.Logger } -func NewSubstrateListener(connection ChainConnection, blockstore store.BlockStore, eventHandlers []EventHandler, domainID uint8, blockRetryInterval time.Duration, blockInterval *big.Int) *SubstrateListener { +func NewSubstrateListener(connection ChainConnection, blockstore BlockStorer, eventHandlers []EventHandler, domainID uint8, blockRetryInterval time.Duration, blockInterval *big.Int) *SubstrateListener { return &SubstrateListener{ log: log.With().Uint8("domainID", domainID).Logger(), domainID: domainID, @@ -86,20 +88,14 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big. continue } - evts, err := l.fetchEvents(startBlock, endBlock) - if err != nil { - l.log.Err(err).Msgf("Failed fetching events for block range %s-%s", startBlock, endBlock) - time.Sleep(l.blockRetryInterval) - continue - } - for _, handler := range l.eventHandlers { - err := handler.HandleEvents(evts) + err := handler.HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1))) if err != nil { l.log.Warn().Err(err).Msg("Error handling substrate events") continue } } + err = l.blockstore.StoreBlock(startBlock, l.domainID) if err != nil { l.log.Error().Str("block", startBlock.String()).Err(err).Msg("Failed to write latest block to blockstore") @@ -109,24 +105,3 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big. } }() } - -func (l *SubstrateListener) fetchEvents(startBlock *big.Int, endBlock *big.Int) ([]*parser.Event, error) { - l.log.Debug().Msgf("Fetching substrate events for block range %s-%s", startBlock, endBlock) - - evts := make([]*parser.Event, 0) - for i := new(big.Int).Set(startBlock); i.Cmp(endBlock) == -1; i.Add(i, big.NewInt(1)) { - hash, err := l.conn.GetBlockHash(i.Uint64()) - if err != nil { - return nil, err - } - - evt, err := l.conn.GetBlockEvents(hash) - if err != nil { - return nil, err - } - evts = append(evts, evt...) - - } - - return evts, nil -}