Skip to content

Commit

Permalink
Add proper structured traces around internal mempool management + fix…
Browse files Browse the repository at this point in the history
… initial race condition.

  For some reason, the first 'acquire' on a mempool is forever blocking
  until at least one change occurs in the mempool. The call to
  `MsgAcquire` is simply never answered.

  This is odd since the server implementation only performs a read on a
  `TVar` (which in principle can not be blocking).
  • Loading branch information
KtorZ committed Mar 21, 2024
1 parent 9acb398 commit e3b27fa
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 69 deletions.
184 changes: 124 additions & 60 deletions server/src/Ogmios/App/Protocol/TxSubmission.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

{-# LANGUAGE RecordWildCards #-}

{-# OPTIONS_GHC -fno-warn-partial-fields #-}

-- | Transaction submission is pretty simple & works by submitting an already
-- serialized and signed transaction as one single message.
--
Expand All @@ -30,6 +32,7 @@ module Ogmios.App.Protocol.TxSubmission
( ExecutionUnitsEvaluator(..)
, mkTxSubmissionClient
, newExecutionUnitsEvaluator
, TraceTxSubmission (..)
) where

import Ogmios.Prelude
Expand All @@ -41,7 +44,7 @@ import Cardano.Ledger.Babbage.TxBody
( BabbageEraTxBody (..)
)
import Cardano.Ledger.BaseTypes
( SlotNo
( SlotNo (..)
)
import Cardano.Ledger.Core
( EraTx (..)
Expand All @@ -50,13 +53,26 @@ import Cardano.Ledger.Core
import Control.Monad.Trans.Except
( Except
)
import Data.Aeson
( genericToEncoding
, toEncoding
)
import Data.SOP.Strict
( NS (..)
)
import Data.Type.Equality
( testEquality
, (:~:) (..)
)
import Ogmios.Control.MonadClock
( MonadClock (..)
)
import Ogmios.Control.MonadLog
( HasSeverityAnnotation (..)
, Logger
, MonadLog (..)
, Severity (..)
)
import Ogmios.Control.MonadSTM
( MonadSTM (..)
, TQueue
Expand All @@ -71,6 +87,7 @@ import Ogmios.Data.EraTranslation
import Ogmios.Data.Json
( Json
, MultiEraDecoder (..)
, ToJSON
)
import Ogmios.Data.Ledger.ScriptFailure
( EvaluateTransactionError (..)
Expand Down Expand Up @@ -100,6 +117,7 @@ import Ogmios.Data.Protocol.TxSubmission
, nodeTipTooOld
, unsupportedEra
, utxoFromMempool
, utxoReferences
)
import Ouroboros.Consensus.Cardano.Block
( BlockQuery (..)
Expand Down Expand Up @@ -152,10 +170,8 @@ import Type.Reflection
)

import qualified Codec.Json.Rpc as Rpc
import qualified Data.Aeson as Json
import qualified Data.Map.Strict as Map
import GHC.Clock
( getMonotonicTimeNSec
)
import qualified Ouroboros.Consensus.HardFork.Combinator as HF
import qualified Ouroboros.Consensus.Ledger.Query as Ledger
import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as LSQ
Expand All @@ -164,12 +180,14 @@ import qualified Ouroboros.Network.Protocol.LocalTxMonitor.Client as LMM
mkTxSubmissionClient
:: forall m block crypto.
( MonadSTM m
, MonadIO m
, MonadLog m
, HasTxId (SerializedTransaction block)
, Crypto crypto
, block ~ CardanoBlock crypto
)
=> (forall a r. m a -> (Json -> m ()) -> Rpc.ToResponse r -> m a -> m a)
=> Logger TraceTxSubmission
-- ^ A logger for the submission client
-> (forall a r. m a -> (Json -> m ()) -> Rpc.ToResponse r -> m a -> m a)
-- ^ A default response handler to catch errors.
-> TxSubmissionCodecs block
-- ^ For encoding Haskell types to JSON
Expand All @@ -180,7 +198,7 @@ mkTxSubmissionClient
-> (Json -> m ())
-- ^ An emitter for yielding JSON objects
-> LocalTxSubmissionClient (SerializedTransaction block) (SubmitTransactionError block) m ()
mkTxSubmissionClient defaultWithInternalError TxSubmissionCodecs{..} ExecutionUnitsEvaluator{..} queue yield = do
mkTxSubmissionClient tr defaultWithInternalError TxSubmissionCodecs{..} ExecutionUnitsEvaluator{..} queue yield = do
LocalTxSubmissionClient clientStIdle
where
await :: m (TxSubmissionMessage block)
Expand Down Expand Up @@ -215,10 +233,9 @@ mkTxSubmissionClient defaultWithInternalError TxSubmissionCodecs{..} ExecutionUn
defaultWithInternalError clientStIdle yield toResponse $ case request of
MultiEraDecoderSuccess transaction -> do
mempoolUtxo <- utxoFromMempool <$> readMempoolM
putStrLn $ "extra utxo provided by user: " <> show additionalUtxo
putStrLn $ "extra utxo inferred from mempool: " <> show mempoolUtxo
logWith tr $ TxSubmissionEvaluateUtxoProvidedByUser { utxoRefs = utxoReferences additionalUtxo }
logWith tr $ TxSubmissionEvaluateUtxoInferredFromMempool { utxoRefs = utxoReferences mempoolUtxo }
result <- evaluateExecutionUnitsM (mergeUtxo mempoolUtxo additionalUtxo, transaction)
putStrLn $ "evaluation result: " <> show result
result
& toResponse
& encodeEvaluateTransactionResponse
Expand Down Expand Up @@ -249,22 +266,24 @@ data ExecutionUnitsEvaluator m block = ExecutionUnitsEvaluator
newExecutionUnitsEvaluator
:: forall m block crypto.
( MonadSTM m
, MonadIO m
, MonadLog m
, MonadClock m
, block ~ HardForkBlock (CardanoEras crypto)
, crypto ~ StandardCrypto
, CanEvaluateScriptsInEra (BabbageEra crypto)
, CanEvaluateScriptsInEra (ConwayEra crypto)
)
=> m ( ExecutionUnitsEvaluator m block
=> Logger TraceTxSubmission
-> m ( ExecutionUnitsEvaluator m block
, LocalStateQueryClient block (Point block) (Query block) m ()
, LocalTxMonitorClient (GenTxId block) (GenTx block) SlotNo m ()
)
newExecutionUnitsEvaluator = do
newExecutionUnitsEvaluator tr = do
evaluateExecutionUnitsRequest <- newEmptyTMVarIO
evaluateExecutionUnitsResponse <- newEmptyTMVarIO

mempool <- newEmptyTMVarIO
mempoolSnapshot <- newEmptyTMVarIO
mempool <- newTMVarIO []
mempoolSnapshot <- newTVarIO Nothing
mempoolSnapshotOnLastSubmit <- newTVarIO Nothing

let runEvaluation = either return $ \(!eval) -> do
Expand All @@ -273,50 +292,53 @@ newExecutionUnitsEvaluator = do

return
( ExecutionUnitsEvaluator
{ evaluateExecutionUnitsM = runEvaluation . \case
(_, GenTxByron{}) ->
Left (incompatibleEra "byron")
{ evaluateExecutionUnitsM = runEvaluation <=< \case
(_, GenTxByron{}) -> do
return $ Left (incompatibleEra "byron")

(_, GenTxShelley{}) ->
Left (incompatibleEra "shelley")
return $ Left (incompatibleEra "shelley")

(_, GenTxAllegra{}) ->
Left (incompatibleEra "allegra")
return $ Left (incompatibleEra "allegra")

(_, GenTxMary{}) ->
Left (incompatibleEra "mary")
return $ Left (incompatibleEra "mary")

(_, GenTxAlonzo{}) ->
Left (unsupportedEra "alonzo")
return $ Left (unsupportedEra "alonzo")

(UTxOInBabbageEra utxo, GenTxBabbage (ShelleyTx _id tx)) -> do
Right (SomeEvaluationInAnyEra utxo tx)
logWith tr $ TxSubmissionEvaluateArguments { utxoEra = "babbage", transactionEra = "babbage" }
return $ Right (SomeEvaluationInAnyEra utxo tx)

(UTxOInBabbageEra utxo, GenTxConway (ShelleyTx _id tx)) -> do
Right (SomeEvaluationInAnyEra (upgrade utxo) tx)
logWith tr $ TxSubmissionEvaluateArguments { utxoEra = "babbage", transactionEra = "conway" }
return $ Right (SomeEvaluationInAnyEra (upgrade utxo) tx)

(UTxOInConwayEra utxo, GenTxBabbage (ShelleyTx _id tx)) -> do
Right (SomeEvaluationInAnyEra utxo (upgrade tx))
logWith tr $ TxSubmissionEvaluateArguments { utxoEra = "conway", transactionEra = "babbage" }
return $ Right (SomeEvaluationInAnyEra utxo (upgrade tx))

(UTxOInConwayEra utxo, GenTxConway (ShelleyTx _id tx)) -> do
Right (SomeEvaluationInAnyEra utxo tx)
logWith tr $ TxSubmissionEvaluateArguments { utxoEra = "conway", transactionEra = "conway" }
return $ Right (SomeEvaluationInAnyEra utxo tx)

, readMempoolM = do
putStrLn "reading local mempool..."
logWith tr TxSubmissionLocalMempoolTryRead
txs <- atomically $ do
snapshot <- readTMVar mempoolSnapshot
snapshot <- readTVar mempoolSnapshot
readTVar mempoolSnapshotOnLastSubmit >>= \case
Just snapshot' | snapshot == snapshot' -> retry
Just snapshot' | snapshot == Just snapshot' -> retry
_ -> readTMVar mempool
putStrLn $ "found " <> show (length txs) <> " transaction(s)"
return txs
txs <$ logWith tr TxSubmissionLocalMempoolCurrent { numberOfTransactions = length txs }

, clearMempoolM = do
snapshot <- atomically $ do
snapshot <- readTMVar mempoolSnapshot
writeTVar mempoolSnapshotOnLastSubmit (Just snapshot)
pure snapshot
putStrLn $ "successfully submitted transaction at " <> show snapshot
snapshot <- readTVar mempoolSnapshot
snapshot <$ writeTVar mempoolSnapshotOnLastSubmit snapshot
whenJust snapshot $ \(unSlotNo -> atSlot, ns) ->
logWith tr TxSubmissionLocalMempoolCleared { atSlot, ns }
}

, localStateQueryClient
Expand All @@ -328,37 +350,30 @@ newExecutionUnitsEvaluator = do
where
mempoolMonitoringClient
:: TMVar m [GenTx block]
-> TMVar m (SlotNo, Word64)
-> TVar m (Maybe (SlotNo, Word64))
-> LocalTxMonitorClient (GenTxId block) (GenTx block) SlotNo m ()
mempoolMonitoringClient mempool mempoolSnapshot =
LocalTxMonitorClient $ pure $ LMM.SendMsgAcquire $ \slot -> do
now <- liftIO getMonotonicTimeNSec
putStrLn $ "received first snapshot at " <> show (slot, now)
atomically $ putTMVar mempoolSnapshot (slot, now)
clientStDrain now []
LocalTxMonitorClient $ pure $ LMM.SendMsgAcquire clientStAwaitAcquire
where
clientStDrain :: Word64 -> [GenTx block] -> m (LMM.ClientStAcquired (GenTxId block) (GenTx block) SlotNo m ())
clientStDrain !start !txs =
clientStAwaitAcquire :: SlotNo -> m (LMM.ClientStAcquired (GenTxId block) (GenTx block) SlotNo m ())
clientStAwaitAcquire slot = do
now <- getMonotonicTimeNSec
logWith tr $ TxSubmissionLocalMempoolNewSnapshot { atSlot = unSlotNo slot, ns = now }
atomically $ void $ takeTMVar mempool
clientStDrain slot now []

clientStDrain :: SlotNo -> Word64 -> [GenTx block] -> m (LMM.ClientStAcquired (GenTxId block) (GenTx block) SlotNo m ())
clientStDrain !slot !start !txs =
pure $ LMM.SendMsgNextTx $ \case
Nothing -> do
end <- liftIO getMonotonicTimeNSec
atomically $ putTMVar mempool $ reverse txs
putStrLn $ "done draining mempool in " <> show ((end - start) `div` 1000) <> "μs"
clientStAwaitChange

end <- getMonotonicTimeNSec
atomically $ do
writeTVar mempoolSnapshot (Just (slot, start))
putTMVar mempool $ reverse txs
logWith tr $ TxSubmissionLocalMempoolSynchronized { took = Microseconds $ (end - start) `div` 1000 }
pure $ LMM.SendMsgAwaitAcquire clientStAwaitAcquire
Just tx -> do
clientStDrain start (tx : txs)

clientStAwaitChange :: m (LMM.ClientStAcquired (GenTxId block) (GenTx block) SlotNo m ())
clientStAwaitChange = do
putStrLn "awaiting next mempool snapshot"
pure $ LMM.SendMsgAwaitAcquire $ \slot -> do
now <- liftIO getMonotonicTimeNSec
putStrLn $ "received new snapshot at " <> show slot
atomically $ do
void $ swapTMVar mempoolSnapshot (slot, now)
void $ takeTMVar mempool
clientStDrain now []
clientStDrain slot start (tx : txs)

localStateQueryClient
:: m (SomeEvaluationInAnyEra crypto)
Expand Down Expand Up @@ -636,3 +651,52 @@ translateToNetworkEra (SomeEvaluationInAnyEra utxoOrig txOrig) =
Nothing
in
sameEra <|> babbageToConway

--
-- Logs
--

data TraceTxSubmission where
TxSubmissionEvaluateArguments
:: { utxoEra :: Text, transactionEra :: Text }
-> TraceTxSubmission
TxSubmissionLocalMempoolTryRead
:: TraceTxSubmission
TxSubmissionLocalMempoolCurrent
:: { numberOfTransactions :: Int }
-> TraceTxSubmission
TxSubmissionLocalMempoolCleared
:: { atSlot :: Word64, ns :: Word64 }
-> TraceTxSubmission
TxSubmissionLocalMempoolNewSnapshot
:: { atSlot :: Word64, ns :: Word64 }
-> TraceTxSubmission
TxSubmissionLocalMempoolSynchronized
:: { took :: Microseconds }
-> TraceTxSubmission
TxSubmissionEvaluateUtxoProvidedByUser
:: { utxoRefs :: [Text] }
-> TraceTxSubmission
TxSubmissionEvaluateUtxoInferredFromMempool
:: { utxoRefs :: [Text] }
-> TraceTxSubmission
deriving (Show, Generic)

instance ToJSON TraceTxSubmission where
toEncoding = genericToEncoding Json.defaultOptions

instance HasSeverityAnnotation TraceTxSubmission where
getSeverityAnnotation = \case
TxSubmissionEvaluateArguments{} -> Debug
TxSubmissionLocalMempoolTryRead{} -> Debug
TxSubmissionLocalMempoolCurrent{} -> Debug
TxSubmissionLocalMempoolCleared{} -> Info
TxSubmissionLocalMempoolNewSnapshot{} -> Info
TxSubmissionLocalMempoolSynchronized{ took } ->
if took > threshold then Warning else Info
TxSubmissionEvaluateUtxoProvidedByUser{ utxoRefs } ->
if null utxoRefs then Debug else Info
TxSubmissionEvaluateUtxoInferredFromMempool{ utxoRefs } ->
if null utxoRefs then Debug else Info
where
threshold = Microseconds (1_000_000 :: Integer)
Loading

0 comments on commit e3b27fa

Please sign in to comment.