From 7b4ef34de2b9469c3f82972b60e38b34c99c5382 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Tue, 12 Jul 2022 09:32:54 +0530 Subject: [PATCH] Update indexer to include block hash in receipts and logs (#256) * Update indexer to include block hash in receipts and logs * Upgrade ipld-eth-db image in docker-compose to run tests --- docker-compose.yml | 2 +- statediff/indexer/database/dump/indexer.go | 2 ++ statediff/indexer/database/file/csv_writer.go | 4 ++-- statediff/indexer/database/file/indexer.go | 2 ++ statediff/indexer/database/file/sql_writer.go | 12 ++++++------ statediff/indexer/database/file/types/schema.go | 2 ++ statediff/indexer/database/sql/indexer.go | 2 ++ .../indexer/database/sql/postgres/database.go | 10 +++++----- statediff/indexer/database/sql/writer.go | 14 +++++++------- statediff/indexer/models/batch.go | 10 ++++++---- statediff/indexer/models/models.go | 2 ++ 11 files changed, 37 insertions(+), 25 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 8c440dd026d5..b3c22e6ef900 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: restart: on-failure depends_on: - ipld-eth-db - image: vulcanize/ipld-eth-db:v4.1.4-alpha + image: vulcanize/ipld-eth-db:v4.2.0-alpha environment: DATABASE_USER: "vdbm" DATABASE_NAME: "vulcanize_testing" diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 0b7a5ffb076e..2cc7e2e0a466 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -323,6 +323,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs rctModel := &models.ReceiptModel{ BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, TxID: trxID, Contract: contract, ContractHash: contractHash, @@ -353,6 +354,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs logDataSet[idx] = &models.LogsModel{ BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, ReceiptID: trxID, Address: l.Address.String(), Index: int64(l.Index), diff --git a/statediff/indexer/database/file/csv_writer.go b/statediff/indexer/database/file/csv_writer.go index 830dde5ecedf..2d4d997e3e6c 100644 --- a/statediff/indexer/database/file/csv_writer.go +++ b/statediff/indexer/database/file/csv_writer.go @@ -281,7 +281,7 @@ func (csw *CSVWriter) upsertAccessListElement(accessListElement models.AccessLis func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { var values []interface{} - values = append(values, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, + values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) csw.rows <- tableRow{types.TableReceipt, values} indexerMetrics.receipts.Inc(1) @@ -290,7 +290,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { for _, l := range logs { var values []interface{} - values = append(values, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, + values = append(values, l.BlockNumber, l.HeaderID, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data) csw.rows <- tableRow{types.TableLog, values} indexerMetrics.logs.Inc(1) diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 27ed0e0abe5d..8103a68f4323 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -371,6 +371,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { rctModel := &models.ReceiptModel{ BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, TxID: txID, Contract: contract, ContractHash: contractHash, @@ -399,6 +400,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { logDataSet[idx] = &models.LogsModel{ BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, ReceiptID: txID, Address: l.Address.String(), Index: int64(l.Index), diff --git a/statediff/indexer/database/file/sql_writer.go b/statediff/indexer/database/file/sql_writer.go index 934fc4c8544d..b947fada93a4 100644 --- a/statediff/indexer/database/file/sql_writer.go +++ b/statediff/indexer/database/file/sql_writer.go @@ -155,11 +155,11 @@ const ( alInsert = "INSERT INTO eth.access_list_elements (block_number, tx_id, index, address, storage_keys) VALUES " + "('%s', '%s', %d, '%s', '%s');\n" - rctInsert = "INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " + - "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n" + rctInsert = "INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " + + "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n" - logInsert = "INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " + - "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n" + logInsert = "INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " + + "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n" stateInsert = "INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " + "VALUES ('%s', '%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n" @@ -235,14 +235,14 @@ func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessLis } func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) { - sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, + sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot)) indexerMetrics.receipts.Inc(1) } func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { for _, l := range logs { - sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, + sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.BlockNumber, l.HeaderID, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data)) indexerMetrics.logs.Inc(1) } diff --git a/statediff/indexer/database/file/types/schema.go b/statediff/indexer/database/file/types/schema.go index 19319da33bca..ea0daefd615f 100644 --- a/statediff/indexer/database/file/types/schema.go +++ b/statediff/indexer/database/file/types/schema.go @@ -132,6 +132,7 @@ var TableReceipt = Table{ "eth.receipt_cids", []column{ {name: "block_number", dbType: bigint}, + {name: "header_id", dbType: varchar}, {name: "tx_id", dbType: varchar}, {name: "leaf_cid", dbType: text}, {name: "contract", dbType: varchar}, @@ -147,6 +148,7 @@ var TableLog = Table{ "eth.log_cids", []column{ {name: "block_number", dbType: bigint}, + {name: "header_id", dbType: varchar}, {name: "leaf_cid", dbType: text}, {name: "leaf_mh_key", dbType: text}, {name: "rct_id", dbType: varchar}, diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index d39f2a0e5631..762107ee5e81 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -381,6 +381,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs rctModel := &models.ReceiptModel{ BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, TxID: txID, Contract: contract, ContractHash: contractHash, @@ -412,6 +413,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs logDataSet[idx] = &models.LogsModel{ BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, ReceiptID: txID, Address: l.Address.String(), Index: int64(l.Index), diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 35a9cbc82400..5484ff8d813a 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -52,7 +52,7 @@ func (db *DB) InsertUncleStm() string { // InsertTxStm satisfies the sql.Statements interface func (db *DB) InsertTxStm() string { return `INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - ON CONFLICT (tx_hash, block_number) DO NOTHING` + ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING` } // InsertAccessListElementStm satisfies the sql.Statements interface @@ -63,14 +63,14 @@ func (db *DB) InsertAccessListElementStm() string { // InsertRctStm satisfies the sql.Statements interface func (db *DB) InsertRctStm() string { - return `INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (tx_id, block_number) DO NOTHING` + return `INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (tx_id, header_id, block_number) DO NOTHING` } // InsertLogStm satisfies the sql.Statements interface func (db *DB) InsertLogStm() string { - return `INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - ON CONFLICT (rct_id, index, block_number) DO NOTHING` + return `INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING` } // InsertStateStm satisfies the sql.Statements interface diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index c6c37855653e..70d8ba45f11d 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -76,7 +76,7 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { /* INSERT INTO eth.transaction_cids (block_number, header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) -ON CONFLICT (tx_hash, block_number) DO NOTHING +ON CONFLICT (tx_hash, header_id, block_number) DO NOTHING */ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), @@ -105,12 +105,12 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL } /* -INSERT INTO eth.receipt_cids (block_number, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) -ON CONFLICT (tx_id, block_number) DO NOTHING +INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) +ON CONFLICT (tx_id, header_id, block_number) DO NOTHING */ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), - rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, + rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) if err != nil { return fmt.Errorf("error upserting receipt_cids entry: %w", err) @@ -120,13 +120,13 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { } /* -INSERT INTO eth.log_cids (block_number, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) -ON CONFLICT (rct_id, index, block_number) DO NOTHING +INSERT INTO eth.log_cids (block_number, header_id, leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) +ON CONFLICT (rct_id, index, header_id, block_number) DO NOTHING */ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { for _, log := range logs { _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), - log.BlockNumber, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, + log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) if err != nil { return fmt.Errorf("error upserting logs entry: %w", err) diff --git a/statediff/indexer/models/batch.go b/statediff/indexer/models/batch.go index 94e9b4e96164..76858c96f83b 100644 --- a/statediff/indexer/models/batch.go +++ b/statediff/indexer/models/batch.go @@ -39,7 +39,7 @@ type UncleBatch struct { // TxBatch holds the arguments for a batch insert of tx data type TxBatch struct { BlockNumbers []string - HeaderID string + HeaderIDs []string Indexes []int64 TxHashes []string CIDs []string @@ -62,6 +62,7 @@ type AccessListBatch struct { // ReceiptBatch holds the arguments for a batch insert of receipt data type ReceiptBatch struct { BlockNumbers []string + HeaderIDs []string TxIDs []string LeafCIDs []string LeafMhKeys []string @@ -75,6 +76,7 @@ type ReceiptBatch struct { // LogBatch holds the arguments for a batch insert of log data type LogBatch struct { BlockNumbers []string + HeaderIDs []string LeafCIDs []string LeafMhKeys []string ReceiptIDs []string @@ -90,7 +92,7 @@ type LogBatch struct { // StateBatch holds the arguments for a batch insert of state data type StateBatch struct { BlockNumbers []string - HeaderID string + HeaderIDs []string Paths [][]byte StateKeys []string NodeTypes []int @@ -102,7 +104,7 @@ type StateBatch struct { // AccountBatch holds the arguments for a batch insert of account data type AccountBatch struct { BlockNumbers []string - HeaderID string + HeaderIDs []string StatePaths [][]byte Balances []string Nonces []uint64 @@ -113,7 +115,7 @@ type AccountBatch struct { // StorageBatch holds the arguments for a batch insert of storage data type StorageBatch struct { BlockNumbers []string - HeaderID string + HeaderIDs []string StatePaths [][]string Paths [][]byte StorageKeys []string diff --git a/statediff/indexer/models/models.go b/statediff/indexer/models/models.go index cbf17d977b0b..be44e37c72fd 100644 --- a/statediff/indexer/models/models.go +++ b/statediff/indexer/models/models.go @@ -83,6 +83,7 @@ type AccessListElementModel struct { // ReceiptModel is the db model for eth.receipt_cids type ReceiptModel struct { BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` TxID string `db:"tx_id"` LeafCID string `db:"leaf_cid"` LeafMhKey string `db:"leaf_mh_key"` @@ -146,6 +147,7 @@ type StateAccountModel struct { // LogsModel is the db model for eth.logs type LogsModel struct { BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` ReceiptID string `db:"rct_id"` LeafCID string `db:"leaf_cid"` LeafMhKey string `db:"leaf_mh_key"`