Skip to content

Commit

Permalink
Save blockchain message
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Aug 29, 2024
1 parent 650bf24 commit f7b21e2
Show file tree
Hide file tree
Showing 15 changed files with 332 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-xmtpd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ jobs:
file: ./dev/docker/Dockerfile
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
labels: ${{ steps.meta.outputs.labels }}
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ jobs:
with:
go-version-file: go.mod
- run: dev/docker/up
- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1
- run: dev/contracts/deploy-local
- name: Run Tests
run: |
export GOPATH="${HOME}/go/"
Expand Down
3 changes: 3 additions & 0 deletions dev/contracts/deploy-local
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

source dev/contracts/.env

# Make sure the build directory exists
mkdir -p ./build

cd ./contracts

# Deploy a contract and save the output (which includes the contract address) to a JSON file to be used in tests
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ContractsOptions struct {
RpcUrl string `long:"rpc-url" description:"Blockchain RPC URL"`
NodesContractAddress string `long:"nodes-address" description:"Node contract address"`
MessagesContractAddress string `long:"messages-address" description:"Message contract address"`
RefreshInterval time.Duration `long:"refresh-interval" description:"Refresh interval" default:"60s"`
RefreshInterval time.Duration `long:"refresh-interval" description:"Refresh interval for the nodes registry" default:"60s"`
}

type DbOptions struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/queries/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions pkg/indexer/blockchain/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package blockchain

import (
"context"

"github.com/ethereum/go-ethereum/ethclient"
)

func NewClient(ctx context.Context, rpcUrl string) (*ethclient.Client, error) {
return ethclient.DialContext(ctx, rpcUrl)
}
12 changes: 4 additions & 8 deletions pkg/indexer/blockchain/rpcLogStreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type RpcLogStreamBuilder struct {
// All the listeners
contractConfigs []contractConfig
logger *zap.Logger
rpcUrl string
ethclient *ethclient.Client
}

func NewRpcLogStreamBuilder(rpcUrl string, logger *zap.Logger) *RpcLogStreamBuilder {
return &RpcLogStreamBuilder{rpcUrl: rpcUrl, logger: logger}
func NewRpcLogStreamBuilder(client *ethclient.Client, logger *zap.Logger) *RpcLogStreamBuilder {
return &RpcLogStreamBuilder{ethclient: client, logger: logger}
}

func (c *RpcLogStreamBuilder) ListenForContractEvent(
Expand All @@ -47,11 +47,7 @@ func (c *RpcLogStreamBuilder) ListenForContractEvent(
}

func (c *RpcLogStreamBuilder) Build() (*RpcLogStreamer, error) {
client, err := ethclient.Dial(c.rpcUrl)
if err != nil {
return nil, err
}
return NewRpcLogStreamer(client, c.logger, c.contractConfigs), nil
return NewRpcLogStreamer(c.ethclient, c.logger, c.contractConfigs), nil
}

// Struct defining all the information required to filter events from logs
Expand Down
9 changes: 4 additions & 5 deletions pkg/indexer/blockchain/rpcLogStreamer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blockchain

import (
"context"
big "math/big"
"testing"

Expand All @@ -15,10 +16,6 @@ import (
"go.uber.org/zap"
)

// Using a free RPC url so that the dial function works.
// May be unwise or flaky and we may need to reconsider
const RPC_URL = "https://nodes.mewapi.io/rpc/eth"

func buildStreamer(
t *testing.T,
client ChainClient,
Expand All @@ -39,7 +36,9 @@ func buildStreamer(
}

func TestBuilder(t *testing.T) {
builder := NewRpcLogStreamBuilder(RPC_URL, testutils.NewLog(t))
testclient, err := NewClient(context.Background(), testutils.GetContractsOptions(t).RpcUrl)
require.NoError(t, err)
builder := NewRpcLogStreamBuilder(testclient, testutils.NewLog(t))

listenerChannel := builder.ListenForContractEvent(
1,
Expand Down
24 changes: 22 additions & 2 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/config"
"github.com/xmtp/xmtpd/pkg/db/queries"
Expand All @@ -22,7 +23,11 @@ func StartIndexer(
queries *queries.Queries,
cfg config.ContractsOptions,
) error {
builder := blockchain.NewRpcLogStreamBuilder(cfg.RpcUrl, logger)
client, err := blockchain.NewClient(ctx, cfg.RpcUrl)
if err != nil {
return err
}
builder := blockchain.NewRpcLogStreamBuilder(client, logger)

messagesTopic, err := buildMessagesTopic()
if err != nil {
Expand All @@ -35,11 +40,16 @@ func StartIndexer(
[]common.Hash{messagesTopic},
)

messagesContract, err := messagesContract(cfg, client)
if err != nil {
return err
}

indexLogs(
ctx,
messagesChannel,
logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)),
storer.NewGroupMessageStorer(queries, logger),
storer.NewGroupMessageStorer(queries, logger, messagesContract),
)

streamer, err := builder.Build()
Expand Down Expand Up @@ -92,3 +102,13 @@ func buildMessagesTopic() (common.Hash, error) {
}
return utils.GetEventTopic(abi, "MessageSent")
}

func messagesContract(
cfg config.ContractsOptions,
client *ethclient.Client,
) (*abis.GroupMessages, error) {
return abis.NewGroupMessages(
common.HexToAddress(cfg.MessagesContractAddress),
client,
)
}
44 changes: 38 additions & 6 deletions pkg/indexer/storer/groupMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,55 @@ package storer

import (
"context"
"errors"
"fmt"

"github.com/ethereum/go-ethereum/core/types"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/db/queries"
"go.uber.org/zap"
)

type GroupMessageStorer struct {
queries *queries.Queries
logger *zap.Logger
contract *abis.GroupMessages
queries *queries.Queries
logger *zap.Logger
}

func NewGroupMessageStorer(queries *queries.Queries, logger *zap.Logger) *GroupMessageStorer {
return &GroupMessageStorer{queries: queries, logger: logger}
func NewGroupMessageStorer(
queries *queries.Queries,
logger *zap.Logger,
contract *abis.GroupMessages,
) *GroupMessageStorer {
return &GroupMessageStorer{queries: queries, logger: logger, contract: contract}
}

// Validate and store a group message log event
func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError {
return NewLogStorageError(errors.New("not implemented"), true)
msgSent, err := s.contract.ParseMessageSent(event)
if err != nil {
return NewLogStorageError(err, false)
}

// TODO:nm figure out topic structure
topic := buildTopic(msgSent.GroupId)

s.logger.Debug("Inserting message from contract", zap.String("topic", topic))

if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{
// We may not want to hardcode this to 0 and have an originator ID for each smart contract?
OriginatorID: 0,
OriginatorSequenceID: int64(msgSent.SequenceId),
Topic: []byte(topic),
OriginatorEnvelope: msgSent.Message, // TODO:nm parse originator envelope and do some validation
}); err != nil {
s.logger.Error("Error inserting envelope from smart contract", zap.Error(err))
return NewLogStorageError(err, true)
}

return nil
}

func buildTopic(groupId [32]byte) string {
// We should think about simplifying the topics, since backwards compatibility shouldn't really matter here
return fmt.Sprintf("/xmtp/1/g-%x/proto", groupId)
}
123 changes: 123 additions & 0 deletions pkg/indexer/storer/groupMessage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package storer

import (
"context"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/indexer/blockchain"
testutils "github.com/xmtp/xmtpd/pkg/testing"
"github.com/xmtp/xmtpd/pkg/utils"
)

func buildGroupMessageStorer(t *testing.T) (*GroupMessageStorer, func()) {
ctx, cancel := context.WithCancel(context.Background())
db, _, cleanup := testutils.NewDB(t, ctx)
queryImpl := queries.New(db)
config := testutils.GetContractsOptions(t)
contractAddress := config.MessagesContractAddress

client, err := blockchain.NewClient(ctx, config.RpcUrl)
require.NoError(t, err)
contract, err := abis.NewGroupMessages(
common.HexToAddress(contractAddress),
client,
)

require.NoError(t, err)
storer := NewGroupMessageStorer(queryImpl, testutils.NewLog(t), contract)

return storer, func() {
cancel()
cleanup()
}
}

func TestStoreGroupMessages(t *testing.T) {
ctx := context.Background()
storer, cleanup := buildGroupMessageStorer(t)
defer cleanup()

var groupID [32]byte
copy(groupID[:], testutils.RandomBytes(32))
message := testutils.RandomBytes(30)
sequenceID := uint64(1)

logMessage := testutils.BuildMessageSentLog(t, groupID, message, sequenceID)

err := storer.StoreLog(
ctx,
logMessage,
)
require.NoError(t, err)

envelopes, queryErr := storer.queries.SelectGatewayEnvelopes(
ctx,
queries.SelectGatewayEnvelopesParams{OriginatorNodeID: db.NullInt32(0)},
)
require.NoError(t, queryErr)

require.Equal(t, len(envelopes), 1)

firstEnvelope := envelopes[0]
require.Equal(t, firstEnvelope.OriginatorEnvelope, message)
}

func TestStoreGroupMessageDuplicate(t *testing.T) {
ctx := context.Background()
storer, cleanup := buildGroupMessageStorer(t)
defer cleanup()

var groupID [32]byte
copy(groupID[:], testutils.RandomBytes(32))
message := testutils.RandomBytes(30)
sequenceID := uint64(1)

logMessage := testutils.BuildMessageSentLog(t, groupID, message, sequenceID)

err := storer.StoreLog(
ctx,
logMessage,
)
require.NoError(t, err)
// Store the log a second time
err = storer.StoreLog(
ctx,
logMessage,
)
require.NoError(t, err)

envelopes, queryErr := storer.queries.SelectGatewayEnvelopes(
ctx,
queries.SelectGatewayEnvelopesParams{OriginatorNodeID: db.NullInt32(0)},
)
require.NoError(t, queryErr)

require.Equal(t, len(envelopes), 1)
}

func TestStoreGroupMessageMalformed(t *testing.T) {
ctx := context.Background()
storer, cleanup := buildGroupMessageStorer(t)
defer cleanup()

abi, err := abis.GroupMessagesMetaData.GetAbi()
require.NoError(t, err)

topic, err := utils.GetEventTopic(abi, "MessageSent")
require.NoError(t, err)

logMessage := types.Log{
Topics: []common.Hash{topic},
Data: []byte("foo"),
}

storageErr := storer.StoreLog(ctx, logMessage)
require.Error(t, storageErr)
require.False(t, storageErr.ShouldRetry())
}
Loading

0 comments on commit f7b21e2

Please sign in to comment.