From 960033eb930ef91b6de8c015ea0011f3b482ffce Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 18 Nov 2021 14:56:26 -0600 Subject: [PATCH 1/2] misc fixes/adjustments --- statediff/builder.go | 2 +- statediff/indexer/constructor.go | 4 + statediff/indexer/database/file/indexer.go | 9 +- statediff/indexer/database/file/writer.go | 107 ++++++++++-------- .../indexer/database/sql/batch_writer.go | 4 +- .../indexer/database/sql/pgx_indexer_test.go | 2 +- .../indexer/database/sql/postgres/config.go | 4 +- .../indexer/database/sql/postgres/database.go | 2 +- .../indexer/database/sql/sqlx_indexer_test.go | 2 +- .../indexer/database/sql/test_helpers.go | 2 +- statediff/indexer/database/sql/writer.go | 2 +- statediff/service.go | 1 - 12 files changed, 75 insertions(+), 66 deletions(-) diff --git a/statediff/builder.go b/statediff/builder.go index 8dc3cece80bc..7811c3e829c9 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -167,7 +167,7 @@ func (sdb *builder) BuildStateDiffObject(args Args, params Params) (types2.State }, nil } -// Writes a statediff object to output callback +// WriteStateDiffObject writes a statediff object to output callback func (sdb *builder) WriteStateDiffObject(args types2.StateRoots, params Params, output types2.StateNodeSink, codeOutput types2.CodeSink) error { if !params.IntermediateStateNodes || len(params.WatchedAddresses) > 0 { // if we are watching only specific accounts then we are only diffing leaf nodes diff --git a/statediff/indexer/constructor.go b/statediff/indexer/constructor.go index bfb746080570..9a66dba8966d 100644 --- a/statediff/indexer/constructor.go +++ b/statediff/indexer/constructor.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/statediff/indexer/database/dump" "github.com/ethereum/go-ethereum/statediff/indexer/database/file" @@ -34,6 +35,7 @@ import ( func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (interfaces.StateDiffIndexer, error) { switch config.Type() { case shared.FILE: + log.Info("Starting statediff service in SQL file writing mode") fc, ok := config.(file.Config) if !ok { return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{}) @@ -41,6 +43,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n fc.NodeInfo = nodeInfo return file.NewStateDiffIndexer(ctx, chainConfig, fc) case shared.POSTGRES: + log.Info("Starting statediff service in Postgres writing mode") pgc, ok := config.(postgres.Config) if !ok { return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{}) @@ -63,6 +66,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n } return sql.NewStateDiffIndexer(ctx, chainConfig, postgres.NewPostgresDB(driver)) case shared.DUMP: + log.Info("Starting statediff service in data dump mode") dumpc, ok := config.(dump.Config) if !ok { return nil, fmt.Errorf("dump config is not the correct type: got %T, expected %T", config, dump.Config{}) diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 1cc19480af7a..454d1e3d1e06 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -72,9 +72,12 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c if err != nil { return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err) } + log.Info("Writing statediff SQL statements to file", "file", filePath) w := NewSQLWriter(file) wg := new(sync.WaitGroup) w.Loop() + w.upsertNode(config.NodeInfo) + w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{}) return &StateDiffIndexer{ writer: w, chainConfig: chainConfig, @@ -130,11 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() - if err := sdi.writer.flush(); err != nil { - traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) - log.Debug(traceMsg) - return err - } + sdi.writer.Flush() tDiff = time.Since(t) indexerMetrics.tPostgresCommit.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) diff --git a/statediff/indexer/database/file/writer.go b/statediff/indexer/database/file/writer.go index 5ee1692292cd..fdfa87b08c8b 100644 --- a/statediff/indexer/database/file/writer.go +++ b/statediff/indexer/database/file/writer.go @@ -25,7 +25,6 @@ import ( node "github.com/ipfs/go-ipld-format" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" @@ -33,7 +32,8 @@ import ( var ( nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") - collatedStmtSize = 65336 // min(linuxPipeSize, macOSPipeSize) + pipeSize = 65336 // min(linuxPipeSize, macOSPipeSize) + collatedStmtSize = pipeSize * 16 ) // SQLWriter writes sql statements to a file @@ -43,18 +43,22 @@ type SQLWriter struct { collatedStmt []byte collationIndex int - quitChan chan struct{} - doneChan chan struct{} + flushChan chan struct{} + flushFinished chan struct{} + quitChan chan struct{} + doneChan chan struct{} } // NewSQLWriter creates a new pointer to a Writer func NewSQLWriter(file *os.File) *SQLWriter { return &SQLWriter{ - file: file, - stmts: make(chan []byte), - collatedStmt: make([]byte, collatedStmtSize), - quitChan: make(chan struct{}), - doneChan: make(chan struct{}), + file: file, + stmts: make(chan []byte), + collatedStmt: make([]byte, collatedStmtSize), + flushChan: make(chan struct{}), + flushFinished: make(chan struct{}), + quitChan: make(chan struct{}), + doneChan: make(chan struct{}), } } @@ -72,16 +76,21 @@ func (sqw *SQLWriter) Loop() { l = len(stmt) if l+sqw.collationIndex+1 > collatedStmtSize { if err := sqw.flush(); err != nil { - log.Error("error writing cached sql stmts to file", "err", err) + panic(fmt.Sprintf("error writing sql stmts buffer to file: %v", err)) } } - copy(sqw.collatedStmt[sqw.collationIndex:sqw.collationIndex+l-1], stmt) + copy(sqw.collatedStmt[sqw.collationIndex:sqw.collationIndex+l], stmt) sqw.collationIndex += l case <-sqw.quitChan: if err := sqw.flush(); err != nil { - log.Error("error writing cached sql stmts to file", "err", err) + panic(fmt.Sprintf("error writing sql stmts buffer to file: %v", err)) } return + case <-sqw.flushChan: + if err := sqw.flush(); err != nil { + panic(fmt.Sprintf("error writing sql stmts buffer to file: %v", err)) + } + sqw.flushFinished <- struct{}{} } } }() @@ -94,8 +103,14 @@ func (sqw *SQLWriter) Close() error { return nil } +// Flush sends a flush signal to the looping process +func (sqw *SQLWriter) Flush() { + sqw.flushChan <- struct{}{} + <-sqw.flushFinished +} + func (sqw *SQLWriter) flush() error { - if _, err := sqw.file.Write(sqw.collatedStmt[0 : sqw.collationIndex-1]); err != nil { + if _, err := sqw.file.Write(sqw.collatedStmt[0:sqw.collationIndex]); err != nil { return err } sqw.collationIndex = 0 @@ -103,46 +118,43 @@ func (sqw *SQLWriter) flush() error { } const ( - nodeInsert = `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES (%s, %s, %s, %s, %d) - ON CONFLICT (node_id) DO NOTHING;\n` + nodeInsert = "INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES " + + "('%s', '%s', '%s', '%s', %d);\n" - ipldInsert = `INSERT INTO public.blocks (key, data) VALUES (%s, %x) ON CONFLICT (key) DO NOTHING;\n` + ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '%x');\n" - headerInsert = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) -VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, %d) -ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, %d);\n` + headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " + + "state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " + + "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%x', %d, '%s', %d, %d);\n" - headerInsertWithoutBaseFee = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) -VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, NULL) -ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, NULL);\n` + headerInsertWithoutBaseFee = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, " + + "reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " + + "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%x', %d, '%s', %d, NULL);\n" - uncleInsert = `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES (%s, %s, %s, %s, %s, %s) -ON CONFLICT (block_hash) DO NOTHING;\n` + uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " + + "('%s', '%s', '%s', '%s', '%s', '%s');\n" - txInsert = `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %d) -ON CONFLICT (tx_hash) DO NOTHING;\n` + txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) " + + "VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '%x', %d);\n" - alInsert = `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES (%s, %d, %s, %s) -ON CONFLICT (tx_id, index) DO NOTHING;\n` + alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n" - rctInsert = `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES (%s, %s, %s, %s, %s, %s, %d, %s) -ON CONFLICT (tx_id) DO NOTHING;\n` + rctInsert = "INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, " + + "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n" - logInsert = `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES (%s, %s, %s, %s, %d, %s, %s, %s, %s, %s) -ON CONFLICT (rct_id, index) DO NOTHING;\n` + logInsert = "INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " + + "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '%x');\n" - stateInsert = `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %d, %t, %s) -ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n` + stateInsert = "INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " + + "VALUES ('%s', '%s', '%s', '%x', %d, %t, '%s');\n" - accountInsert = `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES (%s, %s, %s, %d, %s, %s) -ON CONFLICT (header_id, state_path) DO NOTHING;\n` + accountInsert = "INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) " + + "VALUES ('%s', '%x', '%s', %d, '%x', '%s');\n" - storageInsert = `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %s, %d, %t, %s) -ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n` + storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " + + "node_type, diff, mh_key) VALUES ('%s', '%x', '%s', '%s', '%x', %d, %t, '%s');\n" ) -// ON CONFLICT (node_id) DO UPDATE SET genesis_block = %s, network_id = %s, client_name = %s, chain_id = %s;\n` - func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)) } @@ -183,15 +195,11 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { if header.BaseFee == nil { stmt = fmt.Sprintf(headerInsertWithoutBaseFee, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, - header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, - header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey) + header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1) } else { stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee, - header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, - header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, header.BaseFee) + header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee) } sqw.stmts <- []byte(stmt) indexerMetrics.blocks.Inc(1) @@ -228,8 +236,8 @@ func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) { if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, - true, stateNode.MhKey, stateKey, stateNode.CID, stateNode.NodeType, true, stateNode.MhKey)) + sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, + stateNode.NodeType, true, stateNode.MhKey)) } func (sqw *SQLWriter) upsertStateAccount(stateAccount models.StateAccountModel) { @@ -243,6 +251,5 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) { storageKey = storageCID.StorageKey } sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, - storageCID.Path, storageCID.NodeType, true, storageCID.MhKey, storageKey, storageCID.CID, storageCID.NodeType, - true, storageCID.MhKey)) + storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)) } diff --git a/statediff/indexer/database/sql/batch_writer.go b/statediff/indexer/database/sql/batch_writer.go index 05c8822593be..f186d8052cec 100644 --- a/statediff/indexer/database/sql/batch_writer.go +++ b/statediff/indexer/database/sql/batch_writer.go @@ -43,7 +43,7 @@ const ( txCIDsPgStr string = `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8), unnest($9)) ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, tx_type) = (excluded.cid, excluded.dst, excluded.src, excluded.index, excluded.mh_key, excluded.tx_data, excluded.tx_type) RETURNING id` - accessListPgStr string = `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES (unnest($1), unnest($2), unnest($3), unnest($4)) + accessListPgStr string = `INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES (unnest($1), unnest($2), unnest($3), unnest($4)) ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = (excluded.address, excluded.storage_keys)` rctCIDsPgStr string = `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8)) ON CONFLICT (tx_id) DO UPDATE SET (leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) = (excluded.leaf_cid, excluded.contract, excluded.contract_hash, excluded.leaf_mh_key, excluded.post_state, excluded.post_status, excluded.log_root) @@ -138,7 +138,7 @@ func (pbw *PostgresBatchWriter) upsertTransactionCID(tx *sqlx.Tx, transaction mo } func (pbw *PostgresBatchWriter) upsertAccessListElement(tx *sqlx.Tx, accessListElement models.AccessListElementModel, txID int64) error { - _, err := tx.Exec(`INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) + _, err := tx.Exec(`INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = ($3, $4)`, txID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) if err != nil { diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index a86927341c87..710ad23d91df 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -264,7 +264,7 @@ func TestPGXIndexer(t *testing.T) { t.Fatalf("expected AccessListTxType (1), got %d", txType) } accessListElementModels := make([]models.AccessListElementModel, 0) - pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC` + pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC` err = db.Select(context.Background(), &accessListElementModels, pgStr, c) if err != nil { t.Fatal(err) diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index 5794bd0afeb0..a7c7cc9b4492 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -49,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) { var DefaultConfig = Config{ Hostname: "localhost", Port: 5432, - DatabaseName: "vulcanize_testing", + DatabaseName: "vulcanize_test", Username: "postgres", - Password: "password", + Password: "", } // Config holds params for a Postgres db diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 2136380177f3..99fae1c0273b 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -56,7 +56,7 @@ func (db *DB) InsertTxStm() string { // InsertAccessListElementStm satisfies the sql.Statements interface func (db *DB) InsertAccessListElementStm() string { - return `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) + return `INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) ON CONFLICT (tx_id, index) DO NOTHING` } diff --git a/statediff/indexer/database/sql/sqlx_indexer_test.go b/statediff/indexer/database/sql/sqlx_indexer_test.go index 09ee62fa3e1b..e0b5f2967bd5 100644 --- a/statediff/indexer/database/sql/sqlx_indexer_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_test.go @@ -286,7 +286,7 @@ func TestSQLXIndexer(t *testing.T) { t.Fatalf("expected AccessListTxType (1), got %d", txType) } accessListElementModels := make([]models.AccessListElementModel, 0) - pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC` + pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC` err = db.Select(context.Background(), &accessListElementModels, pgStr, c) if err != nil { t.Fatal(err) diff --git a/statediff/indexer/database/sql/test_helpers.go b/statediff/indexer/database/sql/test_helpers.go index 7de7beec0100..46f9b766b514 100644 --- a/statediff/indexer/database/sql/test_helpers.go +++ b/statediff/indexer/database/sql/test_helpers.go @@ -53,7 +53,7 @@ func TearDownDB(t *testing.T, db Database) { if err != nil { t.Fatal(err) } - _, err = tx.Exec(ctx, `DELETE FROM eth.access_list_element`) + _, err = tx.Exec(ctx, `DELETE FROM eth.access_list_elements`) if err != nil { t.Fatal(err) } diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 94b38c7e1e5f..3089b6d509f3 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -83,7 +83,7 @@ func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error } /* -INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) +INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) ON CONFLICT (tx_id, index) DO NOTHING */ func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error { diff --git a/statediff/service.go b/statediff/service.go index 04aaac4585aa..5334b4b3113f 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -239,7 +239,6 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) chainEventFwd <- chainEvent case err := <-errCh: - println("here") log.Error("Error from chain event subscription", "error", err) close(sds.QuitChan) log.Info("Quitting the statediffing writing loop") From 7fed13e725a0653680885869d16bd90c20341ea4 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 18 Nov 2021 18:10:50 -0600 Subject: [PATCH 2/2] update README --- statediff/README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/statediff/README.md b/statediff/README.md index 7170363ae0cf..92c8ef3870b5 100644 --- a/statediff/README.md +++ b/statediff/README.md @@ -104,6 +104,13 @@ e.g. ./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db.type=postgres --statediff.db.driver=sqlx --statediff.db.host=localhost --statediff.db.port=5432 --statediff.db.name=vulcanize_test --statediff.db.user=postgres --statediff.db.nodeid=nodeid --statediff.db.clientname=clientname ` +When operating in `--statediff.db.type=file` mode, the service will write SQL statements out to the file designated by +`--statediff.file.path`. Please note that it writes out SQL statements with all `ON CONFLICT` constraint checks dropped. +This is done so that we can scale out the production of the SQL statements horizontally, merge the separate SQL files produced, +de-duplicate using unix tools (`sort statediff.sql | uniq` or `sort -u statediff.sql`), bulk load using psql +(`psql db_name --set ON_ERROR_STOP=on -f statediff.sql`), and then add our primary and foreign key constraints and indexes +back afterwards. + ### RPC endpoints The state diffing service exposes both a WS subscription endpoint, and a number of HTTP unary endpoints.