Skip to content

Commit

Permalink
Convert Run, MergingRun and users to the new Ref API
Browse files Browse the repository at this point in the history
Most of the changes are just replacing Run m h with Ref (Run m h) all
over the place, and replacing Run.removeReference with releaseRef.

The slightly more interesting changes are in the modules Run and
MergeSchedule, where we have to change the style of duplication from
incrementing reference countsto returning new references.
  • Loading branch information
dcoutts committed Nov 21, 2024
1 parent bc40f25 commit 9f6e3e4
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 270 deletions.
14 changes: 7 additions & 7 deletions bench/macro/lsm-tree-bench-lookups.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Control.Monad
import Control.Monad.Class.MonadST
import Control.Monad.Primitive
import Control.Monad.ST.Strict (ST, runST)
import Control.RefCount (Ref)
import Control.RefCount
import Data.Arena (ArenaManager, newArenaManager, withArena)
import Data.Bits ((.&.))
import Data.BloomFilter (Bloom)
Expand Down Expand Up @@ -213,7 +213,7 @@ benchmarks !caching = withFS $ \hfs hbio -> do

traceMarkerIO "Cleaning up"
putStrLn "Cleaning up"
V.mapM_ Run.removeReference runs
V.mapM_ releaseRef runs

traceMarkerIO "Computing statistics for prepLookups results"
putStr "<Computing statistics for prepLookups>"
Expand Down Expand Up @@ -331,7 +331,7 @@ lookupsEnv ::
-> FS.HasFS IO FS.HandleIO
-> FS.HasBlockIO IO FS.HandleIO
-> Run.RunDataCaching
-> IO ( V.Vector (Run IO FS.HandleIO)
-> IO ( V.Vector (Ref (Run IO FS.HandleIO))
, V.Vector (Bloom SerialisedKey)
, V.Vector IndexCompact
, V.Vector (FS.Handle FS.HandleIO)
Expand Down Expand Up @@ -372,9 +372,9 @@ lookupsEnv runSizes keyRng0 hfs hbio caching = do

-- return runs
runs <- V.fromList <$> mapM (Run.fromMutable caching) rbs
let blooms = V.map Run.runFilter runs
indexes = V.map Run.runIndex runs
handles = V.map Run.runKOpsFile runs
let blooms = V.map (\(DeRef r) -> Run.runFilter r) runs
indexes = V.map (\(DeRef r) -> Run.runIndex r) runs
handles = V.map (\(DeRef r) -> Run.runKOpsFile r) runs
pure $!! (runs, blooms, indexes, handles)

genLookupBatch :: StdGen -> Int -> (V.Vector SerialisedKey, StdGen)
Expand Down Expand Up @@ -466,7 +466,7 @@ benchLookupsIO ::
-> ResolveSerialisedValue
-> WB.WriteBuffer
-> Ref (WBB.WriteBufferBlobs IO h)
-> V.Vector (Run IO h)
-> V.Vector (Ref (Run IO h))
-> V.Vector (Bloom SerialisedKey)
-> V.Vector IndexCompact
-> V.Vector (FS.Handle h)
Expand Down
12 changes: 6 additions & 6 deletions bench/micro/Bench/Database/LSMTree/Internal/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ benchmarks = bgroup "Bench.Database.LSMTree.Internal.Lookup" [
benchLookups :: Config -> Benchmark
benchLookups conf@Config{name} =
withEnv $ \ ~(_dir, arenaManager, hasFS, hasBlockIO, rs, ks) ->
env ( pure ( V.map Run.runFilter rs
, V.map Run.runIndex rs
, V.map Run.runKOpsFile rs
env ( pure ( V.map (\(DeRef r) -> Run.runFilter r) rs
, V.map (\(DeRef r) -> Run.runIndex r) rs
, V.map (\(DeRef r) -> Run.runKOpsFile r) rs
)
) $ \ ~(blooms, indexes, kopsFiles) ->
bgroup name [
Expand Down Expand Up @@ -181,7 +181,7 @@ lookupsInBatchesEnv ::
, ArenaManager RealWorld
, FS.HasFS IO FS.HandleIO
, FS.HasBlockIO IO FS.HandleIO
, V.Vector (Run IO FS.HandleIO)
, V.Vector (Ref (Run IO FS.HandleIO))
, V.Vector SerialisedKey
)
lookupsInBatchesEnv Config {..} = do
Expand Down Expand Up @@ -213,13 +213,13 @@ lookupsInBatchesCleanup ::
, ArenaManager RealWorld
, FS.HasFS IO FS.HandleIO
, FS.HasBlockIO IO FS.HandleIO
, V.Vector (Run IO FS.HandleIO)
, V.Vector (Ref (Run IO FS.HandleIO))
, V.Vector SerialisedKey
)
-> IO ()
lookupsInBatchesCleanup (tmpDir, _arenaManager, _hasFS, hasBlockIO, rs, _) = do
FS.close hasBlockIO
forM_ rs Run.removeReference
forM_ rs releaseRef
removeDirectoryRecursive tmpDir

-- | Generate keys to store and keys to lookup
Expand Down
9 changes: 5 additions & 4 deletions bench/micro/Bench/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Bench.Database.LSMTree.Internal.Merge (benchmarks) where

import Control.Monad (when, zipWithM)
import Control.RefCount
import Criterion.Main (Benchmark, bench, bgroup)
import qualified Criterion.Main as Cr
import Data.Bifunctor (first)
Expand Down Expand Up @@ -206,7 +207,7 @@ benchMerge conf@Config{name} =
-- Make sure to immediately close resulting runs so we don't run
-- out of file handles. Ideally this would not be measured, but at
-- least it's pretty cheap.
Run.removeReference run
releaseRef run
]
where
withEnv =
Expand All @@ -228,7 +229,7 @@ merge ::
-> Config
-> Run.RunFsPaths
-> InputRuns
-> IO (Run IO FS.HandleIO)
-> IO (Ref (Run IO FS.HandleIO))
merge fs hbio Config {..} targetPaths runs = do
let f = fromMaybe const mergeMappend
m <- fromMaybe (error "empty inputs, no merge created") <$>
Expand All @@ -241,7 +242,7 @@ outputRunPaths = RunFsPaths (FS.mkFsPath []) (RunNumber 0)
inputRunPaths :: [Run.RunFsPaths]
inputRunPaths = RunFsPaths (FS.mkFsPath []) . RunNumber <$> [1..]

type InputRuns = V.Vector (Run IO FS.HandleIO)
type InputRuns = V.Vector (Ref (Run IO FS.HandleIO))

type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue

Expand Down Expand Up @@ -331,7 +332,7 @@ mergeEnvCleanup ::
)
-> IO ()
mergeEnvCleanup (tmpDir, _hasFS, hasBlockIO, runs) = do
traverse_ Run.removeReference runs
traverse_ releaseRef runs
removeDirectoryRecursive tmpDir
FS.close hasBlockIO

Expand Down
12 changes: 6 additions & 6 deletions src-extras/Database/LSMTree/Extras/RunData.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ withRun ::
-> HasBlockIO IO h
-> RunFsPaths
-> SerialisedRunData
-> (Run IO h -> IO a)
-> (Ref (Run IO h) -> IO a)
-> IO a
withRun hfs hbio path rd = do
bracket
(unsafeFlushAsWriteBuffer hfs hbio path $ serialiseRunData rd)
Run.removeReference
releaseRef

{-# INLINABLE withRuns #-}
-- | Create temporary 'Run's using 'unsafeFlushAsWriteBuffer'.
Expand All @@ -68,25 +68,25 @@ withRuns ::
=> HasFS IO h
-> HasBlockIO IO h
-> f (RunFsPaths, SerialisedRunData)
-> (f (Run IO h) -> IO a)
-> (f (Ref (Run IO h)) -> IO a)
-> IO a
withRuns hfs hbio xs = do
bracket
(forM xs $ \(path, rd) -> unsafeFlushAsWriteBuffer hfs hbio path rd)
(mapM_ Run.removeReference)
(mapM_ releaseRef)

-- | Flush serialised run data to disk as if it were a write buffer.
--
-- This might leak resources if not run with asynchronous exceptions masked.
-- Use helper functions like 'withRun' or 'withRuns' instead.
--
-- Use of this function should be paired with a 'Run.removeReference'.
-- Use of this function should be paired with a 'releaseRef.
unsafeFlushAsWriteBuffer ::
HasFS IO h
-> HasBlockIO IO h
-> RunFsPaths
-> SerialisedRunData
-> IO (Run IO h)
-> IO (Ref (Run IO h))
unsafeFlushAsWriteBuffer fs hbio fsPaths (RunData m) = do
let blobpath = addExtension (runBlobPath fsPaths) ".wb"
wbblobs <- WBB.new fs blobpath
Expand Down
20 changes: 8 additions & 12 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ import Database.LSMTree.Internal.Paths (SessionRoot (..),
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Range (Range (..))
import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
import qualified Database.LSMTree.Internal.RunReaders as Readers
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
Expand Down Expand Up @@ -880,7 +879,7 @@ data CursorEnv m h = CursorEnv {
, cursorReaders :: !(Maybe (Readers.Readers m h))
-- | The runs held open by the cursor. We must remove a reference when the
-- cursor gets closed.
, cursorRuns :: !(V.Vector (Run m h))
, cursorRuns :: !(V.Vector (Ref (Run m h)))

-- | The write buffer blobs, which like the runs, we have to keep open
-- untile the cursor is closed.
Expand Down Expand Up @@ -923,8 +922,7 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do
-- 'sessionOpenTables'.
withOpenSession cursorSession $ \_ -> do
withTempRegistry $ \reg -> do
(wb, wbblobs, cursorRuns) <-
allocTableContent reg (tableContent thEnv)
(wb, wbblobs, cursorRuns) <- dupTableContent reg (tableContent thEnv)
cursorReaders <-
allocateMaybeTemp reg
(Readers.new offsetKey (Just (wb, wbblobs)) cursorRuns)
Expand All @@ -942,19 +940,17 @@ newCursor !offsetKey t = withOpenTable t $ \thEnv -> do
pure . Map.insert cursorId cursor
pure $! cursor
where
-- The table contents escape the read access, but we just added
-- The table contents escape the read access, but we just duplicate
-- references to each run, so it is safe.
allocTableContent reg contentVar = do
dupTableContent reg contentVar = do
RW.withReadAccess contentVar $ \content -> do
let wb = tableWriteBuffer content
wbblobs = tableWriteBufferBlobs content
wbblobs' <- allocateTemp reg (dupRef wbblobs) releaseRef
let runs = cachedRuns (tableCache content)
V.forM_ runs $ \r -> do
allocateTemp reg
(Run.addReference r)
(\_ -> Run.removeReference r)
pure (wb, wbblobs', runs)
runs' <- V.forM runs $ \r ->
allocateTemp reg (dupRef r) releaseRef
pure (wb, wbblobs', runs')

{-# SPECIALISE closeCursor :: Cursor IO h -> IO () #-}
-- | See 'Database.LSMTree.Normal.closeCursor'.
Expand All @@ -976,7 +972,7 @@ closeCursor Cursor {..} = do
pure . Map.delete cursorId

forM_ cursorReaders $ freeTemp reg . Readers.close
V.forM_ cursorRuns $ freeTemp reg . Run.removeReference
V.forM_ cursorRuns $ freeTemp reg . releaseRef
freeTemp reg (releaseRef cursorWBB)
return CursorClosed

Expand Down
8 changes: 4 additions & 4 deletions src/Database/LSMTree/Internal/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ data ByteCountDiscrepancy = ByteCountDiscrepancy {
-> ResolveSerialisedValue
-> WB.WriteBuffer
-> Ref (WBB.WriteBufferBlobs IO h)
-> V.Vector (Run IO h)
-> V.Vector (Ref (Run IO h))
-> V.Vector (Bloom SerialisedKey)
-> V.Vector IndexCompact
-> V.Vector (Handle h)
Expand All @@ -180,7 +180,7 @@ lookupsIO ::
-> ResolveSerialisedValue
-> WB.WriteBuffer
-> Ref (WBB.WriteBufferBlobs m h)
-> V.Vector (Run m h) -- ^ Runs @rs@
-> V.Vector (Ref (Run m h)) -- ^ Runs @rs@
-> V.Vector (Bloom SerialisedKey) -- ^ The bloom filters inside @rs@
-> V.Vector IndexCompact -- ^ The indexes inside @rs@
-> V.Vector (Handle h) -- ^ The file handles to the key\/value files inside @rs@
Expand All @@ -205,7 +205,7 @@ lookupsIO !hbio !mgr !resolveV !wb !wbblobs !rs !blooms !indexes !kopsFiles !ks
ResolveSerialisedValue
-> WB.WriteBuffer
-> Ref (WBB.WriteBufferBlobs IO h)
-> V.Vector (Run IO h)
-> V.Vector (Ref (Run IO h))
-> V.Vector SerialisedKey
-> VP.Vector RunIxKeyIx
-> V.Vector (IOOp RealWorld h)
Expand All @@ -224,7 +224,7 @@ intraPageLookups ::
=> ResolveSerialisedValue
-> WB.WriteBuffer
-> Ref (WBB.WriteBufferBlobs m h)
-> V.Vector (Run m h)
-> V.Vector (Ref (Run m h))
-> V.Vector SerialisedKey
-> VP.Vector RunIxKeyIx
-> V.Vector (IOOp (PrimState m) h)
Expand Down
17 changes: 9 additions & 8 deletions src/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import Control.Monad.Class.MonadST (MonadST)
import Control.Monad.Class.MonadSTM (MonadSTM (..))
import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow)
import Control.Monad.Primitive (PrimState)
import Control.RefCount
import Data.Primitive.MutVar
import Data.Traversable (for)
import qualified Data.Vector as V
Expand Down Expand Up @@ -83,7 +84,7 @@ type Mappend = SerialisedValue -> SerialisedValue -> SerialisedValue
-> Level
-> Mappend
-> Run.RunFsPaths
-> V.Vector (Run IO h)
-> V.Vector (Ref (Run IO h))
-> IO (Maybe (Merge IO h)) #-}
-- | Returns 'Nothing' if no input 'Run' contains any entries.
-- The list of runs should be sorted from new to old.
Expand All @@ -96,7 +97,7 @@ new ::
-> Level
-> Mappend
-> Run.RunFsPaths
-> V.Vector (Run m h)
-> V.Vector (Ref (Run m h))
-> m (Maybe (Merge m h))
new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do
-- no offset, no write buffer
Expand Down Expand Up @@ -135,7 +136,7 @@ abort Merge {..} = do

{-# SPECIALISE complete ::
Merge IO h
-> IO (Run IO h) #-}
-> IO (Ref (Run IO h)) #-}
-- | Complete a 'Merge', returning a new 'Run' as the result of merging the
-- input runs.
--
Expand All @@ -156,7 +157,7 @@ abort Merge {..} = do
complete ::
(MonadSTM m, MonadST m, MonadMask m)
=> Merge m h
-> m (Run m h)
-> m (Ref (Run m h))
complete Merge{..} = do
readMutVar mergeState >>= \case
Merging -> error "complete: Merge is not done"
Expand All @@ -171,15 +172,15 @@ complete Merge{..} = do
{-# SPECIALISE stepsToCompletion ::
Merge IO h
-> Int
-> IO (Run IO h) #-}
-> IO (Ref (Run IO h)) #-}
-- | Like 'steps', but calling 'complete' once the merge is finished.
--
-- Note: run with async exceptions masked. See 'complete'.
stepsToCompletion ::
(MonadMask m, MonadSTM m, MonadST m)
=> Merge m h
-> Int
-> m (Run m h)
-> m (Ref (Run m h))
stepsToCompletion m stepBatchSize = go
where
go = do
Expand All @@ -190,15 +191,15 @@ stepsToCompletion m stepBatchSize = go
{-# SPECIALISE stepsToCompletionCounted ::
Merge IO h
-> Int
-> IO (Int, Run IO h) #-}
-> IO (Int, Ref (Run IO h)) #-}
-- | Like 'steps', but calling 'complete' once the merge is finished.
--
-- Note: run with async exceptions masked. See 'complete'.
stepsToCompletionCounted ::
(MonadMask m, MonadSTM m, MonadST m)
=> Merge m h
-> Int
-> m (Int, Run m h)
-> m (Int, Ref (Run m h))
stepsToCompletionCounted m stepBatchSize = go 0
where
go !stepsSum = do
Expand Down
Loading

0 comments on commit 9f6e3e4

Please sign in to comment.