-
Notifications
You must be signed in to change notification settings - Fork 0
blocks
Each block is briefly detailed below, along with the rules that define each block. To make a block in streamtools, double click anywhere on the page and type the name of the block as they appear below. For programmatic access, see the API docs.
Blocks rely on some general concepts:
-
gojee path: The path rules all use gojee syntax to specify which value you'd like to use in the block. Paths always start with the period, which indicates the top-level of the message. So if you want to refer to the whole message use
.
. If you want to refer to a specific value then they follow the first period. So if you have a message that looks like { "user":{ "username":"bob_the_user" "id": 1234 } } and you'd like to refer to the username then the gojee path would be.user.username
. -
gojee expression: gojee also allows for expressions. So we can write expressions like
.id > 1230
, which are especially useful in the filter and map blocks. -
duration string: We use Go's duration strings to specify time periods. They are a number followed by a unit and are pretty intuitive. So
10ms
is 10 milliseconds;5h
is 5 hours and so on.
These blocks emit messages on their own.
-
ticker. This block emits the time regularly. The time between emissions is specified by the
Interval
.- Rules:
-
Interval
: duration string (1s
)
-
- Rules:
These blocks are useful for shaping the stream in one way or another
-
join. This block joins two streams together. It waits until it has seen a message on both its inputs, then emits the joined message.
-
map. This block maps inbound data onto outbound data. The
Map
rule needs to be valid JSON, where each key is a string and each value is a valid gojee expression.- Rules:
-
Map
: gojee expression -
Additive
: (True
)
-
- Rules:
-
mask. This block allows you to select a subset of the inbound message. To create a mask, you need to build up an empty JSON that looks like the message you'd like out. So, for example, if your in bound message looks like
{ "A":"foo", "B":"bar" }
and you just want
B
to come out of the mask block then make theMask
rule:{ "B":{} }
You can supply any valid JSON to the Mask block. If you specify an empty JSON
{}
then all values will pass.- Rules:
-
Mask
: mask JSON
-
- Rules:
-
filter. The filter block applies the provided rule to incoming messages. If the rule evaluates to
true
then the messages is emitted. If the rule evaluates tofalse
the messages is discarded. TheFilter
rule can be any valid gojee expression. So, for example, if the inbound message looks like{ "temperature": 43 }
and you only want to emit messages when the
temperature
value is above 50, then the filter rule would be.temperature > 50
- Rules:
-
Filter
:gojee expression (. != null
)
-
- Rules:
-
pack. The pack block groups blocks together based on a common value. This is almost like an online "group-by" operation, but care needs to be taken in the stream setting as we have to decide when to emit the "packed" message. Imagine you have messages like
{ "option": "a" }
where
option
can bea
,b
orc
. We would like to make a stream that packs together all thea
s together into a single message, and similarly for theb
s andc
s. We only emit the packed message for a particular value ofoption
when we haven't seen any messages with that value for 20 seconds, at which point we emit the bunch all at once. Here we would specify20s
for theEmitAfter
rule and.option
as thePath
rule. If we saw threea
messages in that 20s the output of the pack block looks like{ "pack":[{"option": "a"},{"option": "a"},{"option": "a"}] }
Our main use case for this at the NYT is to create per-reader reading sessions. So we set the
Path
to our user-id and we emit after 20 minutes of not hearing anything from that reader's user-id. Every page-view our readers generate get packed into a per-reader message, generating a stream of reading sessions.- Rules:
-
EmitAfter
: duration string -
Path
: gojee path
-
- Rules:
-
unpack. The unpack block takes an array of objects and emits each object as a separate message. See the citibike example, where we unpack a big array of citibike stations into individual messages we can filter.
- Rules:
-
Path
:gojee path
-
- Rules:
-
sync. The sync block takes an disordered stream and creates a properly timed, ordered stream at the expense of introducing a lag. To explain this block imagine you have a stream that looks like
{"val":"a", "time":23 } ... {"val":"b", "time":14} ... {"val":"c", "time":10}
Ideally you'd like the stream to be ordered by the timestamp in the message, so the
c
message comes first, theb
message comes second and thea
message comes third. In addition, you'd like the time between the messages to respect the timestamp inside the message.The sync block achieves this by storing the stream for a fixed amount time (the
Lag
) and then emitting at the time inside the inbound messages plus the lag. This means we have to wait for a while to get our messages but when we do get them, they're in a stream whose dynamics reflect the timestamp inside the message.This can be very helpful if the plumbing between your sensor and streamtools introduces dynamics that would confuse your analysis. For example, it's quite common for a system to wait until it has a collection of messages from its sensor before it makes an HTTP request to post those messages to the next stage. This means that by the time those messages make it to a streamtools pattern, they're artifcially grouped together into little pulses. You can use the sync block to recover the original stream generated by the sensor.
- Rules:
-
Path
: gojee path. This must point at a UNIX timestamp in seconds, -
Lag
: duration string
-
- Rules:
-
gethttp
- Rules:
-
Path
: gojee path
-
- Rules:
These blocks hook into another system and collect messages to be emitted into streamtools.
- fromhttpstream
- Rules:
-
Endpoint
: -
Auth
:
-
- Rules:
- fromnsq
- Rules:
-
ReadTopic
: -
LookupdAddr
: -
ReadChannel
: -
MaxInFlight
: (0
)
-
- Rules:
- frompost
- fromsqs
- Rules:
-
SignatureVersion
: (4
) -
AccessKey
: -
MaxNumberOfMessages
: (10
) -
APIVersion
: (2012-11-05
) -
SQSEndpoint
: -
WaitTimeSeconds
: (0
) -
AccessSecret
:
-
- Rules:
- fromudp
- Rules:
-
ConnectionString
:
-
- Rules:
- fromwebsocket
- Rules:
-
url
:
-
- Rules:
These blocks send data to external systems.
- toelasticsearch
- Rules:
-
Index
: -
Host
: -
IndexType
: -
Port
:
-
- Rules:
- tofile
- Rules:
-
Filename
:
-
- Rules:
- tolog
- tonsq
- Rules:
-
Topic
: -
NsqdTCPAddrs
:
-
- Rules:
- tonsqmulti
- Rules:
-
Topic
: -
Interval
: duration string (1s
) -
NsqdTCPAddrs
: -
MaxBatch
: (100
)
-
- Rules:
- towebsocket
- Rules:
-
port
:
-
- Rules:
These blocks maintain a state, storing something about the stream of data
- histogram
- Rules:
-
Path
: gojee path -
Window
: duration string (0
)
-
- Rules:
- count
- Rules:
-
Window
: duration string (0
)
-
- Rules:
- timeseries
- Rules:
-
Path
: gojee path -
NumSamples
: (0
)
-
- Rules:
- set
- Rules:
-
Path
: gojee path
-
- Rules:
- movingaverage
- Rules:
-
Path
: gojee path -
Window
: duration string
-
- Rules:
These blocks emit random numbers when polled.
- zipf
- Rules:
-
s
: (2
) -
v
: (5
) -
N
: (99
)
-
- Rules:
- gaussian
- Rules:
-
StdDev
: (1
) -
Mean
: (0
)
-
- Rules:
- poisson
- Rules:
-
Rate
: (1
)
-
- Rules: