Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add another implementation of Min #1612

Merged
merged 2 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 63 additions & 24 deletions benchmark/Streamly/Benchmark/Data/Fold/Window.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,33 @@ sourceDescendingInt :: (Monad m, Stream.IsStream t) => Int -> Int -> t m Int
sourceDescendingInt = sourceDescending

{-# INLINE benchWith #-}
benchWith :: (Num a, NFData a) =>
(Int -> a -> SerialT IO a) -> Int -> String -> Fold IO a a -> Benchmark
benchWith :: (Num a, NFData b) =>
(Int -> a -> SerialT IO a) -> Int -> String -> Fold IO a b -> Benchmark
benchWith src len name f =
bench name
$ nfIO
$ randomRIO (1, 1 :: Int) >>= Stream.fold f . src len . fromIntegral

{-# INLINE benchWithFold #-}
benchWithFold :: Int -> String -> Fold IO Double Double -> Benchmark
benchWithFold :: NFData a => Int -> String -> Fold IO Double a -> Benchmark
benchWithFold = benchWith source

{-# INLINE benchWithFoldInt #-}
benchWithFoldInt :: Int -> String -> Fold IO Int Int -> Benchmark
benchWithFoldInt = benchWith source

{-# INLINE benchScanWith #-}
benchScanWith :: Num a =>
(Int -> a -> SerialT IO a) -> Int -> String -> Fold IO a b -> Benchmark
benchScanWith src len name f =
bench name
$ nfIO
$ randomRIO (1, 1 :: Int)
>>= Stream.drain . Stream.postscan f . src len . fromIntegral

{-# INLINE benchWithPostscan #-}
benchWithPostscan :: Int -> String -> Fold IO Double Double -> Benchmark
benchWithPostscan len name f =
bench name $ nfIO $ randomRIO (1, 1) >>=
Stream.drain . Stream.postscan f . source len
benchWithPostscan :: Int -> String -> Fold IO Double a -> Benchmark
benchWithPostscan = benchScanWith source

{-# INLINE numElements #-}
numElements :: Int
Expand All @@ -63,23 +70,29 @@ main =
[ bgroup
"fold"
[ benchWithFold numElements "minimum (window size 100)"
(Ring.slidingWindow 100 Window.minimum)
(Window.minimum 100)
, benchWithFold numElements "minimum (window size 1000)"
(Ring.slidingWindow 1000 Window.minimum)
(Window.minimum 1000)
, benchWith sourceDescendingInt numElements
"minimum descending (window size 1000)"
(Ring.slidingWindow 1000 Window.minimum)
(Window.minimum 1000)

, benchWithFold numElements "maximum (window size 100)"
(Ring.slidingWindow 100 Window.maximum)
(Window.maximum 100)
, benchWithFold numElements "maximum (window size 1000)"
(Ring.slidingWindow 1000 Window.maximum)
(Window.maximum 1000)
, benchWith sourceDescendingInt numElements
"maximum descending (window size 1000)"
(Ring.slidingWindow 1000 Window.maximum)
(Window.maximum 1000)

, benchWithFold numElements "range (window size 100)"
(Ring.slidingWindow 100 Window.range)
(Window.range 100)
, benchWithFold numElements "range (window size 1000)"
(Ring.slidingWindow 1000 Window.range)
(Window.range 1000)
, benchWith sourceDescendingInt numElements
"range descending (window size 1000)"
(Window.range 1000)

, benchWithFoldInt numElements "sumInt (window size 100)"
(Ring.slidingWindow 100 Window.sumInt)
, benchWithFoldInt numElements "sum for Int (window size 100)"
Expand Down Expand Up @@ -110,25 +123,51 @@ main =
]
, bgroup
"scan"
[ benchWithPostscan numElements "minimum (window size 100)"
(Ring.slidingWindow 100 Window.minimum)
[ benchWithPostscan numElements "minimum (window size 10)"
(Window.minimum 10)
-- Below window size 30 the linear search based impl performs better
-- than the dequeue based implementation.
, benchWithPostscan numElements "minimum (window size 30)"
(Window.minimum 30)
, benchWithPostscan numElements "minimum (window size 1000)"
(Ring.slidingWindow 1000 Window.minimum)
, benchWithPostscan numElements "maximum (window size 100)"
(Ring.slidingWindow 100 Window.maximum)
(Window.minimum 1000)
, benchScanWith sourceDescendingInt numElements
"minimum descending (window size 1000)"
(Window.minimum 1000)

, benchWithPostscan numElements "maximum (window size 10)"
(Window.maximum 10)
, benchWithPostscan numElements "maximum (window size 30)"
(Window.maximum 30)
, benchWithPostscan numElements "maximum (window size 1000)"
(Ring.slidingWindow 1000 Window.maximum)
, benchWithPostscan numElements "range (window size 100)"
(Ring.slidingWindow 100 Window.range)
(Window.maximum 1000)
, benchScanWith sourceDescendingInt numElements
"maximum descending (window size 1000)"
(Window.maximum 1000)

, benchWithPostscan numElements "range (window size 10)"
(Window.range 10)
, benchWithPostscan numElements "range (window size 30)"
(Window.range 30)
, benchWithPostscan numElements "range (window size 1000)"
(Ring.slidingWindow 1000 Window.range)
(Window.range 1000)
, benchScanWith sourceDescendingInt numElements
"range descending (window size 1000)"
(Window.range 1000)

, benchWithPostscan numElements "sum (window size 100)"
(Ring.slidingWindow 100 Window.sum)
, benchWithPostscan numElements "sum (window size 1000)"
(Ring.slidingWindow 1000 Window.sum)

, benchWithPostscan numElements "mean (window size 100)"
(Ring.slidingWindow 100 Window.mean)
, benchWithPostscan numElements "mean (window size 1000)"
(Ring.slidingWindow 1000 Window.mean)

, benchWithPostscan numElements "powerSum 2 (window size 100)"
(Ring.slidingWindow 100 (Window.powerSum 2))
, benchWithPostscan numElements "powerSum 2 (window size 1000)"
(Ring.slidingWindow 1000 (Window.powerSum 2))
]
]
184 changes: 74 additions & 110 deletions core/src/Streamly/Internal/Data/Fold/Window.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,18 @@ module Streamly.Internal.Data.Fold.Window
)
where

import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Bifunctor(bimap)
import Data.Function ((&))
import Data.Maybe (fromMaybe)
import Foreign (Storable(..))

import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..))
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.Tuple.Strict
(Tuple'(..), Tuple3Fused' (Tuple3Fused'))

import Prelude hiding (length, sum, minimum, maximum)

import qualified Deque.Strict as DQ
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Internal.Data.Ring.Foreign as Ring

-- $setup
-- >>> import Data.Bifunctor(bimap)
Expand Down Expand Up @@ -213,111 +214,90 @@ powerSumFrac p = lmap (** p) sum
-- Location
-------------------------------------------------------------------------------

-- Theoretically, we can approximate minimum in a rolling window by using a
-- 'powerMean' with sufficiently large negative power.
--
-- XXX If we need to know the minimum in the window only once in a while then
-- we can use linear search when it is extracted and not pay the cost all the
-- time.
-- XXX Remove MonadIO constraint

-- | Determine the maximum and minimum in a rolling window.
--
-- | The minimum element in a rolling window.
-- If you want to compute the range of the entire stream @Fold.teeWith (,)
-- Fold.maximum Fold.minimum@ would be much faster.
--
-- If you want to compute the minimum of the entire stream Fold.minimum from
-- streamly package would be much faster.
-- /Space/: \(\mathcal{O}(n)\) where @n@ is the window size.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE minimum #-}
minimum :: (Monad m, Ord a) => Fold m (a, Maybe a) a
minimum = Fold step initial extract
{-# INLINE range #-}
range :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe (a, a))
range n = Fold step initial extract

where

initial = return $ Partial $ Tuple3' (0 :: Int) (0 :: Int)
(mempty :: DQ.Deque (Int, a))

step (Tuple3' i w q) (a, ma) =
case ma of
Nothing ->
return $ Partial $ Tuple3' (i + 1) (w + 1)
(headCheck i q (w + 1) & dqloop (i, a))
Just _ ->
return $ Partial $ Tuple3' (i + 1) w
(headCheck i q w & dqloop (i,a))

{-# INLINE headCheck #-}
headCheck i q w =
case DQ.uncons q of
Nothing -> q
Just (ia', q') ->
if fst ia' <= i - w
then q'
else q

dqloop ia q =
case DQ.unsnoc q of
Nothing -> DQ.snoc ia q
-- XXX This can be improved for the case of `=`
Just (ia', q') ->
if snd ia <= snd ia'
then dqloop ia q'
else DQ.snoc ia q

extract (Tuple3' _ _ q) = return $ snd
$ fromMaybe (0, error "min: Empty stream")
$ DQ.head q

-- Theoretically, we can approximate maximum in a rolling window by using a
-- 'powerMean' with sufficiently large positive power.
-- XXX Use Ring unfold and then fold for composing maximum and minimum to
-- get the range.

initial =
if n <= 0
then error "range: window size must be > 0"
else
let f (a, b) = Partial $ Tuple3Fused' a b (0 :: Int)
in fmap f $ liftIO $ Ring.new n

step (Tuple3Fused' rb rh i) a = do
rh1 <- liftIO $ Ring.unsafeInsert rb rh a
return $ Partial $ Tuple3Fused' rb rh1 (i + 1)

-- XXX We need better Ring array APIs so that we can unfold the ring to a
-- stream and fold the stream using a fold of our choice.
--
-- We could just scan the stream to get a stream of ring buffers and then
-- map required folds over those, but we need to be careful that all those
-- rings refer to the same mutable ring, therefore, downstream needs to
-- process those strictly before it can change.
foldFunc i
| i < n = Ring.unsafeFoldRingM
| otherwise = Ring.unsafeFoldRingFullM

extract (Tuple3Fused' rb rh i) =
if i == 0
then return Nothing
else do
x <- liftIO $ peek rh
let accum (mn, mx) a = return (min mn a, max mx a)
fmap Just $ foldFunc i rh accum (x, x) rb

-- | Find the minimum element in a rolling window.
--
-- This implementation traverses the entire window buffer to compute the
-- minimum whenever we demand it. It performs better than the dequeue based
-- implementation in @streamly-statistics@ package when any of the following
-- holds:
--
-- * window size is small (< 30)
-- * we are using this as a fold instead of a scan.
--
-- For other cases the implementation in the @streamly-statistics@ package
-- performs better.
--
-- If you want to compute the minimum of the entire stream
-- 'Streamly.Data.Fold.minimum' is much faster.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE minimum #-}
minimum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a)
minimum n = fmap (fmap fst) $ range n

-- | The maximum element in a rolling window.
--
-- If you want to compute the maximum of the entire stream Fold.maximum from
-- streamly package would be much faster.
-- See the performance related comments in 'minimum'.
--
-- If you want to compute the maximum of the entire stream 'Fold.maximum' would
-- be much faster.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE maximum #-}
maximum :: (Monad m, Ord a) => Fold m (a, Maybe a) a
maximum = Fold step initial extract

where

initial = return $ Partial $ Tuple3' (0 :: Int) (0 :: Int)
(mempty :: DQ.Deque (Int, a))

step (Tuple3' i w q) (a, ma) =
case ma of
Nothing ->
return $ Partial $ Tuple3' (i + 1) (w + 1)
(headCheck i q (w + 1) & dqloop (i, a))
Just _ ->
return $ Partial $ Tuple3' (i + 1) w
(headCheck i q w & dqloop (i,a))

{-# INLINE headCheck #-}
headCheck i q w =
case DQ.uncons q of
Nothing -> q
Just (ia', q') ->
if fst ia' <= i - w
then q'
else q

dqloop ia q =
case DQ.unsnoc q of
Nothing -> DQ.snoc ia q
-- XXX This can be improved for the case of `=`
Just (ia', q') ->
if snd ia >= snd ia'
then dqloop ia q'
else DQ.snoc ia q

extract (Tuple3' _ _ q) =
return
$ snd
$ fromMaybe (0, error "max: Empty stream")
$ DQ.head q
maximum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a)
maximum n = fmap (fmap snd) $ range n

-- | Arithmetic mean of elements in a sliding window:
--
Expand All @@ -335,19 +315,3 @@ maximum = Fold step initial extract
{-# INLINE mean #-}
mean :: forall m a. (Monad m, Fractional a) => Fold m (a, Maybe a) a
mean = Fold.teeWith (/) sum length


-- | The difference between the maximum and minimum elements of a rolling window.
--
-- >>> range = Fold.teeWith (-) maximum minimum
--
-- If you want to compute the range of the entire stream @Fold.teeWith (-)
-- Fold.maximum Fold.min@ from the streamly package would be much faster.
--
-- /Space/: \(\mathcal{O}(n)\) where @n@ is the window size.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE range #-}
range :: (Monad m, Num a, Ord a) => Fold m (a, Maybe a) a
range = Fold.teeWith (-) maximum minimum
1 change: 0 additions & 1 deletion core/streamly-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ library
, transformers-base >= 0.4 && < 0.5
, primitive >= 0.5.4 && < 0.8
, heaps >= 0.3 && < 0.5
, deque >= 0.4 && < 0.5
, hashable >= 1.3 && < 1.5
, unordered-containers >= 0.2 && < 0.3
if flag(use-unliftio)
Expand Down
Loading