Skip to content

Commit

Permalink
Merge pull request #166 from vulcanize/retry-on-deadlock
Browse files Browse the repository at this point in the history
Retry aborted transaction when the deadlock is detected.
  • Loading branch information
i-norden authored Dec 14, 2021
2 parents a21abdd + 8a3fba7 commit ef3f270
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"math/big"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -46,8 +47,12 @@ import (
. "github.com/ethereum/go-ethereum/statediff/types"
)

const chainEventChanSize = 20000
const genesisBlockNumber = 0
const (
chainEventChanSize = 20000
genesisBlockNumber = 0
defaultRetryLimit = 3 // default retry limit once deadlock is detected.
deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html
)

var writeLoopParams = Params{
IntermediateStateNodes: true,
Expand Down Expand Up @@ -131,6 +136,8 @@ type Service struct {
enableWriteLoop bool
// Size of the worker pool
numWorkers uint
// Number of retry for aborted transactions due to deadlock.
maxRetry uint
}

// Wrap the cached last block for safe access from different service loops
Expand Down Expand Up @@ -189,6 +196,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
indexer: indexer,
enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers,
maxRetry: defaultRetryLimit,
}
stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs())
Expand Down Expand Up @@ -270,7 +278,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
err := sds.writeStateDiff(currBlock, common.Hash{}, writeLoopParams)
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams)
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height",
genesisBlockNumber, "error", err.Error(), "worker", workerId)
Expand Down Expand Up @@ -299,7 +307,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
}

log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams)
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams)
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
continue
Expand Down Expand Up @@ -638,7 +646,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root()
}
return sds.writeStateDiff(currentBlock, parentRoot, params)
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
}

// WriteStateDiffFor writes a state diff for the specific blockhash directly to the database
Expand All @@ -651,7 +659,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root()
}
return sds.writeStateDiff(currentBlock, parentRoot, params)
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
}

// Writes a state diff from the current block, parent state root, and provided params
Expand Down Expand Up @@ -691,3 +699,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
}
return nil
}

// Wrapper function on writeStateDiff to retry when the deadlock is detected.
func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot common.Hash, params Params) error {
var err error
for i := uint(0); i < sds.maxRetry; i++ {
err = sds.writeStateDiff(block, parentRoot, params)
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
// Retry only when the deadlock is detected.
continue
}
break
}
return err
}

0 comments on commit ef3f270

Please sign in to comment.