Skip to content

Commit

Permalink
Remove fetching events from substrate listener
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrun5 committed Oct 16, 2023
1 parent 8910d2e commit 9b84fb4
Showing 1 changed file with 11 additions and 36 deletions.
47 changes: 11 additions & 36 deletions chains/substrate/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import (
"math/big"
"time"

"github.com/ChainSafe/sygma-core/store"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

type EventHandler interface {
HandleEvents(evts []*parser.Event) error
HandleEvents(startBlock *big.Int, endBlock *big.Int) error
}

type ChainConnection interface {
UpdateMetatdata() error
GetHeaderLatest() (*types.Header, error)
Expand All @@ -27,11 +27,13 @@ type ChainConnection interface {
GetBlock(blockHash types.Hash) (*types.SignedBlock, error)
}

type SubstrateListener struct {
conn ChainConnection

blockstore store.BlockStore
type BlockStorer interface {
StoreBlock(block *big.Int, domainID uint8) error
}

type SubstrateListener struct {
conn ChainConnection
blockstore BlockStorer
eventHandlers []EventHandler

blockRetryInterval time.Duration
Expand All @@ -41,7 +43,7 @@ type SubstrateListener struct {
log zerolog.Logger
}

func NewSubstrateListener(connection ChainConnection, blockstore store.BlockStore, eventHandlers []EventHandler, domainID uint8, blockRetryInterval time.Duration, blockInterval *big.Int) *SubstrateListener {
func NewSubstrateListener(connection ChainConnection, blockstore BlockStorer, eventHandlers []EventHandler, domainID uint8, blockRetryInterval time.Duration, blockInterval *big.Int) *SubstrateListener {
return &SubstrateListener{
log: log.With().Uint8("domainID", domainID).Logger(),
domainID: domainID,
Expand Down Expand Up @@ -86,20 +88,14 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big.
continue
}

evts, err := l.fetchEvents(startBlock, endBlock)
if err != nil {
l.log.Err(err).Msgf("Failed fetching events for block range %s-%s", startBlock, endBlock)
time.Sleep(l.blockRetryInterval)
continue
}

for _, handler := range l.eventHandlers {
err := handler.HandleEvents(evts)
err := handler.HandleEvents(startBlock, new(big.Int).Sub(endBlock, big.NewInt(1)))
if err != nil {
l.log.Warn().Err(err).Msg("Error handling substrate events")
continue
}
}

err = l.blockstore.StoreBlock(startBlock, l.domainID)
if err != nil {
l.log.Error().Str("block", startBlock.String()).Err(err).Msg("Failed to write latest block to blockstore")
Expand All @@ -109,24 +105,3 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big.
}
}()
}

func (l *SubstrateListener) fetchEvents(startBlock *big.Int, endBlock *big.Int) ([]*parser.Event, error) {
l.log.Debug().Msgf("Fetching substrate events for block range %s-%s", startBlock, endBlock)

evts := make([]*parser.Event, 0)
for i := new(big.Int).Set(startBlock); i.Cmp(endBlock) == -1; i.Add(i, big.NewInt(1)) {
hash, err := l.conn.GetBlockHash(i.Uint64())
if err != nil {
return nil, err
}

evt, err := l.conn.GetBlockEvents(hash)
if err != nil {
return nil, err
}
evts = append(evts, evt...)

}

return evts, nil
}

0 comments on commit 9b84fb4

Please sign in to comment.