-
Notifications
You must be signed in to change notification settings - Fork 57
Channels
A channel represents a stream of messages. It consists of two parts, a node, which propagates messages downstream, and a queue, which stores messages when there are no downstream nodes.
We can pass messages into the channel using (enqueue ch & messages)
. If no one is consuming messages from a channel, they will be placed in the queue. We can see the resulting channel using (view-graph & channels)
.
> (use 'lamina.core 'lamina.viz)
nil
> (def ch (channel))
#'ch
> (enqueue ch 1)
<< ... >>
> (view-graph ch)
If we enqueue another message, it will be placed behind the first. We can see this in action using (view-propagation ch message)
> (view-propagation ch 2)
We can also create a channel that already contains messages using (channel & messages)
.
> (def ch (channel 1 2))
#'ch
> ch
<== [1 2 ...]
> (view-graph ch)
Notice that the REPL representation of the channel prints out the contents of the queue.
To consume a message from a channel, we can use (read-channel ch)
, which returns an async-promise representing the next message passed into the channel. If there is already a message in the queue, the async-promise will be realized immediately:
> (read-channel ch)
<< 1 >>
> (view-graph ch)
If the channel is empty when read-channel
is called, the async-promise will remain unrealized until a message is enqueued.
> (def ch (channel))
#'ch
> (def msg (read-channel ch))
#'msg
> msg
<< ... >>
> (enqueue ch 1)
:lamina/realized
> ch
<== [...]
> msg
<< 1 >>
Notice that since there’s already a consumer for the message, the channel’s queue remains empty. However, this only applies to the first message. If we enqueue
a message without calling read-channel
again, the second message will be placed in the queue.
> (enqueue ch 2)
<< ... >>
> ch
<== [2 ...]
If we call read-channel
multiple times before a message is enqueued, the async-promises will consume multiple messages.
> (def ch (channel))
#'user
> (def a (read-channel ch))
#'a
> (def b (read-channel ch))
#'b
> (enqueue ch 1 2)
(:lamina/realized :lamina/realized)
> a
<< 1 >>
> b
<< 2 >>
A single channel can be useful on its own, when used as an asynchronous variant of a LinkedBlockingQueue, but most of the time we don’t want to handle each message individually.
Consider Clojure’s map
operator: rather than explicitly iterating over each element in the sequence, it simply defines a transform that should be applied to all elements. Lamina provides a similar operator for channels, map*
:
> (def ch (channel))
#'ch
> (map* inc ch)
<== [...]
> (view-propagation ch 42)
Calling map*
returns a new channel. It will not modify the message values in the original channel, but it will change how messages are propagated from the original channel.
Notice that there are two nodes in the above diagram; the original channel is on the left, and the new channel with the inc
operator is on the right. The new channel is downstream of the original channel, and will receive all messages passed into it. The original channel’s queue will no longer be used, so it is not displayed.
Messages will be propagated to downstream nodes synchronously – enqueue
will not return until it has walked the entire downstream graph.
If our original channel already has messages in its queue, the queue will be drained and its messages sent through the downstream node.
> (def ch (channel 1 2 3))
#'ch
> (view-graph ch)
> (map* inc ch)
<== [2 3 4...]
> (view-graph ch)
Multiple operators can be applied to the same channel:
> (def ch (channel))
#'ch
> (map* inc ch)
<== [...]
> (map* dec ch)
<== [...]
> (view-propagation ch 1)
Notice that all downstream nodes receive the same message.
However, consider if the above code is applied to a channel that already contains messages:
> (def ch (channel 1 2))
#'ch
> (map* inc ch)
<== [2 3 ...]
> (map* dec ch)
<== [...]
> (view-propagation ch 3)
Since the first operator consumes all the messages in the queue, the second operator will only receive subsequent messages. Luckily, we can fork
channels:
> (def ch (channel 1 2))
#'ch
> (fork ch)
<== [1 2 ...]
> (view-graph ch)
Forking a channel creates a parallel channel with the same initial queue and that will receive the same messages. This does not affect the original channel in any way.
> (def ch (channel 1 2))
#'ch
> (map* inc (fork ch))
<== [2 3 4 ...]
> (map* dec (fork ch))
<== [0 1 2 ...]
> (view-propagation ch 3)
However, perhaps after forking the two operators we no longer wish messages to accumulate in the original channel’s queue. In this case, we can ground
the channel.
> (ground ch)
true
> (view-graph ch)
This creates a permanent downstream node that will consume and discard all messages. Grounding is idempotent.
Channels have equivalents for many of Clojure’s sequence operators, including map*
, filter*
, reductions*
, take*
, take-while*
, drop*
, drop-while*
, partition*
, partition-all*
, and mapcat*
. These operators, of course, can be composed:
> (def ch (channel))
#'ch
> (->> ch (map* inc) (filter* even?)))
<== [...]
> (view-propagation ch 1)
While map*
and filter*
don’t care what order messages come in, or whether messages are enqueued on many threads at once, the output of an operator like reductions*
depends on the order of messages, and therefore can only consume messages one-at-a-time. As a result, under high enough volume of messages the queue may be non-empty, and is therefore still displayed in the visualization:
> (def ch (channel 1 3 2))
#'ch
> (reductions* max ch)
<== [1 3 3 ...]
> (view-graph ch)
Operators such as take*
and take-while*
will consume a limited number of messages before detaching and closing the original channel.
> (def ch (channel 1 2))
#'ch
> (def next-four (take* 4 ch))
#'next-four
> (view-graph ch)
If we want to separate the first n
messages from a channel from the rest, we can compose fork
, take*
, and drop*
:
(defn split-channel [n ch]
[(take* n (fork ch))
(drop* n ch)])
So far, we’ve seen ways to receive messages one-at-a-time, and ways to transform entire streams of messages. As may be expected, there is a way to receive entire streams of messages as well: (receive-all ch callback)
. This will consume all messages from a channel, and pass it into a callback. This gives us an easy way to, for instance, print out all messages once they’ve passed through a chain of operators:
> (def ch (channel))
#'ch
> (receive-all
(->> ch (map* inc) (filter* even?))
println)
true
> (view-graph ch)
> (enqueue ch 1)
2
nil
> (enqueue ch 2)
:lamina/filtered
Notice that the return value of the first enqueue
call is nil
. This is because println
returns nil
, and enqueue
will always return the value from the first callback that receives a message. This also works with read-channel
, as long as there are callbacks registered for the async-promise.
You may have noticed that a reduce*
operator was not mentioned above. This is because while map
, filter
, and company can operate on infinite sequences, reduce
requires that a sequence is finite. A reduce*
operator requires that the stream of messages represented by a channel can have an end.
Luckily, there is such a mechanism: (close ch)
. A channel which is closed cannot have any further messages enqueued into it.
> (def ch (channel 1 2 3))
#'ch
> (close ch)
true
> (enqueue ch 4)
:lamina/closed!
> ch
<== [1 2 3]
> (view-graph ch)
Notice that the REPL representation of the channel no longer has the trailing ellipses (“…”), indicating that there will be no further messages. However, messages can still be consumed from this channel. Once the queue of a closed channel is empty, it is said to be drained.
> (receive-all ch println)
1
2
3
nil
> ch
<== []
> (view-graph ch)
If we call read-channel
on a drained channel, or if the channel is closed before any message is enqueued, the async-promise will be realized as an error:
> (def ch (channel))
#'ch
> (def r (read-channel ch))
#'r
> r
<< ... >>
> (close ch)
true
> r
<< ERROR: :lamina/drained! >>
To register a callback for when a channel is closed or drained, use (on-closed ch callback)
or (on-drained ch callback)
, which both take a callback with zero arguments.
reduce*
returns an async-promise which will not be realized until the source channel is drained. To create a channel which is already closed, we use (closed-channel & messages)
.
> (reduce* + (closed-channel 1 2 3))
<< 6 >>
We may also use reduce*
to create an async-promise representing all messages that have passed through a channel:
> (reduce* conj [] (closed-channel 4 5 6))
<< [4 5 6] >>
last*
is similar to reduce*
, returning an async-promise that will be realized as the last message from the channel before it was drained. While there is no first*
operator, read-channel
fills a similar role.
When we map*
a function over a channel, we’re effectively saying that the original channel exists to feed messages into the downstream channel.
> (def ch (channel))
#'ch
> (def ch* (map* inc ch))
#'ch*
So what happens if we close the downstream channel? Without the downstream channel to consume the messages, they’ll just accumulate in the queue, which is almost certainly not what we want. If the upstream channel no longer has anything downstream, it should close.
And this is exactly what happens.
> (close ch*)
true
> (closed? ch*)
true
> (closed? ch)
true
Note that this is only true if there are no remaining downstream channels.
> (def ch (channel))
#'ch
> (def a (->> ch (map* inc) (map* dec)))
#'a
> (def b (->> ch (map* dec) (map* inc)))
#'b
> (view-graph ch)
If we close one of the downstream paths, it will only backpropagate along channels that have no remaining downstream channels. When it reaches the original channel, which has another downstream path, it stops.
> (close b)
true
> (view-graph ch)
However, if we close the original channel, it will close all downstream channels, since they longer have any active message source.
> (def ch (channel))
#'ch
> (def a (->> ch (map* inc) (map* dec)))
#'a
> (def b (->> ch (map* dec) (map* inc)))
#'b
> (close ch)
true
> (closed? a)
true
> (closed? b)
true
Sometimes we want to see what’s passing through a channel, but not keep it open if no one else is paying attention. To do this, we can tap
a channel.
> (def ch (channel 1 2 3))
#'ch
> (tap ch)
<== [1 2 3 ...]
> (map* inc ch)
<== [2 3 4 ...]
> (view-graph ch)
Like fork
, our tapped channel contains a full copy of the queue. Notice that in the graph view, the tapped edge is dotted.
What happens if our map*
function throws an exception?
> (def ch (channel))
#'ch
> (defn reciprocal [x] (/ 1 x))
#'reciprocal
r> (def ch* (map* reciprocal ch))
#'ch*
> (view-propagation ch 2)
So far, so good. But if we enqueue a 0 into the channel…
> (enqueue ch 0)
lamina/error!
> ch*
<== | ERROR: java.lang.ArithmeticException: Divide by zero |
> ch
<== []
> (view-graph ch ch*)
The downstream channel has been put into an error state, and the upstream channel has been closed. If we call read-channel
on a channel in an error state, or it’s put into an error state before a message is enqueued, the async-promise will be realized as the channel’s exception.
> (read-channel ch*)
<< ERROR: java.lang.ArithmeticException: Divide by zero >>
By composing together channel operators, we’re implicitly creating a topology describing the message flow. However, these operators can only be used to extend existing topologies. If we want to connect two existing channels, we can’t accomplish this with map*
or any of the other operators.
We could forward messages using receive-all
and enqueue
, like so:
> (def a (channel))
#'a
> (def b (channel))
#'b
> (receive-all a #(enqueue b %))
true
But this only creates an implicit relationship between a
and b
. If we close b
, a
will remain unaffected, because we haven’t encoded the relationship in the topology. To explicitly connect two channels, we can use siphon
and join
.
Using (join src dst)
forwards all messages from src
into dst
. It also creates a bidirectional relationship between the two channels; if either channel is closed, so is the other.
> (def a (channel 1 2 3))
#'a
> (def b (channel))
#'b
> (join a b)
<== [...]
> (view-graph a)
Using (siphon src dst)
also forwards the messages, but it creates a weaker relationship between the channels. siphon
assumes that there will be multiple source channels that feed into the destination, so the source channel closing shouldn’t affect the destination. However, the destination closing will always close the source.
The default relationship between nodes in the topology is a join
. If siphon
is used, the edge will be specially labelled.
> (def a (channel 1 2 3))
#'a
> (def b (channel))
#'b
> (siphon a b)
<== [...]
> (view-graph a)
Both siphon
and join
are variadic, and can take more than two channels. If (join a b c)
is called, then a
will be joined to b
, and b
will be joined to c
. Both siphon
and join
return the last channel passed in.
> (def a (channel 1 2 3))
#'a
> (def b (channel))
#'b
> (def c (channel))
#'c
> (join a b c)
<== [...]
> (view-graph a)
This can be used to cancel connections created by siphon
or join
, using an intermediate channel:
(defn cancellable-join [a b]
(let [bridge (channel)]
(join a bridge b)
#(close bridge)))
This returns a function which will close the intermediate channel, thus ending the transitive connection between a
and b
.
On the surface, channels seem very similar to Clojure’s seq abstraction. However, channels have certain advantages:
- easier to reason about complex data flows
- easier to deal with temporal data flows (operators like
partition-every
andsample-every
) - avoids memory leaks due to holding onto the head of a sequence
Channels also don’t require dedicated threads, which can be useful in larger-scale applications.
However, Clojure is designed to work with seqs, so there needs to be easy interop between the two representations. Lamina provides several functions, including lazy-seq->channel
, channel->lazy-seq
, and channel->seq
. The first two do exactly what you’d expect; converting one type to the other until the channel is closed or the seq is exhausted. Unlike channel->lazy-seq
, however, which will consume and return all messages that flow through the channel, channel->seq
will only consume and return the messages currently in the channel’s queue.
To learn more about channels, read how they can be used to model bidirectional communication or instrument and monitor code.