diff --git a/benchmark/Streamly/Benchmark/Data/Fold/Window.hs b/benchmark/Streamly/Benchmark/Data/Fold/Window.hs index bdd0bc4dda..6974dad0f6 100644 --- a/benchmark/Streamly/Benchmark/Data/Fold/Window.hs +++ b/benchmark/Streamly/Benchmark/Data/Fold/Window.hs @@ -32,26 +32,33 @@ sourceDescendingInt :: (Monad m, Stream.IsStream t) => Int -> Int -> t m Int sourceDescendingInt = sourceDescending {-# INLINE benchWith #-} -benchWith :: (Num a, NFData a) => - (Int -> a -> SerialT IO a) -> Int -> String -> Fold IO a a -> Benchmark +benchWith :: (Num a, NFData b) => + (Int -> a -> SerialT IO a) -> Int -> String -> Fold IO a b -> Benchmark benchWith src len name f = bench name $ nfIO $ randomRIO (1, 1 :: Int) >>= Stream.fold f . src len . fromIntegral {-# INLINE benchWithFold #-} -benchWithFold :: Int -> String -> Fold IO Double Double -> Benchmark +benchWithFold :: NFData a => Int -> String -> Fold IO Double a -> Benchmark benchWithFold = benchWith source {-# INLINE benchWithFoldInt #-} benchWithFoldInt :: Int -> String -> Fold IO Int Int -> Benchmark benchWithFoldInt = benchWith source +{-# INLINE benchScanWith #-} +benchScanWith :: Num a => + (Int -> a -> SerialT IO a) -> Int -> String -> Fold IO a b -> Benchmark +benchScanWith src len name f = + bench name + $ nfIO + $ randomRIO (1, 1 :: Int) + >>= Stream.drain . Stream.postscan f . src len . fromIntegral + {-# INLINE benchWithPostscan #-} -benchWithPostscan :: Int -> String -> Fold IO Double Double -> Benchmark -benchWithPostscan len name f = - bench name $ nfIO $ randomRIO (1, 1) >>= - Stream.drain . Stream.postscan f . source len +benchWithPostscan :: Int -> String -> Fold IO Double a -> Benchmark +benchWithPostscan = benchScanWith source {-# INLINE numElements #-} numElements :: Int @@ -63,23 +70,29 @@ main = [ bgroup "fold" [ benchWithFold numElements "minimum (window size 100)" - (Ring.slidingWindow 100 Window.minimum) + (Window.minimum 100) , benchWithFold numElements "minimum (window size 1000)" - (Ring.slidingWindow 1000 Window.minimum) + (Window.minimum 1000) , benchWith sourceDescendingInt numElements "minimum descending (window size 1000)" - (Ring.slidingWindow 1000 Window.minimum) + (Window.minimum 1000) + , benchWithFold numElements "maximum (window size 100)" - (Ring.slidingWindow 100 Window.maximum) + (Window.maximum 100) , benchWithFold numElements "maximum (window size 1000)" - (Ring.slidingWindow 1000 Window.maximum) + (Window.maximum 1000) , benchWith sourceDescendingInt numElements "maximum descending (window size 1000)" - (Ring.slidingWindow 1000 Window.maximum) + (Window.maximum 1000) + , benchWithFold numElements "range (window size 100)" - (Ring.slidingWindow 100 Window.range) + (Window.range 100) , benchWithFold numElements "range (window size 1000)" - (Ring.slidingWindow 1000 Window.range) + (Window.range 1000) + , benchWith sourceDescendingInt numElements + "range descending (window size 1000)" + (Window.range 1000) + , benchWithFoldInt numElements "sumInt (window size 100)" (Ring.slidingWindow 100 Window.sumInt) , benchWithFoldInt numElements "sum for Int (window size 100)" @@ -110,25 +123,51 @@ main = ] , bgroup "scan" - [ benchWithPostscan numElements "minimum (window size 100)" - (Ring.slidingWindow 100 Window.minimum) + [ benchWithPostscan numElements "minimum (window size 10)" + (Window.minimum 10) + -- Below window size 30 the linear search based impl performs better + -- than the dequeue based implementation. + , benchWithPostscan numElements "minimum (window size 30)" + (Window.minimum 30) , benchWithPostscan numElements "minimum (window size 1000)" - (Ring.slidingWindow 1000 Window.minimum) - , benchWithPostscan numElements "maximum (window size 100)" - (Ring.slidingWindow 100 Window.maximum) + (Window.minimum 1000) + , benchScanWith sourceDescendingInt numElements + "minimum descending (window size 1000)" + (Window.minimum 1000) + + , benchWithPostscan numElements "maximum (window size 10)" + (Window.maximum 10) + , benchWithPostscan numElements "maximum (window size 30)" + (Window.maximum 30) , benchWithPostscan numElements "maximum (window size 1000)" - (Ring.slidingWindow 1000 Window.maximum) - , benchWithPostscan numElements "range (window size 100)" - (Ring.slidingWindow 100 Window.range) + (Window.maximum 1000) + , benchScanWith sourceDescendingInt numElements + "maximum descending (window size 1000)" + (Window.maximum 1000) + + , benchWithPostscan numElements "range (window size 10)" + (Window.range 10) + , benchWithPostscan numElements "range (window size 30)" + (Window.range 30) , benchWithPostscan numElements "range (window size 1000)" - (Ring.slidingWindow 1000 Window.range) + (Window.range 1000) + , benchScanWith sourceDescendingInt numElements + "range descending (window size 1000)" + (Window.range 1000) + , benchWithPostscan numElements "sum (window size 100)" (Ring.slidingWindow 100 Window.sum) , benchWithPostscan numElements "sum (window size 1000)" (Ring.slidingWindow 1000 Window.sum) + , benchWithPostscan numElements "mean (window size 100)" (Ring.slidingWindow 100 Window.mean) , benchWithPostscan numElements "mean (window size 1000)" (Ring.slidingWindow 1000 Window.mean) + + , benchWithPostscan numElements "powerSum 2 (window size 100)" + (Ring.slidingWindow 100 (Window.powerSum 2)) + , benchWithPostscan numElements "powerSum 2 (window size 1000)" + (Ring.slidingWindow 1000 (Window.powerSum 2)) ] ] diff --git a/core/src/Streamly/Internal/Data/Fold/Window.hs b/core/src/Streamly/Internal/Data/Fold/Window.hs index 06f905528e..196d47fb5c 100644 --- a/core/src/Streamly/Internal/Data/Fold/Window.hs +++ b/core/src/Streamly/Internal/Data/Fold/Window.hs @@ -51,17 +51,18 @@ module Streamly.Internal.Data.Fold.Window ) where +import Control.Monad.IO.Class (MonadIO (liftIO)) import Data.Bifunctor(bimap) -import Data.Function ((&)) -import Data.Maybe (fromMaybe) +import Foreign (Storable(..)) import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..)) -import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..)) +import Streamly.Internal.Data.Tuple.Strict + (Tuple'(..), Tuple3Fused' (Tuple3Fused')) import Prelude hiding (length, sum, minimum, maximum) -import qualified Deque.Strict as DQ import qualified Streamly.Data.Fold as Fold +import qualified Streamly.Internal.Data.Ring.Foreign as Ring -- $setup -- >>> import Data.Bifunctor(bimap) @@ -213,111 +214,90 @@ powerSumFrac p = lmap (** p) sum -- Location ------------------------------------------------------------------------------- --- Theoretically, we can approximate minimum in a rolling window by using a --- 'powerMean' with sufficiently large negative power. --- --- XXX If we need to know the minimum in the window only once in a while then --- we can use linear search when it is extracted and not pay the cost all the --- time. +-- XXX Remove MonadIO constraint + +-- | Determine the maximum and minimum in a rolling window. -- --- | The minimum element in a rolling window. +-- If you want to compute the range of the entire stream @Fold.teeWith (,) +-- Fold.maximum Fold.minimum@ would be much faster. -- --- If you want to compute the minimum of the entire stream Fold.minimum from --- streamly package would be much faster. +-- /Space/: \(\mathcal{O}(n)\) where @n@ is the window size. -- -- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size. -- -{-# INLINE minimum #-} -minimum :: (Monad m, Ord a) => Fold m (a, Maybe a) a -minimum = Fold step initial extract +{-# INLINE range #-} +range :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe (a, a)) +range n = Fold step initial extract where - initial = return $ Partial $ Tuple3' (0 :: Int) (0 :: Int) - (mempty :: DQ.Deque (Int, a)) - - step (Tuple3' i w q) (a, ma) = - case ma of - Nothing -> - return $ Partial $ Tuple3' (i + 1) (w + 1) - (headCheck i q (w + 1) & dqloop (i, a)) - Just _ -> - return $ Partial $ Tuple3' (i + 1) w - (headCheck i q w & dqloop (i,a)) - - {-# INLINE headCheck #-} - headCheck i q w = - case DQ.uncons q of - Nothing -> q - Just (ia', q') -> - if fst ia' <= i - w - then q' - else q - - dqloop ia q = - case DQ.unsnoc q of - Nothing -> DQ.snoc ia q - -- XXX This can be improved for the case of `=` - Just (ia', q') -> - if snd ia <= snd ia' - then dqloop ia q' - else DQ.snoc ia q - - extract (Tuple3' _ _ q) = return $ snd - $ fromMaybe (0, error "min: Empty stream") - $ DQ.head q - --- Theoretically, we can approximate maximum in a rolling window by using a --- 'powerMean' with sufficiently large positive power. + -- XXX Use Ring unfold and then fold for composing maximum and minimum to + -- get the range. + + initial = + if n <= 0 + then error "range: window size must be > 0" + else + let f (a, b) = Partial $ Tuple3Fused' a b (0 :: Int) + in fmap f $ liftIO $ Ring.new n + + step (Tuple3Fused' rb rh i) a = do + rh1 <- liftIO $ Ring.unsafeInsert rb rh a + return $ Partial $ Tuple3Fused' rb rh1 (i + 1) + + -- XXX We need better Ring array APIs so that we can unfold the ring to a + -- stream and fold the stream using a fold of our choice. + -- + -- We could just scan the stream to get a stream of ring buffers and then + -- map required folds over those, but we need to be careful that all those + -- rings refer to the same mutable ring, therefore, downstream needs to + -- process those strictly before it can change. + foldFunc i + | i < n = Ring.unsafeFoldRingM + | otherwise = Ring.unsafeFoldRingFullM + + extract (Tuple3Fused' rb rh i) = + if i == 0 + then return Nothing + else do + x <- liftIO $ peek rh + let accum (mn, mx) a = return (min mn a, max mx a) + fmap Just $ foldFunc i rh accum (x, x) rb + +-- | Find the minimum element in a rolling window. +-- +-- This implementation traverses the entire window buffer to compute the +-- minimum whenever we demand it. It performs better than the dequeue based +-- implementation in @streamly-statistics@ package when any of the following +-- holds: +-- +-- * window size is small (< 30) +-- * we are using this as a fold instead of a scan. +-- +-- For other cases the implementation in the @streamly-statistics@ package +-- performs better. +-- +-- If you want to compute the minimum of the entire stream +-- 'Streamly.Data.Fold.minimum' is much faster. +-- +-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size. -- +{-# INLINE minimum #-} +minimum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a) +minimum n = fmap (fmap fst) $ range n + -- | The maximum element in a rolling window. -- --- If you want to compute the maximum of the entire stream Fold.maximum from --- streamly package would be much faster. +-- See the performance related comments in 'minimum'. +-- +-- If you want to compute the maximum of the entire stream 'Fold.maximum' would +-- be much faster. -- -- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size. -- {-# INLINE maximum #-} -maximum :: (Monad m, Ord a) => Fold m (a, Maybe a) a -maximum = Fold step initial extract - - where - - initial = return $ Partial $ Tuple3' (0 :: Int) (0 :: Int) - (mempty :: DQ.Deque (Int, a)) - - step (Tuple3' i w q) (a, ma) = - case ma of - Nothing -> - return $ Partial $ Tuple3' (i + 1) (w + 1) - (headCheck i q (w + 1) & dqloop (i, a)) - Just _ -> - return $ Partial $ Tuple3' (i + 1) w - (headCheck i q w & dqloop (i,a)) - - {-# INLINE headCheck #-} - headCheck i q w = - case DQ.uncons q of - Nothing -> q - Just (ia', q') -> - if fst ia' <= i - w - then q' - else q - - dqloop ia q = - case DQ.unsnoc q of - Nothing -> DQ.snoc ia q - -- XXX This can be improved for the case of `=` - Just (ia', q') -> - if snd ia >= snd ia' - then dqloop ia q' - else DQ.snoc ia q - - extract (Tuple3' _ _ q) = - return - $ snd - $ fromMaybe (0, error "max: Empty stream") - $ DQ.head q +maximum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a) +maximum n = fmap (fmap snd) $ range n -- | Arithmetic mean of elements in a sliding window: -- @@ -335,19 +315,3 @@ maximum = Fold step initial extract {-# INLINE mean #-} mean :: forall m a. (Monad m, Fractional a) => Fold m (a, Maybe a) a mean = Fold.teeWith (/) sum length - - --- | The difference between the maximum and minimum elements of a rolling window. --- --- >>> range = Fold.teeWith (-) maximum minimum --- --- If you want to compute the range of the entire stream @Fold.teeWith (-) --- Fold.maximum Fold.min@ from the streamly package would be much faster. --- --- /Space/: \(\mathcal{O}(n)\) where @n@ is the window size. --- --- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size. --- -{-# INLINE range #-} -range :: (Monad m, Num a, Ord a) => Fold m (a, Maybe a) a -range = Fold.teeWith (-) maximum minimum diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index e9278a1c77..ae8c81f2e6 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -343,7 +343,6 @@ library , transformers-base >= 0.4 && < 0.5 , primitive >= 0.5.4 && < 0.8 , heaps >= 0.3 && < 0.5 - , deque >= 0.4 && < 0.5 , hashable >= 1.3 && < 1.5 , unordered-containers >= 0.2 && < 0.3 if flag(use-unliftio) diff --git a/test/Streamly/Test/Data/Fold/Window.hs b/test/Streamly/Test/Data/Fold/Window.hs index e8ad4c4955..f6bb23d4ef 100644 --- a/test/Streamly/Test/Data/Fold/Window.hs +++ b/test/Streamly/Test/Data/Fold/Window.hs @@ -33,7 +33,9 @@ main = hspec $ do describe "Correctness" $ do let winSize = 3 - testCase1 = [31, 41, 59, 26, 53, 58, 97] :: [Double] + testCase1 = + [1.0, 4.0, 3.0, 2.1, -5.1, -2.0, 7.0, 3.0, -2.5] :: [Double] + testCase2 = replicate 5 1.0 ++ [7.0] testFunc tc f sI sW = do @@ -44,18 +46,17 @@ main = hspec $ do it "Infinite" $ a == sI it ("Finite " ++ show winSize) $ b == sW + testFunc2 tc expec f = do + let c = S.fromList tc + a <- runIO $ S.fold (f winSize) c + it (show tc) $ a == expec + describe "minimum" $ do - let scanInf = [31, 31, 31, 26, 26, 26, 26] :: [Double] - scanWin = [31, 31, 31, 26, 26, 26, 53] :: [Double] - testFunc testCase1 minimum scanInf scanWin + testFunc2 testCase1 (Just (-2.5)) minimum describe "maximum" $ do - let scanInf = [31, 41, 59, 59, 59, 59, 97] :: [Double] - scanWin = [31, 41, 59, 59, 59, 58, 97] :: [Double] - testFunc testCase1 maximum scanInf scanWin + testFunc2 testCase1 (Just 7.0) maximum describe "range" $ do - let scanInf = [0, 10, 28, 33, 33, 33, 71] :: [Double] - scanWin = [0, 10, 28, 33, 33, 32, 44] :: [Double] - testFunc testCase1 range scanInf scanWin + testFunc2 testCase1 (Just (-2.5, 7.0)) range describe "sum" $ do let scanInf = [1, 2, 3, 4, 5, 12] :: [Double] scanWin = [1, 2, 3, 3, 3, 9] :: [Double]