Skip to content

Commit

Permalink
Merge pull request #159 from vulcanize/schema_updates
Browse files Browse the repository at this point in the history
adjust for schema updates
  • Loading branch information
i-norden authored Nov 26, 2021
2 parents ac6ef33 + a3e9d54 commit 1dd37be
Show file tree
Hide file tree
Showing 15 changed files with 229 additions and 151 deletions.
15 changes: 8 additions & 7 deletions statediff/indexer/database/dump/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,6 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode)

var baseFee *string
if header.BaseFee != nil {
baseFee = new(string)
*baseFee = header.BaseFee.String()
}

headerID := header.Hash().String()
mod := models.HeaderModel{
CID: headerNode.Cid().String(),
Expand All @@ -205,7 +199,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
Timestamp: header.Time,
BaseFee: baseFee,
Coinbase: header.Coinbase.String(),
}
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return headerID, err
Expand Down Expand Up @@ -268,6 +262,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
// index tx
trx := args.txs[i]
trxID := trx.Hash().String()

var val string
if trx.Value() != nil {
val = trx.Value().String()
}

// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
Expand All @@ -283,6 +283,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil {
return err
Expand Down
9 changes: 8 additions & 1 deletion statediff/indexer/database/file/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
Timestamp: header.Time,
BaseFee: baseFee,
Coinbase: header.Coinbase.String(),
})
return headerID
}
Expand Down Expand Up @@ -269,6 +269,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
// index tx
trx := args.txs[i]
txID := trx.Hash().String()

var val string
if trx.Value() != nil {
val = trx.Value().String()
}

// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
Expand All @@ -284,6 +290,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
sdi.fileWriter.upsertTransactionCID(txModel)

Expand Down
8 changes: 4 additions & 4 deletions statediff/indexer/database/file/indexer_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,16 @@ func TestFileIndexerLegacy(t *testing.T) {
setupLegacy(t)
dumpData(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = sqlxdb.QueryRowx(pgStr, legacyData.BlockNumber.Uint64()).StructScan(header)
Expand All @@ -126,7 +126,7 @@ func TestFileIndexerLegacy(t *testing.T) {
test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockBlock.Coinbase().String())
require.Nil(t, legacyData.MockHeader.BaseFee)
require.Nil(t, header.BaseFee)
})
}
73 changes: 47 additions & 26 deletions statediff/indexer/database/file/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var (
ipfsPgGet = `SELECT data FROM public.blocks
WHERE key = $1`
tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte
txs types.Transactions
rcts types.Receipts
mockBlock *types.Block
headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid
rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid
Expand All @@ -65,7 +67,7 @@ func init() {
}

mockBlock = mocks.MockBlock
txs, rcts := mocks.MockBlock.Transactions(), mocks.MockReceipts
txs, rcts = mocks.MockBlock.Transactions(), mocks.MockReceipts

buf := new(bytes.Buffer)
txs.EncodeIndex(0, buf)
Expand Down Expand Up @@ -177,16 +179,16 @@ func TestFileIndexer(t *testing.T) {
setup(t)
dumpData(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = sqlxdb.QueryRowx(pgStr, mocks.BlockNumber.Uint64()).StructScan(header)
Expand All @@ -197,7 +199,7 @@ func TestFileIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String())
test_helpers.ExpectEqual(t, header.Coinbase, mocks.MockHeader.Coinbase.String())
dc, err := cid.Decode(header.CID)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -231,6 +233,10 @@ func TestFileIndexer(t *testing.T) {
expectTrue(t, test_helpers.ListContainsString(trxs, trx4CID.String()))
expectTrue(t, test_helpers.ListContainsString(trxs, trx5CID.String()))
// and published
type txResult struct {
TxType uint8 `db:"tx_type"`
Value string
}
for _, c := range trxs {
dc, err := cid.Decode(c)
if err != nil {
Expand All @@ -243,47 +249,59 @@ func TestFileIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
txTypePgStr := `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1`
txTypeAndValueStr := `SELECT tx_type, value FROM eth.transaction_cids WHERE cid = $1`
switch c {
case trx1CID.String():
test_helpers.ExpectEqual(t, data, tx1)
var txType uint8
err = sqlxdb.Get(&txType, txTypePgStr, c)
txRes := new(txResult)
err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != txs[0].Value().String() {
t.Fatalf("expected tx value %s got %s", txs[0].Value().String(), txRes.Value)
}
case trx2CID.String():
test_helpers.ExpectEqual(t, data, tx2)
var txType uint8
err = sqlxdb.Get(&txType, txTypePgStr, c)
txRes := new(txResult)
err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != txs[1].Value().String() {
t.Fatalf("expected tx value %s got %s", txs[1].Value().String(), txRes.Value)
}
case trx3CID.String():
test_helpers.ExpectEqual(t, data, tx3)
var txType uint8
err = sqlxdb.Get(&txType, txTypePgStr, c)
txRes := new(txResult)
err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != txs[2].Value().String() {
t.Fatalf("expected tx value %s got %s", txs[2].Value().String(), txRes.Value)
}
case trx4CID.String():
test_helpers.ExpectEqual(t, data, tx4)
var txType uint8
err = sqlxdb.Get(&txType, txTypePgStr, c)
txRes := new(txResult)
err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", txType)
if txRes.TxType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", txRes.TxType)
}
if txRes.Value != txs[3].Value().String() {
t.Fatalf("expected tx value %s got %s", txs[3].Value().String(), txRes.Value)
}
accessListElementModels := make([]models.AccessListElementModel, 0)
pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC`
Expand All @@ -307,13 +325,16 @@ func TestFileIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model)
case trx5CID.String():
test_helpers.ExpectEqual(t, data, tx5)
var txType *uint8
err = sqlxdb.Get(&txType, txTypePgStr, c)
txRes := new(txResult)
err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if *txType != types.DynamicFeeTxType {
t.Fatalf("expected DynamicFeeTxType (2), got %d", *txType)
if txRes.TxType != types.DynamicFeeTxType {
t.Fatalf("expected DynamicFeeTxType (2), got %d", txRes.TxType)
}
if txRes.Value != txs[4].Value().String() {
t.Fatalf("expected tx value %s got %s", txs[4].Value().String(), txRes.Value)
}
}
}
Expand Down
40 changes: 17 additions & 23 deletions statediff/indexer/database/file/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,14 @@ const (
ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '\\x%x');\n"

headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " +
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, %s);\n"

headerInsertWithoutBaseFee = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, " +
"reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, NULL);\n"
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, '%s');\n"

uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s');\n"

txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) " +
"VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d);\n"
txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, " +
"value) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d, '%s');\n"

alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n"

Expand Down Expand Up @@ -191,42 +187,40 @@ func (sqw *SQLWriter) upsertIPLDRaw(codec, mh uint64, raw []byte) (string, strin
}

func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
var stmt string
if header.BaseFee == nil {
stmt = fmt.Sprintf(headerInsertWithoutBaseFee, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1)
} else {
stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, *header.BaseFee)
}
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase)
sqw.stmts <- []byte(stmt)
indexerMetrics.blocks.Inc(1)
}

func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey))
sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.MhKey))
}

func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type))
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value))
indexerMetrics.transactions.Inc(1)
}

func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address, formatPostgresStringArray(accessListElement.StorageKeys)))
sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address,
formatPostgresStringArray(accessListElement.StorageKeys)))
indexerMetrics.accessListEntries.Inc(1)
}

func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot))
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot))
indexerMetrics.receipts.Inc(1)
}

func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs {
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data))
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data))
indexerMetrics.logs.Inc(1)
}
}
Expand Down
9 changes: 8 additions & 1 deletion statediff/indexer/database/sql/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
Timestamp: header.Time,
BaseFee: baseFee,
Coinbase: header.Coinbase.String(),
})
}

Expand Down Expand Up @@ -316,6 +316,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
// index tx
trx := args.txs[i]
txID := trx.Hash().String()

var val string
if trx.Value() != nil {
val = trx.Value().String()
}

// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
Expand All @@ -331,6 +337,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
if err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel); err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions statediff/indexer/database/sql/pgx_indexer_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestPGXIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacyPGX(t)
defer tearDown(t)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
Expand All @@ -72,18 +72,18 @@ func TestPGXIndexerLegacy(t *testing.T) {
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *int64 `db:"base_fee"`
Coinbase string `db:"coinbase"`
}
header := new(res)

err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).Scan(
&header.CID, &header.TD, &header.Reward, &header.BlockHash, &header.BaseFee)
&header.CID, &header.TD, &header.Reward, &header.BlockHash, &header.Coinbase)
require.NoError(t, err)

test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockHeader.Coinbase.String())
require.Nil(t, legacyData.MockHeader.BaseFee)
require.Nil(t, header.BaseFee)
})
}
Loading

0 comments on commit 1dd37be

Please sign in to comment.