-- | Result of taking a single step in a stream data Step s a where Yield :: a -> s -> Step s a Stop :: Step s a -- | Representation of a loop, step and state. Step returns the next value and -- the next state. We iterate on the state to keep producing more values. data Stream m a = forall s. Stream (s -> m (Step s a)) s
nil :: Monad m => Stream m a nil = Stream (const (return Stop)) () -- | A single value fromPure :: Applicative m => a -> Stream m a fromPure x = Stream step True where step True = return $ Yield x False step False = return $ Stop fromList :: Applicative m => [a] -> Stream m a fromList = Stream step where step _ (x:xs) = pure $ Yield x xs step _ [] = pure Stop
foldrM :: Monad m => (a -> m b -> m b) -> m b -> Stream m a -> m b foldrM f z (Stream step state) = go state where go st = do r <- step st case r of Yield x s -> f x (go s) Stop -> z
In the above code the state is explicit and being threaded around in a recursive loop. The state from the previous iteration is to be consumed by the next iteration to generate the next value. This mandates a closed recursive loop. This is straightforward translation of imperative loops to functional paradigm.
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b mapM f (Stream step state) = Stream step' state where step' st = do r <- step st case r of Yield x s -> f x >>= \a -> return $ Yield a s Stop -> return Stop
Some operations like "drop" (uniq, intersperse, deleteBy, insertBy) may have to introduce a branch in the code by wrapping the state in another layer:
drop :: Monad m => Int -> Stream m a -> Stream m a drop n (Stream step state) = Stream step' (state, Just n) where step' (st, Just i) | i > 0 = do r <- step st return $ case r of Yield _ s -> step' (s, Just (i - 1)) Stop -> Stop | otherwise = step' (st, Nothing) step' (st, Nothing) = do r <- step st return $ case r of Yield x s -> Yield x (s, Nothing) Stop -> Stop
Note that the branch is always checked for every element in the stream. It is still quite efficient because the code fuses.
We should use rewrite rules to fuse such consecutive operations together as they introduce branching which could be costly.
Composing two independent loops together is not scalable, we need to create a wrapper state, wrapping the state of the old stream:
cons :: Monad m => m a -> Stream m a -> Stream m a cons m (Stream step state) = Stream step1 Nothing where step1 Nothing = m >>= \x -> return $ Yield x (Just state) step1 (Just st) = do r <- step st return $ case r of Yield a s -> Yield a (Just s) Stop -> Stop
As we keep consing we keep creating more layers wrapping the state in
Maybe
. These layers need to be traversed every time we run the step
function of the composed stream. In the above example step1
needs
to always branch on Nothing/Just
to reach to the wrapped stream. More
layers we add the more branching needs to occur to generate an element
of the stream. If we have n
"cons" operations we need to go through:
- 1 branch at the top level to generate the first element
- 2 branches to generate the next element
- 3 branches to generate the third element
- n branches to generate the nth element
The total number of branches that we need to take is: 1 + 2 + 3 ... n
i.e.
n * (n + 1)/2 = O(n^2)
where n
is the number of cons operations.
Conceptually, to avoid the introduction of a branch we could use a mutable step function and state to modify step1/state after yielding the first element. The next time we call it, it would be a different function that i.e. "step" and its state "st". However, that would introduce an indirection and mutability. There is a better way to do it with immutability i.e. CPS.
In the direct representation we represented a stream using a step and a state. This model requires us to iterate the step on the state creating an explicit loop. The state machine implemented by the step function is incrementally modified by adding new layers in the state which introduce branches to be traversed every time we go through the loop.
newtype Stream m a = Stream { runStream :: forall r. (a -> Stream m a -> m r) -> m r -> m r }
Here we represent the stream as a single function. Instead, the function is provided with functions to be called next.
Notice, the function does not have to be called again and again to iterate on a state for generating new values. Therefore, there is no closed recursion. There is no explicit loop.
We (the current runStream
function) can choose which one of the supplied
functions (continuations) to call next. If we decide to terminate
the stream execution we call the "stop" continuation. If we decide to
generate a value we call the "yield" continuation.
A stream execution is composed of a progression of such continuations until one of those decides to call the stop continuation. It is a composition of functions, a tree of functions composed together.
The yield continuation is provided with the generated value "a" and the "Stream m a", the function representing the rest of the stream. Notice that the stream function is done with one shot execution, there is no closed loop or recursion, the future execution of the stream is the responsibility of the continuation.
The continuation consumes the element "a" and then proceeds to call "Stream m a" using a "yield" and "stop" continuation. By modifying the "yield" and "stop" continuations that it passes to call "Stream m a", it can control the execution of the stream.
nil :: Stream m a nil = Stream $ \_ stp -> stp fromPure :: a -> Stream m a fromPure a = Stream $ \yield _ -> yield a nil cons :: a -> Stream m a -> Stream m a cons a r = Stream $ \yld _ -> yld a r fromList :: [a] -> Stream m a fromList = Prelude.foldr cons nil
foldrM :: (a -> m b -> m b) -> m b -> Stream m a -> m b foldrM step acc m = go m where go m1 = let stop = acc yieldk a r = step a (go r) in runStream yieldk stop m1
Note that unlike in direct style fold, there is no generator state being threaded around here instead the function yielded by the continuation is being executed.
map :: (a -> b) -> Stream m a -> Stream m b map f = go where go m1 = Stream $ \yld stp -> let yieldk a r = yld (f a) (go r) in runStream yieldk stp m1
In direct style we had to examine the constructors to determine the current state and execute code based on that. Here, we have to make the next function call at each step. The former is much more efficient because the compiler can optimize well to remove the constructors and generate code with direct branches not involving the constructors. On the other hand placing a function call is costlier. Though in some cases it can be avoided by using foldr/build fusion but not always.
drop :: Int -> Stream m a -> Stream m a drop n = Stream $ go n where go n1 m1 = Stream $ \yld stp -> let yieldk _ r = runStream yld stp $ go (n1 - 1) r in if n1 <= 0 then runStream yld stp m1 else runStream yieldk stp m1
This shows a crucial difference between direct style and CPS. In direct style we have to always check for the branch to determine if we are dropping the elements or consuming. In this case we can see that once we have taken the "then" path we never have to check the condition "n1 <= 0", it is out of the way.
Basically CPS provides us the ability to take an exit path and forget about the past code forever. So if we have a million "drop" composed together CPS would have no problem, after the final drop there won't be any branches in the way whereas direct style would introduce a million branches to be traversed forever.
cons :: a -> Stream m a -> Stream m a cons a r = Stream $ \yld _ -> yld a r
Unlike in direct style representation, the performance of stream generation is independent of the number of "cons" operations. There is no quadratic complexity, we simply call the next continuation at each step.
Similarly appending streams is independent of number of appends.
serial :: Stream m a -> Stream m a -> Stream m a serial m1 m2 = go m1 where go m = Stream $ \yld stp -> let stop = runStream yld stp m2 yieldk a r = yld a (go r) in runStream yieldk stop m
In the direct style we have an explicit stream generator. A consumer
generates values using the generator and consumes them. In CPS the
consumer and generator are interleaved. The runStream
function is a
generator and the "yield" continuation is a consumer. The consumer then
calls the generator again and so on.
The direct style representation is like a structured loop with well defined exit points. Whereas the CPS representation can exit from anywhere. Therefore, exception handling and resource management in direct style is much simpler to implement.