Skip to content

Commit

Permalink
Merge pull request #248 from deep-stack/pm-disassociate-blocknumber
Browse files Browse the repository at this point in the history
Disassociate block number from the indexer object
  • Loading branch information
ashwinphatak authored Jun 23, 2022
2 parents d629c99 + e8c0769 commit 5e3de63
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 113 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:

env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '382aca8e42bc5e33f301f77cdd2e09cc80602fc3'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || '48eb594ea95763bda8e51590f105f7a2657ac6d4' }}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || '65b7bee7a6757c1fc527c8bfdc4f99ab915fcf36' }}
GOPATH: /tmp/go

jobs:
Expand Down
17 changes: 10 additions & 7 deletions statediff/indexer/database/dump/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber uint64
BlockNumber string
dump io.Writer
quit chan struct{}
iplds chan models.IPLDModel
Expand Down Expand Up @@ -68,15 +68,17 @@ func (tx *BatchTx) cache() {

func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.iplds <- models.IPLDModel{
Key: key,
Data: value,
BlockNumber: tx.BlockNumber,
Key: key,
Data: value,
}
}

func (tx *BatchTx) cacheIPLD(i node.Node) {
tx.iplds <- models.IPLDModel{
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
BlockNumber: tx.BlockNumber,
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
}
}

Expand All @@ -87,8 +89,9 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error
}
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
tx.iplds <- models.IPLDModel{
Key: prefixedKey,
Data: raw,
BlockNumber: tx.BlockNumber,
Key: prefixedKey,
Data: raw,
}
return c.String(), prefixedKey, err
}
126 changes: 68 additions & 58 deletions statediff/indexer/database/dump/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now()

blockTx := &BatchTx{
BlockNumber: height,
BlockNumber: block.Number().String(),
dump: sdi.dump,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
Expand Down Expand Up @@ -146,7 +146,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
err = sdi.processUncles(blockTx, headerID, height, uncleNodes)
err = sdi.processUncles(blockTx, headerID, block.Number(), uncleNodes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
}

// processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber *big.Int, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode)
Expand All @@ -215,15 +215,16 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNu
if sdi.chainConfig.Clique != nil {
uncleReward = big.NewInt(0)
} else {
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
uncleReward = shared.CalcUncleMinerReward(blockNumber.Uint64(), uncleNode.Number.Uint64())
}
uncle := models.UncleModel{
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
BlockNumber: blockNumber.String(),
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", uncle); err != nil {
return err
Expand Down Expand Up @@ -274,16 +275,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return fmt.Errorf("error deriving tx sender: %v", err)
}
txModel := models.TxModel{
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
TxHash: trxID,
Index: int64(i),
Data: trx.Data(),
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
TxHash: trxID,
Index: int64(i),
Data: trx.Data(),
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil {
return err
Expand All @@ -296,6 +298,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex()
}
accessListElementModel := models.AccessListElementModel{
BlockNumber: args.blockNumber.String(),
TxID: trxID,
Index: int64(j),
Address: accessListElement.Address.Hex(),
Expand All @@ -319,6 +322,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}

rctModel := &models.ReceiptModel{
BlockNumber: args.blockNumber.String(),
TxID: trxID,
Contract: contract,
ContractHash: contractHash,
Expand Down Expand Up @@ -348,16 +352,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}

logDataSet[idx] = &models.LogsModel{
ReceiptID: trxID,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
BlockNumber: args.blockNumber.String(),
ReceiptID: trxID,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
}
}

Expand All @@ -379,33 +384,35 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
// publish the state node
var stateModel models.StateNodeModel
if stateNode.NodeType == sdtypes.Removed {
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel = models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
}
} else {
stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
stateModel = models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
}
}

Expand All @@ -428,6 +435,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
}
accountModel := models.StateAccountModel{
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Balance: account.Balance.String(),
Expand All @@ -446,13 +454,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
return err
Expand All @@ -464,13 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
BlockNumber: tx.BlockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
return err
Expand All @@ -484,7 +494,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
return fmt.Errorf("dump: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
// codec doesn't matter since db key is multihash-based
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
Expand Down
Loading

0 comments on commit 5e3de63

Please sign in to comment.