diff --git a/.github/workflows/applications.yml b/.github/workflows/applications.yml index 03f1870ff0..979923c4a9 100644 --- a/.github/workflows/applications.yml +++ b/.github/workflows/applications.yml @@ -338,7 +338,7 @@ jobs: - name: Build chainweb library run: cabal build --ghc-options=-j2 lib:chainweb - name: Build chainweb applications - run: cabal build -j2 --ghc-options=-j2 exe:chainweb-node test:chainweb-tests exe:cwtool chainweb:bench:bench + run: cabal build -j2 --ghc-options=-j2 exe:chainweb-node exe:compact test:chainweb-tests exe:cwtool chainweb:bench:bench # Checks - name: Check that working directory tree is clean @@ -360,6 +360,7 @@ jobs: run: | mkdir -p artifacts/chainweb cp $(cabal list-bin chainweb-node) artifacts/chainweb + cp $(cabal list-bin compact) artifacts/chainweb cp $(cabal list-bin chainweb-tests) artifacts/chainweb cp $(cabal list-bin cwtool) artifacts/chainweb cp $(cabal list-bin bench) artifacts/chainweb diff --git a/.gitignore b/.gitignore index aafa48011d..0259a974f7 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,4 @@ rosetta/logs/* .ghci_history .direnv/ .envrc +massif.out.* diff --git a/bench/Chainweb/Pact/Backend/ForkingBench.hs b/bench/Chainweb/Pact/Backend/ForkingBench.hs index 758192fcca..aa106129f3 100644 --- a/bench/Chainweb/Pact/Backend/ForkingBench.hs +++ b/bench/Chainweb/Pact/Backend/ForkingBench.hs @@ -301,15 +301,32 @@ withResources rdb trunkLength logLevel compact p f = C.envWithCleanup create des coinAccounts <- newMVar mempty nonceCounter <- newIORef 1 txPerBlock <- newIORef 10 - sqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebBenchPragmas mp <- testMemPoolAccess txPerBlock coinAccounts - pactService <- - startPact testVer logger blockHeaderDb payloadDb mp sqlEnv - mainTrunkBlocks <- - playLine payloadDb blockHeaderDb trunkLength genesisBlock (snd pactService) nonceCounter - when (compact == DoCompact) $ do - C.withDefaultLogger Error $ \lgr -> do - void $ C.compact (BlockHeight trunkLength) lgr sqlEnv [] + (sqlEnv, pactService, mainTrunkBlocks) <- do + srcSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebBenchPragmas + srcPactService <- + startPact testVer logger blockHeaderDb payloadDb mp srcSqlEnv + mainTrunkBlocks <- + playLine payloadDb blockHeaderDb trunkLength genesisBlock (snd srcPactService) nonceCounter + + (sqlEnv, pactService) <- do + if compact == DoCompact + then do + targetSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebBenchPragmas + C.withDefaultLogger Error $ \lgr -> do + C.compactPactState lgr C.defaultRetainment (BlockHeight trunkLength) srcSqlEnv targetSqlEnv + targetPactService <- + startPact testVer logger blockHeaderDb payloadDb mp targetSqlEnv + + -- Stop the previous pact service/close the sqlite connection + stopPact srcPactService + stopSqliteDb srcSqlEnv + + pure (targetSqlEnv, targetPactService) + else do + pure (srcSqlEnv, srcPactService) + + pure (sqlEnv, pactService, mainTrunkBlocks) return $ NoopNFData $ Resources {..} diff --git a/cabal.project b/cabal.project index 0a22c1ab57..b730ca73eb 100644 --- a/cabal.project +++ b/cabal.project @@ -87,8 +87,8 @@ source-repository-package source-repository-package type: git location: https://github.com/kadena-io/chainweb-storage.git - tag: 4b45c1ab9c070c6d16a058bcbab0c06ac0fb6d4e - --sha256: 0m6c7kl6x5a3k02q2i7qzfx91kxz19dzav0piqfxra52bq0x3sm6 + tag: a5e06a8aa2da1e7981ff9fad91dfd41f7c39fc85 + --sha256: sha256-3Zqsgrxa7SQUr8XzT2O5PLTELkI92NXZU4j2UUvlL4E= source-repository-package type: git diff --git a/chainweb.cabal b/chainweb.cabal index c62ab0e340..b8f971ebd7 100644 --- a/chainweb.cabal +++ b/chainweb.cabal @@ -379,6 +379,10 @@ library , crypton-connection >= 0.2 && < 0.4 , containers >= 0.5 , crypton >= 0.31 + , crypton-connection >= 0.2 && < 0.4 + , crypton-x509 >=1.7 + , crypton-x509-system >=1.6 + , crypton-x509-validation >=1.6 , cuckoo >= 0.3 , data-default >=0.7 , data-dword >= 0.3 @@ -411,8 +415,8 @@ library , mmorph >= 1.1 , monad-control >= 1.0 , mtl >= 2.3 - , mwc-random >= 0.13 , mwc-probability >= 2.0 && <2.4 + , mwc-random >= 0.13 , network >= 3.1.2 , optparse-applicative >= 0.14 , pact >= 4.2.0.1 @@ -422,6 +426,7 @@ library , pem >=0.2 , primitive >= 0.7.1.0 , random >= 1.2 + , rocksdb-haskell-kadena >= 1.1.0 , rosetta >= 1.0 , safe-exceptions >= 0.1 , scheduler >= 1.4 @@ -436,32 +441,33 @@ library , streaming-commons >= 0.2 , template-haskell >= 2.14 , text >= 2.0 - , trifecta >= 2.1 , these >= 1.0 , time >= 1.12.2 , tls >=1.9 , tls-session-manager >= 0.0 , token-bucket >= 0.1 , transformers >= 0.5 + , trifecta >= 2.1 , unliftio ^>= 0.2 , unordered-containers >= 0.2.16 , uuid >= 1.3.15 + , vector >= 0.12.2 + , vector-algorithms >= 0.7 , wai >= 3.2.2.1 , wai-app-static >= 3.1.6.3 , wai-cors >= 0.2.7 , wai-extra >= 3.0.28 - , wai-middleware-validation - , vector >= 0.12.2 - , vector-algorithms >= 0.7 , wai-middleware-throttle >= 0.3 + , wai-middleware-validation , warp >= 3.3.6 , warp-tls >= 3.4 - , crypton-x509 >=1.7 - , crypton-x509-system >=1.6 - , crypton-x509-validation >=1.6 , yaml >= 0.11 , yet-another-logger >= 0.4.1 + if !os(windows) + build-depends: + unix >= 2.7 + if flag(ed25519) cpp-options: -DWITH_ED25519=1 if flag(ghc-flags) @@ -815,6 +821,40 @@ executable cwtool if flag(ed25519) cpp-options: -DWITH_ED25519=1 +executable compact + import: warning-flags, debugging-flags + default-language: Haskell2010 + ghc-options: + -threaded + -rtsopts + "-with-rtsopts=-N -A4M --disable-delayed-os-memory-return -qn1" + hs-source-dirs: + compact + main-is: Main.hs + build-depends: + -- internal + , chainweb + + -- external + , base >= 4.12 && < 5 + +executable pact-diff + import: warning-flags, debugging-flags + default-language: Haskell2010 + ghc-options: + -threaded + -rtsopts + "-with-rtsopts=-N -A4M --disable-delayed-os-memory-return -qn1" + hs-source-dirs: + pact-diff + main-is: Main.hs + build-depends: + -- internal + , chainweb + + -- external + , base >= 4.12 && < 5 + benchmark bench import: warning-flags, debugging-flags default-language: Haskell2010 diff --git a/compact/Main.hs b/compact/Main.hs new file mode 100644 index 0000000000..bdd8e4bb8b --- /dev/null +++ b/compact/Main.hs @@ -0,0 +1,11 @@ +{-# language + ImportQualifiedPost +#-} + +module Main (main) where + +import Chainweb.Pact.Backend.Compaction qualified as Compact + +main :: IO () +main = do + Compact.main diff --git a/default.nix b/default.nix index 0fcc768a5c..42b4954b95 100644 --- a/default.nix +++ b/default.nix @@ -76,6 +76,7 @@ let haskellSrc = with nix-filter.lib; filter { shell.buildInputs = with pkgs; [ zlib pkg-config + sqlite ]; modules = [ { @@ -102,17 +103,28 @@ let haskellSrc = with nix-filter.lib; filter { }; pact = pactFromCached pkgs pact passthru.cached; }; - default = pkgs.runCommandCC "chainweb" { inherit passthru; } '' - mkdir -pv $out/bin - cp ${flake.packages."chainweb:exe:chainweb-node"}/bin/chainweb-node $out/bin/chainweb-node - cp ${flake.packages."chainweb:exe:cwtool"}/bin/cwtool $out/bin/cwtool - chmod +w $out/bin/{cwtool,chainweb-node} - $STRIP $out/bin/chainweb-node - $STRIP $out/bin/cwtool - ${pkgs.lib.optionalString (pkgs.stdenv.isLinux) '' - patchelf --shrink-rpath $out/bin/{cwtool,chainweb-node} - ''} - ''; + default = + let + exes = [ + "chainweb-node" + "cwtool" + "compact" + "pact-diff" + ]; + + for = xs: f: builtins.map f xs; + in + pkgs.runCommandCC "chainweb" { inherit passthru; } '' + mkdir -pv $out/bin + ${builtins.concatStringsSep "\n" (for exes (exe: '' + cp ${flake.packages."chainweb:exe:${exe}"}/bin/${exe} $out/bin/${exe} + chmod +w $out/bin/${exe} + $STRIP $out/bin/${exe} + ${pkgs.lib.optionalString (pkgs.stdenv.isLinux) '' + patchelf --shrink-rpath $out/bin/${exe} + ''} + ''))} + ''; in { # The Haskell project flake: Used by flake.nix inherit flake; diff --git a/pact-diff/Main.hs b/pact-diff/Main.hs new file mode 100644 index 0000000000..e23ee14be5 --- /dev/null +++ b/pact-diff/Main.hs @@ -0,0 +1,11 @@ +{-# language + ImportQualifiedPost +#-} + +module Main (main) where + +import Chainweb.Pact.Backend.PactState.Diff qualified as PactDiff + +main :: IO () +main = do + PactDiff.main diff --git a/src/Chainweb/Pact/Backend/Compaction.hs b/src/Chainweb/Pact/Backend/Compaction.hs index 13ec9a7de7..bbedd07a6f 100644 --- a/src/Chainweb/Pact/Backend/Compaction.hs +++ b/src/Chainweb/Pact/Backend/Compaction.hs @@ -1,137 +1,89 @@ -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE DeriveAnyClass #-} -{-# LANGUAGE DerivingStrategies #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE ImportQualifiedPost #-} -{-# LANGUAGE InstanceSigs #-} -{-# LANGUAGE LambdaCase #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE MultiWayIf #-} -{-# LANGUAGE NumericUnderscores #-} -{-# LANGUAGE OverloadedRecordDot #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE PackageImports #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TemplateHaskell #-} -{-# LANGUAGE TupleSections #-} -{-# LANGUAGE TypeApplications #-} -{-# LANGUAGE ViewPatterns #-} - --- | --- Module: Chainweb.Pact.Backend.Compaction --- Copyright: Copyright © 2023 Kadena LLC. --- License: see LICENSE.md --- --- Compact Checkpointer PactDbs by culling old journal rows. --- +{-# language + BangPatterns + , CPP + , DeriveAnyClass + , DeriveGeneric + , DerivingStrategies + , DuplicateRecordFields + , FlexibleContexts + , GeneralizedNewtypeDeriving + , ImportQualifiedPost + , LambdaCase + , NumericUnderscores + , OverloadedRecordDot + , OverloadedStrings + , PackageImports + , ScopedTypeVariables + , TypeApplications +#-} module Chainweb.Pact.Backend.Compaction - ( CompactFlag(..) - , CompactException(..) - , TargetBlockHeight(..) - , compact - , main + ( + -- * Compaction executable implementation + main + + -- * Exported for testing + , compactPactState + , compactRocksDb + , Retainment(..) + , defaultRetainment - -- * Used in various tools/testing + -- * Compaction logging utilities , withDefaultLogger , withPerChainFileLogger - , locateTargets - ) where - -import Chronos qualified -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.MVar (swapMVar, readMVar, newMVar) -import Control.Exception (Exception, SomeException(..)) -import Control.Lens (makeLenses, set, over, view, (^.), _2, (^?!), ix) -import Control.Monad (forM, forM_, unless, void, when) -import Control.Monad.Catch (MonadCatch(catch), MonadThrow(throwM)) -import Control.Monad.IO.Class (MonadIO(liftIO)) -import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, local) -import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp) -import Data.Coerce (coerce) -import Data.Foldable qualified as F -import Data.Function (fix) -import Data.HashMap.Strict (HashMap) -import Data.HashMap.Strict qualified as HM -import Data.IORef (IORef, readIORef, newIORef, atomicModifyIORef') -import Data.Int (Int64) -import Data.List qualified as List -import Data.Map (Map) -import Data.Map.Strict qualified as M -import Data.Maybe (fromMaybe) -import Data.Ord (Down(..)) -import Data.Set (Set) -import Data.Set qualified as Set -import Data.String (IsString) -import Data.Text (Text) -import Data.Text qualified as Text -import Data.Text.Encoding qualified as Text -import Data.Vector (Vector) -import Data.Vector qualified as V -import Data.Word (Word64) -import Database.SQLite3.Direct (Utf8(..), Database) -import GHC.Stack (HasCallStack) -import Options.Applicative -import System.Directory (createDirectoryIfMissing) -import System.FilePath (()) -import System.IO (Handle) -import System.IO qualified as IO -import System.IO.Unsafe (unsafePerformIO) -import UnliftIO.Async (pooledMapConcurrentlyN_) + ) + where +import "base" Control.Exception hiding (Handler) +import "base" Control.Monad (forM, forM_, unless, void, when) +import "base" Control.Monad.IO.Class (MonadIO(liftIO)) +import "base" Data.Function ((&)) +import "base" Data.Int (Int64) +import "base" Data.Maybe (fromMaybe) +import "base" Prelude hiding (log) +import "base" System.Exit (exitFailure) +import "base" System.IO (Handle) +import "base" System.IO qualified as IO +import "chainweb-storage" Chainweb.Storage.Table (Iterator(..), Entry(..), withTableIterator, unCasify, tableInsert) +import "chainweb-storage" Chainweb.Storage.Table.RocksDB (RocksDb, withRocksDb, withReadOnlyRocksDb, modernDefaultOptions) +import "direct-sqlite" Database.SQLite3 qualified as Lite +import "direct-sqlite" Database.SQLite3.Direct (Utf8(..), Database) +import "directory" System.Directory (createDirectoryIfMissing, doesDirectoryExist) +import "filepath" System.FilePath (()) +import "lens" Control.Lens (set, over, (^.), _3, view) +import "loglevel" System.LogLevel qualified as LL +import "monad-control" Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp) +import "optparse-applicative" Options.Applicative qualified as O +import "pact" Pact.Types.SQLite (SType(..), RType(..)) +import "pact" Pact.Types.SQLite qualified as Pact +import "rocksdb-haskell-kadena" Database.RocksDB.Types (Options(..), Compression(..)) +import "streaming" Streaming qualified as S +import "streaming" Streaming.Prelude qualified as S +import "text" Data.Text (Text) +import "text" Data.Text qualified as Text +import "unliftio" UnliftIO.Async (pooledForConcurrently_) +import "yet-another-logger" System.Logger hiding (Logger) +import "yet-another-logger" System.Logger qualified as YAL +import "yet-another-logger" System.Logger.Backend.ColorOption (useColor) +import Chainweb.BlockHeader (blockHeight, blockHash, blockPayloadHash) +import Chainweb.BlockHeaderDB.Internal (BlockHeaderDb(..), RankedBlockHash(..), RankedBlockHeader(..)) import Chainweb.BlockHeight (BlockHeight(..)) -import Chainweb.Logger (Logger, l2l, setComponent) +import Chainweb.Cut.CutHashes (cutIdToText) +import Chainweb.CutDB (cutHashesTable) +import Chainweb.Logger (Logger, l2l, setComponent, logFunctionText) +import Chainweb.Pact.Backend.ChainwebPactDb () +import Chainweb.Pact.Backend.PactState +import Chainweb.Pact.Backend.Types (SQLiteEnv) +import Chainweb.Pact.Backend.Utils (fromUtf8, toUtf8) +import Chainweb.Payload.PayloadStore (initializePayloadDb, addNewPayload, lookupPayloadWithHeight) +import Chainweb.Payload.PayloadStore.RocksDB (newPayloadDb) import Chainweb.Utils (sshow, fromText, toText, int) -import Chainweb.Version (ChainId, ChainwebVersion(..), unsafeChainId, chainIdToText) +import Chainweb.Version (ChainId, ChainwebVersion(..), chainIdToText) import Chainweb.Version.Mainnet (mainnet) import Chainweb.Version.Registry (lookupVersionByName) -import Chainweb.Version.Utils (chainIdsAt) -import Chainweb.Pact.Backend.ChainwebPactDb -import Chainweb.Pact.Backend.PactState (getLatestBlockHeight, getLatestCommonBlockHeight, getEarliestCommonBlockHeight, ensureBlockHeightExists, getEndingTxId) -import Chainweb.Pact.Backend.Utils (fromUtf8, withSqliteDb) - -import "yet-another-logger" System.Logger hiding (Logger) -import "yet-another-logger" System.Logger qualified as YAL -import "loglevel" System.LogLevel qualified as LL -import System.Logger.Backend.ColorOption (useColor) -import Data.LogMessage - -import Pact.Types.Persistence (TxId(..)) -import Pact.Types.SQLite (SType(..), RType(..)) -import Pact.Types.SQLite qualified as Pact - -newtype TableName = TableName { getTableName :: Utf8 } - deriving stock (Show) - deriving newtype (Eq, Ord, IsString) - -data CompactException - = CompactExceptionInternal !Text - | CompactExceptionDb !SomeException - | CompactExceptionInvalidBlockHeight !BlockHeight - | CompactExceptionTableVerificationFailure !TableName - | CompactExceptionNoLatestBlockHeight - | CompactExceptionLatestSafeNotEnoughHistory - deriving stock (Show) - deriving anyclass (Exception) - -data CompactFlag - = KeepCompactTables - -- ^ Keep compaction tables post-compaction for inspection. - | NoVacuum - -- ^ Don't VACUUM database - deriving stock (Eq, Show) - -internalError :: MonadThrow m => Text -> m a -internalError = throwM . CompactExceptionInternal - -data CompactEnv = CompactEnv - { _ceLogger :: YAL.Logger SomeLogMessage - , _ceDb :: Database - , _ceFlags :: [CompactFlag] - } -makeLenses ''CompactEnv +import Chainweb.Version.Testnet (testnet) +import Chainweb.WebBlockHeaderDB (getWebBlockHeaderDb, initWebBlockHeaderDb) +import Data.LogMessage (SomeLogMessage, logText) withDefaultLogger :: LL.LogLevel -> (YAL.Logger SomeLogMessage -> IO a) -> IO a withDefaultLogger ll f = withHandleBackend_ logText handleCfg $ \b -> @@ -141,35 +93,34 @@ withDefaultLogger ll f = withHandleBackend_ logText handleCfg $ \b -> { _handleBackendConfigHandle = StdErr } -withPerChainFileLogger :: FilePath -> ChainId -> LL.LogLevel -> (YAL.Logger SomeLogMessage -> IO a) -> IO a -withPerChainFileLogger logDir chainId ll f = do - createDirectoryIfMissing False {- don't create parents -} logDir - let logFile = logDir ("chain-" <> cid <> ".log") - !_ <- writeFile logFile "" +withRocksDbFileLogger :: FilePath -> LL.LogLevel -> (YAL.Logger SomeLogMessage -> IO a) -> IO a +withRocksDbFileLogger ld ll f = do + createDirectoryIfMissing True {- do create parents -} ld + let logFile = ld "rocksDb.log" let handleConfig = defaultHandleBackendConfig { _handleBackendConfigHandle = FileHandle logFile } withHandleBackend_' logText handleConfig $ \h b -> do + IO.hSetBuffering h IO.LineBuffering + withLogger defaultLoggerConfig b $ \l -> do + let logger = setComponent "compaction" + $ set setLoggerLevel (l2l ll) l + f logger - done <- newMVar False - void $ forkIO $ fix $ \go -> do - doneYet <- readMVar done - let flush = do - w <- IO.hIsOpen h - when w (IO.hFlush h) - when (not doneYet) $ do - flush - threadDelay 5_000_000 - go - flush - +withPerChainFileLogger :: FilePath -> ChainId -> LL.LogLevel -> (YAL.Logger SomeLogMessage -> IO a) -> IO a +withPerChainFileLogger ld chainId ll f = do + createDirectoryIfMissing True {- do create parents -} ld + let logFile = ld ("chain-" <> cid <> ".log") + let handleConfig = defaultHandleBackendConfig + { _handleBackendConfigHandle = FileHandle logFile + } + withHandleBackend_' logText handleConfig $ \h b -> do + IO.hSetBuffering h IO.LineBuffering withLogger defaultLoggerConfig b $ \l -> do let logger = setComponent "compaction" $ over setLoggerScope (("chain", chainIdToText chainId) :) $ set setLoggerLevel (l2l ll) l - a <- f logger - void $ swapMVar done True - pure a + f logger where cid = Text.unpack (chainIdToText chainId) @@ -188,468 +139,678 @@ withHandleBackend_' format conf inner = colored <- liftIO $ useColor (conf ^. handleBackendConfigColor) h inner h (handleBackend_ format h colored) -newtype CompactM a = CompactM { unCompactM :: ReaderT CompactEnv IO a } - deriving newtype (Functor,Applicative,Monad,MonadReader CompactEnv,MonadIO,MonadThrow,MonadCatch) - -instance MonadLog Text CompactM where - localScope :: (LogScope -> LogScope) -> CompactM x -> CompactM x - localScope f = local (over (ceLogger . setLoggerScope) f) - - logg :: LogLevel -> Text -> CompactM () - logg ll m = do - l <- view ceLogger - liftIO $ loggerFunIO l ll $ toLogMessage $ TextLog m - - withLevel :: LogLevel -> CompactM x -> CompactM x - withLevel l = local (set (ceLogger . setLoggerLevel) l) - - withPolicy :: LogPolicy -> CompactM x -> CompactM x - withPolicy p = local (set (ceLogger . setLoggerPolicy) p) - --- | Run compaction monad -runCompactM :: CompactEnv -> CompactM a -> IO a -runCompactM e a = runReaderT (unCompactM a) e - -execNoTemplateM_ :: () - => Text -- ^ query name (for logging purposes) - -> Utf8 -- ^ query - -> CompactM () -execNoTemplateM_ msg q = do - db <- view ceDb - queryDebug msg Nothing $ Pact.exec_ db q - --- | Prepare/Execute a "$VTABLE$"-templated, parameterised query. --- The parameters are the results of the 'CompactM' 'SType' computations. -execM' :: () - => Text -- ^ query name (for logging purposes) - -> TableName -- ^ table name - -> Text -- ^ "$VTABLE$"-templated query - -> [SType] -- ^ parameters - -> CompactM () -execM' msg tbl stmt ps = do - db <- view ceDb - stmt' <- templateStmt tbl stmt - queryDebug msg (Just tbl) $ Pact.exec' db stmt' ps - -exec_ :: () - => Text - -> Utf8 - -> CompactM () -exec_ msg q = do - db <- view ceDb - queryDebug msg Nothing $ Pact.exec_ db q - --- | Prepare/Execute a "$VTABLE$"-templated, parameterised query. --- 'RType's are the expected results. -qryM :: () - => Text -- ^ query name (for logging purposes) - -> TableName -- ^ table name - -> Text -- ^ "$VTABLE$"-templated query - -> [SType] -- ^ parameters - -> [RType] -- ^ result types - -> CompactM [[SType]] -qryM msg tbl q ins outs = do - db <- view ceDb - q' <- templateStmt tbl q - queryDebug msg (Just tbl) $ Pact.qry db q' ins outs - -qryNoTemplateM :: () - => Text -- ^ query name (for logging purposes) - -> Utf8 -- ^ query - -> [SType] -- ^ parameters - -> [RType] -- ^ results - -> CompactM [[SType]] -qryNoTemplateM msg q ins outs = do - db <- view ceDb - queryDebug msg Nothing $ Pact.qry db q ins outs - -ioQuery :: () - => Text -- ^ query name (for logging purposes) - -> (Database -> IO a) -- ^ query function to run - -> CompactM a -ioQuery msg f = do - db <- view ceDb - queryDebug msg Nothing $ f db - -queryDebug :: Text -> Maybe TableName -> IO x -> CompactM x -queryDebug qryName mTblName performQuery = do - logg Info $ "Starting query " <> qryName - (ts, r) <- liftIO $ Chronos.stopwatch performQuery - logg Info $ "Completed query " <> qryName <> ". It took " <> Text.pack (show (Chronos.asSeconds ts)) <> "s" - liftIO $ atomicModifyIORef' queryTimes $ \qdbg -> case mTblName of - Nothing -> (addOnce qryName ts qdbg, ()) - Just tblName -> (addTblQuery tblName qryName ts qdbg, ()) - pure r - -data QueryDebug = QueryDebug - { runOnce :: Set (Chronos.Timespan, Text) - , tableQueries :: Map TableName (Set (Chronos.Timespan, Text)) +data Config = Config + { chainwebVersion :: ChainwebVersion + , fromDir :: FilePath + , toDir :: FilePath + , concurrent :: ConcurrentChains + , logDir :: FilePath + , noRocksDb :: Bool + -- ^ Don't produce a new RocksDB at all. + , noPactState :: Bool + -- ^ Don't produce a new Pact State at all. + , keepFullTransactionIndex :: Bool + -- ^ Whether or not to keep the entire TransactionIndex table. Some APIs rely on this table. } -addOnce :: Text -> Chronos.Timespan -> QueryDebug -> QueryDebug -addOnce dbg t qdbg = qdbg { runOnce = Set.insert (t, dbg) qdbg.runOnce } - -addTblQuery :: TableName -> Text -> Chronos.Timespan -> QueryDebug -> QueryDebug -addTblQuery tbl dbg t qdbg = qdbg { tableQueries = M.insertWith Set.union tbl (Set.singleton (t, dbg)) qdbg.tableQueries } - -emptyQueryDebug :: QueryDebug -emptyQueryDebug = QueryDebug Set.empty M.empty - -queryTimes :: IORef QueryDebug -queryTimes = unsafePerformIO (newIORef emptyQueryDebug) -{-# noinline queryTimes #-} - --- | Statements are templated with "$VTABLE$" substituted --- with the currently-focused versioned table. -templateStmt :: TableName -> Text -> CompactM Utf8 -templateStmt (TableName (Utf8 tblName)) s = - pure $ Utf8 $ Text.encodeUtf8 $ - Text.replace "$VTABLE$" ("[" <> Text.decodeUtf8 tblName <> "]") s - --- | Execute a SQLite transaction, rolling back on failure. --- Throws a 'CompactExceptionDb' on failure. -withTx :: HasCallStack => CompactM a -> CompactM a -withTx a = do - exec_ "withTx.0" "SAVEPOINT compact_tx" - catch (a >>= \r -> exec_ "withTx.1" "RELEASE SAVEPOINT compact_tx" >> pure r) $ - \e@SomeException {} -> do - exec_ "withTx.2" "ROLLBACK TRANSACTION TO SAVEPOINT compact_tx" - throwM $ CompactExceptionDb e - -unlessFlagSet :: CompactFlag -> CompactM () -> CompactM () -unlessFlagSet f x = do - yeahItIs <- isFlagSet f - unless yeahItIs x - -isFlagSet :: CompactFlag -> CompactM Bool -isFlagSet f = view ceFlags >>= \fs -> pure (f `elem` fs) - -withTables :: Vector TableName -> (TableName -> CompactM a) -> CompactM () -withTables ts a = do - V.iforM_ ts $ \i u@(TableName (Utf8 t')) -> do - let lbl = Text.decodeUtf8 t' <> " (" <> sshow (i + 1) <> " of " <> sshow (V.length ts) <> ")" - localScope (("table",lbl):) $ a u - --- | Takes a bunch of singleton tablename rows, sorts them, returns them as --- @TableName@ -sortedTableNames :: [[SType]] -> [TableName] -sortedTableNames rows = coerce - $ List.sortOn (Text.toLower . Text.decodeUtf8) - $ flip List.map rows $ \case - [SText (Utf8 s)] -> s - _ -> error "sortedTableNames: expected text" - --- | CompactActiveRow collects all active rows from all tables. -createCompactActiveRow :: CompactM () -createCompactActiveRow = do - execNoTemplateM_ "createTable: CompactActiveRow" - " CREATE TABLE IF NOT EXISTS CompactActiveRow \ - \ ( tablename TEXT NOT NULL \ - \ , rowkey TEXT NOT NULL \ - \ , vrowid INTEGER NOT NULL \ - \ , UNIQUE (tablename,rowkey) ); " - - execNoTemplateM_ "deleteFrom: CompactActiveRow" - "DELETE FROM CompactActiveRow" - -locateTargets :: (Logger logger) - => logger - -> FilePath - -> [ChainId] - -> TargetBlockHeight - -> IO (HashMap ChainId BlockHeight) -locateTargets logger dbDir cids = \case - Target height -> do - forM_ cids $ \cid -> do - withSqliteDb cid logger dbDir False $ \db -> do - catch (ensureBlockHeightExists db height) $ \(_ :: SomeException) -> do - throwM $ CompactExceptionInvalidBlockHeight height - pure $ HM.fromList $ List.map (, height) cids - - LatestUnsafe -> do - fmap HM.fromList $ forM cids $ \cid -> do - withSqliteDb cid logger dbDir False $ \db -> do - catch ((cid, ) <$> getLatestBlockHeight db) $ \(_ :: SomeException) -> do - throwM CompactExceptionNoLatestBlockHeight - - LatestSafe -> do - latestCommon <- getLatestCommonBlockHeight logger dbDir cids - earliestCommon <- getEarliestCommonBlockHeight logger dbDir cids - - let safeDepth = 1_000 - - when (latestCommon - earliestCommon < safeDepth) $ do - throwM CompactExceptionLatestSafeNotEnoughHistory - - pure $ HM.fromList $ List.map (, latestCommon - safeDepth) cids - -getVersionedTables :: CompactM (Vector TableName) -getVersionedTables = do - logg Info "getVersionedTables" - rs <- qryNoTemplateM - "getVersionedTables.0" - "SELECT tablename FROM VersionedTableCreation;" - [] - [RText] - pure (V.fromList (sortedTableNames rs)) - -tableRowCount :: TableName -> Text -> CompactM () -tableRowCount tbl label = - qryM "tableRowCount.0" tbl "SELECT COUNT(*) FROM $VTABLE$" [] [RInt] >>= \case - [[SInt r]] -> logg Info $ label <> ":rowcount=" <> sshow r - _ -> internalError "count(*) failure" - --- | For a given table, collect all active rows into CompactActiveRow -collectTableRows :: TxId -> TableName -> CompactM () -collectTableRows txId tbl = do - tableRowCount tbl "collectTableRows" - let vt = tableNameToSType tbl - let txid = txIdToSType txId - - let collectInsert = Text.concat - [ "INSERT INTO CompactActiveRow " - , "SELECT ?1,rowkey,rowid " - , "FROM $VTABLE$ t1 " - , "WHERE txid=(SELECT MAX(txid) FROM $VTABLE$ t2 " - , "WHERE t2.rowkey=t1.rowkey AND t2.txid CompactM () -compactTable tbl = do - logg Info $ "compactTable: " <> fromUtf8 (getTableName tbl) - - execM' - "compactTable.0" - tbl - " DELETE FROM $VTABLE$ WHERE rowid NOT IN \ - \ (SELECT t.rowid FROM $VTABLE$ t \ - \ LEFT JOIN CompactActiveRow v \ - \ WHERE t.rowid = v.vrowid AND v.tablename=?1); " - [tableNameToSType tbl] - --- | Delete all rows from Checkpointer system tables that are not for the target blockheight. --- -compactSystemTables :: BlockHeight -> CompactM () -compactSystemTables bh = do - -- we don't need past BlockHistory or VersionedTableMutation rows, because - -- those tables only exist to enable rewinds to the past. - let systemTables = ["BlockHistory", "VersionedTableMutation"] - forM_ systemTables $ \tbl -> do - let tblText = fromUtf8 (getTableName tbl) - logg Info $ "Compacting system table " <> tblText - execM' - ("compactSystemTables: " <> tblText) - tbl - "DELETE FROM $VTABLE$ WHERE blockheight != ?1;" - [bhToSType bh] - -dropCompactTables :: CompactM () -dropCompactTables = do - execNoTemplateM_ "dropCompactTables.0" - "DROP TABLE CompactActiveRow" - -compact :: () - => BlockHeight - -> YAL.Logger SomeLogMessage - -> Database - -> [CompactFlag] - -> IO () -compact blockHeight logger db flags = runCompactM (CompactEnv logger db flags) $ do - logg Info "Beginning compaction" - - withTx createCompactActiveRow - - txId <- fmap (TxId . fromIntegral @Int64 @Word64) $ ioQuery "getEndingTxId" $ \_ -> do - catch (getEndingTxId db blockHeight) $ \(_ :: SomeException) -> do - throwM $ CompactExceptionInvalidBlockHeight blockHeight - - logg Info $ "Target blockheight: " <> sshow blockHeight - logg Info $ "Ending TxId: " <> sshow txId - withTx $ - -- first we delete all traces of data newer than the target. - -- this is safe, because subsequent compaction wouldn't see it anyway. - liftIO $ - rewindDbToBlock db blockHeight txId - - -- we get each existing user table at this height. note that all nonexisting - -- tables have been dropped by rewindDbToBlock earlier. - versionedTables <- getVersionedTables - - withTables versionedTables $ \tbl -> collectTableRows txId tbl - - withTx $ do - withTables versionedTables $ \tbl -> do - compactTable tbl - -- then we delete data in system tables which is block height-indexed and - -- is not needed, because we can't rewind to it anymore. - compactSystemTables blockHeight - - unlessFlagSet KeepCompactTables $ do - logg Info "Dropping compact-specific tables" - withTx dropCompactTables - - unlessFlagSet NoVacuum $ do - logg Info "Vacuum" - execNoTemplateM_ "VACUUM" "VACUUM;" - - debugLogs <- liftIO $ readIORef queryTimes - let -- every query that takes >= 1 second - expensiveQueries = List.filter (not . null . snd) - $ List.map - ( over _2 - (List.take 10 - . List.sortOn (Down . fst) - . List.filter ((>= Chronos.second) . fst) - . Set.toList - ) - ) - $ M.toList debugLogs.tableQueries - - forM_ expensiveQueries $ \(tblName, mostWanted) -> do - logg Debug $ "Most expensive queries on table " <> fromUtf8 (getTableName tblName) - forM_ mostWanted $ \(ts, qryMsg) -> do - logg Debug $ "Query " <> qryMsg <> " took " <> Text.pack (show (Chronos.asSeconds ts)) <> "s" - - logg Info "Compaction complete" - -data TargetBlockHeight - = Target !BlockHeight - -- ^ compact to this blockheight across all chains - | LatestUnsafe - -- ^ for each chain, compact to its latest blockheight - -- - -- Unsafe due to the potential for forks. - | LatestSafe - -- ^ Compact to `latestCommonBlockHeight - someSafeConstant` - deriving stock (Eq, Show) - -data CompactConfig = CompactConfig - { ccBlockHeight :: TargetBlockHeight - , ccDbDir :: FilePath - , ccVersion :: ChainwebVersion - , ccFlags :: [CompactFlag] - , ccChains :: Maybe (Set ChainId) - , logDir :: FilePath - , ccThreads :: Int +defaultRetainment :: Retainment +defaultRetainment = Retainment + { keepFullTransactionIndex = False + , compactThese = CompactBoth } - deriving stock (Eq, Show) - -compactAll :: CompactConfig -> IO () -compactAll CompactConfig{..} = do - latestBlockHeightChain0 <- do - let cid = unsafeChainId 0 - withDefaultLogger LL.Error $ \logger -> do - let resetDb = False - withSqliteDb cid logger ccDbDir resetDb $ \db -> do - getLatestBlockHeight db - - let allCids = Set.fromList $ F.toList $ chainIdsAt ccVersion latestBlockHeightChain0 - let targetCids = Set.toList $ maybe allCids (Set.intersection allCids) ccChains - - targets <- withDefaultLogger LL.Error $ \logger -> do - locateTargets logger ccDbDir targetCids ccBlockHeight - - flip (pooledMapConcurrentlyN_ ccThreads) targetCids $ \cid -> do - withPerChainFileLogger logDir cid LL.Debug $ \logger -> do - let resetDb = False - withSqliteDb cid logger ccDbDir resetDb $ \db -> do - let target = targets ^?! ix cid - void $ compact target logger db ccFlags + +data CompactThese = CompactOnlyRocksDb | CompactOnlyPactState | CompactBoth | CompactNeither + deriving stock (Eq) + +data ConcurrentChains = SingleChain | ManyChainsAtOnce + +getConfig :: IO Config +getConfig = do + O.execParser opts + where + opts :: O.ParserInfo Config + opts = O.info (parser O.<**> O.helper) + (O.fullDesc <> O.progDesc "Pact DB Compaction Tool - create a compacted copy of the source database directory Pact DB into the target directory.") + + parser :: O.Parser Config + parser = Config + <$> (parseVersion <$> O.strOption (O.long "chainweb-version" <> O.value "mainnet01")) + <*> O.strOption (O.long "from" <> O.help "Directory containing SQLite Pact state and RocksDB block data to compact (expected to be in $DIR/0/{sqlite,rocksDb}") + <*> O.strOption (O.long "to" <> O.help "Directory where to place the compacted Pact state and block data. It will place them in $DIR/0/{sqlite,rocksDb}, respectively.") + <*> O.flag SingleChain ManyChainsAtOnce (O.long "parallel" <> O.help "Turn on multi-threaded compaction. The threads are per-chain.") + <*> O.strOption (O.long "log-dir" <> O.help "Directory where compaction logs will be placed.") + -- Hidden options + <*> O.switch (O.long "keep-full-rocksdb" <> O.hidden) + <*> O.switch (O.long "no-rocksdb" <> O.hidden) + <*> O.switch (O.long "no-pact" <> O.hidden) + + parseVersion :: Text -> ChainwebVersion + parseVersion = + lookupVersionByName + . fromMaybe (error "ChainwebVersion parse failed") + . fromText main :: IO () main = do - config <- execParser opts - compactAll config - where - opts :: ParserInfo CompactConfig - opts = info (parser <**> helper) - (fullDesc <> progDesc "Pact DB Compaction tool") - - collapseSum :: [Parser [a]] -> Parser [a] - collapseSum = foldr (\x y -> (++) <$> x <*> y) (pure []) - - maybeList :: [a] -> Maybe [a] - maybeList = \case - [] -> Nothing - xs -> Just xs - - parseTarget :: Parser TargetBlockHeight - parseTarget = - let target = fmap (Target . fromIntegral @Word) $ option auto - (long "target-blockheight" - <> metavar "BLOCKHEIGHT" - <> internal + compact =<< getConfig + +compactPactState :: (Logger logger) => logger -> Retainment -> BlockHeight -> SQLiteEnv -> SQLiteEnv -> IO () +compactPactState logger rt targetBlockHeight srcDb targetDb = do + let log = logFunctionText logger + + -- These pragmas are tuned for fast insertion on systems with a wide range + -- of system resources. + -- + -- journal_mode = OFF is terrible for prod but probably OK here + -- since we are just doing a bunch of bulk inserts + -- + -- See SQLite Pragma docs: https://www.sqlite.org/pragma.html + let fastBulkInsertPragmas = + [ "journal_mode = OFF" + , "synchronous = OFF" + , "cache_size = -9766" -- 10 Megabytes + , "temp_store = FILE" + , "shrink_memory" + ] + + -- Establish pragmas for bulk insert performance + -- + -- Note that we can't apply pragmas to the src + -- because we can't guarantee it's not being accessed + -- by another process. + Pact.runPragmas targetDb fastBulkInsertPragmas + + -- Create checkpointer tables on the target + createCheckpointerTables targetDb logger + + -- Compact BlockHistory + -- This is extremely fast and low residency + do + log LL.Info "Compacting BlockHistory" + activeRow <- getBlockHistoryRowAt logger srcDb targetBlockHeight + Pact.exec' targetDb "INSERT INTO BlockHistory VALUES (?1, ?2, ?3)" activeRow + + -- Compact VersionedTableMutation + -- This is extremely fast and low residency + do + log LL.Info "Compacting VersionedTableMutation" + activeRows <- getVersionedTableMutationRowsAt logger srcDb targetBlockHeight + Lite.withStatement targetDb "INSERT INTO VersionedTableMutation VALUES (?1, ?2)" $ \stmt -> do + forM_ activeRows $ \row -> do + Pact.bindParams stmt row + void $ stepThenReset stmt + + -- Copy over VersionedTableCreation. Read-only rewind needs to know + -- when the table existed at that time, so we can't compact this. + -- + -- This is pretty fast and low residency + do + log LL.Info "Copying over VersionedTableCreation" + let wholeTableQuery = "SELECT tablename, createBlockheight FROM VersionedTableCreation" + throwSqlError $ qryStream srcDb wholeTableQuery [] [RText, RInt] $ \tblRows -> do + Lite.withStatement targetDb "INSERT INTO VersionedTableCreation VALUES (?1, ?2)" $ \stmt -> do + flip S.mapM_ tblRows $ \row -> do + Pact.bindParams stmt row + void $ stepThenReset stmt + + -- Copy over TransactionIndex. + -- + -- If the user specifies that they want to keep the entire table, then we do so, otherwise, + -- we compact this based on the RocksDB 'blockHeightKeepDepth'. + -- + -- /poll and SPV rely on having this table synchronised with RocksDB. + -- We need to document APIs which need TransactionIndex. + -- + -- Maybe consider + -- https://tableplus.com/blog/2018/07/sqlite-how-to-copy-table-to-another-database.html + do + (qry, args) <- + if rt.keepFullTransactionIndex + then do + log LL.Info "Copying over entire TransactionIndex table. This could take a while. Only do this if you are sure you need it." + let wholeTableQuery = "SELECT txhash, blockheight FROM TransactionIndex ORDER BY blockheight" + pure (wholeTableQuery, []) + else do + log LL.Info "Copying over TransactionIndex" + let wholeTableQuery = "SELECT txhash, blockheight FROM TransactionIndex WHERE blockheight >= ?1 ORDER BY blockheight" + pure (wholeTableQuery, [SInt (int (targetBlockHeight - blockHeightKeepDepth))]) + + throwSqlError $ qryStream srcDb qry args [RBlob, RInt] $ \tblRows -> do + Lite.withStatement targetDb "INSERT INTO TransactionIndex VALUES (?1, ?2)" $ \stmt -> do + -- I experimented a bunch with chunk sizes, to keep transactions + -- small. As far as I can tell, there isn't really much + -- difference in any of them wrt residency, but there is wrt + -- speed. More experimentation may be needed here, but 10k is + -- fine so far. + S.chunksOf 10_000 tblRows + & S.mapsM_ (\chunk -> do + inTx targetDb $ flip S.mapM_ chunk $ \row -> do + Pact.bindParams stmt row + void (stepThenReset stmt) ) - latestUnsafe = flag' LatestUnsafe - (long "latest-unsafe" <> internal) - latestSafe = flag' LatestSafe - (long "latest-safe" <> internal) - in - target <|> latestUnsafe <|> latestSafe <|> pure LatestSafe - - parser :: Parser CompactConfig - parser = CompactConfig - <$> parseTarget - <*> strOption - (short 'd' - <> long "pact-database-dir" - <> metavar "DBDIR" - <> help "Pact database directory" - ) - <*> (parseChainwebVersion <$> strOption - (short 'v' - <> long "graph-version" - <> metavar "VERSION" - <> help "Chainweb version for graph. Only needed for non-standard graphs." - <> value (toText (_versionName mainnet)) - <> showDefault)) - <*> collapseSum - [ flag [] [KeepCompactTables] - (long "keep-compact-tables" - <> help "Keep compaction tables post-compaction, for inspection." - <> internal - ) - , flag [] [NoVacuum] - (long "no-vacuum" - <> help "Don't VACUUM database." - <> internal - ) - ] - <*> fmap (fmap Set.fromList . maybeList) (many (unsafeChainId <$> option auto - (short 'c' - <> long "chain" - <> metavar "CHAINID" - <> help "Add this chain to the target set of ones to compact." - <> internal - ))) - <*> strOption - (long "log-dir" - <> metavar "DIRECTORY" - <> help "Directory where logs will be placed" - <> value "." - ) - <*> option auto - (short 't' - <> long "threads" - <> metavar "THREADS" - <> value 4 - <> help "Number of threads for compaction processing" - ) - - parseChainwebVersion :: Text -> ChainwebVersion - parseChainwebVersion = lookupVersionByName . fromMaybe (error "ChainwebVersion parse failed") . fromText - -bhToSType :: BlockHeight -> SType -bhToSType bh = SInt (int bh) - -txIdToSType :: TxId -> SType -txIdToSType (TxId txid) = SInt (fromIntegral txid) - -tableNameToSType :: TableName -> SType -tableNameToSType (TableName tbl) = SText tbl + + -- Vacuuming after copying over all of the TransactionIndex data, + -- but before creating its indices, makes a big differences in + -- memory residency (~0.5G), at the expense of speed (~20s increase) + Pact.exec_ targetDb "VACUUM;" + + -- Create the checkpointer table indices after bulk-inserting into them + -- This is faster than creating the indices before + createCheckpointerIndexes targetDb logger + + -- Grab the endingtxid for determining latest state at the + -- target height + endingTxId <- getEndingTxId srcDb targetBlockHeight + log LL.Info $ "Ending TxId is " <> sshow endingTxId + + -- Compact all user tables + log LL.Info "Starting user tables" + getLatestPactTableNamesAt srcDb targetBlockHeight + & S.mapM_ (\tblname -> do + compactTable logger srcDb targetDb (fromUtf8 tblname) endingTxId + ) + + log LL.Info "Compaction done" + +-- We are trying to make sure that we keep around at least 3k blocks. +-- The compaction target is 1k blocks prior to the latest common +-- blockheight (i.e. min (map blockHeight allChains)), so we take +-- the target and add 1k, but the chains can differ at most by the +-- diameter of the chaingraph, so we also add that to make sure that +-- we have full coverage of every chain. +-- +-- To keep around another 2k blocks (to get to ~3k), we subtract 2k +-- from the target. +-- +-- Note that the number 3k was arbitrary but chosen to be a safe +-- amount of data more than what is in SQLite. +blockHeightKeepDepth :: BlockHeight +blockHeightKeepDepth = 2_000 + +compact :: Config -> IO () +compact cfg = do + let cids = allChains cfg.chainwebVersion + + let _compactThese = case (cfg.noRocksDb, cfg.noPactState) of + (True, True) -> CompactNeither + (True, False) -> CompactOnlyPactState + (False, True) -> CompactOnlyRocksDb + (False, False) -> CompactBoth + + -- Get the target blockheight. + targetBlockHeight <- withDefaultLogger LL.Debug $ \logger -> do + -- Locate the latest (safe) blockheight as per the pact state. + -- See 'locateLatestSafeTarget' for the definition of 'safe' here. + targetBlockHeight <- locateLatestSafeTarget logger cfg.chainwebVersion (pactDir cfg.fromDir) cids + logFunctionText logger LL.Debug $ "targetBlockHeight: " <> sshow targetBlockHeight + + let initDir = do + -- Check that the target directory doesn't exist already, + -- then create its entire tree. + toDirExists <- doesDirectoryExist cfg.toDir + when toDirExists $ do + exitLog logger "Compaction \"To\" directory already exists. Aborting." + + case _compactThese of + CompactNeither -> do + exitLog logger "No compaction requested. Exiting." + + CompactOnlyRocksDb -> do + initDir + createDirectoryIfMissing True (rocksDir cfg.toDir) + + CompactOnlyPactState -> do + initDir + createDirectoryIfMissing True (pactDir cfg.toDir) + + CompactBoth -> do + initDir + createDirectoryIfMissing True (rocksDir cfg.toDir) + createDirectoryIfMissing True (pactDir cfg.toDir) + + pure targetBlockHeight + + -- Compact RocksDB. + when (not cfg.noRocksDb) $ do + withRocksDbFileLogger cfg.logDir LL.Debug $ \logger -> do + withReadOnlyRocksDb (rocksDir cfg.fromDir) modernDefaultOptions $ \srcRocksDb -> do + withRocksDb (rocksDir cfg.toDir) (modernDefaultOptions { compression = NoCompression }) $ \targetRocksDb -> do + compactRocksDb (set setLoggerLevel (l2l LL.Info) logger) cfg.chainwebVersion cids (targetBlockHeight - blockHeightKeepDepth) srcRocksDb targetRocksDb + + -- Compact the pact state. + let retainment = Retainment + { keepFullTransactionIndex = cfg.keepFullTransactionIndex + , compactThese = _compactThese + } + when (not cfg.noPactState) $ do + forChains_ cfg.concurrent cids $ \cid -> do + withPerChainFileLogger cfg.logDir cid LL.Debug $ \logger -> do + withChainDb cid logger (pactDir cfg.fromDir) $ \_ srcDb -> do + withChainDb cid logger (pactDir cfg.toDir) $ \_ targetDb -> do + compactPactState logger retainment targetBlockHeight srcDb targetDb + +compactTable :: (Logger logger) + => logger -- ^ logger + -> Database -- ^ source database (where we get the active pact state) + -> Database -- ^ target database (where we put the compacted state, + use as a rowkey cache) + -> Text -- ^ the table we are compacting + -> Int64 -- ^ target blockheight + -> IO () +compactTable logger srcDb targetDb tblname endingTxId = do + let log = logFunctionText logger + let tblnameUtf8 = toUtf8 tblname + + log LL.Info $ "Creating table " <> tblname + createUserTable targetDb tblnameUtf8 + + -- We create the user table indices before inserting into the table. + -- This makes the insertions slower, but it's for good reason. + -- + -- The query that grabs the pact state from the source db groups rowkeys + -- in descending order by txid. We then simply need to keep only the first + -- appearance of each rowkey. A simple in-memory cache does not suffice, + -- because we have strict max residency requirements. So in order to fully + -- stream with minimal residency, we use the target database as a rowkey cache. + -- For each rowkey, we check if it appears in the target, and if it does, we + -- discard that row and move on to the next. This is why we need the indices, + -- because this membership check is extremely slow without it, and it far + -- outweighs the insert slowdowns imposed by the indices. + log LL.Info $ "Creating table indices for " <> tblname + createUserTableIndex targetDb tblnameUtf8 + + -- We create this index so that we can upsert into the target database based on rowkey. + createUserTableUniqueRowKeyIndex targetDb tblnameUtf8 + + -- If the rowkey is in the target database, and the txid is greater than the one in the target database, then update the row. + -- If the rowkey is not in the target database, then insert the row. + let upsertQuery = Text.concat + [ "INSERT INTO ", fromUtf8 (tbl tblnameUtf8), " (rowkey, txid, rowdata) " + , " VALUES (?1, ?2, ?3) " + , " ON CONFLICT(rowkey) DO UPDATE SET " + , " txid=excluded.txid," + , " rowdata=excluded.rowdata" + , " WHERE excluded.txid > ", fromUtf8 (tbl tblnameUtf8), ".txid" + ] + + -- This query gets all rows at or below (older than) the target blockheight. + -- Note that endingtxid is exclusive, so we need to use '<' instead of '<='. + -- + -- We order by rowid descending because rowid order *generally* (but does not always) agrees + -- with txid order. This allows the query to be performed as a linear scan on disk. Post-compaction, + -- the rowid order should always be the same as the txid order, because we set rowid to AUTOINCREMENT, meaning + -- compacted an already-compacted database will be even faster. + let activeStateQryText = "SELECT rowkey, txid, rowdata FROM " + <> "[" <> tblnameUtf8 <> "]" + <> " WHERE txid < ?1" + <> " ORDER BY rowid DESC" + let activeStateQryArgs = [SInt endingTxId] + let activeStateQryRetTypes = [RText, RInt, RBlob] + + e <- qryStream srcDb activeStateQryText activeStateQryArgs activeStateQryRetTypes $ \rs -> do + Lite.withStatement targetDb upsertQuery $ \upsertRow -> do + log LL.Info $ "Inserting compacted rows into table " <> tblname + + rs + & S.chunksOf 10_000 + & S.mapsM_ (\chunk -> do + inTx targetDb $ flip S.mapM_ chunk $ \row -> do + case row of + [SText _, SInt _, SBlob _] -> do + Pact.bindParams upsertRow row + void $ stepThenReset upsertRow + _badRowShape -> do + exitLog logger "Encountered invalid row shape while compacting" + ) + + -- This index only makes sense during construction of the target database, not after. + -- If we were to keep this index around, the node would not be able to operate, since + -- we need to update new rows for the same rowkey. + deleteUserTableUniqueRowKeyIndex targetDb tblnameUtf8 + + case e of + Left sqlErr -> exitLog logger $ "Encountered SQLite error while compacting: " <> sshow sqlErr + Right () -> pure () + + log LL.Info $ "Done compacting table " <> tblname + +-- | Create all the checkpointer tables +createCheckpointerTables :: (Logger logger) + => Database + -> logger + -> IO () +createCheckpointerTables db logger = do + let log = logFunctionText logger LL.Info + + log "Creating Checkpointer table BlockHistory" + inTx db $ Pact.exec_ db $ mconcat + [ "CREATE TABLE IF NOT EXISTS BlockHistory " + , "(blockheight UNSIGNED BIGINT NOT NULL" + , ", hash BLOB NOT NULL" + , ", endingtxid UNSIGNED BIGINT NOT NULL" + , ");" + ] + + log "Creating Checkpointer table VersionedTableCreation" + inTx db $ Pact.exec_ db $ mconcat + [ "CREATE TABLE IF NOT EXISTS VersionedTableCreation " + , "(tablename TEXT NOT NULL" + , ", createBlockheight UNSIGNED BIGINT NOT NULL" + , ");" + ] + + log "Creating Checkpointer table VersionedTableMutation" + inTx db $ Pact.exec_ db $ mconcat + [ "CREATE TABLE IF NOT EXISTS VersionedTableMutation " + , "(tablename TEXT NOT NULL" + , ", blockheight UNSIGNED BIGINT NOT NULL" + , ");" + ] + + log "Creating Checkpointer table TransactionIndex" + inTx db $ Pact.exec_ db $ mconcat + [ "CREATE TABLE IF NOT EXISTS TransactionIndex " + , "(txhash BLOB NOT NULL" + , ", blockheight UNSIGNED BIGINT NOT NULL" + , ");" + ] + + forM_ ["BlockHistory", "VersionedTableCreation", "VersionedTableMutation", "TransactionIndex"] $ \tblname -> do + log $ "Deleting from table " <> fromUtf8 tblname + Pact.exec_ db $ "DELETE FROM " <> tbl tblname + +-- | Create all the indexes for the checkpointer tables. +createCheckpointerIndexes :: (Logger logger) => Database -> logger -> IO () +createCheckpointerIndexes db logger = do + let log = logFunctionText logger LL.Info + + log "Creating BlockHistory index" + inTx db $ Pact.exec_ db + "CREATE UNIQUE INDEX IF NOT EXISTS BlockHistory_blockheight_unique_ix ON BlockHistory (blockheight)" + + log "Creating VersionedTableCreation index" + inTx db $ Pact.exec_ db + "CREATE UNIQUE INDEX IF NOT EXISTS VersionedTableCreation_createBlockheight_tablename_unique_ix ON VersionedTableCreation (createBlockheight, tablename)" + + log "Creating VersionedTableMutation index" + inTx db $ Pact.exec_ db + "CREATE UNIQUE INDEX IF NOT EXISTS VersionedTableMutation_blockheight_tablename_unique_ix ON VersionedTableMutation (blockheight, tablename)" + + log "Creating TransactionIndex indexes" + inTx db $ Pact.exec_ db + "CREATE UNIQUE INDEX IF NOT EXISTS TransactionIndex_txhash_unique_ix ON TransactionIndex (txhash)" + inTx db $ Pact.exec_ db + "CREATE INDEX IF NOT EXISTS TransactionIndex_blockheight_ix ON TransactionIndex (blockheight)" + +-- | Create a single user table +createUserTable :: Database -> Utf8 -> IO () +createUserTable db tblname = do + Pact.exec_ db $ mconcat + [ "CREATE TABLE IF NOT EXISTS ", tbl tblname, " " + , "(rowid INTEGER PRIMARY KEY AUTOINCREMENT" + , ", rowkey TEXT" -- This should probably be NOT NULL, but we have no proof of that, so for now this is just kept the same as chainweb-node's implementation. + , ", txid UNSIGNED BIGINT NOT NULL" + , ", rowdata BLOB NOT NULL" + , ");" + ] + + Pact.exec_ db $ "DELETE FROM " <> tbl tblname + +-- | Create the indexes for a single user table +createUserTableIndex :: Database -> Utf8 -> IO () +createUserTableIndex db tblname = do + inTx db $ do + Pact.exec_ db $ mconcat + [ "CREATE UNIQUE INDEX IF NOT EXISTS ", tbl (tblname <> "_rowkey_txid_unique_ix"), " ON " + , tbl tblname, " (rowkey, txid)" + ] + Pact.exec_ db $ mconcat + [ "CREATE INDEX IF NOT EXISTS ", tbl (tblname <> "_txid_ix"), " ON " + , tbl tblname, " (txid DESC)" + ] + +-- | Create a temporary index on 'rowkey' for a user table, so that upserts work correctly. +createUserTableUniqueRowKeyIndex :: Database -> Utf8 -> IO () +createUserTableUniqueRowKeyIndex db tblname = do + inTx db $ do + Pact.exec_ db $ mconcat + [ "CREATE UNIQUE INDEX IF NOT EXISTS ", tbl (tblname <> "_rowkey_unique_ix_TEMP"), " ON " + , tbl tblname, " (rowkey)" + ] + +deleteUserTableUniqueRowKeyIndex :: Database -> Utf8 -> IO () +deleteUserTableUniqueRowKeyIndex db tblname = do + inTx db $ do + Pact.exec_ db $ mconcat + [ "DROP INDEX IF EXISTS ", tbl (tblname <> "_rowkey_unique_ix_TEMP") + ] + +-- | Returns the active @(blockheight, hash, endingtxid)@ from BlockHistory +getBlockHistoryRowAt :: (Logger logger) + => logger + -> Database + -> BlockHeight + -> IO [SType] +getBlockHistoryRowAt logger db target = do + r <- Pact.qry db "SELECT blockheight, hash, endingtxid FROM BlockHistory WHERE blockheight = ?1" [SInt (int target)] [RInt, RBlob, RInt] + case r of + [row@[SInt bh, SBlob _hash, SInt _endingTxId]] -> do + unless (target == int bh) $ do + exitLog logger "BlockHeight mismatch in BlockHistory query. This is a bug in the compaction tool. Please report it on the issue tracker or discord." + pure row + _ -> do + exitLog logger "getBlockHistoryRowAt query: invalid query" + +-- | Returns active @[(tablename, blockheight)]@ from VersionedTableMutation +getVersionedTableMutationRowsAt :: (Logger logger) + => logger + -> Database + -> BlockHeight + -> IO [[SType]] +getVersionedTableMutationRowsAt logger db target = do + r <- Pact.qry db "SELECT tablename, blockheight FROM VersionedTableMutation WHERE blockheight = ?1" [SInt (int target)] [RText, RInt] + forM r $ \case + row@[SText _, SInt bh] -> do + unless (target == int bh) $ do + exitLog logger "BlockHeight mismatch in VersionedTableMutation query. This is a bug in the compaction tool. Please report it." + pure row + _ -> do + exitLog logger "getVersionedTableMutationRowsAt query: invalid query" + +tbl :: Utf8 -> Utf8 +tbl u = "[" <> u <> "]" + +-- | Locate the latest "safe" target blockheight for compaction. +-- +-- In mainnet/testnet, this is determined +-- to be the @mininum (map latestBlockHeight chains) - 1000@. +-- +-- In devnet, this is just the latest common blockheight +-- (or @minimum (map latestBlockHeight chains)@). +locateLatestSafeTarget :: (Logger logger) + => logger + -> ChainwebVersion + -> FilePath + -> [ChainId] + -> IO BlockHeight +locateLatestSafeTarget logger v dbDir cids = do + let log = logFunctionText logger + + let logger' = set setLoggerLevel (l2l LL.Error) logger + latestCommon <- getLatestCommonBlockHeight logger' dbDir cids + earliestCommon <- getEarliestCommonBlockHeight logger' dbDir cids + + log LL.Debug $ "Latest Common BlockHeight: " <> sshow latestCommon + log LL.Debug $ "Earliest Common BlockHeight: " <> sshow earliestCommon + + -- Make sure we have at least 1k blocks of depth for prod. + -- In devnet or testing versions we don't care. + let safeDepth :: BlockHeight + safeDepth + | v == mainnet || v == testnet = BlockHeight 1_000 + | otherwise = BlockHeight 0 + + when (latestCommon - earliestCommon < safeDepth) $ do + exitLog logger "locateLatestSafeTarget: Not enough history to safely compact. Aborting." + + let target = latestCommon - safeDepth + log LL.Debug $ "Compaction target blockheight is: " <> sshow target + pure target + +-- | Log an error message, then exit with code 1. +exitLog :: (Logger logger) + => logger + -> Text + -> IO a +exitLog logger msg = do + logFunctionText logger LL.Error msg + exitFailure + +-- | Step through a prepared statement, then clear the statement's bindings +-- and reset the statement. +stepThenReset :: Lite.Statement -> IO Lite.StepResult +stepThenReset stmt = do + Lite.stepNoCB stmt `finally` (Lite.clearBindings stmt >> Lite.reset stmt) + +-- | This is either 'forM_' or 'pooledForConcurrently_', depending on +-- the 'ConcurrentChains' input. +forChains_ :: ConcurrentChains -> [ChainId] -> (ChainId -> IO a) -> IO () +forChains_ = \case + SingleChain -> forM_ + ManyChainsAtOnce -> pooledForConcurrently_ + +-- | Swallow a SQLite 'Lite.Error' and throw it. +throwSqlError :: IO (Either Lite.Error a) -> IO a +throwSqlError ioe = do + e <- ioe + case e of + Left err -> error (show err) + Right a -> pure a + +-- | Run the 'IO' action inside of a transaction. +inTx :: Database -> IO a -> IO a +inTx db io = do + bracket_ + (Pact.exec_ db "BEGIN;") + (Pact.exec_ db "COMMIT;") + io + +pactDir :: FilePath -> FilePath +pactDir db = db "0/sqlite" + +rocksDir :: FilePath -> FilePath +rocksDir db = db "0/rocksDb" + +-- | Copy over all CutHashes, all BlockHeaders, and only some Payloads. +compactRocksDb :: (Logger logger) + => logger + -> ChainwebVersion -- ^ cw version + -> [ChainId] -- ^ ChainIds + -> BlockHeight -- ^ minBlockHeight for payload copying + -> RocksDb -- ^ source db, should be opened read-only + -> RocksDb -- ^ target db + -> IO () +compactRocksDb logger cwVersion cids minBlockHeight srcDb targetDb = do + let log = logFunctionText logger + + -- Copy over entirety of CutHashes table + let srcCutHashes = cutHashesTable srcDb + let targetCutHashes = cutHashesTable targetDb + log LL.Info "Copying over CutHashes table" + withTableIterator (unCasify srcCutHashes) $ \srcIt -> do + let go = do + iterEntry srcIt >>= \case + Nothing -> do + pure () + Just (Entry k v) -> do + log LL.Debug $ "Copying over Cut " <> cutIdToText (k ^. _3) + tableInsert targetCutHashes k v + iterNext srcIt + go + go + + -- Migrate BlockHeaders and Payloads + let srcPayloads = newPayloadDb srcDb + let targetPayloads = newPayloadDb targetDb + + -- The target payload db has to be initialised. + log LL.Info "Initializing payload db" + initializePayloadDb cwVersion targetPayloads + + srcWbhdb <- initWebBlockHeaderDb srcDb cwVersion + targetWbhdb <- initWebBlockHeaderDb targetDb cwVersion + forM_ cids $ \cid -> do + let log' = logFunctionText (addChainIdLabel cid logger) + log' LL.Info $ "Starting chain " <> chainIdToText cid + srcBlockHeaderDb <- getWebBlockHeaderDb srcWbhdb cid + targetBlockHeaderDb <- getWebBlockHeaderDb targetWbhdb cid + + withTableIterator (_chainDbCas srcBlockHeaderDb) $ \it -> do + -- Grab the latest header, for progress logging purposes. + latestHeader <- do + iterLast it + iterValue it >>= \case + Nothing -> exitLog logger "Missing final payload. This is likely due to a corrupted database." + Just rbh -> pure (_getRankedBlockHeader rbh ^. blockHeight) + + -- Go to the earliest entry. We migrate all BlockHeaders, for now. + -- They are needed for SPV. + -- + -- Constructing SPV proofs actually needs the payloads, but validating + -- them does not. + iterFirst it + earliestHeader <- do + iterValue it >>= \case + Nothing -> exitLog logger "Missing first payload. This is likely due to a corrupted database." + Just rbh -> pure (_getRankedBlockHeader rbh ^. blockHeight) + + -- Ensure that we log progress 100 times per chain + -- I just made this number up as something that felt somewhat sensible + let offset = (latestHeader - earliestHeader) `div` 100 + let headerProgressPoints = [earliestHeader + i * offset | i <- [1..100]] + + let logHeaderProgress bHeight = do + when (bHeight `elem` headerProgressPoints) $ do + let percentDone = sshow $ 100 * fromIntegral @_ @Double (bHeight - earliestHeader) / fromIntegral @_ @Double (latestHeader - earliestHeader) + log' LL.Info $ percentDone <> "% done." + + let go = do + iterValue it >>= \case + Nothing -> do + log' LL.Info "Finished copying headers and payloads" + Just rankedBlockHeader -> do + let blkHeader = _getRankedBlockHeader rankedBlockHeader + let blkHeight = view blockHeight blkHeader + let blkHash = view blockHash blkHeader + + -- Migrate the ranked block table and rank table + -- unconditionally. + -- Right now, the headers are definitely needed (we can't delete any). + -- + -- Not sure about the rank table, though. We keep it to be + -- conservative. + log' LL.Debug $ "Copying over BlockHeader " <> toText blkHash + tableInsert (_chainDbCas targetBlockHeaderDb) (RankedBlockHash blkHeight blkHash) rankedBlockHeader + tableInsert (_chainDbRankTable targetBlockHeaderDb) blkHash blkHeight + + -- We only add the payloads for blocks that are in the + -- interesting range. + when (blkHeight >= minBlockHeight) $ do + -- Insert the payload into the new database + let payloadHash = blkHeader ^. blockPayloadHash + log' LL.Info $ "Migrating block payload " <> sshow payloadHash <> " for BlockHeight " <> sshow blkHeight + lookupPayloadWithHeight srcPayloads (Just blkHeight) payloadHash >>= \case + Nothing -> do + exitLog logger "Missing payload: This is likely due to a corrupted database." + Just payloadWithOutputs -> do + addNewPayload targetPayloads blkHeight payloadWithOutputs + + logHeaderProgress blkHeight + + iterNext it + go + go diff --git a/src/Chainweb/Pact/Backend/PactState.hs b/src/Chainweb/Pact/Backend/PactState.hs index 9808ffa53f..1d3d2ed0dc 100644 --- a/src/Chainweb/Pact/Backend/PactState.hs +++ b/src/Chainweb/Pact/Backend/PactState.hs @@ -1,11 +1,14 @@ +{-# LANGUAGE BangPatterns #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ImportQualifiedPost #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StrictData #-} @@ -26,6 +29,8 @@ module Chainweb.Pact.Backend.PactState , getPactTables , getLatestPactStateDiffable , getLatestPactStateAt + , withLatestPactStateAt + , getLatestPactTableNamesAt , getLatestPactStateAtDiffable , getLatestBlockHeight , getEarliestBlockHeight @@ -36,7 +41,10 @@ module Chainweb.Pact.Backend.PactState , withChainDb , addChainIdLabel , doesPactDbExist + , chainDbFileName , allChains + , qryStream + , bulkInsert , PactRow(..) , PactRowContents(..) @@ -45,42 +53,41 @@ module Chainweb.Pact.Backend.PactState ) where -import Control.Exception (bracket) -import Control.Monad (forM, forM_, when) +import Chainweb.BlockHeight (BlockHeight(..)) +import Chainweb.Logger (Logger, addLabel) +import Chainweb.Pact.Backend.Types (SQLiteEnv) +import Chainweb.Pact.Backend.Utils (fromUtf8, toUtf8, withSqliteDb) +import Chainweb.Utils (T2(..), int) +import Chainweb.Version (ChainId, ChainwebVersion, chainIdToText) +import Chainweb.Version.Utils (chainIdsAt) +import Control.Exception (bracket, finally) +import Control.Monad (forM, forM_, when, void) +import Control.Monad.Except (ExceptT(..), runExceptT, throwError) import Control.Monad.IO.Class (MonadIO(liftIO)) import Control.Monad.Trans.Class (lift) -import Control.Monad.Except (ExceptT(..), runExceptT, throwError) import Data.Aeson (ToJSON(..), (.=)) import Data.Aeson qualified as Aeson -import Data.Vector (Vector) -import Data.Vector qualified as Vector import Data.ByteString (ByteString) -import Data.Int (Int64) import Data.Foldable qualified as F +import Data.Function ((&)) +import Data.Int (Int64) import Data.List qualified as List import Data.Map (Map) import Data.Map.Strict qualified as M +import Data.Set (Set) +import Data.Set qualified as Set import Data.Text (Text) import Data.Text qualified as Text import Data.Text.Encoding qualified as Text +import Database.SQLite3 qualified as SQL import Database.SQLite3.Direct (Utf8(..), Database) -import Database.SQLite3.Direct qualified as SQL - -import Chainweb.BlockHeight (BlockHeight(..)) -import Chainweb.Logger (Logger, addLabel) -import Chainweb.Pact.Backend.Types (SQLiteEnv) -import Chainweb.Pact.Backend.Utils (fromUtf8, withSqliteDb) -import Chainweb.Utils (int) -import Chainweb.Version (ChainId, ChainwebVersion, chainIdToText) -import Chainweb.Version.Utils (chainIdsAt) - -import System.Directory (doesFileExist) -import System.FilePath (()) - +import Database.SQLite3.Direct qualified as Direct import Pact.Types.SQLite (SType(..), RType(..)) import Pact.Types.SQLite qualified as Pact import Streaming.Prelude (Stream, Of) import Streaming.Prelude qualified as S +import System.Directory (doesFileExist) +import System.FilePath (()) excludedTables :: [Utf8] excludedTables = checkpointerTables ++ compactionTables @@ -148,14 +155,14 @@ withChainDb cid logger' path f = do withSqliteDb cid logger path resetDb (f logger) -- | Get all Pact table names in the database. -getPactTableNames :: Database -> IO (Vector Utf8) -getPactTableNames db = do +getPactTableNames :: Database -> Stream (Of Utf8) IO () +getPactTableNames db = eachIO $ do let sortedTableNames :: [[SType]] -> [Utf8] sortedTableNames rows = List.sortOn (Text.toLower . fromUtf8) $ flip List.map rows $ \case [SText u] -> u _ -> error "getPactTableNames.sortedTableNames: expected text" - tables <- fmap sortedTableNames $ do + fmap sortedTableNames $ do let qryText = "SELECT name FROM sqlite_schema \ \WHERE \ @@ -163,8 +170,11 @@ getPactTableNames db = do \AND \ \ name NOT LIKE 'sqlite_%'" Pact.qry db qryText [] [RText] - - pure (Vector.fromList tables) + where + eachIO :: (Foldable f) => IO (f a) -> Stream (Of a) IO () + eachIO m_xs = do + xs <- liftIO m_xs + S.each xs -- | Get all of the rows for each table. The tables will be appear sorted -- lexicographically by table name. @@ -172,59 +182,82 @@ getPactTables :: Database -> Stream (Of Table) IO () getPactTables db = do let fmtTable x = "\"" <> x <> "\"" - tables <- liftIO $ getPactTableNames db - - forM_ tables $ \tbl -> do - if tbl `notElem` excludedTables - then do - let qryText = "SELECT rowkey, rowdata, txid FROM " - <> fmtTable tbl - userRows <- liftIO $ Pact.qry db qryText [] [RText, RBlob, RInt] - shapedRows <- forM userRows $ \case - [SText (Utf8 rowKey), SBlob rowData, SInt txId] -> do - pure $ PactRow {..} - _ -> error "getPactTableNames: unexpected shape of user table row" - S.yield $ Table (fromUtf8 tbl) shapedRows - else do - pure () + getPactTableNames db + & S.filter (\tbl -> tbl `notElem` excludedTables) + & S.mapM (\tbl -> do + let qryText = "SELECT rowkey, rowdata, txid FROM " + <> fmtTable tbl + userRows <- liftIO $ Pact.qry db qryText [] [RText, RBlob, RInt] + shapedRows <- forM userRows $ \case + [SText (Utf8 rowKey), SBlob rowData, SInt txId] -> do + pure $ PactRow {..} + _ -> error "getPactTableNames: unexpected shape of user table row" + pure $ Table (fromUtf8 tbl) shapedRows + ) -- streaming SQLite step; see Pact SQLite module -stepStatement :: SQL.Statement -> [RType] -> Stream (Of [SType]) IO (Either SQL.Error ()) +stepStatement :: Direct.Statement -> [RType] -> Stream (Of [SType]) IO (Either Direct.Error ()) stepStatement stmt rts = runExceptT $ do -- todo: rename from acc - let acc :: SQL.StepResult -> ExceptT SQL.Error (Stream (Of [SType]) IO) () + let acc :: Direct.StepResult -> ExceptT Direct.Error (Stream (Of [SType]) IO) () acc = \case - SQL.Done -> do + Direct.Done -> do pure () - SQL.Row -> do + Direct.Row -> do as <- forM (List.zip [0..] rts) $ \(colIx, expectedColType) -> do liftIO $ case expectedColType of - RInt -> SInt <$> SQL.columnInt64 stmt colIx - RDouble -> SDouble <$> SQL.columnDouble stmt colIx - RText -> SText <$> SQL.columnText stmt colIx - RBlob -> SBlob <$> SQL.columnBlob stmt colIx + RInt -> SInt <$> Direct.columnInt64 stmt colIx + RDouble -> SDouble <$> Direct.columnDouble stmt colIx + RText -> SText <$> Direct.columnText stmt colIx + RBlob -> SBlob <$> Direct.columnBlob stmt colIx lift $ S.yield as - liftIO (SQL.step stmt) >>= \case + liftIO (Direct.step stmt) >>= \case Left err -> do throwError err Right sr -> do acc sr -- maybe use stepNoCB - ExceptT (liftIO (SQL.step stmt)) >>= acc + ExceptT (liftIO (Direct.step stmt)) >>= acc -- | Prepare/execute query with params; stream the results -qry :: () +qryStream :: () => Database -> Utf8 -> [SType] -> [RType] - -> (Stream (Of [SType]) IO (Either SQL.Error ()) -> IO x) + -> (Stream (Of [SType]) IO (Either Direct.Error ()) -> IO x) -> IO x -qry db qryText args returnTypes k = do - bracket (Pact.prepStmt db qryText) SQL.finalize $ \stmt -> do - Pact.bindParams stmt args +qryStream db qryText args returnTypes k = do + bracket (SQL.prepareUtf8 db qryText) Direct.finalize $ \stmt -> do + bindParams stmt args k (stepStatement stmt returnTypes) + where + bindParams :: Direct.Statement -> [SType] -> IO () + bindParams s as = forM_ (List.zip [1..] as) $ \(argIndex, arg) -> do + case arg of + SInt a -> Direct.bindInt64 s argIndex a + SDouble a -> Direct.bindDouble s argIndex a + SText a -> Direct.bindText s argIndex a + SBlob a -> Direct.bindBlob s argIndex a + +bulkInsert :: Database -> Text -> [[SType]] -> IO () +bulkInsert db tblname values = do + let oneRow rowSize = "(" <> Text.intercalate ", " (List.replicate rowSize "?") <> ")" + let allRows rows = Text.intercalate ", " (List.map (\row -> oneRow (List.length row)) rows) + case values of + [] -> do + pure () + rows -> do + let q = toUtf8 $ "INSERT INTO [" <> tblname <> "] VALUES " <> allRows rows + bracket (SQL.prepareUtf8 db q) Direct.finalize $ \s -> do + forM_ (List.zip [1..] (List.concat values)) $ \(argIndex, arg) -> do + case arg of + SInt a -> Direct.bindInt64 s argIndex a + SDouble a -> Direct.bindDouble s argIndex a + SText a -> Direct.bindText s argIndex a + SBlob a -> Direct.bindBlob s argIndex a + void $ SQL.stepNoCB s `finally` (SQL.clearBindings s >> SQL.reset s) -- | Get the latest Pact state (in a ready-to-diff form). getLatestPactStateDiffable :: Database -> Stream (Of TableDiffable) IO () @@ -262,8 +295,26 @@ getLatestPactStateAt :: () getLatestPactStateAt db bh = do endingTxId <- liftIO $ getEndingTxId db bh - tables <- liftIO $ getPactTableNames db - + getLatestPactTableNamesAt db bh + & S.mapM (\tbl -> do + let qryText = "SELECT rowkey, rowdata, txid FROM " + <> "\"" <> tbl <> "\"" + <> " WHERE txid do + let go :: Map ByteString PactRowContents -> [SType] -> Map ByteString PactRowContents + go m = \case + [SText (Utf8 rowKey), SBlob rowData, SInt txId] -> + M.insertWith (\prc1 prc2 -> if prc1.txId > prc2.txId then prc1 else prc2) rowKey (PactRowContents rowData txId) m + _ -> error "getLatestPactState: unexpected shape of user table row" + S.fold_ go M.empty id rows + pure (fromUtf8 tbl, latestState) + ) + +getLatestPactTableNamesAt :: () + => Database + -> BlockHeight + -> Stream (Of Utf8) IO () +getLatestPactTableNamesAt db bh = do tablesCreatedAfter <- liftIO $ do let qryText = "SELECT tablename FROM VersionedTableCreation WHERE createBlockheight > ?1" rows <- Pact.qry db qryText [SInt (int bh)] [RText] @@ -271,19 +322,63 @@ getLatestPactStateAt db bh = do [SText tbl] -> pure tbl _ -> error "getLatestPactStateAt.tablesCreatedAfter: expected text" - forM_ tables $ \tbl -> do - when (tbl `notElem` (excludedTables ++ tablesCreatedAfter)) $ do - let qryText = "SELECT rowkey, rowdata, txid FROM " - <> "\"" <> tbl <> "\"" - <> " WHERE txid do - let go :: Map ByteString PactRowContents -> [SType] -> Map ByteString PactRowContents - go m = \case - [SText (Utf8 rowKey), SBlob rowData, SInt txId] -> - M.insertWith (\prc1 prc2 -> if prc1.txId > prc2.txId then prc1 else prc2) rowKey (PactRowContents rowData txId) m - _ -> error "getLatestPactState: unexpected shape of user table row" - S.fold_ go M.empty id rows - S.yield (fromUtf8 tbl, latestState) + let excludeThese = excludedTables ++ tablesCreatedAfter + getPactTableNames db + & S.filter (\tbl -> tbl `notElem` excludeThese) + +-- | Use the Pact State at the given height. +withLatestPactStateAt :: () + => Database + -> BlockHeight + -> (forall r. Text -> Stream (Of (T2 Int64 PactRow)) IO r -> IO ()) + -> IO () +withLatestPactStateAt db bh withTable = do + endingTxId <- liftIO $ getEndingTxId db bh + + getLatestPactTableNamesAt db bh + & S.mapM_ (\tbl -> do + -- ❯ sqlite3 pact-v1-chain-0.sqlite 'EXPLAIN QUERY PLAN SELECT rowid, rowkey, rowdata, txid FROM [coin_coin-table] WHERE txid < 100 ORDER BY rowid DESC' + -- QUERY PLAN + -- `--SCAN coin_coin-table + + let qryText = "SELECT rowid, rowkey, txid, rowdata FROM " + <> "[" <> tbl <> "]" + <> " WHERE txid < ?1" + <> " ORDER BY rowid DESC" + + qryStream db qryText [SInt endingTxId] [RInt, RText, RInt, RBlob] $ \rows -> do + let go :: () + => Set ByteString + -> Stream (Of [SType]) IO (Either Direct.Error ()) + -> Stream (Of (T2 Int64 PactRow)) IO () + go !seen s = do + e <- liftIO (S.next s) + case e of + Left (Left sqlErr) -> do + error $ "withLatestPactStateAt: Encountered SQLite error: " <> show sqlErr + Left (Right ()) -> do + pure () + Right (row, rest) -> do + case row of + [SInt rowid, SText (Utf8 rk), SInt tid, SBlob rd] -> do + let element = T2 rowid $ PactRow + { rowKey = rk + , rowData = rd + , txId = tid + } + if | tbl == "SYS:Pacts" && rd == "null" -> do + S.yield element + go (Set.insert rk seen) rest + | rk `Set.member` seen -> do + go seen rest + | otherwise -> do + S.yield element + go (Set.insert rk seen) rest + _ -> do + error "getLatestPactState: invalid query" + + withTable (fromUtf8 tbl) (go Set.empty rows) + ) -- | A pact table - just its name and its rows. data Table = Table @@ -330,13 +425,17 @@ data PactRowContents = PactRowContents -- contains the pact db for the given ChainId. doesPactDbExist :: ChainId -> FilePath -> IO Bool doesPactDbExist cid dbDir = do - let chainDbFileName = mconcat - [ "pact-v1-chain-" - , Text.unpack (chainIdToText cid) - , ".sqlite" - ] - let file = dbDir chainDbFileName - doesFileExist file + doesFileExist (chainDbFileName cid dbDir) + +-- | Given a pact database directory, return the SQLite +-- path chainweb uses for the given ChainId. +chainDbFileName :: ChainId -> FilePath -> FilePath +chainDbFileName cid dbDir = dbDir mconcat + [ "pact-v1-chain-" + , Text.unpack (chainIdToText cid) + , ".sqlite" + ] + addChainIdLabel :: (Logger logger) => ChainId diff --git a/src/Chainweb/Pact/Backend/PactState/Diff.hs b/src/Chainweb/Pact/Backend/PactState/Diff.hs index 7930752991..5dd52e29fc 100644 --- a/src/Chainweb/Pact/Backend/PactState/Diff.hs +++ b/src/Chainweb/Pact/Backend/PactState/Diff.hs @@ -5,6 +5,7 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -18,46 +19,43 @@ -- -- Diff Pact state between two databases. module Chainweb.Pact.Backend.PactState.Diff - ( pactDiffMain + ( main ) where -import Data.IORef (newIORef, readIORef, atomicModifyIORef') +import Streaming.Prelude qualified as S +import Chainweb.BlockHeight (BlockHeight) +import Chainweb.Logger (logFunctionText, logFunctionJson) +import Chainweb.Pact.Backend.Compaction qualified as C +import Chainweb.Pact.Backend.PactState (TableDiffable(..), getLatestPactStateAtDiffable, doesPactDbExist, withChainDb, allChains) +import Chainweb.Utils (fromText, toText) +import Chainweb.Version (ChainwebVersion(..), ChainId, chainIdToText) +import Chainweb.Version.Mainnet (mainnet) +import Chainweb.Version.Registry (lookupVersionByName) import Control.Monad (forM_, when, void) import Control.Monad.IO.Class (MonadIO(liftIO)) import Data.Aeson ((.=)) import Data.Aeson qualified as Aeson import Data.ByteString (ByteString) +import Data.IORef (newIORef, readIORef, atomicModifyIORef') import Data.Map (Map) import Data.Map.Merge.Strict qualified as Merge import Data.Map.Strict qualified as M import Data.Maybe (fromMaybe) import Data.Text (Text) import Data.Text qualified as Text -import Data.Text.IO qualified as Text import Data.Text.Encoding qualified as Text +import Data.Text.IO qualified as Text import Options.Applicative - -import Chainweb.Logger (logFunctionText, logFunctionJson) -import Chainweb.Utils (fromText, toText) -import Chainweb.Version (ChainwebVersion(..), ChainId, chainIdToText) -import Chainweb.Version.Mainnet (mainnet) -import Chainweb.Version.Registry (lookupVersionByName) -import Chainweb.Pact.Backend.Compaction (TargetBlockHeight(..)) -import Chainweb.Pact.Backend.Compaction qualified as C -import Chainweb.Pact.Backend.PactState (TableDiffable(..), getLatestPactStateAtDiffable, getLatestPactStateDiffable, doesPactDbExist, withChainDb, allChains) - +import Streaming.Prelude (Stream, Of) import System.Exit (exitFailure) import System.LogLevel (LogLevel(..)) -import Streaming.Prelude (Stream, Of) -import Streaming.Prelude qualified as S - data PactDiffConfig = PactDiffConfig { firstDbDir :: FilePath , secondDbDir :: FilePath , chainwebVersion :: ChainwebVersion - , target :: TargetBlockHeight + , target :: BlockHeight , logDir :: FilePath } @@ -72,8 +70,8 @@ instance Semigroup IsDifferent where instance Monoid IsDifferent where mempty = NoDifference -pactDiffMain :: IO () -pactDiffMain = do +main :: IO () +main = do cfg <- execParser opts when (cfg.firstDbDir == cfg.secondDbDir) $ do @@ -99,12 +97,7 @@ pactDiffMain = do withChainDb cid logger cfg.firstDbDir $ \_ db1 -> do withChainDb cid logger cfg.secondDbDir $ \_ db2 -> do logText Info "[Starting diff]" - let getPactState db = case cfg.target of - LatestUnsafe -> getLatestPactStateDiffable db - LatestSafe -> liftIO $ do - logText Error "LatestSafe is not supported by pact-diff, use Target instead" - exitFailure - Target bh -> getLatestPactStateAtDiffable db bh + let getPactState db = getLatestPactStateAtDiffable db cfg.target let diff :: Stream (Of (Text, Stream (Of RowKeyDiffExists) IO ())) IO () diff = diffLatestPactState (getPactState db1) (getPactState db2) isDifferent <- S.foldMap_ id $ flip S.mapM diff $ \(tblName, tblDiff) -> do @@ -136,29 +129,11 @@ pactDiffMain = do parser :: Parser PactDiffConfig parser = PactDiffConfig - <$> strOption - (long "first-database-dir" - <> metavar "PACT_DB_DIRECTORY" - <> help "First Pact database directory") - <*> strOption - (long "second-database-dir" - <> metavar "PACT_DB_DIRECTORY" - <> help "Second Pact database directory") - <*> (fmap parseChainwebVersion $ strOption - (long "graph-version" - <> metavar "CHAINWEB_VERSION" - <> help "Chainweb version for graph. Only needed for non-standard graphs." - <> value (toText (_versionName mainnet)) - <> showDefault)) - <*> (fmap Target (fromIntegral @Int <$> option auto - (long "target-blockheight" - <> metavar "BLOCKHEIGHT" - <> help "Target Blockheight")) <|> pure LatestUnsafe) - <*> strOption - (long "log-dir" - <> metavar "LOG_DIRECTORY" - <> help "Directory where logs will be placed" - <> value ".") + <$> strOption (long "first-database-dir" <> help "First Pact database directory") + <*> strOption (long "second-database-dir" <> help "Second Pact database directory") + <*> fmap parseChainwebVersion (strOption (long "graph-version" <> help "Chainweb version for graph. Only needed for non-standard graphs." <> value (toText (_versionName mainnet)) <> showDefault)) + <*> fmap (fromIntegral @Int) (option auto (long "target-blockheight" <> metavar "BLOCKHEIGHT" <> help "Target Blockheight")) + <*> strOption (long "log-dir" <> help "Directory where logs will be placed" <> value ".") parseChainwebVersion :: Text -> ChainwebVersion parseChainwebVersion = lookupVersionByName . fromMaybe (error "ChainwebVersion parse failed") . fromText @@ -235,6 +210,6 @@ diffLatestPactState = go error "right stream longer than left" (Right (t1, next1), Right (t2, next2)) -> do when (t1.name /= t2.name) $ do - error "diffLatestPactState: mismatched table names" + error $ "diffLatestPactState: mismatched table names: " <> Text.unpack t1.name <> " vs. " <> Text.unpack t2.name S.yield (t1.name, diffTables t1 t2) go next1 next2 diff --git a/test/Chainweb/Test/MultiNode.hs b/test/Chainweb/Test/MultiNode.hs index 2f99f7e951..a0439ea50e 100644 --- a/test/Chainweb/Test/MultiNode.hs +++ b/test/Chainweb/Test/MultiNode.hs @@ -47,7 +47,7 @@ import Control.Concurrent import Control.Concurrent.Async import Control.DeepSeq import Control.Exception -import Control.Lens (set, view, over, (^?!), ix) +import Control.Lens (set, view, (^?!), ix) import Control.Monad import Chronos qualified @@ -74,9 +74,9 @@ import Numeric.Natural import qualified Streaming.Prelude as S import Prelude hiding (log) +import System.Directory (createDirectoryIfMissing) import System.FilePath import System.IO.Temp -import System.Logger qualified as YAL import System.LogLevel import System.Timeout @@ -91,7 +91,7 @@ import Chainweb.Chainweb import Chainweb.Chainweb.Configuration import Chainweb.Chainweb.CutResources import Chainweb.Chainweb.PeerResources -import Chainweb.Pact.Backend.Compaction qualified as C +import Chainweb.Pact.Backend.Compaction qualified as Sigma import Chainweb.Pact.Backend.Utils (withSqliteDb) import Chainweb.Cut import Chainweb.CutDB @@ -105,7 +105,7 @@ import Chainweb.Pact.Backend.PactState.GrandHash.Calc qualified as GrandHash.Cal import Chainweb.Pact.Backend.PactState.GrandHash.Import qualified as GrandHash.Import import Chainweb.Pact.Backend.PactState.GrandHash.Utils qualified as GrandHash.Utils import Chainweb.Test.P2P.Peer.BootstrapConfig -import Chainweb.Test.Pact.Utils (compactUntilAvailable) +import Chainweb.Test.Pact.Utils (sigmaCompact) import Chainweb.Test.Utils import Chainweb.Time (Seconds(..)) import Chainweb.Utils @@ -330,14 +330,14 @@ compactLiveNodeTest :: () -> Natural -> RocksDb -> FilePath + -> FilePath -> (String -> IO ()) -> IO () -compactLiveNodeTest logLevel v n rocksDb pactDir step = do +compactLiveNodeTest logLevel v n rocksDb srcPactDir targetPactDir step = do let logFun = step . T.unpack let logger = genericLogger logLevel logFun logFun "Phase 1... creating blocks" - logFun $ T.pack pactDir -- N.B: This consensus state stuff counts the number of blocks -- in RocksDB, rather than the number of blocks in all chains @@ -347,21 +347,30 @@ compactLiveNodeTest logLevel v n rocksDb pactDir step = do let ct :: Int -> StartedChainweb logger -> IO () ct = harvestConsensusState logger stateVar do - runNodesForSeconds logLevel logFun (multiConfig v n) n 10 rocksDb pactDir ct + runNodesForSeconds logLevel logFun (multiConfig v n) n 10 rocksDb srcPactDir ct Just stats1 <- consensusStateSummary <$> swapMVar stateVar (emptyConsensusState v) assertGe "average block count before proceeding" (Actual $ _statBlockCount stats1) (Expected 50) logFun $ sshow stats1 let compactAll = Chronos.stopwatch_ $ do threadDelay 5_000_000 - C.withDefaultLogger logLevel $ \lgr -> do + Sigma.withDefaultLogger logLevel $ \lgr -> do forM_ [0 .. int @_ @Word n - 1] $ \nid -> do + -- We haven't gotten to run the node against the target yet, + -- and nothing has created its db directories, so we do that here. + createDirectoryIfMissing False (targetPactDir show nid) forM_ (allChains v) $ \cid -> do let logger' = addLabel ("nodeId", sshow nid) $ addLabel ("chainId", chainIdToText cid) lgr - withSqliteDb cid logger' (pactDir show nid) False $ \sqlEnv -> do - void $ compactUntilAvailable (C.Target (BlockHeight 25)) logger' sqlEnv [C.NoVacuum] + withSqliteDb cid logger' (srcPactDir show nid) False $ \srcDb -> do + withSqliteDb cid logger' (targetPactDir show nid) False $ \targetDb -> do + sigmaCompact srcDb targetDb (BlockHeight 25) + let run = Chronos.stopwatch_ $ do - runNodesForSeconds logLevel logFun (multiConfig v n) n 60 rocksDb pactDir ct + -- It may seem a bit strange that we never run the node against the + -- compacted state (see that we are using 'srcPactDir' here). This is + -- because we are only trying to see that compacting a node as it is, + -- does not disrupt it. + runNodesForSeconds logLevel logFun (multiConfig v n) n 60 rocksDb srcPactDir ct Just stats2 <- consensusStateSummary <$> swapMVar stateVar (emptyConsensusState v) assertGe "average block count before proceeding" (Actual $ _statBlockCount stats2) (Expected 100) logFun $ sshow stats2 @@ -500,7 +509,7 @@ pactImportTest logLevel v n rocksDb pactDir step = do -- Each node creates blocks -- We wait until they've made a sufficient amount of blocks -- We stop the nodes --- We open sqlite connections to some of the database dirs and compact them +-- We compact the databases of each node -- We restart all nodes with the same database dirs -- We observe that they can make progress compactAndResumeTest :: () @@ -508,10 +517,12 @@ compactAndResumeTest :: () -> ChainwebVersion -> Natural -> RocksDb + -> RocksDb + -> FilePath -> FilePath -> (String -> IO ()) -> IO () -compactAndResumeTest logLevel v n rdb pactDbDir step = do +compactAndResumeTest logLevel v n srcRocksDb targetRocksDb srcPactDir targetPactDir step = do let logFun = step . T.unpack let logger = genericLogger logLevel logFun @@ -523,26 +534,29 @@ compactAndResumeTest logLevel v n rdb pactDbDir step = do stateVar <- newMVar (emptyConsensusState v) let ct :: Int -> StartedChainweb logger -> IO () ct = harvestConsensusState logger stateVar - runNodesForSeconds logLevel logFun (multiConfig v n) n 60 rdb pactDbDir ct + runNodesForSeconds logLevel logFun (multiConfig v n) n 10 srcRocksDb srcPactDir ct Just stats1 <- consensusStateSummary <$> swapMVar stateVar (emptyConsensusState v) assertGe "average block count before compaction" (Actual $ _statBlockCount stats1) (Expected 50) logFun $ sshow stats1 logFun "phase 2... compacting" - let cid = unsafeChainId 0 - -- compact only half of them - let nids = filter even [0 .. int @_ @Int n - 1] - forM_ nids $ \nid -> do - let dir = pactDbDir show nid - withSqliteDb cid logger dir False $ \sqlEnv -> do - C.withDefaultLogger Warn $ \cLogger -> do - let cLogger' = over YAL.setLoggerScope (\scope -> ("nodeId",sshow nid) : ("chainId",sshow cid) : scope) cLogger - let flags = [C.NoVacuum] - let bh = BlockHeight 5 - void $ compactUntilAvailable (C.Target bh) cLogger' sqlEnv flags + + logFun "phase 2.1...compacting pact state" + forM_ [0 .. int @_ @Word n - 1] $ \nid -> do + forM_ (allChains v) $ \cid -> do + let logger' = addLabel ("nodeId", sshow nid) $ addLabel ("chainId", chainIdToText cid) logger + withSqliteDb cid logger' (srcPactDir show nid) False $ \srcDb -> do + withSqliteDb cid logger' (targetPactDir show nid) False $ \targetDb -> do + sigmaCompact srcDb targetDb (BlockHeight 25) + + logFun "phase 2.2...compacting RocksDB" + forM_ [0 .. int @_ @Word n - 1] $ \nid -> do + let srcRdb = srcRocksDb { _rocksDbNamespace = T.encodeUtf8 (toText nid) } + let tgtRdb = targetRocksDb { _rocksDbNamespace = T.encodeUtf8 (toText nid) } + Sigma.compactRocksDb (addLabel ("nodeId", sshow nid) logger) v (allChains v) 20 srcRdb tgtRdb logFun "phase 3... restarting nodes and ensuring progress" - runNodesForSeconds logLevel logFun (multiConfig v n) { _configFullHistoricPactState = False } n 60 rdb pactDbDir ct + runNodesForSeconds logLevel logFun (multiConfig v n) { _configFullHistoricPactState = False } n 10 targetRocksDb targetPactDir ct Just stats2 <- consensusStateSummary <$> swapMVar stateVar (emptyConsensusState v) -- We ensure that we've gotten to at least 1.5x the previous block count assertGe "average block count post-compaction" (Actual $ _statBlockCount stats2) (Expected (3 * _statBlockCount stats1 `div` 2)) diff --git a/test/Chainweb/Test/Pact/PactMultiChainTest.hs b/test/Chainweb/Test/Pact/PactMultiChainTest.hs index 49cdb38c11..1734b54c8f 100644 --- a/test/Chainweb/Test/Pact/PactMultiChainTest.hs +++ b/test/Chainweb/Test/Pact/PactMultiChainTest.hs @@ -24,6 +24,8 @@ import Data.IORef import Data.List(isPrefixOf) import Data.List qualified as List import Data.Maybe +import Data.Map qualified as M +import Data.Map.Merge.Strict qualified as M import Data.Set (Set) import Data.Set qualified as Set import qualified Data.Text as T @@ -31,8 +33,6 @@ import qualified Data.Vector as V import Test.Tasty import Test.Tasty.HUnit import System.IO.Unsafe -import System.Logger qualified as YAL -import System.LogLevel -- internal modules @@ -58,7 +58,6 @@ import Chainweb.Cut import Chainweb.Mempool.Mempool import Chainweb.Miner.Pact import Chainweb.Pact.Backend.Types -import Chainweb.Pact.Backend.Compaction qualified as C import Chainweb.Pact.PactService import Chainweb.Pact.Service.Types import Chainweb.Pact.TransactionExec (listErrMsg) @@ -91,6 +90,7 @@ data MultiEnv = MultiEnv { _menvBdb :: !TestBlockDb , _menvPact :: !WebPactExecutionService , _menvPacts :: !(HM.HashMap ChainId (SQLiteEnv, PactExecutionService)) + , _menvCompactedPacts :: !(HM.HashMap ChainId (SQLiteEnv, PactExecutionService)) , _menvMpa :: !(IO (IORef MemPoolAccess)) , _menvMiner :: !Miner , _menvChainId :: !ChainId @@ -160,9 +160,9 @@ tests = testGroup testName withTestBlockDb testVersion $ \bdb -> do (iompa,mpa) <- dmpio let logger = hunitDummyLogger step - withWebPactExecutionService logger testVersion pactConfig bdb mpa gasmodel $ \(pact,pacts) -> + withWebPactExecutionServiceCompaction logger testVersion pactConfig bdb mpa gasmodel $ \(pact,pacts,_cPact,cPacts) -> runReaderT f $ - MultiEnv bdb pact pacts (return iompa) noMiner cid + MultiEnv bdb pact pacts cPacts (return iompa) noMiner cid minerKeysetTest :: PactTestM () minerKeysetTest = do @@ -390,12 +390,6 @@ runLocalWithDepth nonce depth cid' cmd = do cwCmd <- buildCwCmd nonce testVersion cmd liftIO $ try @_ @PactException $ _pactLocal pact Nothing Nothing depth cwCmd -getSqlite :: ChainId -> PactTestM SQLiteEnv -getSqlite cid' = do - HM.lookup cid' <$> view menvPacts >>= \case - Just (dbEnv, _) -> return dbEnv - Nothing -> liftIO $ assertFailure $ "No SQLite found for chain id " ++ show cid' - getPactService :: ChainId -> PactTestM PactExecutionService getPactService cid' = do HM.lookup cid' <$> view menvPacts >>= \case @@ -1293,18 +1287,10 @@ compactAndSyncTest = do runBlockTest [ PactTxTest (buildBasic $ mkExec' "1") (assertTxSuccess "should allow innocent transaction" (pDecimal 1)) ] + -- save the cut with the tx, we'll return to it after compaction cutWithTx <- currentCut - currentCid <- view menvChainId - dbEnv <- getSqlite currentCid - - liftIO $ C.withDefaultLogger Warn $ \cLogger -> do - let cLogger' = over YAL.setLoggerScope (\scope -> ("chainId",sshow currentCid) : scope) cLogger - let flags = [C.NoVacuum] - -- compact to before the tx happened - void $ compactUntilAvailable (C.Target start) cLogger' dbEnv flags - -- now sync to after the tx and expect no errors - syncTo cutWithTx + withCompacted start $ syncTo cutWithTx compactionCompactsUnmodifiedTables :: PactTestM () compactionCompactsUnmodifiedTables = do @@ -1313,7 +1299,7 @@ compactionCompactsUnmodifiedTables = do runBlockTest -- create table [ PactTxTest - (buildBasicGas 70000 $ mkExec' $ mconcat + (buildBasicGas 70_000 $ mkExec' $ mconcat [ "(namespace 'free)" , "(module dbmod G (defcap G () true)" , " (defschema sch i:integer)" @@ -1340,16 +1326,10 @@ compactionCompactsUnmodifiedTables = do -- compact to the empty block, before we've written to the table but after -- creating it - currentCid <- view menvChainId - dbEnv <- getSqlite currentCid - liftIO $ C.withDefaultLogger Warn $ \cLogger -> do - let cLogger' = over YAL.setLoggerScope (\scope -> ("chainId",sshow currentCid) : scope) cLogger - let flags = [C.NoVacuum] - void $ compactUntilAvailable (C.Target (start + 2)) cLogger' dbEnv flags - - -- fast forward to after we did the write, expecting the write to not fail - -- due to a duplicate row - syncTo afterWrite + withCompacted (start + 2) $ do + -- fast forward to after we did the write, expecting the write to not fail + -- due to a duplicate row + syncTo afterWrite quirkTest :: TestTree quirkTest = do @@ -1381,8 +1361,8 @@ quirkTest = do withTestBlockDb realVersion $ \bdb -> do (iompa,mpa) <- dmpio let logger = hunitDummyLogger step - withWebPactExecutionService logger realVersion testPactServiceConfig bdb mpa getGasModel $ \(pact,pacts) -> - flip runReaderT (MultiEnv bdb pact pacts (return iompa) noMiner cid) $ do + withWebPactExecutionServiceCompaction logger realVersion testPactServiceConfig bdb mpa getGasModel $ \(pact,pacts,_cPact,cPacts) -> + flip runReaderT (MultiEnv bdb pact pacts cPacts (return iompa) noMiner cid) $ do runToHeight 99 -- run the command once without it being quirked, to establish @@ -1834,3 +1814,16 @@ cbResult = do (o,_h) <- getPWO chid liftIO $ decodeStrictOrThrow @_ @(CommandResult Hash) (_coinbaseOutput $ _payloadWithOutputsCoinbase o) + +-- Compact and return a new MultiEnv +withCompacted :: BlockHeight -> PactTestM x -> PactTestM x +withCompacted height pt = do + srcDbs <- fmap (M.fromList . HM.toList) $ view menvPacts + targetDbs <- fmap (M.fromList . HM.toList) $ view menvCompactedPacts + let dbs :: M.Map ChainId (SQLiteEnv, SQLiteEnv) + dbs = M.merge M.dropMissing M.dropMissing (M.zipWithMatched (\_ e1 e2 -> (fst e1, fst e2))) srcDbs targetDbs + forM_ (M.toList dbs) $ \(_, (srcDb, targetDb)) -> do + liftIO $ sigmaCompact srcDb targetDb height + + targetPacts <- view menvCompactedPacts + local (\me -> me & menvPacts .~ targetPacts) pt diff --git a/test/Chainweb/Test/Pact/PactSingleChainTest.hs b/test/Chainweb/Test/Pact/PactSingleChainTest.hs index 78755bc972..0b6a1bfb73 100644 --- a/test/Chainweb/Test/Pact/PactSingleChainTest.hs +++ b/test/Chainweb/Test/Pact/PactSingleChainTest.hs @@ -22,10 +22,13 @@ import Control.DeepSeq import Control.Lens hiding ((.=), matching) import Control.Monad import Control.Monad.Catch +import Data.Ord (Down(..)) import Patience qualified as PatienceL import Patience.Map qualified as PatienceM import Patience.Map (Delta(..)) +import Streaming.Prelude qualified as S +import Data.Int (Int64) import Data.Aeson (object, (.=), Value(..), eitherDecode) import qualified Data.ByteString.Lazy as BL import Data.Either (isLeft, isRight, fromRight) @@ -40,6 +43,7 @@ import Data.Text (Text) import qualified Data.Text.Encoding as T import qualified Data.Text.IO as T import qualified Data.Vector as V +import Database.SQLite3 qualified as Lite import GHC.Stack @@ -68,7 +72,6 @@ import Chainweb.Logger (genericLogger) import Chainweb.Mempool.Mempool import Chainweb.MerkleLogHash (unsafeMerkleLogHash) import Chainweb.Miner.Pact -import Chainweb.Pact.Backend.Compaction qualified as C import Chainweb.Pact.Backend.PactState.GrandHash.Algorithm (computeGrandHash) import Chainweb.Pact.Backend.PactState qualified as PS import Chainweb.Pact.Backend.Types hiding (RunnableBlock(..)) @@ -80,7 +83,7 @@ import Chainweb.Pact.Types import Chainweb.Pact.Utils (emptyPayload) import Chainweb.Payload import Chainweb.Test.Cut.TestBlockDb -import Chainweb.Test.Pact.Utils hiding (compact) +import Chainweb.Test.Pact.Utils import Chainweb.Test.Pact.Utils qualified as Utils import Chainweb.Test.Utils import Chainweb.Test.TestVersions @@ -90,6 +93,8 @@ import Chainweb.Utils import Chainweb.Version import Chainweb.Version.Utils import Chainweb.WebBlockHeaderDB (getWebBlockHeaderDb) +import Pact.Types.SQLite (SType(..), RType(..)) +import Pact.Types.SQLite qualified as Pact import Chainweb.Storage.Table.RocksDB @@ -133,6 +138,7 @@ tests rdb = testGroup testName , compactionUserTablesDropped rdb , compactionGrandHashUnchanged rdb , compactionDoesNotDisruptDuplicateDetection rdb + , compactionResilientToRowIdOrdering rdb ] where testName = "Chainweb.Test.Pact.PactSingleChainTest" @@ -341,22 +347,23 @@ rosettaFailsWithoutFullHistory :: () => RocksDb -> TestTree rosettaFailsWithoutFullHistory rdb = - withTemporaryDir $ \iodir -> - withSqliteDb cid iodir $ \sqlEnvIO -> + withTemporaryDir $ \srcDir -> withSqliteDb cid srcDir $ \srcSqlEnvIO -> + withTemporaryDir $ \targetDir -> withSqliteDb cid targetDir $ \targetSqlEnvIO -> withDelegateMempool $ \dm -> independentSequentialTestGroup "rosettaFailsWithoutFullHistory" [ -- Run some blocks and then compact - withPactTestBlockDb' testVersion cid rdb sqlEnvIO mempty testPactServiceConfig $ \reqIO -> + withPactTestBlockDb' testVersion cid rdb srcSqlEnvIO mempty testPactServiceConfig $ \reqIO -> testCase "runBlocksAndCompact" $ do - (sqlEnv, q, bdb) <- reqIO + (srcSqlEnv, q, bdb) <- reqIO mempoolRef <- fmap (pure . fst) dm setOneShotMempool mempoolRef =<< goldenMemPool replicateM_ 10 $ void $ runBlock q bdb second - Utils.compact Error [C.NoVacuum] sqlEnv (C.Target (BlockHeight 5)) + targetSqlEnv <- targetSqlEnvIO + Utils.sigmaCompact srcSqlEnv targetSqlEnv (BlockHeight 5) -- This needs to run after the previous test -- Annoyingly, we must inline the PactService util starts here. @@ -365,7 +372,7 @@ rosettaFailsWithoutFullHistory rdb = pactQueue <- newPactQueue 2000 blockDb <- mkTestBlockDb testVersion rdb bhDb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb blockDb) cid - sqlEnv <- sqlEnvIO + sqlEnv <- targetSqlEnvIO mempool <- fmap snd dm let payloadDb = _bdbPayloadDb blockDb let cfg = testPactServiceConfig { _pactFullHistoryRequired = True } @@ -385,14 +392,14 @@ rewindPastMinBlockHeightFails :: () -> TestTree rewindPastMinBlockHeightFails rdb = compactionSetup "rewindPastMinBlockHeightFails" rdb testPactServiceConfig $ \cr -> do - replicateM_ 10 $ runBlock cr.pactQueue cr.blockDb second + replicateM_ 10 $ runBlock cr.srcPactQueue cr.blockDb second - Utils.compact Error [C.NoVacuum] cr.sqlEnv (C.Target (BlockHeight 5)) + Utils.sigmaCompact cr.srcSqlEnv cr.targetSqlEnv (BlockHeight 5) -- Genesis block header; compacted away by now let bh = genesisBlockHeader testVersion cid - syncResult <- try (pactSyncToBlock bh cr.pactQueue) + syncResult <- try (pactSyncToBlock bh cr.targetPactQueue) case syncResult of Left (BlockHeaderLookupFailure {}) -> do return () @@ -417,49 +424,23 @@ pactStateSamePreAndPostCompaction rdb = $ defaultCmd replicateM_ numBlocks $ do - runTxInBlock_ cr.mempoolRef cr.pactQueue cr.blockDb + runTxInBlock_ cr.mempoolRef cr.srcPactQueue cr.blockDb $ \n _ _ bHeader -> makeTx n bHeader - let db = cr.sqlEnv + statePreCompaction <- getLatestPactState cr.srcSqlEnv + Utils.sigmaCompact cr.srcSqlEnv cr.targetSqlEnv (BlockHeight numBlocks) + statePostCompaction <- getLatestPactState cr.targetSqlEnv - statePreCompaction <- getLatestPactState db - Utils.compact Error [C.NoVacuum] cr.sqlEnv (C.Target (BlockHeight numBlocks)) - statePostCompaction <- getLatestPactState db - - let stateDiff = M.filter (not . PatienceM.isSame) (PatienceM.diff statePreCompaction statePostCompaction) - when (not (null stateDiff)) $ do - T.putStrLn "" - forM_ (M.toList stateDiff) $ \(tbl, delta) -> do - T.putStrLn "" - T.putStrLn tbl - case delta of - Same _ -> do - pure () - Old x -> do - putStrLn $ "a pre-only value appeared in the pre- and post-compaction diff: " ++ show x - New x -> do - putStrLn $ "a post-only value appeared in the pre- and post-compaction diff: " ++ show x - Delta x1 x2 -> do - let daDiff = M.filter (not . PatienceM.isSame) (PatienceM.diff x1 x2) - forM_ daDiff $ \item -> do - case item of - Old x -> do - putStrLn $ "old: " ++ show x - New x -> do - putStrLn $ "new: " ++ show x - Same _ -> do - pure () - Delta x y -> do - putStrLn $ "old: " ++ show x - putStrLn $ "new: " ++ show y - putStrLn "" - assertFailure "pact state check failed" + comparePactStateBeforeAndAfter statePreCompaction statePostCompaction compactionIsIdempotent :: () => RocksDb -> TestTree compactionIsIdempotent rdb = - compactionSetup "compactionIdempotent" rdb testPactServiceConfig $ \cr -> do + -- This requires a bit more than 'compactionSetup', since we + -- are compacting more than once. + withTemporaryDir $ \twiceDir -> withSqliteDb cid twiceDir $ \twiceSqlEnvIO -> + compactionSetup "compactionIsIdempotent" rdb testPactServiceConfig $ \cr -> do let numBlocks :: Num a => a numBlocks = 100 @@ -471,19 +452,19 @@ compactionIsIdempotent rdb = $ defaultCmd replicateM_ numBlocks $ do - runTxInBlock_ cr.mempoolRef cr.pactQueue cr.blockDb + runTxInBlock_ cr.mempoolRef cr.srcPactQueue cr.blockDb $ \n _ _ bHeader -> makeTx n bHeader - let db = cr.sqlEnv - - let compact h = - Utils.compact Error [C.NoVacuum] cr.sqlEnv h - - let compactionHeight = C.Target (BlockHeight numBlocks) - compact compactionHeight - statePostCompaction1 <- getPactUserTables db - compact compactionHeight - statePostCompaction2 <- getPactUserTables db + twiceSqlEnv <- twiceSqlEnvIO + let targetHeight = BlockHeight numBlocks + -- Compact 'src' into 'target' + Utils.sigmaCompact cr.srcSqlEnv cr.targetSqlEnv targetHeight + -- Get table contents of 'target' + statePostCompaction1 <- getPactUserTables cr.targetSqlEnv + -- Compact 'target' into 'twice' + Utils.sigmaCompact cr.targetSqlEnv twiceSqlEnv targetHeight + -- Get table state of 'twice' + statePostCompaction2 <- getPactUserTables twiceSqlEnv let stateDiff = M.filter (not . PatienceM.isSame) (PatienceM.diff statePostCompaction1 statePostCompaction2) when (not (null stateDiff)) $ do @@ -525,15 +506,13 @@ compactionDoesNotDisruptDuplicateDetection rdb = do $ set cbRPC (mkExec' "(coin.transfer \"sender00\" \"sender01\" 1.0)") $ defaultCmd - let run = do - runTxInBlock cr.mempoolRef cr.pactQueue cr.blockDb - $ \_ _ _ _ -> makeTx + e1 <- runTxInBlock cr.mempoolRef cr.srcPactQueue cr.blockDb (\_ _ _ _ -> makeTx) + assertBool "First tx submission succeeds" (isRight e1) - run >>= \e -> assertBool "First tx submission succeeds" (isRight e) - Utils.compact Error [C.NoVacuum] cr.sqlEnv C.LatestUnsafe - run >>= \e -> assertBool "First tx submission fails" (isLeft e) + Utils.sigmaCompact cr.srcSqlEnv cr.targetSqlEnv =<< PS.getLatestBlockHeight cr.srcSqlEnv - pure () + e2 <- runTxInBlock cr.mempoolRef cr.targetPactQueue cr.blockDb (\_ _ _ _ -> makeTx) + assertBool "First tx submission fails" (isLeft e2) -- | Test that user tables created before the compaction height are kept, -- while those created after the compaction height are dropped. @@ -600,23 +579,19 @@ compactionUserTablesDropped rdb = else do mkTable madeAfterTable afterTable } - void $ runBlock cr.pactQueue cr.blockDb second + void $ runBlock cr.srcPactQueue cr.blockDb second let freeBeforeTbl = "free.m0_" <> beforeTable let freeAfterTbl = "free.m1_" <> afterTable - let db = cr.sqlEnv - - statePre <- getPactUserTables db - let assertExists tbl = do - let msg = "Table " ++ T.unpack tbl ++ " should exist pre-compaction, but it doesn't." - assertBool msg (isJust (M.lookup tbl statePre)) - assertExists freeBeforeTbl - assertExists freeAfterTbl + statePre <- getPactUserTables cr.srcSqlEnv + forM_ [freeBeforeTbl, freeAfterTbl] $ \tbl -> do + let msg = "Table " ++ T.unpack tbl ++ " should exist pre-compaction, but it doesn't." + assertBool msg (isJust (M.lookup tbl statePre)) - Utils.compact Error [C.NoVacuum] cr.sqlEnv (C.Target (BlockHeight halfwayPoint)) + Utils.sigmaCompact cr.srcSqlEnv cr.targetSqlEnv (BlockHeight halfwayPoint) - statePost <- getPactUserTables db + statePost <- getPactUserTables cr.targetSqlEnv flip assertBool (isJust (M.lookup freeBeforeTbl statePost)) $ T.unpack beforeTable ++ " was dropped; it wasn't supposed to be." @@ -641,18 +616,103 @@ compactionGrandHashUnchanged rdb = $ defaultCmd replicateM_ numBlocks - $ runTxInBlock_ cr.mempoolRef cr.pactQueue cr.blockDb + $ runTxInBlock_ cr.mempoolRef cr.srcPactQueue cr.blockDb $ \n _ _ blockHeader -> makeTx n blockHeader - let db = cr.sqlEnv let targetHeight = BlockHeight numBlocks - hashPreCompaction <- computeGrandHash (PS.getLatestPactStateAt db targetHeight) - Utils.compact Error [C.NoVacuum] cr.sqlEnv (C.Target targetHeight) - hashPostCompaction <- computeGrandHash (PS.getLatestPactStateAt db targetHeight) + hashPreCompaction <- computeGrandHash (PS.getLatestPactStateAt cr.srcSqlEnv targetHeight) + Utils.sigmaCompact cr.srcSqlEnv cr.targetSqlEnv targetHeight + hashPostCompaction <- computeGrandHash (PS.getLatestPactStateAt cr.targetSqlEnv targetHeight) assertEqual "GrandHash pre- and post-compaction are the same" hashPreCompaction hashPostCompaction +compactionResilientToRowIdOrdering :: () + => RocksDb + -> TestTree +compactionResilientToRowIdOrdering rdb = + compactionSetup "compactionResilientToRowIdOrdering" rdb testPactServiceConfig $ \cr -> do + + let numBlocks :: Num a => a + numBlocks = 100 + + -- Just run a bunch of blocks + setOneShotMempool cr.mempoolRef =<< goldenMemPool + let makeTx :: Word -> BlockHeader -> IO ChainwebTransaction + makeTx nth bh = buildCwCmd (sshow nth) testVersion + $ set cbSigners [mkEd25519Signer' sender00 [mkGasCap, mkTransferCap "sender00" "sender01" 1.0]] + $ setFromHeader bh + $ set cbRPC (mkExec' "(coin.transfer \"sender00\" \"sender01\" 1.0)") + $ defaultCmd + replicateM_ numBlocks + $ runTxInBlock_ cr.mempoolRef cr.srcPactQueue cr.blockDb + $ \n _ _ blockHeader -> makeTx n blockHeader + + -- Get the state after running the blocks but before doing anything else + statePreCompaction <- getLatestPactState cr.srcSqlEnv + + -- Reverse all of the rowids in the table. We get all the rows in txid DESC order, like so: + -- rk1, txid=100, rowid=100 + -- rk1, txid=99, rowid=99 + -- ... + -- + -- Then we reverse the rowids, so that the table looks like this: + -- rk1, txid=100, rowid=0 + -- rk1, txid=99, rowid=1 + -- ... + -- + -- Since the compaction algorithm orders by rowid DESC, it will get the rows in reverse order to how they were inserted. + -- If compaction still results in the same end state, this confirms that the compaction algorithm is resilient to rowid ordering. + e <- PS.qryStream cr.srcSqlEnv "SELECT rowkey, txid FROM [coin_coin-table] ORDER BY txid ASC" [] [RText, RInt] $ \rows -> do + Lite.withStatement cr.srcSqlEnv "UPDATE [coin_coin-table] SET rowid = ?3 WHERE rowkey = ?1 AND txid = ?2" $ \stmt -> do + flip S.mapM_ (S.zip (S.enumFrom @_ @(Down Int64) 10_000) rows) $ \(Down newRowId, row) -> case row of + [SText rowkey, SInt txid] -> do + Pact.bindParams stmt [SText rowkey, SInt txid, SInt newRowId] + stepThenReset stmt + + _ -> error "unexpected row shape" + assertBool "Didn't encounter a sqlite error during rowid shuffling" (isRight e) + + -- Compact to the tip + Utils.sigmaCompact cr.srcSqlEnv cr.targetSqlEnv (BlockHeight numBlocks) + + -- Get the state post-randomisation and post-compaction + statePostCompaction <- getLatestPactState cr.targetSqlEnv + + -- Same logic as in 'pactStateSamePreAndPostCompaction' + comparePactStateBeforeAndAfter statePreCompaction statePostCompaction + +comparePactStateBeforeAndAfter :: (Ord k, Eq a, Show k, Show a) => M.Map Text (M.Map k a) -> M.Map Text (M.Map k a) -> IO () +comparePactStateBeforeAndAfter statePreCompaction statePostCompaction = do + let stateDiff = M.filter (not . PatienceM.isSame) (PatienceM.diff statePreCompaction statePostCompaction) + when (not (null stateDiff)) $ do + T.putStrLn "" + forM_ (M.toList stateDiff) $ \(tbl, delta) -> do + T.putStrLn "" + T.putStrLn tbl + case delta of + Same _ -> do + pure () + Old x -> do + putStrLn $ "a pre-only value appeared in the pre- and post-compaction diff: " ++ show x + New x -> do + putStrLn $ "a post-only value appeared in the pre- and post-compaction diff: " ++ show x + Delta x1 x2 -> do + let daDiff = M.filter (not . PatienceM.isSame) (PatienceM.diff x1 x2) + forM_ daDiff $ \item -> do + case item of + Old x -> do + putStrLn $ "old: " ++ show x + New x -> do + putStrLn $ "new: " ++ show x + Same _ -> do + pure () + Delta x y -> do + putStrLn $ "old: " ++ show x + putStrLn $ "new: " ++ show y + putStrLn "" + assertFailure "pact state check failed" + getHistory :: IO (IORef MemPoolAccess) -> IO (SQLiteEnv, PactQueue, TestBlockDb) -> TestTree getHistory refIO reqIO = testCase "getHistory" $ do (_, q, bdb) <- reqIO @@ -1087,8 +1147,10 @@ mempoolOf blocks = do data CompactionResources = CompactionResources { mempoolRef :: IO (IORef MemPoolAccess) , mempool :: MemPoolAccess - , sqlEnv :: SQLiteEnv - , pactQueue :: PactQueue + , srcSqlEnv :: SQLiteEnv + , targetSqlEnv :: SQLiteEnv + , srcPactQueue :: PactQueue + , targetPactQueue :: PactQueue , blockDb :: TestBlockDb } @@ -1100,30 +1162,36 @@ compactionSetup :: () -> (CompactionResources -> IO ()) -> TestTree compactionSetup pat rdb pactCfg f = - withTemporaryDir $ \iodir -> - withSqliteDb cid iodir $ \sqlEnvIO -> + withTemporaryDir $ \srcDir -> withSqliteDb cid srcDir $ \srcSqlEnvIO -> + withTemporaryDir $ \targetDir -> withSqliteDb cid targetDir $ \targetSqlEnvIO -> withDelegateMempool $ \dm -> testCase pat $ do blockDb <- mkTestBlockDb testVersion rdb bhDb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb blockDb) cid let payloadDb = _bdbPayloadDb blockDb - sqlEnv <- sqlEnvIO + srcSqlEnv <- srcSqlEnvIO + targetSqlEnv <- targetSqlEnvIO (mempoolRef, mempool) <- do (ref, nonRef) <- dm pure (pure ref, nonRef) - pactQueue <- newPactQueue 2000 + srcPactQueue <- newPactQueue 2_000 + targetPactQueue <- newPactQueue 2_000 let logger = genericLogger System.LogLevel.Error (\_ -> return ()) - void $ forkIO $ runPactService testVersion cid logger Nothing pactQueue mempool bhDb payloadDb sqlEnv pactCfg + -- Start pact service for the src and target + void $ forkIO $ runPactService testVersion cid logger Nothing srcPactQueue mempool bhDb payloadDb srcSqlEnv pactCfg + void $ forkIO $ runPactService testVersion cid logger Nothing targetPactQueue mempool bhDb payloadDb targetSqlEnv pactCfg setOneShotMempool mempoolRef =<< goldenMemPool f $ CompactionResources { mempoolRef = mempoolRef , mempool = mempool - , sqlEnv = sqlEnv - , pactQueue = pactQueue + , srcSqlEnv = srcSqlEnv + , targetSqlEnv = targetSqlEnv + , srcPactQueue = srcPactQueue + , targetPactQueue = targetPactQueue , blockDb = blockDb } @@ -1162,3 +1230,9 @@ runTxInBlock_ mempoolRef pactQueue blockDb makeTx = do runTxInBlock mempoolRef pactQueue blockDb makeTx >>= \case Left e -> assertFailure $ "newBlockAndValidate: validate: got failure result: " ++ show e Right v -> pure v + +-- | Step through a prepared statement, then clear the statement's bindings +-- and reset the statement. +stepThenReset :: Lite.Statement -> IO Lite.StepResult +stepThenReset stmt = do + Lite.stepNoCB stmt `finally` (Lite.clearBindings stmt >> Lite.reset stmt) diff --git a/test/Chainweb/Test/Pact/RemotePactTest.hs b/test/Chainweb/Test/Pact/RemotePactTest.hs index 2eaadcfb2f..6c1db7d46f 100644 --- a/test/Chainweb/Test/Pact/RemotePactTest.hs +++ b/test/Chainweb/Test/Pact/RemotePactTest.hs @@ -89,7 +89,8 @@ import Chainweb.ChainId import Chainweb.Chainweb.Configuration import Chainweb.Graph import Chainweb.Mempool.Mempool -import Chainweb.Pact.Backend.Compaction qualified as C +import Chainweb.Pact.Backend.Compaction qualified as Sigma +import Chainweb.Pact.Backend.PactState (getLatestBlockHeight) import Chainweb.Pact.Backend.Utils qualified as Backend import Chainweb.Pact.RestAPI.Client import Chainweb.Pact.RestAPI.EthSpv @@ -231,7 +232,7 @@ txlogsCompactionTest rdb = runResourceT $ do -- we are dealing with both submitting /local txs -- and compaction, so picking an arbitrary node -- to run these two operations on is fine. - let pactDir = nodePactDbDir (head nodeDbDirs) + let srcPactDir = nodePactDbDir (head nodeDbDirs) iot <- liftIO $ toTxCreationTime @Integer <$> getCurrentTimeIntegral let cmd :: Text -> CmdBuilder cmd tx = do @@ -307,16 +308,17 @@ txlogsCompactionTest rdb = runResourceT $ do liftIO $ submitAndCheckTx cenv =<< createWriteTx =<< nextNonce -- phase 2: compact - liftIO $ C.withDefaultLogger Error $ \logger -> do - let flags = [C.NoVacuum] - let resetDb = False + targetPactDir <- withPactDir 0 + liftIO $ Sigma.withDefaultLogger Error $ \logger -> do + Backend.withSqliteDb cid logger srcPactDir False $ \srcDb -> do + Backend.withSqliteDb cid logger targetPactDir False $ \targetDb -> do + sigmaCompact srcDb targetDb =<< getLatestBlockHeight srcDb - Backend.withSqliteDb cid logger pactDir resetDb $ \dbEnv -> - compactUntilAvailable C.LatestUnsafe logger dbEnv flags + let newNodeDbDirs = (head nodeDbDirs) { nodePactDbDir = targetPactDir } : tail nodeDbDirs -- phase 3: restart nodes, query txlogs liftIO $ runResourceT $ do - net <- withNodesAtLatestBehavior v (configFullHistoricPactState .~ False) nodeDbDirs + net <- withNodesAtLatestBehavior v (configFullHistoricPactState .~ False) newNodeDbDirs let cenv = _getServiceClientEnv net let createTxLogsTx :: Word -> IO (Command Text) @@ -357,8 +359,8 @@ txlogsCompactionTest rdb = runResourceT $ do txLogs <- liftIO $ crGetTxLogs =<< local v cid cenv =<< createTxLogsTx =<< nextNonce let getLatestState :: IO (M.Map RowKey RowData) - getLatestState = C.withDefaultLogger Error $ \logger -> do - Backend.withSqliteDb cid logger pactDir False $ \db -> do + getLatestState = Sigma.withDefaultLogger Error $ \logger -> do + Backend.withSqliteDb cid logger targetPactDir False $ \db -> do st <- Utils.getLatestPactState db case M.lookup "free.m0_persons" st of Just ps -> fmap M.fromList $ forM (M.toList ps) $ \(rkBytes, rdBytes) -> do @@ -373,8 +375,8 @@ txlogsCompactionTest rdb = runResourceT $ do latestState <- liftIO getLatestState liftIO $ assertEqual "txlogs match latest state" - (map (\(rk, rd) -> (rk, J.toJsonViaEncode (_rdData rd))) (M.toList latestState)) - txLogs + (map (\(rk, rd) -> (rk, J.toJsonViaEncode (_rdData rd))) (M.toAscList latestState)) + (L.sort txLogs) localTest :: Pact.TxCreationTime -> ClientEnv -> IO () localTest t cenv = do diff --git a/test/Chainweb/Test/Pact/Utils.hs b/test/Chainweb/Test/Pact/Utils.hs index 40038799c2..c7bf07113a 100644 --- a/test/Chainweb/Test/Pact/Utils.hs +++ b/test/Chainweb/Test/Pact/Utils.hs @@ -90,6 +90,7 @@ module Chainweb.Test.Pact.Utils , withPactTestBlockDb , withPactTestBlockDb' , withWebPactExecutionService +, withWebPactExecutionServiceCompaction , withPactCtxSQLite , WithPactCtxSQLite -- * Other service creation @@ -110,8 +111,7 @@ module Chainweb.Test.Pact.Utils , Noncer , zeroNoncer -- * Pact State -, compact -, compactUntilAvailable +, sigmaCompact , PactRow(..) , getLatestPactState , getPactUserTables @@ -144,7 +144,6 @@ import Data.Foldable import qualified Data.HashMap.Strict as HM import Data.IORef import Data.List qualified as List -import Data.LogMessage import Data.Map (Map) import qualified Data.Map.Strict as M import Data.Maybe @@ -157,12 +156,10 @@ import qualified Data.Vector as V import Database.SQLite3.Direct (Database) import GHC.Generics -import GHC.IO.Exception(IOException(..)) import Streaming.Prelude qualified as S import System.Directory import System.IO.Temp (createTempDirectory) -import qualified System.Logger as YAL import System.LogLevel import Test.Tasty @@ -199,7 +196,7 @@ import Chainweb.ChainId import Chainweb.Graph import Chainweb.Logger import Chainweb.Miner.Pact -import Chainweb.Pact.Backend.Compaction qualified as C +import Chainweb.Pact.Backend.Compaction qualified as Sigma import Chainweb.Pact.Backend.PactState qualified as PactState import Chainweb.Pact.Backend.PactState (TableDiffable(..), Table(..), PactRow(..)) import Chainweb.Pact.Backend.RelationalCheckpointer (initRelationalCheckpointer) @@ -711,9 +708,7 @@ testPactCtxSQLite logger v cid bhdb pdb sqlenv conf gasmodel = do freeGasModel :: TxContext -> GasModel freeGasModel = const $ constGasModel 0 --- | A queue-less WebPactExecutionService (for all chains) --- with direct chain access map for local. -withWebPactExecutionService +withWebPactExecutionServiceCompaction :: (Logger logger) => logger -> ChainwebVersion @@ -721,22 +716,37 @@ withWebPactExecutionService -> TestBlockDb -> MemPoolAccess -> (TxContext -> GasModel) - -> ((WebPactExecutionService,HM.HashMap ChainId (SQLiteEnv, PactExecutionService)) -> IO a) + -> ((WebPactExecutionService, HM.HashMap ChainId (SQLiteEnv, PactExecutionService), WebPactExecutionService, HM.HashMap ChainId (SQLiteEnv, PactExecutionService)) -> IO a) -- TODO: second 'WebPactExecutionService' seems unnecessary? -> IO a -withWebPactExecutionService logger v pactConfig bdb mempoolAccess gasmodel act = - withDbs $ \sqlenvs -> do - pacts <- fmap HM.fromList - $ traverse (\(dbEnv, cid) -> (cid,) . (dbEnv,) <$> mkPact dbEnv cid) - $ zip sqlenvs - $ toList - $ chainIds v - act (mkWebPactExecutionService (snd <$> pacts), pacts) +withWebPactExecutionServiceCompaction logger v pactConfig bdb mempoolAccess gasmodel act = + withDbs $ \srcSqlEnvs -> + withDbs $ \targetSqlEnvs -> do + srcPacts <- mkPacts srcSqlEnvs + let srcWeb = mkWebPactExecutionService (snd <$> srcPacts) + + targetPacts <- mkPacts targetSqlEnvs + let targetWeb = mkWebPactExecutionService (snd <$> targetPacts) + + act (srcWeb, srcPacts, targetWeb, targetPacts) where + mkPacts :: [SQLiteEnv] -> IO (HM.HashMap ChainId (SQLiteEnv, PactExecutionService)) + mkPacts sqlEnvs = fmap HM.fromList + $ traverse (\(dbEnv, cid) -> (cid,) . (dbEnv,) <$> mkTestPactExecutionService dbEnv cid) + $ zip sqlEnvs + $ toList + $ chainIds v + + withDbs :: ([SQLiteEnv] -> IO x) -> IO x withDbs f = foldl' (\soFar _ -> withDb soFar) f (chainIds v) [] + + withDb :: ([SQLiteEnv] -> IO x) -> [SQLiteEnv] -> IO x withDb g envs = withTempSQLiteConnection chainwebPragmas $ \s -> g (s : envs) - mkPact :: SQLiteEnv -> ChainId -> IO PactExecutionService - mkPact sqlenv c = do + mkTestPactExecutionService :: () + => SQLiteEnv + -> ChainId + -> IO PactExecutionService + mkTestPactExecutionService sqlenv c = do bhdb <- getBlockHeaderDb c bdb ctx <- testPactCtxSQLite logger v c bhdb (_bdbPayloadDb bdb) sqlenv pactConfig gasmodel return $ PactExecutionService @@ -762,6 +772,22 @@ withWebPactExecutionService logger v pactConfig bdb mempoolAccess gasmodel act = evalPactServiceM_ ctx $ execReadOnlyReplay l u } +-- | A queue-less WebPactExecutionService (for all chains) +-- with direct chain access map for local. +withWebPactExecutionService + :: (Logger logger) + => logger + -> ChainwebVersion + -> PactServiceConfig + -> TestBlockDb + -> MemPoolAccess + -> (TxContext -> GasModel) + -> ((WebPactExecutionService, HM.HashMap ChainId (SQLiteEnv, PactExecutionService)) -> IO a) + -> IO a +withWebPactExecutionService logger v pactConfig bdb mempoolAccess gasmodel act = + withWebPactExecutionServiceCompaction logger v pactConfig bdb mempoolAccess gasmodel + $ \(pact, pacts, _, _) -> act (pact, pacts) + -- | Noncer for 'runCut' type Noncer = ChainId -> IO Nonce @@ -1013,7 +1039,7 @@ someBlockHeader v h = (!! (int h - 1)) -- the streaming version of this function from -- 'Chainweb.Pact.Backend.PactState'. getPactUserTables :: Database -> IO (Map Text [PactRow]) -getPactUserTables db = do +getPactUserTables db = fmap (M.map (List.sortOn (\pr -> (pr.rowKey, pr.txId)))) $ do S.foldM_ (\m tbl -> pure (M.insert tbl.name tbl.rows m)) (pure M.empty) @@ -1034,62 +1060,14 @@ getLatestPactState db = do pure (PactState.getLatestPactStateDiffable db) -locateTarget :: () +sigmaCompact :: () => SQLiteEnv - -> C.TargetBlockHeight - -> IO BlockHeight -locateTarget db = \case - C.Target height -> do - PactState.ensureBlockHeightExists db height - pure height - C.LatestUnsafe -> do - PactState.getLatestBlockHeight db - C.LatestSafe -> do - latest <- PactState.getLatestBlockHeight db - earliest <- PactState.getEarliestBlockHeight db - - let safeDepth = 1_000 - - when (latest - earliest < safeDepth) $ do - error "not enough history for Compaction.LatestSafe" - - pure (latest - safeDepth) - --- | Compaction utility for testing. --- Most of the time the flags will be ['C.NoVacuum'] -compact :: () - => LogLevel - -> [C.CompactFlag] -> SQLiteEnv - -> C.TargetBlockHeight + -> BlockHeight -> IO () -compact logLevel cFlags db target = do - C.withDefaultLogger logLevel $ \logger -> do - height <- locateTarget db target - void $ C.compact height logger db cFlags - --- | Compaction function that retries until the database is available. -compactUntilAvailable - :: C.TargetBlockHeight - -> YAL.Logger SomeLogMessage - -> SQLiteEnv - -> [C.CompactFlag] - -> IO () -compactUntilAvailable target logger db flags = do - height <- locateTarget db target - go height - where - go h = do - r <- try (C.compact h logger db flags) - case r of - Right _ -> pure () - Left err - | C.CompactExceptionDb e <- err - , Just ioErr <- fromException e - -- someone, somewhere, is calling "show" on an exception - , "ErrorBusy" `List.isInfixOf` ioe_description ioErr - -> putStrLn "Retrying compaction" >> go h - | otherwise -> throwM err +sigmaCompact srcDb targetDb targetBlockHeight = do + Sigma.withDefaultLogger Warn $ \logger -> do + Sigma.compactPactState logger Sigma.defaultRetainment targetBlockHeight srcDb targetDb getPWOByHeader :: BlockHeader -> TestBlockDb -> IO PayloadWithOutputs getPWOByHeader h (TestBlockDb _ pdb _) = diff --git a/test/Chainweb/Test/Utils.hs b/test/Chainweb/Test/Utils.hs index 73688b08db..ad63307441 100644 --- a/test/Chainweb/Test/Utils.hs +++ b/test/Chainweb/Test/Utils.hs @@ -120,6 +120,7 @@ module Chainweb.Test.Utils , interface , testRetryPolicy , withNodeDbDirs +, withPactDir , NodeDbDirs(..) ) where @@ -1051,6 +1052,15 @@ data NodeDbDirs = NodeDbDirs , nodeRocksDb :: RocksDb } +withPactDir :: Word -> ResourceT IO FilePath +withPactDir nid = do + fmap snd $ allocate + (do + targetDir <- getCanonicalTemporaryDirectory + createTempDirectory targetDir ("pactdb-dir-" ++ show nid) + ) + (\dir -> ignoringIOErrors $ removeDirectoryRecursive dir) + withNodeDbDirs :: RocksDb -> Word -> ResourceT IO [NodeDbDirs] withNodeDbDirs rdb n = do let create :: IO [NodeDbDirs] @@ -1075,9 +1085,9 @@ withNodeDbDirs rdb n = do (_, m) <- allocate create destroy pure m - where - ignoringIOErrors :: (MonadCatch m) => m () -> m () - ignoringIOErrors ioe = ioe `catch` (\(_ :: IOError) -> pure ()) + +ignoringIOErrors :: (MonadCatch m) => m () -> m () +ignoringIOErrors ioe = ioe `catch` (\(_ :: IOError) -> pure ()) deadbeef :: TransactionHash deadbeef = TransactionHash "deadbeefdeadbeefdeadbeefdeadbeef" diff --git a/test/SlowTests.hs b/test/SlowTests.hs index ebc053f0dc..e81ccf4645 100644 --- a/test/SlowTests.hs +++ b/test/SlowTests.hs @@ -34,13 +34,16 @@ loglevel = Warn suite :: TestTree suite = independentSequentialTestGroup "ChainwebSlowTests" [ testCaseSteps "compact-resume" $ \step -> - withTempRocksDb "compact-resume-test-rocks" $ \rdb -> - withSystemTempDirectory "compact-resume-test-pact" $ \pactDbDir -> do - Chainweb.Test.MultiNode.compactAndResumeTest loglevel (fastForkingCpmTestVersion pairChainGraph) 6 rdb pactDbDir step + withTempRocksDb "compact-resume-test-rocks-src" $ \srcRocksDb -> + withTempRocksDb "compact-resume-test-rocks-target" $ \targetRocksDb -> + withSystemTempDirectory "compact-resume-test-pact-src" $ \srcPactDbDir -> + withSystemTempDirectory "compact-resume-test-pact-target" $ \targetPactDbDir -> do + Chainweb.Test.MultiNode.compactAndResumeTest loglevel (fastForkingCpmTestVersion pairChainGraph) 6 srcRocksDb targetRocksDb srcPactDbDir targetPactDbDir step , testCaseSteps "compact-live-node" $ \step -> withTempRocksDb "pact-import-test-rocks" $ \rdb -> - withSystemTempDirectory "pact-import-test-pact" $ \pactDbDir -> do - Chainweb.Test.MultiNode.compactLiveNodeTest loglevel (fastForkingCpmTestVersion twentyChainGraph) 1 rdb pactDbDir step + withSystemTempDirectory "pact-import-test-pact-src" $ \srcPactDbDir -> + withSystemTempDirectory "pact-import-test-pact-target" $ \targetPactDbDir -> do + Chainweb.Test.MultiNode.compactLiveNodeTest loglevel (fastForkingCpmTestVersion twentyChainGraph) 1 rdb srcPactDbDir targetPactDbDir step , testCaseSteps "ConsensusNetwork - TimedConsensus - 10 nodes - 30 seconds" $ \step -> withTempRocksDb "multinode-tests-timedconsensus-peterson-twenty-rocks" $ \rdb -> withSystemTempDirectory "multinode-tests-timedconsensus-peterson-twenty-pact" $ \pactDbDir -> diff --git a/tools/cwtool/CwTool.hs b/tools/cwtool/CwTool.hs index 837b85fe07..4b66181fdc 100644 --- a/tools/cwtool/CwTool.hs +++ b/tools/cwtool/CwTool.hs @@ -10,7 +10,7 @@ import System.Exit import Text.Printf import Chainweb.Pact.Backend.Compaction (main) -import Chainweb.Pact.Backend.PactState.Diff (pactDiffMain) +--import Chainweb.Pact.Backend.PactState.Diff (pactDiffMain) import Chainweb.Pact.Backend.PactState.GrandHash.Calc (pactCalcMain) import Chainweb.Pact.Backend.PactState.GrandHash.Import (pactImportMain) @@ -108,10 +108,10 @@ topLevelCommands = "compact" "Compact pact database" Chainweb.Pact.Backend.Compaction.main - , CommandSpec - "pact-diff" - "Diff the latest state of two pact databases" - Chainweb.Pact.Backend.PactState.Diff.pactDiffMain + --, CommandSpec + -- "pact-diff" + -- "Diff the latest state of two pact databases" + -- Chainweb.Pact.Backend.PactState.Diff.pactDiffMain , CommandSpec "pact-calc" "Calculate the GrandHashes for a pact database at a particular blockheight"