Skip to content

Commit

Permalink
Discard reverting megabundle blocks and head change interrupted blocks (
Browse files Browse the repository at this point in the history
#123)

* Discard reverting megabundle blocks and head change interrupted blocks

* Discard all blocks with incomplete bundles

* Run reverting megabundles regression test separately from bundle tests
  • Loading branch information
Ruteri authored Mar 30, 2022
1 parent ec93a8f commit c8d2698
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 21 deletions.
23 changes: 22 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,31 @@ jobs:
path: e2e

- run: cd e2e && yarn install
- run: |
- name: Run single node e2e
run: |
cd e2e
GETH=`pwd`/../build/bin/geth ./run.sh &
sleep 15
yarn run demo-simple
yarn run e2e-reverting-bundles
yarn run demo-contract
pkill -9 geth || true
- name: Run private tx with two nodes
run: |
cd e2e
GETH=`pwd`/../build/bin/geth ./run.sh &
# Second node, not mining
P2P_PORT=30302 DATADIR=datadir2 HTTP_PORT=8546 MINER_ARGS='--nodiscover' GETH=`pwd`/../build/bin/geth ./run.sh &
sleep 15
DATADIR1=datadir DATADIR2=datadir2 GETH=`pwd`/../build/bin/geth ./peer_nodes.sh
sleep 15
yarn run demo-private-tx
pkill -9 geth || true
- name: Run megabundle-only node checking for reverts
run: |
cd e2e
# Disable bundle workers
MINER_ARGS='--miner.etherbase=0xd912aecb07e9f4e1ea8e6b4779e7fb6aa1c3e4d8 --miner.trustedrelays=0xfb11e78C4DaFec86237c2862441817701fdf197F --mine --miner.threads=2 --miner.maxmergedbundles=0' GETH=`pwd`/../build/bin/geth ./run.sh &
sleep 15
yarn run e2e-reverting-megabundle
pkill -9 geth || true
52 changes: 32 additions & 20 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
return receipt.Logs, nil
}

// Returns whether the block should be discarded
func (w *worker) commitBundle(env *environment, txs types.Transactions, interrupt *int32) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
Expand All @@ -938,8 +939,7 @@ func (w *worker) commitBundle(env *environment, txs types.Transactions, interrup
// (1) new head block event arrival, the interrupt signal is 1
// (2) worker start or restart, the interrupt signal is 1
// (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
// Discard the interrupted work, since it is incomplete and contains partial bundles
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
Expand All @@ -953,12 +953,13 @@ func (w *worker) commitBundle(env *environment, txs types.Transactions, interrup
}
}

return atomic.LoadInt32(interrupt) == commitInterruptNewHead
return true
}
// If we don't have enough gas for any further transactions then we're done
// If we don't have enough gas for any further transactions discard the block
// since not all bundles of the were applied
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break
return true
}

// Error may be ignored here. The error has already been checked
Expand Down Expand Up @@ -1245,7 +1246,8 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *int32, env *environment) {
// Returns whether the block should be discarded.
func (w *worker) fillTransactions(interrupt *int32, env *environment) bool {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(true)
Expand All @@ -1260,36 +1262,36 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) {
bundles, err := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time)
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
return true
}

bundleTxs, bundle, numBundles, err := w.generateFlashbotsBundle(env, bundles, pending)
if err != nil {
log.Error("Failed to generate flashbots bundle", "err", err)
return
return true
}
log.Info("Flashbots bundle", "ethToCoinbase", ethIntToFloat(bundle.totalEth), "gasUsed", bundle.totalGasUsed, "bundleScore", bundle.mevGasPrice, "bundleLength", len(bundleTxs), "numBundles", numBundles, "worker", w.flashbots.maxMergedBundles)
if len(bundleTxs) == 0 {
return
return true
}
if w.commitBundle(env, bundleTxs, interrupt) {
return
return true
}
env.profit.Add(env.profit, bundle.ethSentToCoinbase)
}
if w.flashbots.isMegabundleWorker {
megabundle, err := w.eth.TxPool().GetMegabundle(w.flashbots.relayAddr, env.header.Number, env.header.Time)
log.Info("Starting to process a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "error", err)
if err != nil {
return // no valid megabundle for this relay, nothing to do
return true // no valid megabundle for this relay, nothing to do
}

// Flashbots bundle merging duplicates work by simulating TXes and then committing them once more.
// Megabundles API focuses on speed and runs everything in one cycle.
coinbaseBalanceBefore := env.state.GetBalance(env.coinbase)
if w.commitBundle(env, megabundle.Txs, interrupt) {
log.Info("Could not commit a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle)
return
return true
}
var txStatuses = map[common.Hash]bool{}
for _, receipt := range env.receipts {
Expand All @@ -1299,11 +1301,11 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) {
status, ok := txStatuses[tx.Hash()]
if !ok {
log.Error("No TX receipt after megabundle simulation", "TxHash", tx.Hash())
return
return true
}
if !status && !containsHash(megabundle.RevertingTxHashes, tx.Hash()) {
log.Info("Ignoring megabundle because of failing TX", "relay", w.flashbots.relayAddr, "TxHash", tx.Hash())
return
return true
}
}
coinbaseBalanceAfter := env.state.GetBalance(env.coinbase)
Expand All @@ -1315,15 +1317,17 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) {
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interrupt) {
return
return true
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interrupt) {
return
return true
}
}

return false
}

// generateWork generates a sealing block based on the given parameters.
Expand All @@ -1334,7 +1338,11 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
}
defer work.discard()

w.fillTransactions(nil, work)
shouldDiscard := w.fillTransactions(nil, work)
if shouldDiscard {
return nil, errors.New("could not generate valid block")
}

return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}

Expand Down Expand Up @@ -1365,7 +1373,11 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
w.commit(work.copy(), nil, false, start)
}
// Fill pending transactions from the txpool
w.fillTransactions(interrupt, work)
shouldDiscard := w.fillTransactions(interrupt, work)
if shouldDiscard {
return
}

w.commit(work.copy(), w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
Expand Down Expand Up @@ -1525,11 +1537,11 @@ func (w *worker) simulateBundles(env *environment, bundles []types.MevBundle, pe
simulatedBundles := []simulatedBundle{}

for _, bundle := range bundles {
state := env.state.Copy()
gasPool := new(core.GasPool).AddGas(env.header.GasLimit)
if len(bundle.Txs) == 0 {
continue
}
state := env.state.Copy()
gasPool := new(core.GasPool).AddGas(env.header.GasLimit)
simmed, err := w.computeBundleGas(env, bundle, state, gasPool, pendingTxs, 0)

if err != nil {
Expand Down

0 comments on commit c8d2698

Please sign in to comment.