diff --git a/server/src/Ogmios/App/Protocol/TxSubmission.hs b/server/src/Ogmios/App/Protocol/TxSubmission.hs index c37d85d982..c8edcb24aa 100644 --- a/server/src/Ogmios/App/Protocol/TxSubmission.hs +++ b/server/src/Ogmios/App/Protocol/TxSubmission.hs @@ -4,6 +4,8 @@ {-# LANGUAGE RecordWildCards #-} +{-# OPTIONS_GHC -fno-warn-partial-fields #-} + -- | Transaction submission is pretty simple & works by submitting an already -- serialized and signed transaction as one single message. -- @@ -30,6 +32,7 @@ module Ogmios.App.Protocol.TxSubmission ( ExecutionUnitsEvaluator(..) , mkTxSubmissionClient , newExecutionUnitsEvaluator + , TraceTxSubmission (..) ) where import Ogmios.Prelude @@ -41,7 +44,7 @@ import Cardano.Ledger.Babbage.TxBody ( BabbageEraTxBody (..) ) import Cardano.Ledger.BaseTypes - ( SlotNo + ( SlotNo (..) ) import Cardano.Ledger.Core ( EraTx (..) @@ -50,6 +53,10 @@ import Cardano.Ledger.Core import Control.Monad.Trans.Except ( Except ) +import Data.Aeson + ( genericToEncoding + , toEncoding + ) import Data.SOP.Strict ( NS (..) ) @@ -57,6 +64,15 @@ import Data.Type.Equality ( testEquality , (:~:) (..) ) +import Ogmios.Control.MonadClock + ( MonadClock (..) + ) +import Ogmios.Control.MonadLog + ( HasSeverityAnnotation (..) + , Logger + , MonadLog (..) + , Severity (..) + ) import Ogmios.Control.MonadSTM ( MonadSTM (..) , TQueue @@ -71,6 +87,7 @@ import Ogmios.Data.EraTranslation import Ogmios.Data.Json ( Json , MultiEraDecoder (..) + , ToJSON ) import Ogmios.Data.Ledger.ScriptFailure ( EvaluateTransactionError (..) @@ -100,6 +117,7 @@ import Ogmios.Data.Protocol.TxSubmission , nodeTipTooOld , unsupportedEra , utxoFromMempool + , utxoReferences ) import Ouroboros.Consensus.Cardano.Block ( BlockQuery (..) @@ -152,10 +170,8 @@ import Type.Reflection ) import qualified Codec.Json.Rpc as Rpc +import qualified Data.Aeson as Json import qualified Data.Map.Strict as Map -import GHC.Clock - ( getMonotonicTimeNSec - ) import qualified Ouroboros.Consensus.HardFork.Combinator as HF import qualified Ouroboros.Consensus.Ledger.Query as Ledger import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as LSQ @@ -164,12 +180,14 @@ import qualified Ouroboros.Network.Protocol.LocalTxMonitor.Client as LMM mkTxSubmissionClient :: forall m block crypto. ( MonadSTM m - , MonadIO m + , MonadLog m , HasTxId (SerializedTransaction block) , Crypto crypto , block ~ CardanoBlock crypto ) - => (forall a r. m a -> (Json -> m ()) -> Rpc.ToResponse r -> m a -> m a) + => Logger TraceTxSubmission + -- ^ A logger for the submission client + -> (forall a r. m a -> (Json -> m ()) -> Rpc.ToResponse r -> m a -> m a) -- ^ A default response handler to catch errors. -> TxSubmissionCodecs block -- ^ For encoding Haskell types to JSON @@ -180,7 +198,7 @@ mkTxSubmissionClient -> (Json -> m ()) -- ^ An emitter for yielding JSON objects -> LocalTxSubmissionClient (SerializedTransaction block) (SubmitTransactionError block) m () -mkTxSubmissionClient defaultWithInternalError TxSubmissionCodecs{..} ExecutionUnitsEvaluator{..} queue yield = do +mkTxSubmissionClient tr defaultWithInternalError TxSubmissionCodecs{..} ExecutionUnitsEvaluator{..} queue yield = do LocalTxSubmissionClient clientStIdle where await :: m (TxSubmissionMessage block) @@ -215,10 +233,9 @@ mkTxSubmissionClient defaultWithInternalError TxSubmissionCodecs{..} ExecutionUn defaultWithInternalError clientStIdle yield toResponse $ case request of MultiEraDecoderSuccess transaction -> do mempoolUtxo <- utxoFromMempool <$> readMempoolM - putStrLn $ "extra utxo provided by user: " <> show additionalUtxo - putStrLn $ "extra utxo inferred from mempool: " <> show mempoolUtxo + logWith tr $ TxSubmissionEvaluateUtxoProvidedByUser { utxoRefs = utxoReferences additionalUtxo } + logWith tr $ TxSubmissionEvaluateUtxoInferredFromMempool { utxoRefs = utxoReferences mempoolUtxo } result <- evaluateExecutionUnitsM (mergeUtxo mempoolUtxo additionalUtxo, transaction) - putStrLn $ "evaluation result: " <> show result result & toResponse & encodeEvaluateTransactionResponse @@ -249,22 +266,24 @@ data ExecutionUnitsEvaluator m block = ExecutionUnitsEvaluator newExecutionUnitsEvaluator :: forall m block crypto. ( MonadSTM m - , MonadIO m + , MonadLog m + , MonadClock m , block ~ HardForkBlock (CardanoEras crypto) , crypto ~ StandardCrypto , CanEvaluateScriptsInEra (BabbageEra crypto) , CanEvaluateScriptsInEra (ConwayEra crypto) ) - => m ( ExecutionUnitsEvaluator m block + => Logger TraceTxSubmission + -> m ( ExecutionUnitsEvaluator m block , LocalStateQueryClient block (Point block) (Query block) m () , LocalTxMonitorClient (GenTxId block) (GenTx block) SlotNo m () ) -newExecutionUnitsEvaluator = do +newExecutionUnitsEvaluator tr = do evaluateExecutionUnitsRequest <- newEmptyTMVarIO evaluateExecutionUnitsResponse <- newEmptyTMVarIO - mempool <- newEmptyTMVarIO - mempoolSnapshot <- newEmptyTMVarIO + mempool <- newTMVarIO [] + mempoolSnapshot <- newTVarIO Nothing mempoolSnapshotOnLastSubmit <- newTVarIO Nothing let runEvaluation = either return $ \(!eval) -> do @@ -273,50 +292,53 @@ newExecutionUnitsEvaluator = do return ( ExecutionUnitsEvaluator - { evaluateExecutionUnitsM = runEvaluation . \case - (_, GenTxByron{}) -> - Left (incompatibleEra "byron") + { evaluateExecutionUnitsM = runEvaluation <=< \case + (_, GenTxByron{}) -> do + return $ Left (incompatibleEra "byron") (_, GenTxShelley{}) -> - Left (incompatibleEra "shelley") + return $ Left (incompatibleEra "shelley") (_, GenTxAllegra{}) -> - Left (incompatibleEra "allegra") + return $ Left (incompatibleEra "allegra") (_, GenTxMary{}) -> - Left (incompatibleEra "mary") + return $ Left (incompatibleEra "mary") (_, GenTxAlonzo{}) -> - Left (unsupportedEra "alonzo") + return $ Left (unsupportedEra "alonzo") (UTxOInBabbageEra utxo, GenTxBabbage (ShelleyTx _id tx)) -> do - Right (SomeEvaluationInAnyEra utxo tx) + logWith tr $ TxSubmissionEvaluateArguments { utxoEra = "babbage", transactionEra = "babbage" } + return $ Right (SomeEvaluationInAnyEra utxo tx) (UTxOInBabbageEra utxo, GenTxConway (ShelleyTx _id tx)) -> do - Right (SomeEvaluationInAnyEra (upgrade utxo) tx) + logWith tr $ TxSubmissionEvaluateArguments { utxoEra = "babbage", transactionEra = "conway" } + return $ Right (SomeEvaluationInAnyEra (upgrade utxo) tx) (UTxOInConwayEra utxo, GenTxBabbage (ShelleyTx _id tx)) -> do - Right (SomeEvaluationInAnyEra utxo (upgrade tx)) + logWith tr $ TxSubmissionEvaluateArguments { utxoEra = "conway", transactionEra = "babbage" } + return $ Right (SomeEvaluationInAnyEra utxo (upgrade tx)) (UTxOInConwayEra utxo, GenTxConway (ShelleyTx _id tx)) -> do - Right (SomeEvaluationInAnyEra utxo tx) + logWith tr $ TxSubmissionEvaluateArguments { utxoEra = "conway", transactionEra = "conway" } + return $ Right (SomeEvaluationInAnyEra utxo tx) , readMempoolM = do - putStrLn "reading local mempool..." + logWith tr TxSubmissionLocalMempoolTryRead txs <- atomically $ do - snapshot <- readTMVar mempoolSnapshot + snapshot <- readTVar mempoolSnapshot readTVar mempoolSnapshotOnLastSubmit >>= \case - Just snapshot' | snapshot == snapshot' -> retry + Just snapshot' | snapshot == Just snapshot' -> retry _ -> readTMVar mempool - putStrLn $ "found " <> show (length txs) <> " transaction(s)" - return txs + txs <$ logWith tr TxSubmissionLocalMempoolCurrent { numberOfTransactions = length txs } , clearMempoolM = do snapshot <- atomically $ do - snapshot <- readTMVar mempoolSnapshot - writeTVar mempoolSnapshotOnLastSubmit (Just snapshot) - pure snapshot - putStrLn $ "successfully submitted transaction at " <> show snapshot + snapshot <- readTVar mempoolSnapshot + snapshot <$ writeTVar mempoolSnapshotOnLastSubmit snapshot + whenJust snapshot $ \(unSlotNo -> atSlot, ns) -> + logWith tr TxSubmissionLocalMempoolCleared { atSlot, ns } } , localStateQueryClient @@ -328,37 +350,30 @@ newExecutionUnitsEvaluator = do where mempoolMonitoringClient :: TMVar m [GenTx block] - -> TMVar m (SlotNo, Word64) + -> TVar m (Maybe (SlotNo, Word64)) -> LocalTxMonitorClient (GenTxId block) (GenTx block) SlotNo m () mempoolMonitoringClient mempool mempoolSnapshot = - LocalTxMonitorClient $ pure $ LMM.SendMsgAcquire $ \slot -> do - now <- liftIO getMonotonicTimeNSec - putStrLn $ "received first snapshot at " <> show (slot, now) - atomically $ putTMVar mempoolSnapshot (slot, now) - clientStDrain now [] + LocalTxMonitorClient $ pure $ LMM.SendMsgAcquire clientStAwaitAcquire where - clientStDrain :: Word64 -> [GenTx block] -> m (LMM.ClientStAcquired (GenTxId block) (GenTx block) SlotNo m ()) - clientStDrain !start !txs = + clientStAwaitAcquire :: SlotNo -> m (LMM.ClientStAcquired (GenTxId block) (GenTx block) SlotNo m ()) + clientStAwaitAcquire slot = do + now <- getMonotonicTimeNSec + logWith tr $ TxSubmissionLocalMempoolNewSnapshot { atSlot = unSlotNo slot, ns = now } + atomically $ void $ takeTMVar mempool + clientStDrain slot now [] + + clientStDrain :: SlotNo -> Word64 -> [GenTx block] -> m (LMM.ClientStAcquired (GenTxId block) (GenTx block) SlotNo m ()) + clientStDrain !slot !start !txs = pure $ LMM.SendMsgNextTx $ \case Nothing -> do - end <- liftIO getMonotonicTimeNSec - atomically $ putTMVar mempool $ reverse txs - putStrLn $ "done draining mempool in " <> show ((end - start) `div` 1000) <> "μs" - clientStAwaitChange - + end <- getMonotonicTimeNSec + atomically $ do + writeTVar mempoolSnapshot (Just (slot, start)) + putTMVar mempool $ reverse txs + logWith tr $ TxSubmissionLocalMempoolSynchronized { took = Microseconds $ (end - start) `div` 1000 } + pure $ LMM.SendMsgAwaitAcquire clientStAwaitAcquire Just tx -> do - clientStDrain start (tx : txs) - - clientStAwaitChange :: m (LMM.ClientStAcquired (GenTxId block) (GenTx block) SlotNo m ()) - clientStAwaitChange = do - putStrLn "awaiting next mempool snapshot" - pure $ LMM.SendMsgAwaitAcquire $ \slot -> do - now <- liftIO getMonotonicTimeNSec - putStrLn $ "received new snapshot at " <> show slot - atomically $ do - void $ swapTMVar mempoolSnapshot (slot, now) - void $ takeTMVar mempool - clientStDrain now [] + clientStDrain slot start (tx : txs) localStateQueryClient :: m (SomeEvaluationInAnyEra crypto) @@ -636,3 +651,52 @@ translateToNetworkEra (SomeEvaluationInAnyEra utxoOrig txOrig) = Nothing in sameEra <|> babbageToConway + +-- +-- Logs +-- + +data TraceTxSubmission where + TxSubmissionEvaluateArguments + :: { utxoEra :: Text, transactionEra :: Text } + -> TraceTxSubmission + TxSubmissionLocalMempoolTryRead + :: TraceTxSubmission + TxSubmissionLocalMempoolCurrent + :: { numberOfTransactions :: Int } + -> TraceTxSubmission + TxSubmissionLocalMempoolCleared + :: { atSlot :: Word64, ns :: Word64 } + -> TraceTxSubmission + TxSubmissionLocalMempoolNewSnapshot + :: { atSlot :: Word64, ns :: Word64 } + -> TraceTxSubmission + TxSubmissionLocalMempoolSynchronized + :: { took :: Microseconds } + -> TraceTxSubmission + TxSubmissionEvaluateUtxoProvidedByUser + :: { utxoRefs :: [Text] } + -> TraceTxSubmission + TxSubmissionEvaluateUtxoInferredFromMempool + :: { utxoRefs :: [Text] } + -> TraceTxSubmission + deriving (Show, Generic) + +instance ToJSON TraceTxSubmission where + toEncoding = genericToEncoding Json.defaultOptions + +instance HasSeverityAnnotation TraceTxSubmission where + getSeverityAnnotation = \case + TxSubmissionEvaluateArguments{} -> Debug + TxSubmissionLocalMempoolTryRead{} -> Debug + TxSubmissionLocalMempoolCurrent{} -> Debug + TxSubmissionLocalMempoolCleared{} -> Info + TxSubmissionLocalMempoolNewSnapshot{} -> Info + TxSubmissionLocalMempoolSynchronized{ took } -> + if took > threshold then Warning else Info + TxSubmissionEvaluateUtxoProvidedByUser{ utxoRefs } -> + if null utxoRefs then Debug else Info + TxSubmissionEvaluateUtxoInferredFromMempool{ utxoRefs } -> + if null utxoRefs then Debug else Info + where + threshold = Microseconds (1_000_000 :: Integer) diff --git a/server/src/Ogmios/App/Server/WebSocket.hs b/server/src/Ogmios/App/Server/WebSocket.hs index acde8902b2..7e737d15e0 100644 --- a/server/src/Ogmios/App/Server/WebSocket.hs +++ b/server/src/Ogmios/App/Server/WebSocket.hs @@ -62,6 +62,7 @@ import Ogmios.App.Protocol.TxMonitor ) import Ogmios.App.Protocol.TxSubmission ( ExecutionUnitsEvaluator (..) + , TraceTxSubmission , mkTxSubmissionClient , newExecutionUnitsEvaluator ) @@ -235,7 +236,7 @@ newWebSocketApp tr unliftIO = do logWith tr $ WebSocketConnectionAccepted (userAgent pending) recordSession sensors $ onExceptions $ acceptRequest pending $ \conn -> do let trClient = contramap WebSocketClient tr - withExecutionUnitsEvaluator $ \exUnitsEvaluator exUnitsClients -> do + withExecutionUnitsEvaluator tr $ \exUnitsEvaluator exUnitsClients -> do withOuroborosClients tr opts codecs maxInFlight sensors exUnitsEvaluator getGenesisConfig conn $ \protocolsClients -> do let clientA = mkClient unliftIO (natTracer liftIO trClient) slotsPerEpoch protocolsClients let clientB = mkClient unliftIO (natTracer liftIO trClient) slotsPerEpoch exUnitsClients @@ -291,12 +292,13 @@ withExecutionUnitsEvaluator :: forall m a. ( MonadClock m , MonadSTM m - , MonadIO m + , MonadLog m ) - => (ExecutionUnitsEvaluator m Block -> Clients m Block -> m a) + => Logger TraceWebSocket + -> (ExecutionUnitsEvaluator m Block -> Clients m Block -> m a) -> m a -withExecutionUnitsEvaluator action = do - ( exUnitsEvaluator, stateQueryClient, txMonitorClient ) <- newExecutionUnitsEvaluator +withExecutionUnitsEvaluator tr action = do + ( exUnitsEvaluator, stateQueryClient, txMonitorClient ) <- newExecutionUnitsEvaluator (contramap WebSocketTxSubmission tr) action exUnitsEvaluator $ Clients { chainSyncClient = ChainSyncClientPipelined idle @@ -315,7 +317,6 @@ withOuroborosClients , MonadLog m , MonadMetrics m , MonadWebSocket m - , MonadIO m ) => Logger TraceWebSocket -> Rpc.Options @@ -343,7 +344,7 @@ withOuroborosClients tr opts codecs maxInFlight sensors exUnitsEvaluator getGene , stateQueryClient = mkStateQueryClient (contramap WebSocketStateQuery tr) catchError stateQueryCodecs getGenesisConfig stateQueryQ yield , txSubmissionClient = - mkTxSubmissionClient catchError txSubmissionCodecs exUnitsEvaluator txSubmissionQ yield + mkTxSubmissionClient (contramap WebSocketTxSubmission tr) catchError txSubmissionCodecs exUnitsEvaluator txSubmissionQ yield , txMonitorClient = mkTxMonitorClient catchError txMonitorCodecs txMonitorQ yield } @@ -430,6 +431,10 @@ data TraceWebSocket where :: TraceStateQuery Block -> TraceWebSocket + WebSocketTxSubmission + :: TraceTxSubmission + -> TraceWebSocket + WebSocketWorkerExited :: { exception :: Text } -> TraceWebSocket @@ -456,6 +461,7 @@ instance HasSeverityAnnotation TraceWebSocket where getSeverityAnnotation = \case WebSocketClient msg -> getSeverityAnnotation' msg WebSocketStateQuery msg -> getSeverityAnnotation msg + WebSocketTxSubmission msg -> getSeverityAnnotation msg WebSocketWorkerExited{} -> Debug WebSocketConnectionAccepted{} -> Info WebSocketConnectionEnded{} -> Info diff --git a/server/src/Ogmios/Control/MonadClock.hs b/server/src/Ogmios/Control/MonadClock.hs index 29880061a4..95e416b76d 100644 --- a/server/src/Ogmios/Control/MonadClock.hs +++ b/server/src/Ogmios/Control/MonadClock.hs @@ -28,7 +28,6 @@ import Ogmios.Control.MonadAsync ) import Ogmios.Control.MonadSTM ( MonadSTM (..) - , newTMVar , putTMVar , tryTakeTMVar ) @@ -48,12 +47,16 @@ import Data.Time.Clock import qualified Control.Concurrent as IO import qualified Data.Time.Clock as IO +import qualified GHC.Clock as IO -- | A 'Monad' to make time effects explicit. class Monad m => MonadClock (m :: Type -> Type) where getCurrentTime :: m UTCTime -- ^ Get the time of the current clock. + getMonotonicTimeNSec :: m Word64 + -- ^ Return monotonic time in nanoseconds, since some unspecified starting point + threadDelay :: DiffTime -> m () -- ^ Mark a pause in the current thread @@ -66,6 +69,7 @@ idle = forever $ threadDelay 43200 instance MonadClock IO where getCurrentTime = IO.getCurrentTime + getMonotonicTimeNSec = IO.getMonotonicTimeNSec threadDelay = IO.threadDelay . diffTimeToMicroseconds timed action = do start <- IO.getCurrentTime @@ -78,6 +82,7 @@ instance MonadClock IO where instance MonadClock m => MonadClock (ReaderT env m) where getCurrentTime = lift getCurrentTime + getMonotonicTimeNSec = lift getMonotonicTimeNSec threadDelay = lift . threadDelay timed = mapReaderT timed @@ -99,7 +104,7 @@ withDebouncer -> (Debouncer m -> m a) -> m a withDebouncer delay action = do - lock <- atomically $ newTMVar () + lock <- newTMVarIO () let debouncer = Debouncer $ \io -> atomically (tryTakeTMVar lock) >>= \case Nothing -> return () diff --git a/server/src/Ogmios/Data/Protocol/TxSubmission.hs b/server/src/Ogmios/Data/Protocol/TxSubmission.hs index e10892533f..a7ee0d60f9 100644 --- a/server/src/Ogmios/Data/Protocol/TxSubmission.hs +++ b/server/src/Ogmios/Data/Protocol/TxSubmission.hs @@ -47,6 +47,7 @@ module Ogmios.Data.Protocol.TxSubmission -- ** Mempool / UTxO reconstruction , utxoFromMempool , mergeUtxo + , utxoReferences -- ** Re-exports , AlonzoEra @@ -155,6 +156,10 @@ import qualified Cardano.Ledger.Binary as Binary import qualified Cardano.Ledger.Core as Core import qualified Ouroboros.Consensus.Shelley.Ledger.Mempool as Consensus +import qualified Cardano.Crypto.Hash.Class as CC +import qualified Cardano.Ledger.BaseTypes as Ledger +import qualified Cardano.Ledger.SafeHash as Ledger +import qualified Cardano.Ledger.TxIn as Ledger import qualified Codec.Json.Rpc as Rpc import qualified Data.Aeson.Types as Json import qualified Data.Map as Map @@ -561,3 +566,12 @@ mergeUtxo a b = case (a, b) of UTxOInConwayEra $ UTxO (Map.union l (upgrade <$> r)) (UTxOInConwayEra (unUTxO -> l), UTxOInConwayEra (unUTxO -> r)) -> UTxOInConwayEra $ UTxO (Map.union l r) + +utxoReferences :: Crypto crypto => MultiEraUTxO (CardanoBlock crypto) -> [Text] +utxoReferences = fmap txInToText . \case + UTxOInBabbageEra (unUTxO -> u) -> Map.keys u + UTxOInConwayEra (unUTxO -> u) -> Map.keys u + where + txInToText (Ledger.TxIn txid (Ledger.TxIx ix)) = + let (CC.UnsafeHash h) = Ledger.extractHash (Ledger.unTxId txid) + in encodeBase16 (fromShort h) <> "#" <> show ix diff --git a/server/src/Ogmios/Prelude.hs b/server/src/Ogmios/Prelude.hs index 1e223feb4c..6d59f48f98 100644 --- a/server/src/Ogmios/Prelude.hs +++ b/server/src/Ogmios/Prelude.hs @@ -44,6 +44,9 @@ module Ogmios.Prelude -- * Set , traverset + -- * Time units + , Microseconds (..) + -- * Ledger & consensus common , AllegraEra , AlonzoEra @@ -219,10 +222,12 @@ import Relude hiding import qualified Cardano.Ledger.Binary.Decoding as Binary import qualified Cardano.Ledger.Core as Ledger +import qualified Data.Aeson.Encoding as Json import qualified Data.ByteString.Base16 as B16 import qualified Data.Map as Map import qualified Data.Set as Set import qualified Data.Text.Lazy.Builder as TL +import qualified Text.Show mapToArray :: Ix k => Map k v -> Array k v mapToArray m = @@ -442,3 +447,22 @@ decodeBase16 = decodeBase16Untyped unsafeDecodeBase16 :: HasCallStack => Text -> ByteString unsafeDecodeBase16 = either error identity . decodeBase16 . encodeUtf8 + +-- +-- TimeUnits +-- + +data Microseconds = forall a. (Num a, Integral a, Show a) => Microseconds a + +instance Eq Microseconds where + Microseconds a == Microseconds b = toInteger a == toInteger b + +instance Ord Microseconds where + Microseconds a <= Microseconds b = toInteger a <= toInteger b + +instance Show Microseconds where + show (Microseconds x) = show x ++ "µs" + +instance ToJSON Microseconds where + toJSON = toJSON . show @Text + toEncoding = Json.text . show @Text