Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resumable newblock + notify miners with refreshed blocks #1907

Merged
merged 7 commits into from
Jun 17, 2024
1 change: 0 additions & 1 deletion bench/Chainweb/Pact/Backend/Bench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import Chainweb.MerkleLogHash
import Chainweb.Pact.Backend.RelationalCheckpointer
import Chainweb.Pact.Backend.Types
import Chainweb.Pact.Backend.Utils
import Chainweb.Pact.Service.Types
import Chainweb.Pact.Types
import Chainweb.Test.TestVersions
import Chainweb.Utils.Bench
Expand Down
21 changes: 13 additions & 8 deletions bench/Chainweb/Pact/Backend/ForkingBench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ bench rdb = C.bgroup "PactService" $
oneBlock :: BenchConfig -> Int -> C.Benchmark
oneBlock cfg txCount = withResources rdb cfg.numPriorBlocks Error cfg.compact cfg.persistIntraBlockWrites go
where
go _mainLineBlocks _pdb _bhdb _nonceCounter pactQueue txsPerBlock = do
go mainLineBlocks _pdb _bhdb _nonceCounter pactQueue txsPerBlock = do
C.bench name $ C.whnfIO $ do
writeIORef txsPerBlock txCount
createBlock cfg.validate (Nonce 1234) pactQueue
let (T3 _ join1 _) = last mainLineBlocks
createBlock cfg.validate (ParentHeader join1) (Nonce 1234) pactQueue
name = "block-new ["
++ List.intercalate ","
[ "txCount=" ++ show txCount
Expand Down Expand Up @@ -209,35 +210,39 @@ playLine pdb bhdb trunkLength startingBlock pactQueue counter = do
startHeight = fromIntegral $ _blockHeight start
go = do
r <- ask
pblock <- gets ParentHeader
n <- liftIO $ Nonce <$> readIORef ncounter
ret@(T3 _ newblock _) <- liftIO $ mineBlock n pdb bhdb r
ret@(T3 _ newblock _) <- liftIO $ mineBlock pblock n pdb bhdb r
liftIO $ modifyIORef' ncounter succ
put newblock
return ret

mineBlock
:: Nonce
:: ParentHeader
-> Nonce
-> PayloadDb HashMapTable
-> BlockHeaderDb
-> PactQueue
-> IO (T3 ParentHeader BlockHeader PayloadWithOutputs)
mineBlock nonce pdb bhdb pact = do
r@(T3 parent newHeader payload) <- createBlock DoValidate nonce pact
mineBlock parent nonce pdb bhdb pact = do
r@(T3 _ newHeader payload) <- createBlock DoValidate parent nonce pact
addNewPayload pdb (succ (_blockHeight (_parentHeader parent))) payload
-- NOTE: this doesn't validate the block header, which is fine in this test case
unsafeInsertBlockHeaderDb bhdb newHeader
return r

createBlock
:: Validate
-> ParentHeader
-> Nonce
-> PactQueue
-> IO (T3 ParentHeader BlockHeader PayloadWithOutputs)
createBlock validate nonce pact = do
createBlock validate parent nonce pact = do

-- assemble block without nonce and timestamp

T2 parent payload <- newBlock noMiner pact
bip <- throwIfNoHistory =<< newBlock noMiner NewBlockFill parent pact
let payload = blockInProgressToPayloadWithOutputs bip

let creationTime = add second $ _blockCreationTime $ _parentHeader parent
let bh = newBlockHeader
Expand Down
3 changes: 2 additions & 1 deletion src/Chainweb/Chainweb.hs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ import Chainweb.Mempool.P2pConfig
import Chainweb.Miner.Config
import qualified Chainweb.OpenAPIValidation as OpenAPIValidation
import Chainweb.Pact.RestAPI.Server (PactServerData(..))
import Chainweb.Pact.Service.Types (PactServiceConfig(..), IntraBlockPersistence(..))
import Chainweb.Pact.Service.Types (PactServiceConfig(..))
import Chainweb.Pact.Backend.Types (IntraBlockPersistence(..))
import Chainweb.Pact.Validations
import Chainweb.Payload.PayloadStore
import Chainweb.Payload.PayloadStore.RocksDB
Expand Down
106 changes: 83 additions & 23 deletions src/Chainweb/Chainweb/MinerResources.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

-- |
Expand Down Expand Up @@ -55,26 +57,26 @@ import Chainweb.ChainId
import Chainweb.Chainweb.ChainResources
import Chainweb.Cut (_cutMap)
import Chainweb.CutDB (CutDb, awaitNewBlock, cutDbPactService, _cut)
import Chainweb.Logger (Logger, logFunction)
import Chainweb.Logger
import Chainweb.Miner.Config
import Chainweb.Miner.Coordinator
import Chainweb.Miner.Miners
import Chainweb.Miner.Pact (Miner(..), minerId)
import Chainweb.Pact.Service.Types
import Chainweb.Pact.Utils
import Chainweb.Payload
import Chainweb.Payload.PayloadStore
import Chainweb.Sync.WebBlockHeaderStore
import Chainweb.Time (Micros, Time, minute, getCurrentTimeIntegral, scaleTimeSpan)
import Chainweb.Utils (fromJuste, runForever, thd, T2(..), T3(..))
import Chainweb.Time
import Chainweb.Utils
import Chainweb.Version
import Chainweb.WebPactExecutionService (_webPactExecutionService)
import Chainweb.WebPactExecutionService
import Utils.Logging.Trace

import Data.LogMessage (JsonLog(..), LogFunction)

import Numeric.AffineSpace

import Utils.Logging.Trace (trace)

-- -------------------------------------------------------------------------- --
-- Miner

Expand All @@ -96,8 +98,9 @@ withMiningCoordination logger conf cdb inner
in fmap ((mid,) . HM.fromList) $
forM cids $ \cid -> do
let bh = fromMaybe (genesisBlockHeader v cid) (HM.lookup cid (_cutMap cut))
fmap ((cid,) . over _2 Just) $
getPayload (ParentHeader bh) cid miner
newBlock <- throwIfNoHistory =<< getPayload cid miner (ParentHeader bh)
return (cid, WorkReady newBlock)

m <- newTVarIO initialPw
c503 <- newIORef 0
c403 <- newIORef 0
Expand Down Expand Up @@ -127,44 +130,101 @@ withMiningCoordination logger conf cdb inner
!miners = S.toList (_coordinationMiners coordConf)
<> [ _nodeMiner inNodeConf | _nodeMiningEnabled inNodeConf ]

chainLogger cid = addLabel ("chain", toText cid)

-- we assume that this path always exists in PrimedWork and never delete it.
workForMiner :: Miner -> ChainId -> Traversal' PrimedWork WorkState
workForMiner miner cid = _Wrapped' . ix (view minerId miner) . ix cid

periodicallyRefreshPayload :: TVar PrimedWork -> ChainId -> Miner -> IO a
periodicallyRefreshPayload tpw cid ourMiner = forever $ do
let delay =
timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf
threadDelay (fromIntegral @Micros @Int delay)
when (not $ v ^. versionCheats . disablePact) $ do
-- "stale" in the sense of not having all of the transactions
-- that it could. it still has the latest possible parent
mContinuableBlockInProgress <- atomically $ do
primed <- readTVar tpw <&> (^?! workForMiner ourMiner cid)
case primed of
WorkReady (NewBlockInProgress bip) -> return (Just bip)
WorkReady (NewBlockPayload {}) ->
error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed"
WorkAlreadyMined {} -> return Nothing
WorkStale -> return Nothing

forM_ mContinuableBlockInProgress $ \continuableBlockInProgress -> do
maybeNewBlock <- _pactContinueBlock pact cid continuableBlockInProgress
-- if continuing returns NoHistory then the parent header
-- isn't available in the checkpointer right now.
-- in that case we just mark the payload as not stale.
let newBlock = case maybeNewBlock of
NoHistory -> continuableBlockInProgress
Historical b -> b

logFunctionText (chainLogger cid logger) Debug
$ "refreshed block, old and new tx count: "
<> sshow (V.length $ _transactionPairs $ _blockInProgressTransactions continuableBlockInProgress, V.length $ _transactionPairs $ _blockInProgressTransactions newBlock)

atomically $ modifyTVar' tpw $
workForMiner ourMiner cid .~ WorkReady (NewBlockInProgress newBlock)

-- | THREAD: Keep a live-updated cache of Payloads for specific miners, such
-- that when they request new work, the block can be instantly constructed
-- without interacting with the Pact Queue.
--
primeWork :: TVar PrimedWork -> ChainId -> IO ()
primeWork tpw cid =
forConcurrently_ miners $ \miner ->
runForever (logFunction logger) "primeWork" (go miner)
runForever (logFunction (chainLogger cid logger)) "primeWork" (go miner)
where
go :: Miner -> IO ()
go miner = do
pw <- readTVarIO tpw
let
-- we assume that this path always exists in PrimedWork and never delete it.
ourMiner :: Traversal' PrimedWork (T2 ParentHeader (Maybe PayloadWithOutputs))
ourMiner = _Wrapped' . at (view minerId miner) . _Just . at cid . _Just
let !(T2 ph _) = fromJuste $ pw ^? ourMiner
-- wait for a block different from what we've got primed work for
new <- awaitNewBlock cdb cid (_parentHeader ph)
ourMiner :: Traversal' PrimedWork WorkState
ourMiner = workForMiner miner cid
let !outdatedPayload = fromJuste $ pw ^? ourMiner
let outdatedParentHash = case outdatedPayload of
WorkReady outdatedBlock -> _blockHash (_parentHeader (newBlockParentHeader outdatedBlock))
WorkAlreadyMined outdatedBlockHash -> outdatedBlockHash
WorkStale -> error "primeWork loop: Invariant Violation: Stale work should be an impossibility"

newParent <- either ParentHeader id <$> race
-- wait for a block different from what we've got primed work for
(awaitNewBlock cdb cid outdatedParentHash)
-- in the meantime, periodically refresh the payload to make sure
-- it has all of the transactions it can have
(periodicallyRefreshPayload tpw cid miner)

-- Temporarily block this chain from being considered for queries
atomically $ modifyTVar' tpw (ourMiner . _2 .~ Nothing)
-- Generate new payload for this miner
newParentAndPayload <- getPayload (ParentHeader new) cid miner
atomically $ modifyTVar' tpw (ourMiner .~ over _2 Just newParentAndPayload)
atomically $ modifyTVar' tpw (ourMiner .~ WorkStale)

-- Get a payload for the new block
getPayload cid miner newParent >>= \case
NoHistory -> do
logFunctionText (addLabel ("chain", toText cid) logger) Warn
"current block is not in the checkpointer; halting primed work loop temporarily"
approximateThreadDelay 1_000_000
atomically $ modifyTVar' tpw (ourMiner .~ outdatedPayload)
Historical newBlock ->
atomically $ modifyTVar' tpw (ourMiner .~ WorkReady newBlock)

getPayload :: ParentHeader -> ChainId -> Miner -> IO (T2 ParentHeader PayloadWithOutputs)
getPayload new cid m =
getPayload :: ChainId -> Miner -> ParentHeader -> IO (Historical NewBlock)
getPayload cid m ph =
if v ^. versionCheats . disablePact
-- if pact is disabled, we must keep track of the latest header
-- ourselves. otherwise we use the header we get from newBlock as the
-- real parent. newBlock may return a header in the past due to a race
-- with rocksdb though that shouldn't cause a problem, just wasted work,
-- see docs for
-- Chainweb.Pact.PactService.Checkpointer.findLatestValidBlockHeader'
then return $ T2 new emptyPayload
else trace (logFunction logger)
then return $ Historical $
NewBlockPayload ph emptyPayload
else trace (logFunction (chainLogger cid logger))
"Chainweb.Chainweb.MinerResources.withMiningCoordination.newBlock"
() 1 (_pactNewBlock pact cid m)
() 1 (_pactNewBlock pact cid m NewBlockFill ph)

pact :: PactExecutionService
pact = _webPactExecutionService $ view cutDbPactService cdb
Expand Down
2 changes: 1 addition & 1 deletion src/Chainweb/Cut.hs
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ isMonotonicCutExtension c h = do
validBraidingCid cid a
| Just b <- c ^? ixg cid = _blockHash b == a || _blockParent b == a
| _blockHeight h == genesisHeight v cid = a == genesisParentBlockHash v cid
| otherwise = error $ T.unpack $ "isMonotonicCutExtension.validBraiding: missing adjacent parent on chain " <> sshow cid <> " in cut. " <> encodeToText h
| otherwise = error $ T.unpack $ "isMonotonicCutExtension.validBraiding: missing adjacent parent on chain " <> toText cid <> " in cut. " <> encodeToText h

v = _chainwebVersion c

Expand Down
10 changes: 5 additions & 5 deletions src/Chainweb/CutDB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -349,16 +349,16 @@ awaitNewCutByChainId cdb cid c = atomically $ awaitNewCutByChainIdStm cdb cid c

-- | As in `awaitNewCut`, but only updates when the header at the specified
-- `ChainId` has changed, and only returns that new header.
awaitNewBlock :: CutDb tbl -> ChainId -> BlockHeader -> IO BlockHeader
awaitNewBlock cdb cid bh = atomically $ awaitNewBlockStm cdb cid bh
awaitNewBlock :: CutDb tbl -> ChainId -> BlockHash -> IO BlockHeader
awaitNewBlock cdb cid bHash = atomically $ awaitNewBlockStm cdb cid bHash

-- | As in `awaitNewCut`, but only updates when the header at the specified
-- `ChainId` has changed, and only returns that new header.
awaitNewBlockStm :: CutDb tbl -> ChainId -> BlockHeader -> STM BlockHeader
awaitNewBlockStm cdb cid bh = do
awaitNewBlockStm :: CutDb tbl -> ChainId -> BlockHash -> STM BlockHeader
awaitNewBlockStm cdb cid bHash = do
c <- _cutStm cdb
case HM.lookup cid (_cutMap c) of
Just bh' | _blockHash bh' /= _blockHash bh -> return bh'
Just bh' | _blockHash bh' /= bHash -> return bh'
_ -> retry

-- | As in `awaitNewCut`, but only updates when the specified `ChainId` has
Expand Down
15 changes: 14 additions & 1 deletion src/Chainweb/Miner/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}

-- |
Expand Down Expand Up @@ -55,7 +56,7 @@ import Pact.Types.Term (mkKeySet, PublicKeyText(..))
-- internal modules

import Chainweb.Miner.Pact (Miner(..), MinerKeys(..), MinerId(..), minerId)
import Chainweb.Time (Seconds)
import Chainweb.Time

---

Expand Down Expand Up @@ -138,6 +139,8 @@ data CoordinationConfig = CoordinationConfig
-- ^ the maximum number of concurrent update streams that is supported
, _coordinationUpdateStreamTimeout :: !Seconds
-- ^ the duration that an update stream is kept open in seconds
, _coordinationPayloadRefreshDelay :: !(TimeSpan Micros)
-- ^ the duration between payload refreshes in microseconds
} deriving stock (Eq, Show, Generic)

coordinationEnabled :: Lens' CoordinationConfig Bool
Expand All @@ -157,13 +160,18 @@ coordinationUpdateStreamTimeout :: Lens' CoordinationConfig Seconds
coordinationUpdateStreamTimeout =
lens _coordinationUpdateStreamTimeout (\m c -> m { _coordinationUpdateStreamTimeout = c })

coordinationPayloadRefreshDelay :: Lens' CoordinationConfig (TimeSpan Micros)
coordinationPayloadRefreshDelay =
lens _coordinationPayloadRefreshDelay (\m c -> m { _coordinationPayloadRefreshDelay = c })

instance ToJSON CoordinationConfig where
toJSON o = object
[ "enabled" .= _coordinationEnabled o
, "limit" .= _coordinationReqLimit o
, "miners" .= (J.toJsonViaEncode <$> S.toList (_coordinationMiners o))
, "updateStreamLimit" .= _coordinationUpdateStreamLimit o
, "updateStreamTimeout" .= _coordinationUpdateStreamTimeout o
, "payloadRefreshDelay" .= _coordinationPayloadRefreshDelay o
]

instance FromJSON (CoordinationConfig -> CoordinationConfig) where
Expand All @@ -173,6 +181,7 @@ instance FromJSON (CoordinationConfig -> CoordinationConfig) where
<*< coordinationMiners .fromLeftMonoidalUpdate %.: "miners" % o
<*< coordinationUpdateStreamLimit ..: "updateStreamLimit" % o
<*< coordinationUpdateStreamTimeout ..: "updateStreamTimeout" % o
<*< coordinationPayloadRefreshDelay ..: "payloadRefreshDelay" % o

defaultCoordination :: CoordinationConfig
defaultCoordination = CoordinationConfig
Expand All @@ -181,6 +190,7 @@ defaultCoordination = CoordinationConfig
, _coordinationReqLimit = 1200
, _coordinationUpdateStreamLimit = 2000
, _coordinationUpdateStreamTimeout = 240
, _coordinationPayloadRefreshDelay = TimeSpan (Micros 15_000_000)
}

pCoordinationConfig :: MParser CoordinationConfig
Expand All @@ -198,6 +208,9 @@ pCoordinationConfig = id
<*< coordinationUpdateStreamTimeout .:: jsonOption
% long "mining-update-stream-timeout"
<> help "duration that an update stream is kept open in seconds"
<*< coordinationPayloadRefreshDelay .:: jsonOption
% long "mining-payload-refresh-delay"
<> help "frequency that the mining payload is refreshed"

pMiner :: String -> Parser Miner
pMiner prefix = pkToMiner <$> pPk
Expand Down
Loading
Loading