Kinesis is a platform for streaming data on AWS, offering powerful services to make it easy to load and analyze streaming data, and also providing the ability for you to build custom streaming data applications for specialized needs. This gem is a helpful utility for parsing data from a Kinesis stream.
Add this line to your application's Gemfile:
gem 'kinesis-stream-reader', github: 'ello/kinesis-stream-reader', require: 'stream_reader'
And then execute:
$ bundle
Set up your AWS keys:
AWS_REGION
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
A Redis server is required to help keep track of the reader's location within the Kinesis stream:
REDIS_URL
Add the Kinesis stream name and prefix:
KINESIS_STREAM_NAME
KINESIS_STREAM_NAME_PREFIX
This gem assumes that you're storing your data in Kinesis with Avro format. More documentation on Avro can be found here: http://avro.apache.org/
Configure the stream reader to read from a particular stream:
stream = StreamReader.new(
stream_name: ENV['KINESIS_STREAM_NAME'],
prefix: ENV['KINESIS_STREAM_PREFIX'] || ''
)
Run the stream reader.
stream.run! do |record, opts|
DoSomethingFromStream.perform(record: record, kind: opts[:schema_name])
end
It will grab data matching the given stream_name
, where record
is the parsed data and opts
is a hash containing:
schema_name
- the name of the event/schema stored in Kinesisraw_data
- the raw, unparsed avro datasequence_number
- the sequence number of the Kinesis eventshard_id
- the shard_id that the event is being processed on
As of version 0.2.0, a stream with multiple shards will have a thread spawned per-shard to process the records from that shard. You should ensure that whatever happens in your processing block is thread-safe, particularly as it pertains to using external resources. For instance, you'll likely want to use ActiveRecord::Base.with_connection
to wrap any database calls.
Version 0.2.0 introduced a threading model for spawning multiple readers to service streams with more than one shard. As a result of that change, the sequence number tracker key structure changed as well, to be able to track the position of a reader for each shard. When upgrading, you'll need to do one of two things:
- If your processing blocks are idempotent and fast, you can simply do nothing and let the processors re-process old data starting from the
TRIM_HORIZON
. - Otherwise, you can manually obtain the
shard_id
value from aDescribeStream
call and move the value of the last processed sequence number to a new key in Redis. You'll need to stop your workers while doing this. This will avoid re-processing the same records multiple times.
If you specify the LIBRATO_USER
and LIBRATO_TOKEN
environment variables, shard processor stats will be periodically reported to Librato (with the metrics stream_reader.process_record.duration
and stream_reader.process_record.latency
and a custom source that identifies the prefix and shard id). It is highly recommended that you monitor at least the latency metric to ensure that your processors do not fall so far behind that data is dropped from the stream before it can be processed. If you monitor that metric, you should also have a "dead man's switch"-type metric that alerts if it stops reporting.
Bug reports and pull requests are welcome on GitHub at https://github.com/ello/kinesis-stream-reader.
Released under the MIT License
Ello was created by idealists who believe that the essential nature of all human beings is to be kind, considerate, helpful, intelligent, responsible, and respectful of others. To that end, we will be enforcing the Ello rules within all of our open source projects. If you don’t follow the rules, you risk being ignored, banned, or reported for abuse.