From 488309b5a932372878d5f10e51dabc9ea69edb43 Mon Sep 17 00:00:00 2001 From: i-norden Date: Wed, 29 Dec 2021 01:05:24 -0600 Subject: [PATCH] port retry on deadlock detection feature --- statediff/service.go | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 5334b4b3113f..c8c7649fd426 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -20,6 +20,7 @@ import ( "bytes" "math/big" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -46,8 +47,12 @@ import ( types2 "github.com/ethereum/go-ethereum/statediff/types" ) -const chainEventChanSize = 20000 -const genesisBlockNumber = 0 +const ( + chainEventChanSize = 20000 + genesisBlockNumber = 0 + defaultRetryLimit = 3 // default retry limit once deadlock is detected. + deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html +) var writeLoopParams = Params{ IntermediateStateNodes: true, @@ -122,6 +127,8 @@ type Service struct { enableWriteLoop bool // Size of the worker pool numWorkers uint + // Number of retry for aborted transactions due to deadlock. + maxRetry uint } // BlockCache caches the last block for safe access from different service loops @@ -174,6 +181,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params indexer: indexer, enableWriteLoop: params.EnableWriteLoop, numWorkers: workers, + maxRetry: defaultRetryLimit, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -266,7 +274,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) { // For genesis block we need to return the entire state trie hence we diff it with an empty trie. log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId) - err := sds.writeStateDiff(currBlock, common.Hash{}, writeLoopParams) + err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams) if err != nil { log.Error("statediff.Service.WriteLoop: processing error", "block height", genesisBlockNumber, "error", err.Error(), "worker", workerId) @@ -295,7 +303,7 @@ func (sds *Service) writeLoopWorker(params workerParams) { } log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id) - err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams) + err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams) if err != nil { log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) continue @@ -632,7 +640,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash()) parentRoot = parentBlock.Root() } - return sds.writeStateDiff(currentBlock, parentRoot, params) + return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) } // WriteStateDiffFor writes a state diff for the specific blockhash directly to the database @@ -645,7 +653,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash()) parentRoot = parentBlock.Root() } - return sds.writeStateDiff(currentBlock, parentRoot, params) + return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) } // Writes a state diff from the current block, parent state root, and provided params @@ -689,3 +697,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p } return nil } + +// Wrapper function on writeStateDiff to retry when the deadlock is detected. +func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot common.Hash, params Params) error { + var err error + for i := uint(0); i < sds.maxRetry; i++ { + err = sds.writeStateDiff(block, parentRoot, params) + if err != nil && strings.Contains(err.Error(), deadlockDetected) { + // Retry only when the deadlock is detected. + continue + } + break + } + return err +}