Skip to content

Commit

Permalink
Actually use blockTracker as part of indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Nov 20, 2024
1 parent b7988bc commit 756e259
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ packages:
interfaces:
NodesContract:
NodeRegistry:
github.com/xmtp/xmtpd/pkg/indexer:
interfaces:
IBlockTracker:
github.com/xmtp/xmtpd/pkg/blockchain:
interfaces:
ChainClient:
Expand Down
34 changes: 23 additions & 11 deletions pkg/blockchain/rpcLogStreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package blockchain
import (
"context"
"fmt"
"github.com/xmtp/xmtpd/pkg/tracing"
"math/big"
"sync"
"time"

"github.com/xmtp/xmtpd/pkg/tracing"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -17,10 +18,10 @@ import (
)

const (
BACKFILL_BLOCKS = 1000
BACKFILL_BLOCKS = uint64(1000)
// Don't index very new blocks to account for reorgs
// Setting to 0 since we are talking about L2s with low reorg risk
LAG_FROM_HIGHEST_BLOCK = 0
LAG_FROM_HIGHEST_BLOCK = uint64(0)
ERROR_SLEEP_TIME = 100 * time.Millisecond
NO_LOGS_SLEEP_TIME = 1 * time.Second
)
Expand All @@ -43,7 +44,7 @@ func NewRpcLogStreamBuilder(
}

func (c *RpcLogStreamBuilder) ListenForContractEvent(
fromBlock int,
fromBlock uint64,
contractAddress common.Address,
topics []common.Hash,
maxDisconnectTime time.Duration,
Expand All @@ -62,7 +63,7 @@ func (c *RpcLogStreamBuilder) Build() (*RpcLogStreamer, error) {

// Struct defining all the information required to filter events from logs
type contractConfig struct {
fromBlock int
fromBlock uint64
contractAddress common.Address
topics []common.Hash
channel chan<- types.Log
Expand Down Expand Up @@ -129,7 +130,7 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) {
if err != nil {
logger.Error(
"Error getting next page",
zap.Int("fromBlock", fromBlock),
zap.Uint64("fromBlock", fromBlock),
zap.Error(err),
)
time.Sleep(ERROR_SLEEP_TIME)
Expand All @@ -147,7 +148,11 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) {
// reset self-termination timer
startTime = time.Now()

logger.Debug("Got logs", zap.Int("numLogs", len(logs)), zap.Int("fromBlock", fromBlock))
logger.Debug(
"Got logs",
zap.Int("numLogs", len(logs)),
zap.Uint64("fromBlock", fromBlock),
)
if len(logs) == 0 {
time.Sleep(NO_LOGS_SLEEP_TIME)
}
Expand All @@ -163,19 +168,26 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) {

func (r *RpcLogStreamer) getNextPage(
config contractConfig,
fromBlock int,
) (logs []types.Log, nextBlock *int, err error) {
fromBlock uint64,
) (logs []types.Log, nextBlock *uint64, err error) {
contractAddress := config.contractAddress.Hex()
highestBlock, err := r.client.BlockNumber(r.ctx)
if err != nil {
return nil, nil, err
}
metrics.EmitCurrentBlock(contractAddress, int(highestBlock))

highestBlockCanProcess := int(highestBlock) - LAG_FROM_HIGHEST_BLOCK
highestBlockCanProcess := highestBlock - LAG_FROM_HIGHEST_BLOCK
if fromBlock > highestBlockCanProcess {
return nil, nil, fmt.Errorf(
"fromBlock (%d) is higher than highest processable block (%d)",
fromBlock,
highestBlockCanProcess,
)
}
numOfBlocksToProcess := highestBlockCanProcess - fromBlock + 1

var to int
var to uint64
// Make sure we stay within a reasonable page size
if numOfBlocksToProcess > BACKFILL_BLOCKS {
// quick mode
Expand Down
6 changes: 3 additions & 3 deletions pkg/blockchain/rpcLogStreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
func buildStreamer(
t *testing.T,
client ChainClient,
fromBlock int,
fromBlock uint64,
address common.Address,
topic common.Hash,
) (*RpcLogStreamer, chan types.Log) {
Expand Down Expand Up @@ -56,8 +56,8 @@ func TestBuilder(t *testing.T) {
func TestRpcLogStreamer(t *testing.T) {
address := testutils.RandomAddress()
topic := testutils.RandomLogTopic()
fromBlock := 1
lastBlock := 10
fromBlock := uint64(1)
lastBlock := uint64(10)
logMessage := types.Log{
Address: address,
Topics: []common.Hash{topic},
Expand Down
4 changes: 2 additions & 2 deletions pkg/indexer/blockTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewBlockTracker(
queries: queries,
}

latestBlock, err := getLatestBlock(ctx, contractAddress, queries)
latestBlock, err := loadLatestBlock(ctx, contractAddress, queries)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func (bt *BlockTracker) updateDB(ctx context.Context, block uint64) error {
})
}

func getLatestBlock(
func loadLatestBlock(
ctx context.Context,
contractAddress string,
querier *queries.Queries,
Expand Down
42 changes: 32 additions & 10 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package indexer
import (
"context"
"database/sql"
"github.com/xmtp/xmtpd/pkg/tracing"
"sync"
"time"

"github.com/xmtp/xmtpd/pkg/tracing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (i *Indexer) StartIndexer(
builder := blockchain.NewRpcLogStreamBuilder(i.ctx, client, i.log)
querier := queries.New(db)

streamer, err := configureLogStream(i.ctx, builder, cfg)
streamer, err := configureLogStream(i.ctx, builder, cfg, querier)
if err != nil {
return err
}
Expand All @@ -87,6 +88,7 @@ func (i *Indexer) StartIndexer(
streamer.messagesChannel,
indexingLogger,
storer.NewGroupMessageStorer(querier, indexingLogger, messagesContract),
streamer.messagesBlockTracker,
)
})

Expand All @@ -111,6 +113,7 @@ func (i *Indexer) StartIndexer(
identityUpdatesContract,
validationService,
),
streamer.identityUpdatesBlockTracker,
)
})

Expand All @@ -119,23 +122,31 @@ func (i *Indexer) StartIndexer(
}

type builtStreamer struct {
streamer *blockchain.RpcLogStreamer
messagesChannel <-chan types.Log
identityUpdatesChannel <-chan types.Log
streamer *blockchain.RpcLogStreamer
messagesChannel <-chan types.Log
identityUpdatesChannel <-chan types.Log
identityUpdatesBlockTracker *BlockTracker
messagesBlockTracker *BlockTracker
}

func configureLogStream(
ctx context.Context,
builder *blockchain.RpcLogStreamBuilder,
cfg config.ContractsOptions,
querier *queries.Queries,
) (*builtStreamer, error) {
messagesTopic, err := buildMessagesTopic()
if err != nil {
return nil, err
}

messagesTracker, err := NewBlockTracker(ctx, cfg.MessagesContractAddress, querier)
if err != nil {
return nil, err
}

messagesChannel := builder.ListenForContractEvent(
0,
messagesTracker.GetLatestBlock(),
common.HexToAddress(cfg.MessagesContractAddress),
[]common.Hash{messagesTopic},
cfg.MaxChainDisconnectTime,
Expand All @@ -146,8 +157,13 @@ func configureLogStream(
return nil, err
}

identityUpdatesTracker, err := NewBlockTracker(ctx, cfg.IdentityUpdatesContractAddress, querier)
if err != nil {
return nil, err
}

identityUpdatesChannel := builder.ListenForContractEvent(
0,
identityUpdatesTracker.GetLatestBlock(),
common.HexToAddress(cfg.IdentityUpdatesContractAddress),
[]common.Hash{identityUpdatesTopic},
cfg.MaxChainDisconnectTime,
Expand All @@ -159,9 +175,11 @@ func configureLogStream(
}

return &builtStreamer{
streamer: streamer,
messagesChannel: messagesChannel,
identityUpdatesChannel: identityUpdatesChannel,
streamer: streamer,
messagesChannel: messagesChannel,
identityUpdatesChannel: identityUpdatesChannel,
identityUpdatesBlockTracker: identityUpdatesTracker,
messagesBlockTracker: messagesTracker,
}, nil
}

Expand All @@ -177,6 +195,7 @@ func indexLogs(
eventChannel <-chan types.Log,
logger *zap.Logger,
logStorer storer.LogStorer,
blockTracker IBlockTracker,
) {
var err storer.LogStorageError
// We don't need to listen for the ctx.Done() here, since the eventChannel will be closed when the parent context is canceled
Expand All @@ -192,6 +211,9 @@ func indexLogs(
}
} else {
logger.Info("Stored log", zap.Uint64("blockNumber", event.BlockNumber))
if trackerErr := blockTracker.UpdateLatestBlock(ctx, event.BlockNumber); trackerErr != nil {
logger.Error("error updating block tracker", zap.Error(trackerErr))
}
}
break Retry

Expand Down
15 changes: 10 additions & 5 deletions pkg/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,36 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/mock"
"github.com/xmtp/xmtpd/pkg/indexer/storer"
mocks "github.com/xmtp/xmtpd/pkg/mocks/storer"
indexerMocks "github.com/xmtp/xmtpd/pkg/mocks/indexer"
storerMocks "github.com/xmtp/xmtpd/pkg/mocks/storer"
"github.com/xmtp/xmtpd/pkg/testutils"
)

func TestIndexLogsSuccess(t *testing.T) {
channel := make(chan types.Log, 10)
defer close(channel)

logStorer := mocks.NewMockLogStorer(t)
logStorer := storerMocks.NewMockLogStorer(t)
blockTracker := indexerMocks.NewMockIBlockTracker(t)
blockTracker.EXPECT().UpdateLatestBlock(mock.Anything, mock.Anything).Return(nil)

event := types.Log{
Address: common.HexToAddress("0x123"),
}
logStorer.EXPECT().StoreLog(mock.Anything, event).Times(1).Return(nil)
channel <- event

go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer)
go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer, blockTracker)
time.Sleep(100 * time.Millisecond)
}

func TestIndexLogsRetryableError(t *testing.T) {
channel := make(chan types.Log, 10)
defer close(channel)

logStorer := mocks.NewMockLogStorer(t)
logStorer := storerMocks.NewMockLogStorer(t)
blockTracker := indexerMocks.NewMockIBlockTracker(t)
blockTracker.EXPECT().UpdateLatestBlock(mock.Anything, mock.Anything).Return(nil)

event := types.Log{
Address: common.HexToAddress("0x123"),
Expand All @@ -51,7 +56,7 @@ func TestIndexLogsRetryableError(t *testing.T) {
})
channel <- event

go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer)
go indexLogs(context.Background(), channel, testutils.NewLog(t), logStorer, blockTracker)
time.Sleep(200 * time.Millisecond)

logStorer.AssertNumberOfCalls(t, "StoreLog", 2)
Expand Down
8 changes: 8 additions & 0 deletions pkg/indexer/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package indexer

import "context"

type IBlockTracker interface {
GetLatestBlock() uint64
UpdateLatestBlock(ctx context.Context, block uint64) error
}
Loading

0 comments on commit 756e259

Please sign in to comment.