Skip to content

Commit

Permalink
Merge branch 'sender_lazy' into custom_trace_upper_bound
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Dec 12, 2024
2 parents ad33f7a + 43956ca commit 291ca48
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 28 deletions.
11 changes: 0 additions & 11 deletions cmd/state/exec3/historical_trace_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,17 +469,6 @@ func CustomTraceMapReduce(fromBlock, toBlock uint64, consumer TraceConsumer, ctx
if err != nil {
return err
}

if sender, ok := txs[txIndex].GetSender(); ok {
txTask.Sender = &sender
} else {
sender, err := signer.Sender(txTask.Tx)
if err != nil {
return err
}
txTask.Sender = &sender
logger.Warn("[Execution] expensive lazy sender recovery", "blockNum", txTask.BlockNum, "txIdx", txTask.TxIndex)
}
}
if workersExited.Load() {
return workers.Wait()
Expand Down
4 changes: 2 additions & 2 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ func (rs *StateV3) RegisterSender(txTask *TxTask) bool {
}()
rs.triggerLock.Lock()
defer rs.triggerLock.Unlock()
lastTxNum, deferral := rs.senderTxNums[*txTask.Sender]
lastTxNum, deferral := rs.senderTxNums[*txTask.Sender()]
if deferral {
// Transactions with the same sender have obvious data dependency, no point running it before lastTxNum
// So we add this data dependency as a trigger
//fmt.Printf("trigger[%d] sender [%x]<=%x\n", lastTxNum, *txTask.Sender, txTask.Tx.Hash())
rs.triggers[lastTxNum] = txTask
}
//fmt.Printf("senderTxNums[%x]=%d\n", *txTask.Sender, txTask.TxNum)
rs.senderTxNums[*txTask.Sender] = txTask.TxNum
rs.senderTxNums[*txTask.Sender()] = txTask.TxNum
return !deferral
}

Expand Down
24 changes: 22 additions & 2 deletions core/state/txtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"container/heap"
"context"
"fmt"
"github.com/erigontech/erigon-lib/common/dbg"
"sync"
"time"

"github.com/erigontech/erigon-lib/common/dbg"
"github.com/erigontech/erigon-lib/log/v3"

"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon/core/rawdb/rawtemporaldb"
"github.com/holiman/uint256"
Expand All @@ -49,7 +51,7 @@ type TxTask struct {
Coinbase libcommon.Address
Withdrawals types.Withdrawals
BlockHash libcommon.Hash
Sender *libcommon.Address
sender *libcommon.Address
SkipAnalysis bool
PruneNonEssentials bool
TxIndex int // -1 for block initialisation
Expand Down Expand Up @@ -86,6 +88,24 @@ type TxTask struct {
Config *chain.Config
}

func (t *TxTask) Sender() *libcommon.Address {
if t.sender != nil {
return t.sender
}
if sender, ok := t.Tx.GetSender(); ok {
t.sender = &sender
return t.sender
}
signer := *types.MakeSigner(t.Config, t.BlockNum, t.Header.Time)
sender, err := signer.Sender(t.Tx)
if err != nil {
panic(err)
}
t.sender = &sender
log.Warn("[Execution] expensive lazy sender recovery", "blockNum", t.BlockNum, "txIdx", t.TxIndex)
return t.sender
}

func (t *TxTask) CreateReceipt(tx kv.Tx) {
if t.TxIndex < 0 || t.Final {
return
Expand Down
11 changes: 0 additions & 11 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,17 +569,6 @@ Loop:
if err != nil {
return err
}

if sender, ok := txs[txIndex].GetSender(); ok {
txTask.Sender = &sender
} else {
sender, err := signer.Sender(txTask.Tx)
if err != nil {
return err
}
txTask.Sender = &sender
logger.Warn("[Execution] expensive lazy sender recovery", "blockNum", txTask.BlockNum, "txIdx", txTask.TxIndex)
}
}

txTasks = append(txTasks, txTask)
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/exec3_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func (pe *parallelExecutor) processResultQueue(ctx context.Context, inputTxNum u
// return outputTxNum, conflicts, triggers, processedBlockNum, false, fmt.Errorf("block hashk mismatch: %x != %x bn =%d, txn= %d", rh, txTask.BlockRoot[:], txTask.BlockNum, txTask.TxNum)
//}
}
triggers += pe.rs.CommitTxNum(txTask.Sender, txTask.TxNum, pe.in)
triggers += pe.rs.CommitTxNum(txTask.Sender(), txTask.TxNum, pe.in)
outputTxNum++
if backPressure != nil {
select {
Expand Down Expand Up @@ -533,7 +533,7 @@ func (pe *parallelExecutor) wait() error {

func (pe *parallelExecutor) execute(ctx context.Context, tasks []*state.TxTask) (bool, error) {
for _, txTask := range tasks {
if txTask.Sender != nil {
if txTask.Sender() != nil {
if ok := pe.rs.RegisterSender(txTask); ok {
pe.rs.AddWork(ctx, txTask, pe.in)
}
Expand Down

0 comments on commit 291ca48

Please sign in to comment.