From a175c2279886dcab99b6f02edbb76d44753815ee Mon Sep 17 00:00:00 2001 From: prathamesh0 Date: Fri, 17 Jun 2022 19:11:48 +0530 Subject: [PATCH] Disassociate block number from the indexer object --- statediff/indexer/database/dump/batch_tx.go | 17 +-- statediff/indexer/database/dump/indexer.go | 126 +++++++++++--------- statediff/indexer/database/file/indexer.go | 62 +++++----- statediff/indexer/database/sql/indexer.go | 36 +++--- 4 files changed, 129 insertions(+), 112 deletions(-) diff --git a/statediff/indexer/database/dump/batch_tx.go b/statediff/indexer/database/dump/batch_tx.go index f1754b9072e6..ee195a558be2 100644 --- a/statediff/indexer/database/dump/batch_tx.go +++ b/statediff/indexer/database/dump/batch_tx.go @@ -30,7 +30,7 @@ import ( // BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration type BatchTx struct { - BlockNumber uint64 + BlockNumber string dump io.Writer quit chan struct{} iplds chan models.IPLDModel @@ -68,15 +68,17 @@ func (tx *BatchTx) cache() { func (tx *BatchTx) cacheDirect(key string, value []byte) { tx.iplds <- models.IPLDModel{ - Key: key, - Data: value, + BlockNumber: tx.BlockNumber, + Key: key, + Data: value, } } func (tx *BatchTx) cacheIPLD(i node.Node) { tx.iplds <- models.IPLDModel{ - Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), - Data: i.RawData(), + BlockNumber: tx.BlockNumber, + Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), + Data: i.RawData(), } } @@ -87,8 +89,9 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error } prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() tx.iplds <- models.IPLDModel{ - Key: prefixedKey, - Data: raw, + BlockNumber: tx.BlockNumber, + Key: prefixedKey, + Data: raw, } return c.String(), prefixedKey, err } diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 42fc8488a6f1..0b7a5ffb076e 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -102,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() blockTx := &BatchTx{ - BlockNumber: height, + BlockNumber: block.Number().String(), dump: sdi.dump, iplds: make(chan models.IPLDModel), quit: make(chan struct{}), @@ -146,7 +146,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles - err = sdi.processUncles(blockTx, headerID, height, uncleNodes) + err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes) if err != nil { return nil, err } @@ -206,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -215,15 +215,16 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu if sdi.chainConfig.Clique != nil { uncleReward = big.NewInt(0) } else { - uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) + uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64()) } uncle := models.UncleModel{ - HeaderID: headerID, - CID: uncleNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), - ParentHash: uncleNode.ParentHash.String(), - BlockHash: uncleNode.Hash().String(), - Reward: uncleReward.String(), + BlockNumber: blockNumber.String(), + HeaderID: headerID, + CID: uncleNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), + ParentHash: uncleNode.ParentHash.String(), + BlockHash: uncleNode.Hash().String(), + Reward: uncleReward.String(), } if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", uncle); err != nil { return err @@ -274,16 +275,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - HeaderID: args.headerID, - Dst: shared.HandleZeroAddrPointer(trx.To()), - Src: shared.HandleZeroAddr(from), - TxHash: trxID, - Index: int64(i), - Data: trx.Data(), - CID: txNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(txNode.Cid()), - Type: trx.Type(), - Value: val, + BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), + TxHash: trxID, + Index: int64(i), + Data: trx.Data(), + CID: txNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(txNode.Cid()), + Type: trx.Type(), + Value: val, } if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil { return err @@ -296,6 +298,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ + BlockNumber: args.blockNumber.String(), TxID: trxID, Index: int64(j), Address: accessListElement.Address.Hex(), @@ -319,6 +322,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } rctModel := &models.ReceiptModel{ + BlockNumber: args.blockNumber.String(), TxID: trxID, Contract: contract, ContractHash: contractHash, @@ -348,16 +352,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } logDataSet[idx] = &models.LogsModel{ - ReceiptID: trxID, - Address: l.Address.String(), - Index: int64(l.Index), - Data: l.Data, - LeafCID: args.logLeafNodeCIDs[i][idx].String(), - LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), - Topic0: topicSet[0], - Topic1: topicSet[1], - Topic2: topicSet[2], - Topic3: topicSet[3], + BlockNumber: args.blockNumber.String(), + ReceiptID: trxID, + Address: l.Address.String(), + Index: int64(l.Index), + Data: l.Data, + LeafCID: args.logLeafNodeCIDs[i][idx].String(), + LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]), + Topic0: topicSet[0], + Topic1: topicSet[1], + Topic2: topicSet[2], + Topic3: topicSet[3], } } @@ -379,7 +384,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { - return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) + return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } // publish the state node var stateModel models.StateNodeModel @@ -387,12 +392,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present stateModel = models.StateNodeModel{ - HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: shared.RemovedNodeStateCID, - MhKey: shared.RemovedNodeMhKey, - NodeType: stateNode.NodeType.Int(), + BlockNumber: tx.BlockNumber, + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: shared.RemovedNodeStateCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: stateNode.NodeType.Int(), } } else { stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) @@ -400,12 +406,13 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel = models.StateNodeModel{ - HeaderID: headerID, - Path: stateNode.Path, - StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: stateCIDStr, - MhKey: stateMhKey, - NodeType: stateNode.NodeType.Int(), + BlockNumber: tx.BlockNumber, + HeaderID: headerID, + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: stateCIDStr, + MhKey: stateMhKey, + NodeType: stateNode.NodeType.Int(), } } @@ -428,6 +435,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ + BlockNumber: tx.BlockNumber, HeaderID: headerID, StatePath: stateNode.Path, Balance: account.Balance.String(), @@ -446,13 +454,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ - HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, - StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: shared.RemovedNodeStorageCID, - MhKey: shared.RemovedNodeMhKey, - NodeType: storageNode.NodeType.Int(), + BlockNumber: tx.BlockNumber, + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: shared.RemovedNodeStorageCID, + MhKey: shared.RemovedNodeMhKey, + NodeType: storageNode.NodeType.Int(), } if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil { return err @@ -464,13 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ - HeaderID: headerID, - StatePath: stateNode.Path, - Path: storageNode.Path, - StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: storageCIDStr, - MhKey: storageMhKey, - NodeType: storageNode.NodeType.Int(), + BlockNumber: tx.BlockNumber, + HeaderID: headerID, + StatePath: stateNode.Path, + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: storageCIDStr, + MhKey: storageMhKey, + NodeType: storageNode.NodeType.Int(), } if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil { return err @@ -484,7 +494,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error { tx, ok := batch.(*BatchTx) if !ok { - return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) + return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } // codec doesn't matter since db key is multihash-based mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash) diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 5ddb3f5b94c5..62da6d8c3627 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -64,7 +64,6 @@ type StateDiffIndexer struct { chainConfig *params.ChainConfig nodeID string wg *sync.WaitGroup - blockNumber string removedCacheFlag *uint32 watchedAddressesFilePath string @@ -111,7 +110,6 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {} // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { sdi.removedCacheFlag = new(uint32) - sdi.blockNumber = block.Number().String() start, t := time.Now(), time.Now() blockHash := block.Hash() blockHashStr := blockHash.String() @@ -147,7 +145,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() blockTx := &BatchTx{ - BlockNumber: sdi.blockNumber, + BlockNumber: block.Number().String(), submit: func(self *BatchTx, err error) error { tDiff := time.Since(t) indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) @@ -175,7 +173,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip t = time.Now() // write uncles - sdi.processUncles(headerID, height, uncleNodes) + sdi.processUncles(headerID, block.Number(), uncleNodes) tDiff = time.Since(t) indexerMetrics.tUncleProcessing.Update(tDiff) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) @@ -209,7 +207,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // processHeader write a header IPLD insert SQL stmt to a file // it returns the headerID func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string { - sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, headerNode) + sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode) var baseFee *string if header.BaseFee != nil { @@ -222,7 +220,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), - BlockNumber: sdi.blockNumber, + BlockNumber: header.Number.String(), BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), @@ -238,19 +236,19 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node } // processUncles writes uncle IPLD insert SQL stmts to a file -func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) { +func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) { // publish and index uncles for _, uncleNode := range uncleNodes { - sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, uncleNode) + sdi.fileWriter.upsertIPLDNode(blockNumber.String(), uncleNode) var uncleReward *big.Int // in PoA networks uncle reward is 0 if sdi.chainConfig.Clique != nil { uncleReward = big.NewInt(0) } else { - uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) + uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64()) } sdi.fileWriter.upsertUncleCID(models.UncleModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: blockNumber.String(), HeaderID: headerID, CID: uncleNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), @@ -282,10 +280,10 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) for i, receipt := range args.receipts { for _, logTrieNode := range args.logTrieNodes[i] { - sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, logTrieNode) + sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), logTrieNode) } txNode := args.txNodes[i] - sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, txNode) + sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), txNode) // index tx trx := args.txs[i] @@ -302,7 +300,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: args.blockNumber.String(), HeaderID: args.headerID, Dst: shared.HandleZeroAddrPointer(trx.To()), Src: shared.HandleZeroAddr(from), @@ -323,7 +321,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: args.blockNumber.String(), TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), @@ -345,7 +343,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } rctModel := &models.ReceiptModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: args.blockNumber.String(), TxID: txID, Contract: contract, ContractHash: contractHash, @@ -373,7 +371,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } logDataSet[idx] = &models.LogsModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: args.blockNumber.String(), ReceiptID: txID, Address: l.Address.String(), Index: int64(l.Index), @@ -391,8 +389,8 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { // publish trie nodes, these aren't indexed directly for i, n := range args.txTrieNodes { - sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, n) - sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, args.rctTrieNodes[i]) + sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), n) + sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), args.rctTrieNodes[i]) } return nil @@ -400,15 +398,19 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { // PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { + tx, ok := batch.(*BatchTx) + if !ok { + return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch) + } // publish the state node var stateModel models.StateNodeModel if stateNode.NodeType == sdtypes.Removed { if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { atomic.StoreUint32(sdi.removedCacheFlag, 1) - sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{}) + sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeMhKey, []byte{}) } stateModel = models.StateNodeModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: tx.BlockNumber, HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), @@ -417,12 +419,12 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt NodeType: stateNode.NodeType.Int(), } } else { - stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) + stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(tx.BlockNumber, ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) if err != nil { return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel = models.StateNodeModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: tx.BlockNumber, HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), @@ -449,7 +451,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: tx.BlockNumber, HeaderID: headerID, StatePath: stateNode.Path, Balance: account.Balance.String(), @@ -465,10 +467,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt if storageNode.NodeType == sdtypes.Removed { if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { atomic.StoreUint32(sdi.removedCacheFlag, 1) - sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{}) + sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeMhKey, []byte{}) } storageModel := models.StorageNodeModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: tx.BlockNumber, HeaderID: headerID, StatePath: stateNode.Path, Path: storageNode.Path, @@ -480,12 +482,12 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt sdi.fileWriter.upsertStorageCID(storageModel) continue } - storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) + storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(tx.BlockNumber, ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) if err != nil { return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: tx.BlockNumber, HeaderID: headerID, StatePath: stateNode.Path, Path: storageNode.Path, @@ -502,12 +504,16 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt // PushCodeAndCodeHash writes code and codehash pairs insert SQL stmts to a file func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error { + tx, ok := batch.(*BatchTx) + if !ok { + return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch) + } // codec doesn't matter since db key is multihash-based mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash) if err != nil { return fmt.Errorf("error deriving multihash key from codehash: %v", err) } - sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, mhKey, codeAndCodeHash.Code) + sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, mhKey, codeAndCodeHash.Code) return nil } diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 74ce06745f31..d39f2a0e5631 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -55,7 +55,6 @@ type StateDiffIndexer struct { ctx context.Context chainConfig *params.ChainConfig dbWriter *Writer - blockNumber string } // NewStateDiffIndexer creates a sql implementation of interfaces.StateDiffIndexer @@ -90,7 +89,6 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { start, t := time.Now(), time.Now() - sdi.blockNumber = block.Number().String() blockHash := block.Hash() blockHashStr := blockHash.String() height := block.NumberU64() @@ -140,7 +138,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip blockTx := &BatchTx{ removedCacheFlag: new(uint32), ctx: sdi.ctx, - BlockNumber: sdi.blockNumber, + BlockNumber: block.Number().String(), stm: sdi.dbWriter.db.InsertIPLDsStm(), iplds: make(chan models.IPLDModel), quit: make(chan struct{}), @@ -203,7 +201,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index uncles - err = sdi.processUncles(blockTx, headerID, block.NumberU64(), uncleNodes) + err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes) if err != nil { return nil, err } @@ -252,7 +250,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: header.ParentHash.String(), - BlockNumber: sdi.blockNumber, + BlockNumber: header.Number.String(), BlockHash: headerID, TotalDifficulty: td.String(), Reward: reward.String(), @@ -267,7 +265,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he } // processUncles publishes and indexes uncle IPLDs in Postgres -func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error { +func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { tx.cacheIPLD(uncleNode) @@ -276,10 +274,10 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu if sdi.chainConfig.Clique != nil { uncleReward = big.NewInt(0) } else { - uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) + uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64()) } uncle := models.UncleModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: blockNumber.String(), HeaderID: headerID, CID: uncleNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), @@ -335,7 +333,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return fmt.Errorf("error deriving tx sender: %v", err) } txModel := models.TxModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: args.blockNumber.String(), HeaderID: args.headerID, Dst: shared.HandleZeroAddrPointer(trx.To()), Src: shared.HandleZeroAddr(from), @@ -358,7 +356,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs storageKeys[k] = storageKey.Hex() } accessListElementModel := models.AccessListElementModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: args.blockNumber.String(), TxID: txID, Index: int64(j), Address: accessListElement.Address.Hex(), @@ -382,7 +380,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } rctModel := &models.ReceiptModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: args.blockNumber.String(), TxID: txID, Contract: contract, ContractHash: contractHash, @@ -413,7 +411,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs } logDataSet[idx] = &models.LogsModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: args.blockNumber.String(), ReceiptID: txID, Address: l.Address.String(), Index: int64(l.Index), @@ -445,14 +443,14 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error { tx, ok := batch.(*BatchTx) if !ok { - return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) + return fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } // publish the state node var stateModel models.StateNodeModel if stateNode.NodeType == sdtypes.Removed { tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{}) stateModel = models.StateNodeModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: tx.BlockNumber, HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), @@ -466,7 +464,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } stateModel = models.StateNodeModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: tx.BlockNumber, HeaderID: headerID, Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), @@ -495,7 +493,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error decoding state account rlp: %s", err.Error()) } accountModel := models.StateAccountModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: tx.BlockNumber, HeaderID: headerID, StatePath: stateNode.Path, Balance: account.Balance.String(), @@ -513,7 +511,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt if storageNode.NodeType == sdtypes.Removed { tx.cacheRemoved(shared.RemovedNodeMhKey, []byte{}) storageModel := models.StorageNodeModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: tx.BlockNumber, HeaderID: headerID, StatePath: stateNode.Path, Path: storageNode.Path, @@ -532,7 +530,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } storageModel := models.StorageNodeModel{ - BlockNumber: sdi.blockNumber, + BlockNumber: tx.BlockNumber, HeaderID: headerID, StatePath: stateNode.Path, Path: storageNode.Path, @@ -553,7 +551,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error { tx, ok := batch.(*BatchTx) if !ok { - return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch) + return fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } // codec doesn't matter since db key is multihash-based mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)