Skip to content

Commit

Permalink
add WorkState sum type that more properly represents PrimedWork state
Browse files Browse the repository at this point in the history
Change-Id: Ib1604f81a9e7f2f8542f28e0c467f417fcd00cc3
  • Loading branch information
chessai authored and edmundnoble committed Jun 14, 2024
2 parents ea53ebd + eaf299e commit 0c49a37
Show file tree
Hide file tree
Showing 17 changed files with 224 additions and 168 deletions.
21 changes: 12 additions & 9 deletions bench/Chainweb/Pact/Backend/ForkingBench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ bench rdb = C.bgroup "PactService" $
oneBlock :: BenchConfig -> Int -> C.Benchmark
oneBlock cfg txCount = withResources rdb cfg.numPriorBlocks Error cfg.compact cfg.persistIntraBlockWrites go
where
go _mainLineBlocks _pdb _bhdb _nonceCounter pactQueue txsPerBlock = do
go mainLineBlocks _pdb _bhdb _nonceCounter pactQueue txsPerBlock = do
C.bench name $ C.whnfIO $ do
writeIORef txsPerBlock txCount
createBlock cfg.validate (Nonce 1234) pactQueue
let (T3 _ join1 _) = last mainLineBlocks
createBlock cfg.validate (ParentHeader join1) (Nonce 1234) pactQueue
name = "block-new ["
++ List.intercalate ","
[ "txCount=" ++ show txCount
Expand Down Expand Up @@ -209,36 +210,38 @@ playLine pdb bhdb trunkLength startingBlock pactQueue counter = do
startHeight = fromIntegral $ _blockHeight start
go = do
r <- ask
pblock <- gets ParentHeader
n <- liftIO $ Nonce <$> readIORef ncounter
ret@(T3 _ newblock _) <- liftIO $ mineBlock n pdb bhdb r
ret@(T3 _ newblock _) <- liftIO $ mineBlock pblock n pdb bhdb r
liftIO $ modifyIORef' ncounter succ
put newblock
return ret

mineBlock
:: Nonce
:: ParentHeader
-> Nonce
-> PayloadDb HashMapTable
-> BlockHeaderDb
-> PactQueue
-> IO (T3 ParentHeader BlockHeader PayloadWithOutputs)
mineBlock nonce pdb bhdb pact = do
r@(T3 parent newHeader payload) <- createBlock DoValidate nonce pact
mineBlock parent nonce pdb bhdb pact = do
r@(T3 _ newHeader payload) <- createBlock DoValidate parent nonce pact
addNewPayload pdb (succ (_blockHeight (_parentHeader parent))) payload
-- NOTE: this doesn't validate the block header, which is fine in this test case
unsafeInsertBlockHeaderDb bhdb newHeader
return r

createBlock
:: Validate
-> ParentHeader
-> Nonce
-> PactQueue
-> IO (T3 ParentHeader BlockHeader PayloadWithOutputs)
createBlock validate nonce pact = do
createBlock validate parent nonce pact = do

-- assemble block without nonce and timestamp

bip <- newBlock noMiner NewBlockFill pact
let parent = _blockInProgressParentHeader bip
bip <- throwIfNoHistory =<< newBlock noMiner NewBlockFill parent pact
let payload = blockInProgressToPayloadWithOutputs bip

let creationTime = add second $ _blockCreationTime $ _parentHeader parent
Expand Down
111 changes: 64 additions & 47 deletions src/Chainweb/Chainweb/MinerResources.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand Down Expand Up @@ -70,13 +71,12 @@ import Chainweb.Time
import Chainweb.Utils
import Chainweb.Version
import Chainweb.WebPactExecutionService
import Utils.Logging.Trace

import Data.LogMessage (JsonLog(..), LogFunction)

import Numeric.AffineSpace

import Utils.Logging.Trace (trace)

-- -------------------------------------------------------------------------- --
-- Miner

Expand All @@ -98,8 +98,8 @@ withMiningCoordination logger conf cdb inner
in fmap ((mid,) . HM.fromList) $
forM cids $ \cid -> do
let bh = fromMaybe (genesisBlockHeader v cid) (HM.lookup cid (_cutMap cut))
newBlock <- getPayload cid miner (ParentHeader bh)
return (cid, Just newBlock)
newBlock <- throwIfNoHistory =<< getPayload cid miner (ParentHeader bh)
return (cid, WorkReady newBlock)

m <- newTVarIO initialPw
c503 <- newIORef 0
Expand Down Expand Up @@ -132,6 +132,43 @@ withMiningCoordination logger conf cdb inner

chainLogger cid = addLabel ("chain", toText cid)

-- we assume that this path always exists in PrimedWork and never delete it.
workForMiner :: Miner -> ChainId -> Traversal' PrimedWork WorkState
workForMiner miner cid = _Wrapped' . ix (view minerId miner) . ix cid

periodicallyRefreshPayload :: TVar PrimedWork -> ChainId -> Miner -> IO a
periodicallyRefreshPayload tpw cid ourMiner = forever $ do
let delay =
timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf
threadDelay (fromIntegral @Micros @Int delay)
when (not $ v ^. versionCheats . disablePact) $ do
-- "stale" in the sense of not having all of the transactions
-- that it could. it still has the latest possible parent
mContinuableBlockInProgress <- atomically $ do
primed <- readTVar tpw <&> (^?! workForMiner ourMiner cid)
case primed of
WorkReady (NewBlockInProgress bip) -> return (Just bip)
WorkReady (NewBlockPayload {}) ->
error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed"
WorkAlreadyMined {} -> return Nothing
WorkStale -> return Nothing

forM_ mContinuableBlockInProgress $ \continuableBlockInProgress -> do
maybeNewBlock <- _pactContinueBlock pact cid continuableBlockInProgress
-- if continuing returns NoHistory then the parent header
-- isn't available in the checkpointer right now.
-- in that case we just mark the payload as not stale.
let newBlock = case maybeNewBlock of
NoHistory -> continuableBlockInProgress
Historical b -> b

logFunctionText (chainLogger cid logger) Debug
$ "refreshed block, old and new tx count: "
<> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions continuableBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock)

atomically $ modifyTVar' tpw $
workForMiner ourMiner cid .~ WorkReady (NewBlockInProgress newBlock)

-- | THREAD: Keep a live-updated cache of Payloads for specific miners, such
-- that when they request new work, the block can be instantly constructed
-- without interacting with the Pact Queue.
Expand All @@ -146,55 +183,35 @@ withMiningCoordination logger conf cdb inner
pw <- readTVarIO tpw
let
-- we assume that this path always exists in PrimedWork and never delete it.
ourMiner :: Traversal' PrimedWork (Maybe NewBlock)
ourMiner = _Wrapped' . ix (view minerId miner) . ix cid
let !outdatedPayload = pw ^?! ourMiner . _Just
let ParentHeader outdatedParent = newBlockParentHeader outdatedPayload
let
periodicallyRefreshPayload = do
let delay =
timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf
threadDelay (fromIntegral @Micros @Int delay)
when (not $ v ^. versionCheats . disablePact) $ do
continuableBlockInProgress <- atomically $ do
primed <- readTVar tpw <&> (^?! (ourMiner . _Just))
case primed of
NewBlockInProgress bip -> return bip
NewBlockPayload {} ->
error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed"
maybeNewBlock <- _pactContinueBlock pact cid continuableBlockInProgress
-- if continuing returns Nothing then the parent header
-- isn't available in the checkpointer right now.
-- in that case we just mark the payload as not stale
let newBlock = case maybeNewBlock of
NoHistory -> continuableBlockInProgress
Historical b -> b

logFunctionText (chainLogger cid logger) Debug
$ "refreshed block, old and new tx count: "
<> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions continuableBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock)

atomically $ modifyTVar' tpw $
ourMiner .~ Just (NewBlockInProgress newBlock)
periodicallyRefreshPayload
ourMiner :: Traversal' PrimedWork WorkState
ourMiner = workForMiner miner cid
let !outdatedPayload = fromJuste $ pw ^? ourMiner
let outdatedParentHash = case outdatedPayload of
WorkReady outdatedBlock -> _blockHash (_parentHeader (newBlockParentHeader outdatedBlock))
WorkAlreadyMined outdatedBlockHash -> outdatedBlockHash
WorkStale -> error "primeWork loop: Invariant Violation: Stale work should be an impossibility"

newParent <- either ParentHeader id <$> race
-- wait for a block different from what we've got primed work for
(awaitNewBlock cdb cid outdatedParent)
(awaitNewBlock cdb cid outdatedParentHash)
-- in the meantime, periodically refresh the payload to make sure
-- it has all of the transactions it can have
periodicallyRefreshPayload
(periodicallyRefreshPayload tpw cid miner)

-- Temporarily block this chain from being considered for queries
atomically $ modifyTVar' tpw (ourMiner .~ Nothing)
atomically $ modifyTVar' tpw (ourMiner .~ WorkStale)

-- Get a payload for the new block
newBlock <- getPayload cid miner newParent

atomically $ modifyTVar' tpw (ourMiner .~ Just newBlock)


getPayload :: ChainId -> Miner -> ParentHeader -> IO NewBlock
getPayload cid miner newParent >>= \case
NoHistory -> do
logFunctionText (addLabel ("chain", toText cid) logger) Warn
"current block is not in the checkpointer; halting primed work loop temporarily"
approximateThreadDelay 1_000_000
atomically $ modifyTVar' tpw (ourMiner .~ outdatedPayload)
Historical newBlock ->
atomically $ modifyTVar' tpw (ourMiner .~ WorkReady newBlock)

getPayload :: ChainId -> Miner -> ParentHeader -> IO (Historical NewBlock)
getPayload cid m ph =
if v ^. versionCheats . disablePact
-- if pact is disabled, we must keep track of the latest header
Expand All @@ -203,11 +220,11 @@ withMiningCoordination logger conf cdb inner
-- with rocksdb though that shouldn't cause a problem, just wasted work,
-- see docs for
-- Chainweb.Pact.PactService.Checkpointer.findLatestValidBlockHeader'
then return $
then return $ Historical $
NewBlockPayload ph emptyPayload
else trace (logFunction (chainLogger cid logger))
else trace (logFunction (addLabel ("chain", toText cid) logger))
"Chainweb.Chainweb.MinerResources.withMiningCoordination.newBlock"
() 1 (_pactNewBlock pact cid m NewBlockFill)
() 1 (_pactNewBlock pact cid m NewBlockFill ph)

pact :: PactExecutionService
pact = _webPactExecutionService $ view cutDbPactService cdb
Expand Down
10 changes: 5 additions & 5 deletions src/Chainweb/CutDB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -349,16 +349,16 @@ awaitNewCutByChainId cdb cid c = atomically $ awaitNewCutByChainIdStm cdb cid c

-- | As in `awaitNewCut`, but only updates when the header at the specified
-- `ChainId` has changed, and only returns that new header.
awaitNewBlock :: CutDb tbl -> ChainId -> BlockHeader -> IO BlockHeader
awaitNewBlock cdb cid bh = atomically $ awaitNewBlockStm cdb cid bh
awaitNewBlock :: CutDb tbl -> ChainId -> BlockHash -> IO BlockHeader
awaitNewBlock cdb cid bHash = atomically $ awaitNewBlockStm cdb cid bHash

-- | As in `awaitNewCut`, but only updates when the header at the specified
-- `ChainId` has changed, and only returns that new header.
awaitNewBlockStm :: CutDb tbl -> ChainId -> BlockHeader -> STM BlockHeader
awaitNewBlockStm cdb cid bh = do
awaitNewBlockStm :: CutDb tbl -> ChainId -> BlockHash -> STM BlockHeader
awaitNewBlockStm cdb cid bHash = do
c <- _cutStm cdb
case HM.lookup cid (_cutMap c) of
Just bh' | _blockHash bh' /= _blockHash bh -> return bh'
Just bh' | _blockHash bh' /= bHash -> return bh'
_ -> retry

-- | As in `awaitNewCut`, but only updates when the specified `ChainId` has
Expand Down
38 changes: 28 additions & 10 deletions src/Chainweb/Miner/Coordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ module Chainweb.Miner.Coordinator
, PrevTime(..)
, ChainChoice(..)
, PrimedWork(..)
, WorkState(..)
, MiningCoordination(..)
, NoAsscociatedPayload(..)

Expand All @@ -59,7 +60,6 @@ import qualified Data.HashMap.Strict as HM
import Data.IORef
import Data.List(sort)
import qualified Data.Map.Strict as M
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Vector as V

Expand All @@ -71,6 +71,7 @@ import System.LogLevel (LogLevel(..))
-- internal modules

import Chainweb.BlockCreationTime
import Chainweb.BlockHash (BlockHash)
import Chainweb.BlockHeader
import Chainweb.Cut hiding (join)
import Chainweb.Cut.Create
Expand Down Expand Up @@ -132,14 +133,26 @@ data MiningCoordination logger tbl = MiningCoordination
-- made as often as desired, without clogging the Pact queue.
--
newtype PrimedWork =
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId (Maybe NewBlock)))
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId WorkState))
deriving newtype (Semigroup, Monoid)
deriving stock Generic
deriving anyclass (Wrapped)

resetPrimed :: MinerId -> ChainId -> PrimedWork -> PrimedWork
resetPrimed mid cid (PrimedWork pw) = PrimedWork
$! HM.adjust (HM.adjust (\_ -> Nothing) cid) mid pw
data WorkState
= WorkReady NewBlock
-- ^ We have work ready for the miner
| WorkAlreadyMined BlockHash
-- ^ A block with this parent has already been mined and submitted to the
-- cut pipeline - we don't want to mine it again.
| WorkStale
-- ^ No work has been produced yet with the latest parent block on this
-- chain.
deriving stock (Show)

isWorkReady :: WorkState -> Bool
isWorkReady = \case
WorkReady {} -> True
_ -> False

-- | Data shared between the mining threads represented by `newWork` and
-- `publish`.
Expand Down Expand Up @@ -202,20 +215,23 @@ newWork logFun choice eminer@(Miner mid _) hdb pact tpw c = do
mpw <- atomically $ do
PrimedWork pw <- readTVar tpw
mpw <- maybe retry return (HM.lookup mid pw)
guard (any isJust mpw)
guard (any isWorkReady mpw)
return mpw
let mr = T2
<$> HM.lookup cid mpw
<*> getCutExtension c cid

case mr of
Just (T2 Nothing _) -> do
Just (T2 WorkStale _) -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has stale work"
newWork logFun Anything eminer hdb pact tpw c
Just (T2 (WorkAlreadyMined _) _) -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has a payload that was already mined"
newWork logFun Anything eminer hdb pact tpw c
Nothing -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " not mineable"
newWork logFun Anything eminer hdb pact tpw c
Just (T2 (Just newBlock) extension) -> do
Just (T2 (WorkReady newBlock) extension) -> do
let ParentHeader primedParent = newBlockParentHeader newBlock
if _blockHash primedParent == _blockHash (_parentHeader (_cutExtensionParent extension))
then do
Expand Down Expand Up @@ -264,7 +280,9 @@ publish lf cdb pwVar miner pwo s = do
Right (bh, Just ch) -> do

-- reset the primed payload for this cut extension
atomically $ modifyTVar pwVar $ resetPrimed miner (_chainId bh)
atomically $ modifyTVar pwVar $ \(PrimedWork pw) ->
PrimedWork $! HM.adjust (HM.insert (_chainId bh) (WorkAlreadyMined (_blockParent bh))) miner pw

addCutHashes cdb ch

let bytes = sum . fmap (BS.length . _transactionBytes . fst) $
Expand Down Expand Up @@ -338,7 +356,7 @@ work mr mcid m = do
"no chains have primed work"
| otherwise ->
"all chains with primed work may be stalled. chains with primed payloads: "
<> sshow (sort [cid | (cid, Just _) <- HM.toList mpw])
<> sshow (sort [cid | (cid, WorkReady _) <- HM.toList mpw])
)

logDelays n'
Expand Down
Loading

0 comments on commit 0c49a37

Please sign in to comment.