Skip to content
Mike Dewar edited this page Mar 15, 2014 · 20 revisions

Each block is briefly detailed below, along with the rules that define each block. To make a block in streamtools, double click anywhere on the page and type the name of the block as they appear below. For programmatic access, see the API docs.

Blocks rely on some general concepts:

  • gojee path
  • gojee expression
  • duration string

generator blocks

These blocks emit messages on their own.

  • ticker. This block emits the time regularly. The time between emissions is specified by the Interval.
    • Rules:
      • Interval: duration string (1s)

flow blocks

These blocks are useful for shaping the stream in one way or another

  • join. This block joins two streams together. It waits until it has seen a message on both its inputs, then emits the joined message.

  • map. This block maps inbound data onto outbound data. The Map rule needs to be valid JSON, where each key is a string and each value is a valid gojee expression.

    • Rules:
      • Map: gojee expression
      • Additive: (True)
  • mask. This block allows you to select a subset of the inbound message. To create a mask, you need to build up an empty JSON that looks like the message you'd like out. So, for example, if your in bound message looks like

      {
        "A":"foo",
        "B":"bar"
      }
    

    and you just want B to come out of the mask block then make the Mask rule:

      {
        "B":{}
      }
    

    You can supply any valid JSON to the Mask block. If you specify an empty JSON {} then all values will pass.

    • Rules:
      • Mask: mask JSON
  • filter. The filter block applies the provided rule to incoming messages. If the rule evaluates to true then the messages is emitted. If the rule evaluates to false the messages is discarded. The Filter rule can be any valid gojee expression. So, for example, if the inbound message looks like

      {
          "temperature": 43
      }
    

    and you only want to emit messages when the temperature value is above 50, then the filter rule would be

      .temperature > 50
    
    • Rules:
      • Filter:gojee expression (. != null)
  • pack. The pack block groups blocks together based on a common value. This is almost like an online "group-by" operation, but care needs to be taken in the stream setting as we have to decide when to emit the "packed" message. Imagine you have messages like

      {
           "option": "a"
      } 
    

    where option can be a, b or c. We would like to make a stream that packs together all the as together into a single message, and similarly for the bs and cs. We only emit the packed message for a particular value of option when we haven't seen any messages with that value for 20 seconds, at which point we emit the bunch all at once. Here we would specify 20s for the EmitAfter rule and .option as the Path rule. If we saw three a messages in that 20s the output of the pack block looks like

{ "pack":[{"option": "a"},{"option": "a"},{"option": "a"}] }

 Our main use case for this at the NYT is to create per-reader reading sessions. So we set the `Path` to our user-id and we emit after 20 minutes of not hearing anything from that reader's user-id. Every page-view our readers generate get packed into a per-reader message, generating a stream of reading sessions. 
* Rules:
    * `EmitAfter`: duration string
    * `Path`: [gojee](https://github.com/nytlabs/gojee) path
  • unpack. The unpack block takes an array of objects and emits each object as a separate message. See the citibike example, where we unpack a big array of citibike stations into individual messages we can filter.

  • sync. The sync block takes an disordered stream and creates a properly timed, ordered stream at the expense of introducing a lag. To explain this block imagine you have a stream that looks like

     {"val":"a", "time":23 } ... {"val":"b", "time":14} ... {"val":"c", "time":10}
    

    Ideally you'd like the stream to be ordered by the timestamp in the message, so the c message comes first, the b message comes second and the a message comes third. In addition, you'd like the time between the messages to respect the timestamp inside the message.

    The sync block achieves this by storing the stream for a fixed amount time (the Lag) and then emitting at the time inside the inbound messages plus the lag. This means we have to wait for a while to get our messages but when we do get them, they're in a stream whose dynamics reflect the timestamp inside the message.

    This can be very helpful if the plumbing between your sensor and streamtools introduces dynamics that would confuse your analysis. For example, it's quite common for a system to wait until it has a collection of messages from its sensor before it makes an HTTP request to post those messages to the next stage. This means that by the time those messages make it to a streamtools pattern, they're artifcially grouped together into little pulses. You can use the sync block to recover the original stream generated by the sensor.

    • Rules:
      • Path: gojee path. This must point at a UNIX timestamp in seconds,
      • Lag: duration string
  • gethttp

    • Rules:

source blocks

These blocks hook into another system and collect messages to be emitted into streamtools.

  • fromhttpstream
    • Rules:
      • Endpoint:
      • Auth:
  • fromnsq
    • Rules:
      • ReadTopic:
      • LookupdAddr:
      • ReadChannel:
      • MaxInFlight: (0)
  • frompost
  • fromsqs
    • Rules:
      • SignatureVersion: (4)
      • AccessKey:
      • MaxNumberOfMessages: (10)
      • APIVersion: (2012-11-05)
      • SQSEndpoint:
      • WaitTimeSeconds: (0)
      • AccessSecret:
  • fromudp
    • Rules:
      • ConnectionString:
  • fromwebsocket
    • Rules:
      • url:

sink blocks

These blocks send data to external systems.

  • toelasticsearch
    • Rules:
      • Index:
      • Host:
      • IndexType:
      • Port:
  • tofile
    • Rules:
      • Filename:
  • tolog
  • tonsq
    • Rules:
      • Topic:
      • NsqdTCPAddrs:
  • tonsqmulti
    • Rules:
      • Topic:
      • Interval: duration string (1s)
      • NsqdTCPAddrs:
      • MaxBatch: (100)
  • towebsocket
    • Rules:
      • port:

state blocks

These blocks maintain a state, storing something about the stream of data

  • histogram
    • Rules:
      • Path: gojee path
      • Window: duration string (0)
  • count
    • Rules:
      • Window: duration string (0)
  • timeseries
    • Rules:
      • Path: gojee path
      • NumSamples: (0)
  • set
    • Rules:
  • movingaverage
    • Rules:
      • Path: gojee path
      • Window: duration string

random number blocks

These blocks emit random numbers when polled.

  • zipf
    • Rules:
      • s: (2)
      • v: (5)
      • N: (99)
  • gaussian
    • Rules:
      • StdDev: (1)
      • Mean: (0)
  • poisson
    • Rules:
      • Rate: (1)
Clone this wiki locally