Skip to content
nik hanselmann edited this page Mar 18, 2014 · 20 revisions

Each block is briefly detailed below, along with the rules that define each block. To make a block in streamtools, double click anywhere on the page and type the name of the block as they appear below. For programmatic access, see the API docs.

Blocks rely on some general concepts:

  • gojee path: The path rules all use gojee syntax to specify which value you'd like to use in the block. Paths always start with the period, which indicates the top-level of the message. So if you want to refer to the whole message use .. If you want to refer to a specific value then they follow the first period. So if you have a message that looks like

      {
          "user":{
              "username":"bob_the_user"
              "id": 1234
          }
      }
    

    and you'd like to refer to the username then the gojee path would be .user.username.

  • gojee expression: gojee also allows for expressions. So we can write expressions like .user.id > 1230, which are especially useful in the filter and map blocks.

  • duration string: We use Go's duration strings to specify time periods. They are a number followed by a unit and are pretty intuitive. So 10ms is 10 milliseconds; 5h is 5 hours and so on.

  • route: every block has a set of routes. Routes can either be inbound, query, or outbound routes. Inbound routes receive data from somewhere and send it to the block. Query routes are two-way: they accept an inbound query and return information back to the requester. Outbound routes send data from a block to a connection.

generator blocks

These blocks emit messages on their own.

  • ticker. This block emits the time regularly. The time between emissions is specified by the Interval.
    • Rules:
      • Interval: duration string (1s)

flow blocks

These blocks are useful for shaping the stream in one way or another

  • join. This block joins two streams together. It waits until it has seen a message on both its inputs, then emits the joined message.

  • map. This block maps inbound data onto outbound data. The Map rule needs to be valid JSON, where each key is a string and each value is a valid gojee expression.

    • Rules:
      • Map: gojee expression
      • Additive: (True)
  • mask. This block allows you to select a subset of the inbound message. To create a mask, you need to build up an empty JSON that looks like the message you'd like out. So, for example, if your in bound message looks like

      {
        "A":"foo",
        "B":"bar"
      }
    

    and you just want B to come out of the mask block then make the Mask rule:

      {
        "B":{}
      }
    

    You can supply any valid JSON to the Mask block. If you specify an empty JSON {} then all values will pass.

    • Rules:
      • Mask: mask JSON
  • filter. The filter block applies the provided rule to incoming messages. If the rule evaluates to true then the messages is emitted. If the rule evaluates to false the messages is discarded. The Filter rule can be any valid gojee expression. So, for example, if the inbound message looks like

      {
          "temperature": 43
      }
    

    and you only want to emit messages when the temperature value is above 50, then the filter rule would be

      .temperature > 50
    
    • Rules:
      • Filter:gojee expression (. != null)
  • pack. The pack block groups blocks together based on a common value. This is almost like an online "group-by" operation, but care needs to be taken in the stream setting as we have to decide when to emit the "packed" message. Imagine you have messages like

      {
           "option": "a"
      } 
    

    where option can be a, b or c. We would like to make a stream that packs together all the as together into a single message, and similarly for the bs and cs. We only emit the packed message for a particular value of option when we haven't seen any messages with that value for 20 seconds, at which point we emit the bunch all at once. Here we would specify 20s for the EmitAfter rule and .option as the Path rule. If we saw three a messages in that 20s the output of the pack block looks like

      {
           "pack":[{"option": "a"},{"option": "a"},{"option": "a"}]
      }
    

    Our main use case for this at the NYT is to create per-reader reading sessions. So we set the Path to our user-id and we emit after 20 minutes of not hearing anything from that reader's user-id. Every page-view our readers generate get packed into a per-reader message, generating a stream of reading sessions.

    • Rules:
      • EmitAfter: duration string
      • Path: gojee path
  • unpack. The unpack block takes an array of objects and emits each object as a separate message. See the citibike example, where we unpack a big array of citibike stations into individual messages we can filter.

  • sync. The sync block takes an disordered stream and creates a properly timed, ordered stream at the expense of introducing a lag. To explain this block imagine you have a stream that looks like

      {"val":"a", "time":23 } ... {"val":"b", "time":14} ... {"val":"c", "time":10}
    

    Ideally you'd like the stream to be ordered by the timestamp in the message, so the c message comes first, the b message comes second and the a message comes third. In addition, you'd like the time between the messages to respect the timestamp inside the message.

    The sync block achieves this by storing the stream for a fixed amount time (the Lag) and then emitting at the time inside the inbound messages plus the lag. This means we have to wait for a while to get our messages but when we do get them, they're in a stream whose dynamics reflect the timestamp inside the message.

    This can be very helpful if the plumbing between your sensor and streamtools introduces dynamics that would confuse your analysis. For example, it's quite common for a system to wait until it has a collection of messages from its sensor before it makes an HTTP request to post those messages to the next stage. This means that by the time those messages make it to a streamtools pattern, they're artifcially grouped together into little pulses. You can use the sync block to recover the original stream generated by the sensor.

    • Rules:
      • Path: gojee path. This must point at a UNIX epoch time in milliseconds,
      • Lag: duration string
  • gethttp. The getHTTP block makes an HTTP GET request to a URL you specify in the inbound message. It is necessary for the HTTP endpoint to serve JSON. This block forms the backbone for any sort of polling pattern.

    • Rules:
      • Path: gojee path to a fully formed URL.

source blocks

These blocks hook into another system and collect messages to be emitted into streamtools.

  • fromhttpstream. This block allows you to listen to a long-lived http stream. Each new JSON that appears on the stream is emitted into streamtools. Try using the 1.usa.gov endpoint, available at http://developer.usa.gov/1usagov.
    • Rules:
      • Endpoint: endpoint string
      • Auth: authorisation string
  • fromnsq. This block implements an NSQ reader. For more details on how to specify the rule for this block check out the NSQ docs.
    • Rules:
      • ReadTopic: topic to read from.
      • LookupdAddr: nsqlookupd addresss
      • ReadChannel: name of the channel
      • MaxInFlight: how many messages to take from the queue at a time. (0)
  • frompost. This block emits any message that is POSTed to its IN route. This block isn't strictly needed as you can POST JSON to any inbound route on any block. Having said that, sometimes it's a bit clearer to have a dedicated block that listens for data.
  • fromsqs. This block connects to an Amazon Simple Queueing System queue. Messages from SQS are XML; this block extracts the message string from this XML, which it assumes is newline separated JSON. Each JSON is emitted into streamtools as a separate message. See the SQS docs for more information about the rules of this block.
    • Rules:
      • SignatureVersion: the version number of the signature hash Amazon is expecting for this queue (4)
      • AccessKey: your access key
      • MaxNumberOfMessages: how many messages to pull off the queue at a time (10)
      • APIVersion: what version of the API are you using (2012-11-05)
      • SQSEndpoint: the endpoint (ARM) of the SQS queue you are reading
      • WaitTimeSeconds: how long to wait between polling (0)
      • AccessSecret: your access secret
  • fromudp. Listens for messages sent over UDP. Each message is emitted into streamtools.
    • Rules:
      • ConnectionString:
  • fromwebsocket. Connects to an existing websocket. Each message it hears from the websocket is emitted into streamtools.
    • Rules:
      • url: address of the websocket.

sink blocks

These blocks send data to external systems.

  • toelasticsearch. Send JSON to an elasticsearch instance.
    • Rules:
      • Index:
      • Host:
      • IndexType:
      • Port:
  • tofile. Writes a message as JSON to a file. Each message becomes a new line of JSON.
    • Rules:
      • Filename: file to write to
  • tolog. Send messages to the log. This is a quick way to look at the data in your stream.
  • tonsq. Send messages to an existing NSQ system.
    • Rules:
      • Topic: topic you will write to
      • NsqdTCPAddrs: address of the NSQ daemon.
  • tonsqmulti. Send messages to an NSQ system in batches. This is useful if you have a fast (>1KHz) stream of data you need to send to NSQ. This block gathers messages for Interval time and then sends. It emits immediately if the block gets more than MaxBatch messages.
    • Rules:
      • Topic: topic you will write to
      • Interval: duration string (1s)
      • NsqdTCPAddrs: address of the NSQ daemon.
      • MaxBatch: size of largest batch (100)
  • towebsocket. Send messages to a dedicated websocket. This should only be used if necessary - each block already produces a websocket and a long lived HTTP connection you can use. This block is useful if you want to serve on a new port.
    • Rules:
      • port: port of the websocket.

state blocks

These blocks maintain a state, storing something about the stream of data

  • histogram. Build a non-staionary histogram of the inbound messages. Currently this only works with discrete values.
    • Rules:
      • Path: gojee path to the value over which you'd like to build a histogram.
      • Window: duration string specifying how long to retain messages in the histogram (0)
  • count. This block counts the number of messages it has seen over the specified Window.
    • Rules:
      • Window: duration string (0)
  • timeseries. This block stores an array of the value specified by Path along with the timestamp at the time the message arrived.
    • Rules:
      • Path: gojee path
      • NumSamples: how many samples to store (0)
  • set. This stores a set of values as specified by the block's Path. Add new members through the (idempotent) ADD route. If you send a message through the ISMEMBER route, the block will emit true or false. You can also query the cardinality of the set.
    • Rules:
  • movingaverage. Performs a moving average of the values specified by the Path over the duration of the Window.
    • Rules:
      • Path: gojee path
      • Window: duration string

random number blocks

These blocks emit random numbers when polled. So to generate a stream of random numbers, connect a generator block (like a ticker) to a random number block's POLL endpoint. Each of these blocks emits JSON of the form:

{
    "sample": 1234
}
  • zipf. This block draws a random number from a Zipf-Mandelbrot distribution when polled.
    • Rules:
      • s: (2)
      • v: (5)
      • N: (99)
  • gaussian. This block draws a random number from the Gaussian distribution when polled.
    • Rules:
      • StdDev: (1)
      • Mean: (0)
  • poisson. This block draws a random number from a Poisson distribution when polled.
    • Rules:
      • Rate: (1)
Clone this wiki locally