From f5efc2b50277304fc404d8ff540660a326d256d2 Mon Sep 17 00:00:00 2001 From: Tangui Clairet <181825613+Tangui-Bitfly@users.noreply.github.com> Date: Wed, 27 Nov 2024 11:11:45 +0100 Subject: [PATCH] feat(data store): update bigtable index for better querying feat(data store): add with filter --- backend/internal/e2e/data_test.go | 96 +++ backend/pkg/commons/db2/data/data.go | 185 ++---- .../commons/db2/data/data_external_test.go | 8 +- backend/pkg/commons/db2/data/data_test.go | 127 ++-- backend/pkg/commons/db2/data/filter.go | 267 ++------ backend/pkg/commons/db2/data/filter_test.go | 169 +++-- backend/pkg/commons/db2/data/keys.go | 312 ++++------ backend/pkg/commons/db2/data/option.go | 119 ++-- backend/pkg/commons/db2/data/parser.go | 58 ++ backend/pkg/commons/db2/database/bigtable.go | 102 ++- .../pkg/commons/db2/database/bigtable_test.go | 38 +- backend/pkg/commons/db2/database/option.go | 36 +- backend/pkg/commons/db2/database/remote.go | 2 +- backend/pkg/commons/db2/database/store.go | 2 +- .../pkg/commons/db2/databasetest/bigtable.go | 1 + backend/pkg/commons/db2/metadata/key.go | 54 ++ backend/pkg/commons/indexer/cache.go | 15 + backend/pkg/commons/indexer/indexer.go | 67 ++ backend/pkg/commons/indexer/indexer_test.go | 71 +++ backend/pkg/commons/indexer/tranformer.go | 163 +++++ backend/pkg/commons/rpc/erigon.go | 12 +- backend/pkg/commons/types/eth1.pb.go | 582 ++++++++++-------- backend/pkg/commons/types/eth1.proto | 17 + 23 files changed, 1500 insertions(+), 1003 deletions(-) create mode 100644 backend/internal/e2e/data_test.go create mode 100644 backend/pkg/commons/db2/data/parser.go create mode 100644 backend/pkg/commons/db2/metadata/key.go create mode 100644 backend/pkg/commons/indexer/cache.go create mode 100644 backend/pkg/commons/indexer/indexer.go create mode 100644 backend/pkg/commons/indexer/indexer_test.go create mode 100644 backend/pkg/commons/indexer/tranformer.go diff --git a/backend/internal/e2e/data_test.go b/backend/internal/e2e/data_test.go new file mode 100644 index 000000000..8acb6c715 --- /dev/null +++ b/backend/internal/e2e/data_test.go @@ -0,0 +1,96 @@ +package e2e + +import ( + "context" + "encoding/hex" + "fmt" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + + "github.com/gobitfly/beaconchain/internal/th" + "github.com/gobitfly/beaconchain/pkg/commons/db2/data" + "github.com/gobitfly/beaconchain/pkg/commons/db2/database" + "github.com/gobitfly/beaconchain/pkg/commons/db2/databasetest" + "github.com/gobitfly/beaconchain/pkg/commons/indexer" + "github.com/gobitfly/beaconchain/pkg/commons/rpc" + "github.com/gobitfly/beaconchain/pkg/commons/types" +) + +func TestStoreWithBackend(t *testing.T) { + clientBT, adminBT := databasetest.NewBigTable(t) + bigtable, err := database.NewBigTableWithClient(context.Background(), clientBT, adminBT, data.Schema) + if err != nil { + t.Fatal(err) + } + + store := data.NewStore(database.Wrap(bigtable, data.Table)) + backend := th.NewBackend(t) + _, usdt := backend.DeployToken(t, "usdt", "usdt", backend.BankAccount.From) + + transform := indexer.NewTransformer(indexer.NoopCache{}) + indexer := indexer.New(store, transform.Tx, transform.ERC20) + + client, err := rpc.NewErigonClient(backend.Endpoint) + if err != nil { + t.Fatal(err) + } + + var addresses []common.Address + for i := 0; i < 10; i++ { + temp := th.CreateEOA(t) + addresses = append(addresses, temp.From) + for j := 0; j < 25; j++ { + if err := backend.Client().SendTransaction(context.Background(), backend.MakeTx(t, backend.BankAccount, &temp.From, big.NewInt(1), nil)); err != nil { + t.Fatal(err) + } + if _, err := usdt.Mint(backend.BankAccount.TransactOpts, temp.From, big.NewInt(1)); err != nil { + t.Fatal(i, j, err) + } + backend.Commit() + lastBlock, err := backend.Client().BlockNumber(context.Background()) + if err != nil { + t.Fatal(err) + } + block, _, err := client.GetBlock(int64(lastBlock), "geth") + if err != nil { + t.Fatal(err) + } + if err := indexer.IndexBlocksWithTransformers(fmt.Sprintf("%d", backend.ChainID), []*types.Eth1Block{block}); err != nil { + t.Fatal(err) + } + } + } + + t.Run("get interactions", func(t *testing.T) { + efficiencies := make(map[string]int64) + interactions, _, err := store.Get(addresses, nil, 50, data.WithDatabaseStats(func(msg string, args ...any) { + var efficiency int64 + var rowRange string + for i := 0; i < len(args); i = i + 2 { + if args[i].(string) == database.KeyStatEfficiency { + efficiency = args[i+1].(int64) + } + if args[i].(string) == database.KeyStatRange { + rowRange = args[i+1].(string) + } + } + efficiencies[rowRange] = efficiency + })) + if err != nil { + t.Fatal(err) + } + for _, interaction := range interactions { + t.Log(interaction.Type, interaction.ChainID, "0x"+interaction.From, "0x"+interaction.To, "0x"+hex.EncodeToString(interaction.Hash), interaction.Time) + } + if got, want := len(efficiencies), len(addresses); got != want { + t.Errorf("got %d want %d", got, want) + } + for rowRange, efficiency := range efficiencies { + if got, want := efficiency, int64(1); got != want { + t.Errorf("efficiency for %s: got %d, want %d", rowRange, got, want) + } + } + }) +} diff --git a/backend/pkg/commons/db2/data/data.go b/backend/pkg/commons/db2/data/data.go index 24879dc01..82690245e 100644 --- a/backend/pkg/commons/db2/data/data.go +++ b/backend/pkg/commons/db2/data/data.go @@ -26,112 +26,45 @@ func NewStore(store database.Database) Store { } } -type TransferWithIndexes struct { - Indexed *types.Eth1ERC20Indexed - TxIndex int - LogIndex int -} - -func (store Store) BlockERC20TransfersToItems(chainID string, transfers []TransferWithIndexes) (map[string][]database.Item, error) { - items := make(map[string][]database.Item) - for _, transfer := range transfers { - b, err := proto.Marshal(transfer.Indexed) - if err != nil { - return nil, err - } - key := keyERC20(chainID, transfer.Indexed.ParentHash, transfer.LogIndex) - item := []database.Item{{Family: defaultFamily, Column: key}} - items[key] = []database.Item{{Family: defaultFamily, Column: dataColumn, Data: b}} - - items[keyERC20Time(chainID, transfer.Indexed, transfer.Indexed.From, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20Time(chainID, transfer.Indexed, transfer.Indexed.To, transfer.TxIndex, transfer.LogIndex)] = item - - items[keyERC20ContractAllTime(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20ContractTime(chainID, transfer.Indexed, transfer.Indexed.From, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20ContractTime(chainID, transfer.Indexed, transfer.Indexed.To, transfer.TxIndex, transfer.LogIndex)] = item - - items[keyERC20To(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20From(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20Sent(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item - items[keyERC20Received(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item - } - return items, nil +func (store Store) AddItems(items map[string][]database.Item) error { + return store.db.BulkAdd(items) } func (store Store) AddBlockERC20Transfers(chainID string, transactions []TransferWithIndexes) error { - items, err := store.BlockERC20TransfersToItems(chainID, transactions) + items, err := BlockERC20TransfersToItemsV2(chainID, transactions) if err != nil { return err } return store.db.BulkAdd(items) } -func (store Store) BlockTransactionsToItems(chainID string, transactions []*types.Eth1TransactionIndexed) (map[string][]database.Item, error) { - items := make(map[string][]database.Item) - for i, transaction := range transactions { - b, err := proto.Marshal(transaction) - if err != nil { - return nil, err - } - key := keyTx(chainID, transaction.GetHash()) - item := []database.Item{{Family: defaultFamily, Column: key}} - items[key] = []database.Item{{Family: defaultFamily, Column: dataColumn, Data: b}} - items[keyTxSent(chainID, transaction, i)] = item - items[keyTxReceived(chainID, transaction, i)] = item - - items[keyTxTime(chainID, transaction, transaction.To, i)] = item - items[keyTxBlock(chainID, transaction, transaction.To, i)] = item - items[keyTxMethod(chainID, transaction, transaction.To, i)] = item - - items[keyTxTime(chainID, transaction, transaction.From, i)] = item - items[keyTxBlock(chainID, transaction, transaction.From, i)] = item - items[keyTxMethod(chainID, transaction, transaction.From, i)] = item - - if transaction.ErrorMsg != "" { - items[keyTxError(chainID, transaction, transaction.To, i)] = item - items[keyTxError(chainID, transaction, transaction.From, i)] = item - } - - if transaction.IsContractCreation { - items[keyTxContractCreation(chainID, transaction, transaction.To, i)] = item - items[keyTxContractCreation(chainID, transaction, transaction.From, i)] = item - } - } - return items, nil -} - func (store Store) AddBlockTransactions(chainID string, transactions []*types.Eth1TransactionIndexed) error { - items, err := store.BlockTransactionsToItems(chainID, transactions) + items, err := BlockTransactionsToItemsV2(chainID, transactions) if err != nil { return err } return store.db.BulkAdd(items) } -func (store Store) Get(chainIDs []string, addresses []common.Address, prefixes map[string]map[string]string, limit int64, opts ...Option) ([]*Interaction, map[string]map[string]string, error) { - sources := map[formatType]unMarshalInteraction{ - typeTx: unMarshalTx, - typeTransfer: unMarshalTransfer, - } +func (store Store) Get(addresses []common.Address, prefixes map[string]string, limit int64, opts ...Option) ([]*Interaction, map[string]string, error) { options := apply(opts) - if options.ignoreTxs { - delete(sources, typeTx) + + filter, err := newQueryFilter(options) + if err != nil { + return nil, nil, err } - if options.ignoreTransfers { - delete(sources, typeTransfer) + databaseOptions := []database.Option{ + database.WithLimit(limit), + database.WithOpenRange(true), } - var interactions []*interactionWithInfo - for interactionType, unMarshalFunc := range sources { - filter, err := makeFilters(options, interactionType) - if err != nil { - return nil, nil, err - } - temp, err := store.getBy(unMarshalFunc, chainIDs, addresses, prefixes, limit, filter) - if err != nil { - return nil, nil, err - } - interactions = append(interactions, temp...) + if options.statsReporter != nil { + databaseOptions = append(databaseOptions, database.WithStats(options.statsReporter)) + } + interactions, err := store.getBy(addresses, prefixes, filter, databaseOptions) + if err != nil { + return nil, nil, err } + sort.Sort(byTimeDesc(interactions)) if int64(len(interactions)) > limit { interactions = interactions[:limit] @@ -139,59 +72,59 @@ func (store Store) Get(chainIDs []string, addresses []common.Address, prefixes m var res []*Interaction if prefixes == nil { - prefixes = make(map[string]map[string]string) + prefixes = make(map[string]string) } for i := 0; i < len(interactions); i++ { - if prefixes[interactions[i].chainID] == nil { - prefixes[interactions[i].chainID] = make(map[string]string) - } - prefixes[interactions[i].chainID][interactions[i].root] = interactions[i].key + prefixes[interactions[i].root] = interactions[i].key res = append(res, interactions[i].Interaction) } return res, prefixes, nil } -func (store Store) getBy(unMarshal unMarshalInteraction, chainIDs []string, addresses []common.Address, prefixes map[string]map[string]string, limit int64, condition filter) ([]*interactionWithInfo, error) { +func (store Store) getBy(addresses []common.Address, prefixes map[string]string, condition filter, databaseOptions []database.Option) ([]*interactionWithInfo, error) { var interactions []*interactionWithInfo - for _, chainID := range chainIDs { - for _, address := range addresses { - root := condition.get(chainID, address) - prefix := root - if prefixes != nil && prefixes[chainID] != nil && prefixes[chainID][root] != "" { - prefix = prefixes[chainID][root] + for _, address := range addresses { + root := condition.get(address) + prefix := root + if prefixes != nil && prefixes[root] != "" { + prefix = prefixes[root] + } + upper := condition.limit(root) + indexRows, err := store.db.GetRowsRange(upper, prefix, databaseOptions...) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + continue } - upper := condition.limit(root) - indexRows, err := store.db.GetRowsRange(upper, prefix, database.WithLimit(limit), database.WithOpenRange(true)) - if err != nil { - if errors.Is(err, database.ErrNotFound) { - continue - } - return nil, err + return nil, err + } + txKeys := make(map[string]string) + for _, row := range indexRows { + for key := range row.Values { + txKey := strings.TrimPrefix(key, fmt.Sprintf("%s:", defaultFamily)) + txKeys[txKey] = row.Key } - txKeys := make(map[string]string) - for _, row := range indexRows { - for key := range row.Values { - txKey := strings.TrimPrefix(key, fmt.Sprintf("%s:", defaultFamily)) - txKeys[txKey] = row.Key - } + } + txRows, err := store.db.GetRowsWithKeys(maps.Keys(txKeys)) + if err != nil { + return nil, err + } + for _, row := range txRows { + parts := strings.Split(row.Key, ":") + unMarshal := unMarshalTx + if parts[0] == "ERC20" { + unMarshal = unMarshalTransfer } - txRows, err := store.db.GetRowsWithKeys(maps.Keys(txKeys)) + interaction, err := unMarshal(row.Values[fmt.Sprintf("%s:%s", defaultFamily, dataColumn)]) if err != nil { return nil, err } - for _, row := range txRows { - interaction, err := unMarshal(row.Values[fmt.Sprintf("%s:%s", defaultFamily, dataColumn)]) - if err != nil { - return nil, err - } - interaction.ChainID = chainID - interactions = append(interactions, &interactionWithInfo{ - Interaction: interaction, - chainID: chainID, - root: root, - key: txKeys[row.Key], - }) - } + interaction.ChainID = parts[1] + interactions = append(interactions, &interactionWithInfo{ + Interaction: interaction, + chainID: parts[1], + root: root, + key: txKeys[row.Key], + }) } } return interactions, nil @@ -231,8 +164,6 @@ type Interaction struct { var erc20Transfer, _ = hex.DecodeString("a9059cbb") -type unMarshalInteraction func(b []byte) (*Interaction, error) - func unMarshalTx(b []byte) (*Interaction, error) { tx := &types.Eth1TransactionIndexed{} if err := proto.Unmarshal(b, tx); err != nil { diff --git a/backend/pkg/commons/db2/data/data_external_test.go b/backend/pkg/commons/db2/data/data_external_test.go index 04afd2bd3..efdaa9907 100644 --- a/backend/pkg/commons/db2/data/data_external_test.go +++ b/backend/pkg/commons/db2/data/data_external_test.go @@ -77,7 +77,7 @@ func TestStoreExternal(t *testing.T) { chainIDs: chainIDs, addresses: addresses, opts: []data.Option{ - data.IgnoreTransfers(), + data.OnlyTransactions(), data.ByMethod("a9059cbb"), }, }, @@ -87,15 +87,15 @@ func TestStoreExternal(t *testing.T) { chainIDs: chainIDs, addresses: addresses, opts: []data.Option{ - data.IgnoreTransactions(), + data.OnlyTransactions(), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var lastPrefixes map[string]map[string]string + var lastPrefixes map[string]string for i := 0; i < tt.scroll+1; i++ { - interactions, prefixes, err := store.Get(chainIDs, addresses, lastPrefixes, tt.limit, tt.opts...) + interactions, prefixes, err := store.Get(addresses, lastPrefixes, tt.limit, tt.opts...) if err != nil { t.Fatal(err) } diff --git a/backend/pkg/commons/db2/data/data_test.go b/backend/pkg/commons/db2/data/data_test.go index 0942331a6..376c34be1 100644 --- a/backend/pkg/commons/db2/data/data_test.go +++ b/backend/pkg/commons/db2/data/data_test.go @@ -3,13 +3,11 @@ package data import ( "context" "encoding/hex" - "slices" - "sort" + "fmt" "testing" "time" "github.com/ethereum/go-ethereum/common" - "golang.org/x/exp/maps" "google.golang.org/protobuf/types/known/timestamppb" "github.com/gobitfly/beaconchain/pkg/commons/db2/database" @@ -22,6 +20,7 @@ var ( bob = common.HexToAddress("0x000000000000000000000000000000000000beef") carl = common.HexToAddress("0x000000000000000000000000000000000000cafe") usdc = common.HexToAddress("0x000000000000000000000000000000000000dead") + dai = common.HexToAddress("0x000000000000000000000000000000000000eeee") ) func TestStore(t *testing.T) { @@ -31,15 +30,13 @@ func TestStore(t *testing.T) { if err != nil { t.Fatal(err) } - store := Store{ - db: database.Wrap(s, Table), - } + + store := NewStore(database.Wrap(s, Table)) tests := []struct { name string txs map[string][][]*types.Eth1TransactionIndexed // map[chainID][block][txPosition]*types.Eth1TransactionIndexed transfers map[string][][]TransferWithIndexes - limit int64 opts []Option addresses []common.Address expectedHashes []string @@ -53,7 +50,6 @@ func TestStore(t *testing.T) { {newTx("hash3", alice, bob, "", 2)}, }, }, - limit: 1, addresses: []common.Address{alice}, expectedHashes: []string{"hash3", "hash2", "hash1"}, }, @@ -65,7 +61,6 @@ func TestStore(t *testing.T) { {newTx("hash2", carl, bob, "", 1)}, }, }, - limit: 2, addresses: []common.Address{alice, carl}, expectedHashes: []string{"hash2", "hash1"}, }, @@ -79,7 +74,6 @@ func TestStore(t *testing.T) { {newTx("hash4", carl, bob, "", 3)}, }, }, - limit: 2, addresses: []common.Address{alice, carl}, expectedHashes: []string{"hash4", "hash3", "hash2", "hash1"}, }, @@ -95,7 +89,6 @@ func TestStore(t *testing.T) { {newTx("hash4", carl, bob, "", 3)}, }, }, - limit: 2, addresses: []common.Address{alice, carl}, expectedHashes: []string{"hash4", "hash3", "hash2", "hash1"}, }, @@ -111,7 +104,6 @@ func TestStore(t *testing.T) { {newTx("hash4", alice, bob, "", 3)}, }, }, - limit: 2, addresses: []common.Address{alice, carl}, expectedHashes: []string{"hash4", "hash3", "hash2", "hash1"}, }, @@ -124,8 +116,7 @@ func TestStore(t *testing.T) { {newTx("hash3", carl, bob, "foo", 2)}, }, }, - limit: 1, - opts: []Option{IgnoreTransfers(), ByMethod(hex.EncodeToString([]byte("foo")))}, + opts: []Option{OnlyTransactions(), ByMethod(hex.EncodeToString([]byte("foo")))}, addresses: []common.Address{alice}, expectedHashes: []string{"hash1"}, }, @@ -138,8 +129,7 @@ func TestStore(t *testing.T) { {newTx("hash3", alice, bob, "", 2)}, }, }, - limit: 1, - opts: []Option{WithTimeRange(timestamppb.New(t0), timestamppb.New(t0.Add(1*time.Second)))}, + opts: []Option{WithTimeRange(timestamppb.New(t0), timestamppb.New(t1))}, addresses: []common.Address{alice}, expectedHashes: []string{"hash2", "hash1"}, }, @@ -152,7 +142,6 @@ func TestStore(t *testing.T) { {newTx("hash3", alice, bob, "", 2)}, }, }, - limit: 1, opts: []Option{OnlySent()}, addresses: []common.Address{alice}, expectedHashes: []string{"hash3", "hash1"}, @@ -166,11 +155,11 @@ func TestStore(t *testing.T) { {newTx("hash3", alice, bob, "", 2)}, }, }, - limit: 1, opts: []Option{OnlyReceived()}, addresses: []common.Address{bob}, expectedHashes: []string{"hash3", "hash1"}, - }, { + }, + { name: "only transfers", txs: map[string][][]*types.Eth1TransactionIndexed{ "1": { @@ -183,8 +172,7 @@ func TestStore(t *testing.T) { {newTransfer("hash3", alice, bob, common.Address{}, 2)}, }, }, - limit: 1, - opts: []Option{IgnoreTransactions()}, + opts: []Option{OnlyTransfers()}, addresses: []common.Address{alice}, expectedHashes: []string{"hash3", "hash1"}, }, @@ -201,8 +189,7 @@ func TestStore(t *testing.T) { {newTransfer("hash2", alice, bob, common.Address{}, 1)}, }, }, - limit: 1, - opts: []Option{IgnoreTransfers()}, + opts: []Option{OnlyTransactions()}, addresses: []common.Address{alice}, expectedHashes: []string{"hash3", "hash1"}, }, @@ -226,7 +213,6 @@ func TestStore(t *testing.T) { {newTransfer("hash10", alice, bob, common.Address{}, 9)}, }, }, - limit: 2, addresses: []common.Address{alice}, expectedHashes: []string{"hash10", "hash9", "hash8", "hash7", "hash6", "hash5", "hash4", "hash3", "hash2", "hash1"}, }, @@ -236,31 +222,28 @@ func TestStore(t *testing.T) { "1": { {newTransfer("hash1", alice, bob, usdc, 0)}, {newTransfer("hash2", alice, bob, usdc, 1)}, - {newTransfer("hash3", alice, bob, usdc, 2)}, + {newTransfer("hash3", alice, bob, dai, 2)}, {newTransfer("hash4", alice, bob, usdc, 3)}, {newTransfer("hash5", alice, bob, usdc, 4)}, }, }, - limit: 2, - opts: []Option{IgnoreTransactions(), ByAsset(usdc), WithTimeRange(timestamppb.New(t0.Add(1*time.Second)), timestamppb.New(t0.Add(3*time.Second)))}, + opts: []Option{OnlyTransfers(), ByAsset(usdc), WithTimeRange(timestamppb.New(t1), timestamppb.New(t0.Add(3*time.Second)))}, addresses: []common.Address{alice}, - expectedHashes: []string{"hash4", "hash3", "hash2"}, + expectedHashes: []string{"hash4", "hash2"}, }, { - name: "by asset and sender with time range", + name: "by asset and sender", transfers: map[string][][]TransferWithIndexes{ "1": { - {newTransfer("hash1", alice, bob, usdc, 0)}, - {newTransfer("hash2", bob, alice, usdc, 1)}, - {newTransfer("hash3", alice, bob, usdc, 2)}, - {newTransfer("hash4", bob, alice, usdc, 3)}, - {newTransfer("hash5", alice, bob, usdc, 4)}, + {newTransfer("hash1", bob, alice, usdc, 0)}, + {newTransfer("hash2", bob, alice, dai, 1)}, + {newTransfer("hash3", alice, bob, dai, 2)}, + {newTransfer("hash4", alice, bob, usdc, 3)}, }, }, - limit: 2, - opts: []Option{IgnoreTransactions(), OnlySent(), ByAsset(usdc), WithTimeRange(timestamppb.New(t0.Add(1*time.Second)), timestamppb.New(t0.Add(3*time.Second)))}, + opts: []Option{OnlyTransfers(), ByAsset(usdc), OnlySent()}, addresses: []common.Address{alice}, - expectedHashes: []string{"hash3"}, + expectedHashes: []string{"hash4"}, }, { name: "by asset and receiver with time range", @@ -273,8 +256,7 @@ func TestStore(t *testing.T) { {newTransfer("hash5", bob, alice, usdc, 4)}, }, }, - limit: 2, - opts: []Option{IgnoreTransactions(), OnlyReceived(), ByAsset(usdc), WithTimeRange(timestamppb.New(t0.Add(1*time.Second)), timestamppb.New(t0.Add(3*time.Second)))}, + opts: []Option{OnlyTransfers(), OnlyReceived(), ByAsset(usdc), WithTimeRange(timestamppb.New(t1), timestamppb.New(t0.Add(3*time.Second)))}, addresses: []common.Address{alice}, expectedHashes: []string{"hash3"}, }, @@ -296,33 +278,60 @@ func TestStore(t *testing.T) { } } } - chainIDs := append(maps.Keys(tt.txs), maps.Keys(tt.transfers)...) - sort.Strings(chainIDs) - chainIDs = slices.Compact(chainIDs) - var suffix map[string]map[string]string - for i := int64(0); i < int64(len(tt.expectedHashes))/tt.limit; i++ { - txs, newSuffix, err := store.Get(chainIDs, tt.addresses, suffix, tt.limit, tt.opts...) - if err != nil { - t.Fatalf("tx %d: %v", i, err) - } - if len(txs) == 0 { - t.Fatalf("tx %d: no transactions found", i) - } - if got, want := int64(len(txs)), tt.limit; got != want { + var suffix map[string]string + txs, _, err := store.Get(tt.addresses, suffix, 25, tt.opts...) + if err != nil { + t.Fatal(err) + } + if len(txs) == 0 { + t.Fatalf("no transactions found") + } + if got, want := len(txs), len(tt.expectedHashes); got != want { + t.Errorf("got %v, want %v", got, want) + } + for i := int64(0); i < int64(len(tt.expectedHashes)); i++ { + if got, want := string(txs[i].Hash), tt.expectedHashes[i]; got != want { t.Errorf("got %v, want %v", got, want) } - for j := int64(0); j < tt.limit; j++ { - if got, want := string(txs[j].Hash), tt.expectedHashes[i*tt.limit+j]; got != want { - t.Errorf("got %v, want %v", got, want) - } - } - suffix = newSuffix } }) } } -var t0 = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) +func TestStoreLimitAndPagination(t *testing.T) { + client, admin := databasetest.NewBigTable(t) + + s, err := database.NewBigTableWithClient(context.Background(), client, admin, Schema) + if err != nil { + t.Fatal(err) + } + + store := NewStore(database.Wrap(s, Table)) + + for i := 0; i < 10; i++ { + err := store.AddBlockTransactions("1", []*types.Eth1TransactionIndexed{newTx(fmt.Sprintf("%d", i), common.Address{}, common.Address{}, "", int64(i))}) + if err != nil { + t.Fatal(err) + } + } + + var prefix map[string]string + var txs []*Interaction + for i := 9; i >= 0; i-- { + txs, prefix, err = store.Get([]common.Address{{}}, prefix, 1) + if len(txs) != 1 { + t.Errorf("got %v, want 1", len(txs)) + } + if got, want := string(txs[0].Hash), fmt.Sprintf("%d", i); got != want { + t.Errorf("got %v, want %v", got, want) + } + } +} + +var ( + t0 = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + t1 = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).Add(1 * time.Second) +) func newTx(hash string, from, to common.Address, method string, delta int64) *types.Eth1TransactionIndexed { return &types.Eth1TransactionIndexed{ diff --git a/backend/pkg/commons/db2/data/filter.go b/backend/pkg/commons/db2/data/filter.go index aea5b19f6..648085f59 100644 --- a/backend/pkg/commons/db2/data/filter.go +++ b/backend/pkg/commons/db2/data/filter.go @@ -8,236 +8,95 @@ import ( "github.com/golang/protobuf/ptypes/timestamp" ) -type formatType string - -const ( - typeTx = formatType("tx") - typeTransfer = formatType("transfer") -) - -type filterType string - -const ( - byMethod = filterType("byMethod") - bySent = filterType("bySent") - byReceived = filterType("byReceived") - byAsset = filterType("byAsset") - byAssetSent = filterType("byAssetSent") - byAssetReceived = filterType("byAssetReceived") -) - type filter interface { - get(chainID string, address common.Address) string + get(address common.Address) string limit(prefix string) string } -type chainFilter interface { - addByMethod(method string) error - addBySent() error - addByReceived() error - addByAsset(asset common.Address) error - addTimeRange(from *timestamp.Timestamp, to *timestamp.Timestamp) error - valid() error - filterType() filterType - filter +func toHex(b []byte) string { + return fmt.Sprintf("%x", b) } -type chainFilterTx struct { - base string - filtered filterType - - method *string - from *timestamp.Timestamp - to *timestamp.Timestamp +type queryFilter struct { + query string + timeFrom *timestamp.Timestamp + timeTo *timestamp.Timestamp } -func newChainFilterTx() *chainFilterTx { - return &chainFilterTx{ - base: ":I:TX:
:TIME", +func newQueryFilter(options options) (*queryFilter, error) { + query := []string{"all"} + params := []string{"
"} + if options.onlySent && options.onlyReceived { + options.onlySent = false + options.onlyReceived = false } -} - -func (c *chainFilterTx) addByMethod(method string) error { - if c.filtered != "" { - return fmt.Errorf("filter tx already filtered by %s", c.filtered) - } - c.base = ":I:TX:
:METHOD:" - c.method = &method - c.filtered = byMethod - return nil -} - -func (c *chainFilterTx) addBySent() error { - if c.filtered != "" { - return fmt.Errorf("filter tx already filtered by %s", c.filtered) + if options.asset != nil && options.method != nil { + return nil, fmt.Errorf("cannot filter by method and by asset together") } - c.base = ":I:TX:
:TO" - c.filtered = bySent - return nil -} - -func (c *chainFilterTx) addByReceived() error { - if c.filtered != "" { - return fmt.Errorf("filter tx already filtered by %s", c.filtered) + if options.method != nil { + options.onlyTxs = true } - c.base = ":I:TX:
:FROM" - c.filtered = byReceived - return nil -} - -func (c *chainFilterTx) addByAsset(common.Address) error { - return fmt.Errorf("cannot filter tx by asset") -} - -func (c *chainFilterTx) addTimeRange(from *timestamp.Timestamp, to *timestamp.Timestamp) error { - if from == nil || to == nil { - return fmt.Errorf("invalid time range: empty border") + if options.asset != nil { + options.onlyTransfers = true } - c.from = from - c.to = to - return nil -} - -func (c *chainFilterTx) filterType() filterType { - return c.filtered -} - -func (c *chainFilterTx) valid() error { - return nil -} - -func (c *chainFilterTx) get(chainID string, address common.Address) string { - query := strings.Replace(c.base, "", chainID, 1) - query = strings.Replace(query, "
", fmt.Sprintf("%x", address.Bytes()), 1) - if c.method != nil { - query = strings.Replace(query, "", *c.method, 1) + if options.onlyTxs && options.onlyTransfers { + options.onlyTxs = false + options.onlyTransfers = false } - if c.to != nil { - query = fmt.Sprintf("%s:%s", query, reversePaddedTimestamp(c.to)) + if options.onlyReceived { + query[0] = "in" + params[0] = "
" } - return query -} - -func (c *chainFilterTx) limit(prefix string) string { - if c.from != nil { - index := strings.LastIndex(prefix, ":") - return toSuccessor(fmt.Sprintf("%s:%s", prefix[:index], reversePaddedTimestamp(c.from))) + if options.onlySent { + query[0] = "out" + params[0] = "
" } - return toSuccessor(prefix) -} - -type chainFilterTransfer struct { - base string - filtered filterType - - asset *common.Address - from *timestamp.Timestamp - to *timestamp.Timestamp -} - -func newChainFilterTransfer() *chainFilterTransfer { - return &chainFilterTransfer{ - base: ":I:ERC20:
:TIME", + if options.with != nil { + query = append(query, "with") + params = append(params, toHex(options.with.Bytes())) } -} - -func (c *chainFilterTransfer) addByMethod(string) error { - return fmt.Errorf("cannot filter transfer by method") -} - -func (c *chainFilterTransfer) addBySent() error { - if c.filtered != "" { - if c.filtered != byAsset { - return fmt.Errorf("filter transfer already filtered by %s", c.filtered) - } - return c.addByAssetSent() - } - c.base = ":I:ERC20:
:TOKEN_SENT" - c.filtered = bySent - return nil -} - -func (c *chainFilterTransfer) addByReceived() error { - if c.filtered != "" { - if c.filtered != byAsset { - return fmt.Errorf("filter transfer already filtered by %s", c.filtered) - } - return c.addByAssetReceived() - } - c.base = ":I:ERC20:
:TOKEN_RECEIVED" - c.filtered = byReceived - return nil -} - -func (c *chainFilterTransfer) addByAssetReceived() error { - c.base = ":I:ERC20:
:TOKEN_RECEIVED:" - c.filtered = byAssetReceived - return nil -} - -func (c *chainFilterTransfer) addByAssetSent() error { - c.base = ":I:ERC20:
:TOKEN_SENT:" - c.filtered = byAssetSent - return nil -} - -func (c *chainFilterTransfer) addByAsset(asset common.Address) error { - if c.filtered != "" { - if c.filtered != byReceived && c.filtered != bySent { - return fmt.Errorf("filter transfer already filtered by %s", c.filtered) - } - } - c.asset = &asset - if c.filtered == byReceived { - return c.addByAssetReceived() - } - if c.filtered == bySent { - return c.addByAssetSent() - } - c.base = ":I:ERC20::
:TIME" - c.filtered = byAsset - return nil -} - -func (c *chainFilterTransfer) addTimeRange(from, to *timestamp.Timestamp) error { - if from == nil || to == nil { - return fmt.Errorf("invalid time range: empty border") + if options.chainID != nil { + query = append(query, "chainID") + params = append(params, *options.chainID) } - if c.filtered == byReceived || c.filtered == bySent { - return fmt.Errorf("cannot apply range over filter by %s", c.filtered) + if options.onlyTxs { + query = append(query, "TX") } - c.from = from - c.to = to - return nil -} - -func (c *chainFilterTransfer) filterType() filterType { - return c.filtered -} - -func (c *chainFilterTransfer) valid() error { - if (c.from != nil || c.to != nil) && (c.filtered == bySent || c.filtered == byReceived) { - return fmt.Errorf("cannot apply range over filter by %s", c.filtered) + if options.onlyTransfers { + query = append(query, "ERC20") + } + if options.method != nil { + query = append(query, "method") + params = append(params, *options.method) + } + if options.asset != nil { + query = append(query, "asset") + params = append(params, toHex(options.asset.Bytes())) } - return nil + if options.from == nil || options.to == nil { + options.from = nil + options.to = nil + } + + return &queryFilter{ + query: strings.Join(append(query, params...), ":"), + timeFrom: options.from, + timeTo: options.to, + }, nil } -func (c *chainFilterTransfer) get(chainID string, address common.Address) string { - query := strings.Replace(c.base, "", chainID, 1) - query = strings.Replace(query, "
", fmt.Sprintf("%x", address.Bytes()), 1) - if c.asset != nil { - query = strings.Replace(query, "", fmt.Sprintf("%x", c.asset.Bytes()), 1) - } - if c.to != nil { - query = fmt.Sprintf("%s:%s", query, reversePaddedTimestamp(c.to)) +func (f queryFilter) get(address common.Address) string { + query := strings.Replace(f.query, "
", toHex(address.Bytes()), 1) + if f.timeTo != nil { + query = fmt.Sprintf("%s:%s", query, reversePaddedTimestamp(f.timeTo)) } return query } -func (c *chainFilterTransfer) limit(prefix string) string { - if c.from != nil { +func (f queryFilter) limit(prefix string) string { + if f.timeFrom != nil { index := strings.LastIndex(prefix, ":") - return toSuccessor(fmt.Sprintf("%s:%s", prefix[:index], reversePaddedTimestamp(c.from))) + return toSuccessor(fmt.Sprintf("%s:%s", prefix[:index], reversePaddedTimestamp(f.timeFrom))) } return toSuccessor(prefix) } diff --git a/backend/pkg/commons/db2/data/filter_test.go b/backend/pkg/commons/db2/data/filter_test.go index 8bf258cd4..06485a0aa 100644 --- a/backend/pkg/commons/db2/data/filter_test.go +++ b/backend/pkg/commons/db2/data/filter_test.go @@ -1,139 +1,132 @@ package data import ( + "strings" "testing" "github.com/ethereum/go-ethereum/common" "google.golang.org/protobuf/types/known/timestamppb" ) -func TestFilter(t *testing.T) { +func TestQueryFilter(t *testing.T) { tests := []struct { - name string - filter chainFilter - add func(chainFilter) error - expectErr bool - expectType filterType + name string + options []Option + want string + err string }{ { - name: "tx by asset should err", - filter: newChainFilterTx(), - add: func(c chainFilter) error { - return c.addByAsset(common.Address{}) + name: "all", + want: "all:
", + }, + { + name: "err for ByMethod and ByAsset", + options: []Option{ + ByMethod(""), + ByAsset(common.Address{}), }, - expectErr: true, + err: "cannot filter by method and by asset together", }, { - name: "tx invalid time range", - filter: newChainFilterTx(), - add: func(c chainFilter) error { - return c.addTimeRange(nil, nil) + name: "only sent", + options: []Option{ + OnlySent(), }, - expectErr: true, + want: "out:
", }, { - name: "transfer by method should err", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - return c.addByMethod("") + name: "received", + options: []Option{ + OnlyReceived(), }, - expectErr: true, + want: "in:
", }, { - name: "transfer by asset sent", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addByAsset(common.Address{}); err != nil { - return err - } - return c.addBySent() + name: "sent", + options: []Option{ + OnlySent(), }, - expectType: byAssetSent, + want: "out:
", }, { - name: "transfer by asset received", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addByAsset(common.Address{}); err != nil { - return err - } - return c.addByReceived() + name: "sent to", + options: []Option{ + OnlySent(), + With(common.Address{}), }, - expectType: byAssetReceived, + want: "out:with:
:0000000000000000000000000000000000000000", }, { - name: "transfer by sent asset", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addBySent(); err != nil { - return err - } - return c.addByAsset(common.Address{}) + name: "on a chain ID", + options: []Option{ + ByChainID("1234"), }, - expectType: byAssetSent, + want: "all:chainID:
:1234", }, { - name: "transfer by received asset", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addByReceived(); err != nil { - return err - } - return c.addByAsset(common.Address{}) + name: "sent on a chain ID", + options: []Option{ + OnlySent(), + ByChainID("1234"), }, - expectType: byAssetReceived, + want: "out:chainID:
:1234", }, { - name: "transfer invalid time range", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - return c.addTimeRange(nil, nil) + name: "received on a chain ID", + options: []Option{ + OnlyReceived(), + ByChainID("1234"), }, - expectErr: true, + want: "in:chainID:
:1234", }, { - name: "transfer time range over bySent should err", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addTimeRange(timestamppb.New(t0), timestamppb.New(t0)); err != nil { - return err - } - if err := c.addBySent(); err != nil { - return err - } - return c.valid() + name: "received TX on a chain ID", + options: []Option{ + OnlyTransactions(), + OnlyReceived(), + ByChainID("1234"), }, - expectErr: true, + want: "in:chainID:TX:
:1234", }, { - name: "transfer time range over byReceived should err", - filter: newChainFilterTransfer(), - add: func(c chainFilter) error { - if err := c.addTimeRange(timestamppb.New(t0), timestamppb.New(t0)); err != nil { - return err - } - if err := c.addByReceived(); err != nil { - return err - } - return c.valid() + name: "received method on a chain ID", + options: []Option{ + ByMethod("bar"), + OnlyReceived(), + ByChainID("1234"), + }, + want: "in:chainID:TX:method:
:1234:bar", + }, + { + name: "received method on a chain ID", + options: []Option{ + ByAsset(common.Address{}), + OnlyReceived(), + ByChainID("1234"), }, - expectErr: true, + want: "in:chainID:ERC20:asset:
:1234:0000000000000000000000000000000000000000", + }, + { + name: "with time range", + options: []Option{ + WithTimeRange(timestamppb.New(t0), timestamppb.New(t1)), + }, + want: "all:
:" + reversePaddedTimestamp(timestamppb.New(t1)), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := tt.add(tt.filter) + filter, err := newQueryFilter(apply(tt.options)) if err != nil { - if !tt.expectErr { - t.Errorf("unexpected err: %s", err) + if got, want := err.Error(), tt.err; got != want { + t.Errorf("got error %v, want %v", got, want) } return } - if tt.expectErr { - t.Error("expected err but got nil") - } - if got, want := tt.filter.filterType(), tt.expectType; got != want { - t.Errorf("got %v, want %v", got, want) + addr := common.Address{} + query := filter.get(addr) + if got, want := query, strings.ReplaceAll(tt.want, "
", toHex(addr.Bytes())); got != want { + t.Errorf("get() = %v, want %v", got, want) } }) } diff --git a/backend/pkg/commons/db2/data/keys.go b/backend/pkg/commons/db2/data/keys.go index a039db47c..f86722e77 100644 --- a/backend/pkg/commons/db2/data/keys.go +++ b/backend/pkg/commons/db2/data/keys.go @@ -11,8 +11,8 @@ import ( ) const ( - maxInt = 9223372036854775807 - maxExecutionLayerBlockNumber = 1000000000 + maxInt = 9223372036854775807 + // maxExecutionLayerBlockNumber = 1000000000 txPerBlockLimit = 10_000 logPerTxLimit = 100_000 @@ -38,191 +38,151 @@ func reversePaddedTimestamp(timestamp *timestamppb.Timestamp) string { return fmt.Sprintf("%019d", maxInt-timestamp.Seconds) } -func reversedPaddedBlockNumber(blockNumber uint64) string { - return fmt.Sprintf("%09d", maxExecutionLayerBlockNumber-blockNumber) -} - -func keyTx(chainID string, hash []byte) string { - format := ":TX:" - replacer := strings.NewReplacer("", chainID, "", fmt.Sprintf("%x", hash)) - return replacer.Replace(format) -} - -func keyTxSent(chainID string, tx *types.Eth1TransactionIndexed, index int) string { - format := ":I:TX::TO::