Skip to content

Commit

Permalink
Add 'shard_iterator_type' and 'iterator_timestamp' parameters to Asyn…
Browse files Browse the repository at this point in the history
…cKinesisConsumer

Add 'put_records' method to AsyncKinesisProducer
Add tests
  • Loading branch information
Nikita Makeev committed Jan 9, 2019
1 parent 98fa5c7 commit 7f52fae
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 26 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.3
0.0.4
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
aioboto3==6.0.1
multidict==4.5.2
multidict>=4.5.2
botocore==1.12.49
boto3==1.9.49
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
url='https://github.com/whale2/async-kinesis-client',

install_requires=[
'aioboto3==5.0.0',
'aioboto3==6.0.1',
'multidict>=4.5.2'
],
packages=find_packages('src'),
package_dir={'': 'src'},
Expand Down
2 changes: 1 addition & 1 deletion src/async_kinesis_client/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class CheckpointTimeoutException(Exception):
class DynamoDB:
"""
Class for checkpointing stream using async calls to DynamoDB
Basically, state.py from kinesis-python client by Evan Borgstrom <[email protected]>,
Based on state.py from kinesis-python client by Evan Borgstrom <[email protected]>,
but with async calls and tailored for per-shard operations
"""

Expand Down
15 changes: 13 additions & 2 deletions src/async_kinesis_client/kinesis_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,23 @@ class AsyncKinesisConsumer(StoppableProcess):
DEFAULT_CHECKPOINT_INTERVAL = 100
DEFAULT_LOCK_DURATION = 30

def __init__(self, stream_name, checkpoint_table=None, host_key=None):
def __init__(
self, stream_name, checkpoint_table=None, host_key=None, shard_iterator_type=None, iterator_timestamp=None):
"""
Initialize Async Kinseis Consumer
:param stream_name: stream name to read from
:param checkpoint_table: DynamoDB table for checkpointing; If not set, checkpointing is not used
:param host_key: Key to identify reader instance; If not set, defaults to FQDN.
:param shard_iterator_type Type of shard iterator, see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.get_shard_iterator
:param iterator_timestamp Timestamp (datetime type) for shard iterator of type 'AT_TIMESTAMP'. See link above
"""

super(AsyncKinesisConsumer, self).__init__()

self.stream_name = stream_name
self.shard_iterator_type = shard_iterator_type
self.iterator_timestamp = iterator_timestamp

self.kinesis_client = aioboto3.client('kinesis')

self.checkpoint_table = checkpoint_table
Expand Down Expand Up @@ -249,6 +255,12 @@ async def get_shard_readers(self):

log.debug("%s iterator arguments: %s", shard_id, iterator_args)

# override shard_iterator_type if given in constructor
if self.shard_iterator_type:
iterator_args['ShardIteratorType'] = self.shard_iterator_type
if self.shard_iterator_type == 'AT_TIMESTAMP':
iterator_args['Timestamp'] = self.iterator_timestamp

# get our initial iterator
shard_iter = await self.kinesis_client.get_shard_iterator(
StreamName=self.stream_name,
Expand All @@ -272,4 +284,3 @@ async def get_shard_readers(self):
# If interruptable_sleep returned false, we were signalled to stop
if not await self.interruptable_sleep(self.lock_duration * 0.8):
return

91 changes: 89 additions & 2 deletions src/async_kinesis_client/kinesis_producer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
import logging
import time

import aioboto3

from src.async_kinesis_client.utils import _sizeof

log = logging.getLogger(__name__)


# Following constants are originating from here:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records
MAX_RECORDS_IN_BATCH = 500
MAX_RECORD_SIZE = 1024 * 1024 # 1 Mb
MAX_BATCH_SIZE = 5 * MAX_RECORD_SIZE # 5 Mb


def _get_default_partition_key():
return '{0}{1}'.format(time.process_time(), time.time())


class AsyncKinesisProducer:

def __init__(self, stream_name, ordered=True):
Expand All @@ -14,14 +28,26 @@ def __init__(self, stream_name, ordered=True):

self.seq = '0'

self.record_buf = []
self.buf_size = 0

self.kinesis_client = aioboto3.client('kinesis')
log.debug("Configured kinesis producer for stream '%s'; ordered=%s",
stream_name, ordered)



async def put_record(self, record, partition_key=None, explicit_hash_key=None):
"""
Put single record into Kinesis stream
:param record: record to put, bytes
:param partition_key: partition key to determine shard; if none, time-based key is used
:param explicit_hash_key: hash value used to determine the shard explicitly, overriding partition key
:return: response from kinesis client, see boto3 doc
"""

if partition_key is None:
partition_key = '{0}{1}'.format(time.process_time(), time.time())
partition_key = _get_default_partition_key()

kwargs = {
'StreamName': self.stream_name,
Expand All @@ -32,11 +58,72 @@ async def put_record(self, record, partition_key=None, explicit_hash_key=None):
if self.ordered:
kwargs['SequenceNumberForOrdering'] = self.seq

kwargs['PartitionKey'] = partition_key or _get_default_partition_key()
if explicit_hash_key:
kwargs['ExplicitHashKey'] = explicit_hash_key

resp = await self.kinesis_client.put_record(**kwargs)
if self.ordered:
self.seq = resp.get('SequenceNumber')
return resp

async def put_records(self, records, partition_key=None, explicit_hash_key=None):
"""
Put list of records into Kinesis stream
This call is buffered until it outgrow maximum allowed sizes (500 records or 5 Mb of data including partition
keys) or until explicitly flushed (see flush() below)
:param records: iterable with records to put; records should be of bytes type
:param partition_key: partition key to determine shard; if none, time-based key is used
:param explicit_hash_key: hash value used to determine the shard explicitly, overriding partition key
:return: Empty list if no records were flushed, list of responses from kinesis client
otherwise
Raises ValueError if single record exceeds 1 Mb
"""
resp = []
n = 1
for r in records:

if len(self.record_buf) == MAX_RECORDS_IN_BATCH:
resp.append(await self.flush())

record_size = _sizeof(r)

# I hope I'm implementing this correctly, as there are different hints about maximum data sizes
# in boto3 docs and general AWS docs
if record_size > MAX_RECORD_SIZE:
raise ValueError('Record # {} exceeded max record size of {}; size={}; record={}'.format(
n, MAX_RECORD_SIZE, record_size, r))

datum = {}

if explicit_hash_key :
datum['ExplicitHashKey'] = explicit_hash_key
else:
datum['PartitionKey'] = partition_key or _get_default_partition_key()

datum['Data'] = r
datum_size = _sizeof(datum)

if self.buf_size + datum_size > MAX_BATCH_SIZE:
resp.append(await self.flush())

self.record_buf.append(datum)
self.buf_size += datum_size
n += 1

return resp

async def flush(self):

if len(self.record_buf) == 0:
return

# TODO: Add bulk put_records
resp = await self.kinesis_client.put_records(
Records=self.record_buf,
StreamName=self.stream_name
)
self.record_buf = []
self.buf_size = 0
return resp
49 changes: 49 additions & 0 deletions src/async_kinesis_client/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Next code block is essentially a slightly modified code from https://github.com/NerdWalletOSS/kinesis-python
# and licensed under Apache 2.0 license.
#
# Copyright 2017 NerdWallet
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import sys


def _sizeof(obj, seen=None):
"""Recursively and fully calculate the size of an object"""
obj_id = id(obj)
try:
if obj_id in seen:
return 0
except TypeError:
seen = set()

seen.add(obj_id)

size = sys.getsizeof(obj)

# since strings are iterables we return their size explicitly first
if isinstance(obj, str):
return size
elif isinstance(obj, collections.Mapping):
return size + sum(
_sizeof(key, seen) + _sizeof(val, seen)
for key, val in obj.items()
)
elif isinstance(obj, collections.Iterable):
return size + sum(
_sizeof(item, seen)
for item in obj
)

return size
30 changes: 15 additions & 15 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from src.async_kinesis_client.kinesis_consumer import AsyncKinesisConsumer, ShardClosedException


# TODO: Add tests for DynamoDB

class TestConsumer(TestCase):
Expand All @@ -19,7 +20,7 @@ def setUp(self):

self.sample_record = {
'MillisBehindLatest': 0,
'Records': [ {'Data': 'xxxx'} ],
'Records': [{'Data': 'xxxx'}],
}

aioboto3.setup_default_session(botocore_session=MagicMock())
Expand Down Expand Up @@ -52,7 +53,7 @@ async def mock_describe_stream(self, StreamName):
]
}
} if not self.shard_closed else {
'StreamDescription' : {
'StreamDescription': {
'Shards': []
}
}
Expand All @@ -62,21 +63,20 @@ async def mock_get_shard_iterator(self, StreamName, ShardId, **kwargs):
'ShardIterator': {}
}


def test_consmuer(self):

async def read():
cnt = 0
async for shard_reader in self.consumer.get_shard_readers():
try:
async for record in shard_reader.get_records():
self.test_data.append(record[0]['Data'])
except ShardClosedException:
if cnt > 0:
# We should get second shard reader after first one gets closed
# However we signal mocked method to stop returning shards after that
self.shard_closed = True
cnt += 1
cnt = 0
async for shard_reader in self.consumer.get_shard_readers():
try:
async for record in shard_reader.get_records():
self.test_data.append(record[0]['Data'])
except ShardClosedException:
if cnt > 0:
# We should get second shard reader after first one gets closed
# However we signal mocked method to stop returning shards after that
self.shard_closed = True
cnt += 1

async def stop_test():
await asyncio.sleep(2)
Expand All @@ -91,4 +91,4 @@ async def test():
asyncio.ensure_future(read())
)

self.event_loop.run_until_complete(test())
self.event_loop.run_until_complete(test())
Loading

0 comments on commit 7f52fae

Please sign in to comment.