Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save blockchain message #120

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-xmtpd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ jobs:
file: ./dev/docker/Dockerfile
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
labels: ${{ steps.meta.outputs.labels }}
5 changes: 5 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
submodules: recursive
- uses: actions/setup-go@v3
with:
go-version-file: go.mod
- run: dev/docker/up
- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1
- run: dev/contracts/deploy-local
- name: Run Tests
run: |
export GOPATH="${HOME}/go/"
Expand Down
3 changes: 3 additions & 0 deletions dev/contracts/deploy-local
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

source dev/contracts/.env

# Make sure the build directory exists
mkdir -p ./build

cd ./contracts

# Deploy a contract and save the output (which includes the contract address) to a JSON file to be used in tests
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ContractsOptions struct {
RpcUrl string `long:"rpc-url" description:"Blockchain RPC URL"`
NodesContractAddress string `long:"nodes-address" description:"Node contract address"`
MessagesContractAddress string `long:"messages-address" description:"Message contract address"`
RefreshInterval time.Duration `long:"refresh-interval" description:"Refresh interval" default:"60s"`
RefreshInterval time.Duration `long:"refresh-interval" description:"Refresh interval for the nodes registry" default:"60s"`
}

type DbOptions struct {
Expand Down
11 changes: 11 additions & 0 deletions pkg/indexer/blockchain/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package blockchain

import (
"context"

"github.com/ethereum/go-ethereum/ethclient"
)

func NewClient(ctx context.Context, rpcUrl string) (*ethclient.Client, error) {
return ethclient.DialContext(ctx, rpcUrl)
}
12 changes: 4 additions & 8 deletions pkg/indexer/blockchain/rpcLogStreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type RpcLogStreamBuilder struct {
// All the listeners
contractConfigs []contractConfig
logger *zap.Logger
rpcUrl string
ethclient *ethclient.Client
}

func NewRpcLogStreamBuilder(rpcUrl string, logger *zap.Logger) *RpcLogStreamBuilder {
return &RpcLogStreamBuilder{rpcUrl: rpcUrl, logger: logger}
func NewRpcLogStreamBuilder(client *ethclient.Client, logger *zap.Logger) *RpcLogStreamBuilder {
return &RpcLogStreamBuilder{ethclient: client, logger: logger}
}

func (c *RpcLogStreamBuilder) ListenForContractEvent(
Expand All @@ -47,11 +47,7 @@ func (c *RpcLogStreamBuilder) ListenForContractEvent(
}

func (c *RpcLogStreamBuilder) Build() (*RpcLogStreamer, error) {
client, err := ethclient.Dial(c.rpcUrl)
if err != nil {
return nil, err
}
return NewRpcLogStreamer(client, c.logger, c.contractConfigs), nil
return NewRpcLogStreamer(c.ethclient, c.logger, c.contractConfigs), nil
}

// Struct defining all the information required to filter events from logs
Expand Down
9 changes: 4 additions & 5 deletions pkg/indexer/blockchain/rpcLogStreamer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blockchain

import (
"context"
big "math/big"
"testing"

Expand All @@ -15,10 +16,6 @@ import (
"go.uber.org/zap"
)

// Using a free RPC url so that the dial function works.
// May be unwise or flaky and we may need to reconsider
const RPC_URL = "https://nodes.mewapi.io/rpc/eth"

func buildStreamer(
t *testing.T,
client ChainClient,
Expand All @@ -39,7 +36,9 @@ func buildStreamer(
}

func TestBuilder(t *testing.T) {
builder := NewRpcLogStreamBuilder(RPC_URL, testutils.NewLog(t))
testclient, err := NewClient(context.Background(), testutils.GetContractsOptions(t).RpcUrl)
require.NoError(t, err)
builder := NewRpcLogStreamBuilder(testclient, testutils.NewLog(t))

listenerChannel := builder.ListenForContractEvent(
1,
Expand Down
24 changes: 22 additions & 2 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/config"
"github.com/xmtp/xmtpd/pkg/db/queries"
Expand All @@ -22,7 +23,11 @@ func StartIndexer(
queries *queries.Queries,
cfg config.ContractsOptions,
) error {
builder := blockchain.NewRpcLogStreamBuilder(cfg.RpcUrl, logger)
client, err := blockchain.NewClient(ctx, cfg.RpcUrl)
if err != nil {
return err
}
builder := blockchain.NewRpcLogStreamBuilder(client, logger)

messagesTopic, err := buildMessagesTopic()
if err != nil {
Expand All @@ -35,11 +40,16 @@ func StartIndexer(
[]common.Hash{messagesTopic},
)

messagesContract, err := messagesContract(cfg, client)
if err != nil {
return err
}

indexLogs(
ctx,
messagesChannel,
logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)),
storer.NewGroupMessageStorer(queries, logger),
storer.NewGroupMessageStorer(queries, logger, messagesContract),
)

streamer, err := builder.Build()
Expand Down Expand Up @@ -92,3 +102,13 @@ func buildMessagesTopic() (common.Hash, error) {
}
return utils.GetEventTopic(abi, "MessageSent")
}

func messagesContract(
cfg config.ContractsOptions,
client *ethclient.Client,
) (*abis.GroupMessages, error) {
return abis.NewGroupMessages(
common.HexToAddress(cfg.MessagesContractAddress),
client,
)
}
44 changes: 38 additions & 6 deletions pkg/indexer/storer/groupMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,55 @@ package storer

import (
"context"
"errors"
"fmt"

"github.com/ethereum/go-ethereum/core/types"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/db/queries"
"go.uber.org/zap"
)

type GroupMessageStorer struct {
neekolas marked this conversation as resolved.
Show resolved Hide resolved
queries *queries.Queries
logger *zap.Logger
contract *abis.GroupMessages
queries *queries.Queries
logger *zap.Logger
}

func NewGroupMessageStorer(queries *queries.Queries, logger *zap.Logger) *GroupMessageStorer {
return &GroupMessageStorer{queries: queries, logger: logger}
func NewGroupMessageStorer(
queries *queries.Queries,
logger *zap.Logger,
contract *abis.GroupMessages,
) *GroupMessageStorer {
return &GroupMessageStorer{queries: queries, logger: logger, contract: contract}
}

// Validate and store a group message log event
func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError {
return NewLogStorageError(errors.New("not implemented"), true)
msgSent, err := s.contract.ParseMessageSent(event)
if err != nil {
return NewLogStorageError(err, false)
}

// TODO:nm figure out topic structure
topic := buildTopic(msgSent.GroupId)

s.logger.Debug("Inserting message from contract", zap.String("topic", topic))

if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it right that we don't ever expect to process the same log twice? If so, is it problematic if we run into a unique constraint error here (which would mean the (originator ID, sequence ID) tuple already exists)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could if we have two instances of the replication node running, and both are indexing the chain.

If a node gets shut down mid-block, it'll also restart at the top of that block.

The only hard constraint is that they are both processing messages in ascending order.

// We may not want to hardcode this to 0 and have an originator ID for each smart contract?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understanding this a little bit better - what other smart contracts are we envisioning here? Is this related to sharding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And maybe upgradeability. At some point we are going to replace the smart contract and we have some more optionality if we've designed around it having its own originator ID where we can handle payloads differently.

Smart contract cutover would be a complicated migration to synchronize and I don't have all the answers on how to do it. But more options feels better.

OriginatorID: 0,
OriginatorSequenceID: int64(msgSent.SequenceId),
Topic: []byte(topic),
OriginatorEnvelope: msgSent.Message, // TODO:nm parse originator envelope and do some validation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably share this validation code, if you don't want to deal with it

}); err != nil {
s.logger.Error("Error inserting envelope from smart contract", zap.Error(err))
return NewLogStorageError(err, true)
}

return nil
}

func buildTopic(groupId [32]byte) string {
// We should think about simplifying the topics, since backwards compatibility shouldn't really matter here
return fmt.Sprintf("/xmtp/1/g-%x/proto", groupId)
}
123 changes: 123 additions & 0 deletions pkg/indexer/storer/groupMessage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package storer

import (
"context"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/indexer/blockchain"
testutils "github.com/xmtp/xmtpd/pkg/testing"
"github.com/xmtp/xmtpd/pkg/utils"
)

func buildGroupMessageStorer(t *testing.T) (*GroupMessageStorer, func()) {
ctx, cancel := context.WithCancel(context.Background())
db, _, cleanup := testutils.NewDB(t, ctx)
queryImpl := queries.New(db)
config := testutils.GetContractsOptions(t)
contractAddress := config.MessagesContractAddress

client, err := blockchain.NewClient(ctx, config.RpcUrl)
require.NoError(t, err)
contract, err := abis.NewGroupMessages(
common.HexToAddress(contractAddress),
client,
)

require.NoError(t, err)
storer := NewGroupMessageStorer(queryImpl, testutils.NewLog(t), contract)

return storer, func() {
cancel()
cleanup()
}
}

func TestStoreGroupMessages(t *testing.T) {
ctx := context.Background()
storer, cleanup := buildGroupMessageStorer(t)
defer cleanup()

var groupID [32]byte
copy(groupID[:], testutils.RandomBytes(32))
message := testutils.RandomBytes(30)
sequenceID := uint64(1)

logMessage := testutils.BuildMessageSentLog(t, groupID, message, sequenceID)

err := storer.StoreLog(
ctx,
logMessage,
)
require.NoError(t, err)

envelopes, queryErr := storer.queries.SelectGatewayEnvelopes(
ctx,
queries.SelectGatewayEnvelopesParams{OriginatorNodeID: db.NullInt32(0)},
)
require.NoError(t, queryErr)

require.Equal(t, len(envelopes), 1)

firstEnvelope := envelopes[0]
require.Equal(t, firstEnvelope.OriginatorEnvelope, message)
}

func TestStoreGroupMessageDuplicate(t *testing.T) {
ctx := context.Background()
storer, cleanup := buildGroupMessageStorer(t)
defer cleanup()

var groupID [32]byte
copy(groupID[:], testutils.RandomBytes(32))
message := testutils.RandomBytes(30)
sequenceID := uint64(1)

logMessage := testutils.BuildMessageSentLog(t, groupID, message, sequenceID)

err := storer.StoreLog(
ctx,
logMessage,
)
require.NoError(t, err)
// Store the log a second time
err = storer.StoreLog(
ctx,
logMessage,
)
require.NoError(t, err)

envelopes, queryErr := storer.queries.SelectGatewayEnvelopes(
ctx,
queries.SelectGatewayEnvelopesParams{OriginatorNodeID: db.NullInt32(0)},
)
require.NoError(t, queryErr)

require.Equal(t, len(envelopes), 1)
}

func TestStoreGroupMessageMalformed(t *testing.T) {
ctx := context.Background()
storer, cleanup := buildGroupMessageStorer(t)
defer cleanup()

abi, err := abis.GroupMessagesMetaData.GetAbi()
require.NoError(t, err)

topic, err := utils.GetEventTopic(abi, "MessageSent")
require.NoError(t, err)

logMessage := types.Log{
Topics: []common.Hash{topic},
Data: []byte("foo"),
}

storageErr := storer.StoreLog(ctx, logMessage)
require.Error(t, storageErr)
require.False(t, storageErr.ShouldRetry())
}
2 changes: 1 addition & 1 deletion pkg/mocks/mock_ChainClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/mock_LogStorer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/mock_NodeRegistry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mocks/mock_NodesContract.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading