From 7f52fae62a6f797c5cbcac71910ab0db14b78e68 Mon Sep 17 00:00:00 2001 From: Nikita Makeev Date: Wed, 9 Jan 2019 12:29:14 +0100 Subject: [PATCH] Add 'shard_iterator_type' and 'iterator_timestamp' parameters to AsyncKinesisConsumer Add 'put_records' method to AsyncKinesisProducer Add tests --- VERSION | 2 +- requirements.txt | 2 +- setup.py | 3 +- src/async_kinesis_client/dynamodb.py | 2 +- src/async_kinesis_client/kinesis_consumer.py | 15 +++- src/async_kinesis_client/kinesis_producer.py | 91 +++++++++++++++++++- src/async_kinesis_client/utils.py | 49 +++++++++++ tests/test_consumer.py | 30 +++---- tests/test_producer.py | 76 +++++++++++++++- 9 files changed, 244 insertions(+), 26 deletions(-) create mode 100644 src/async_kinesis_client/utils.py diff --git a/VERSION b/VERSION index bcab45a..81340c7 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.0.3 +0.0.4 diff --git a/requirements.txt b/requirements.txt index 025c5fa..ac66400 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ aioboto3==6.0.1 -multidict==4.5.2 +multidict>=4.5.2 botocore==1.12.49 boto3==1.9.49 diff --git a/setup.py b/setup.py index 356c3a0..2db4ff3 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,8 @@ url='https://github.com/whale2/async-kinesis-client', install_requires=[ - 'aioboto3==5.0.0', + 'aioboto3==6.0.1', + 'multidict>=4.5.2' ], packages=find_packages('src'), package_dir={'': 'src'}, diff --git a/src/async_kinesis_client/dynamodb.py b/src/async_kinesis_client/dynamodb.py index b964f57..6540466 100644 --- a/src/async_kinesis_client/dynamodb.py +++ b/src/async_kinesis_client/dynamodb.py @@ -19,7 +19,7 @@ class CheckpointTimeoutException(Exception): class DynamoDB: """ Class for checkpointing stream using async calls to DynamoDB - Basically, state.py from kinesis-python client by Evan Borgstrom , + Based on state.py from kinesis-python client by Evan Borgstrom , but with async calls and tailored for per-shard operations """ diff --git a/src/async_kinesis_client/kinesis_consumer.py b/src/async_kinesis_client/kinesis_consumer.py index 5243336..b62f81a 100644 --- a/src/async_kinesis_client/kinesis_consumer.py +++ b/src/async_kinesis_client/kinesis_consumer.py @@ -141,17 +141,23 @@ class AsyncKinesisConsumer(StoppableProcess): DEFAULT_CHECKPOINT_INTERVAL = 100 DEFAULT_LOCK_DURATION = 30 - def __init__(self, stream_name, checkpoint_table=None, host_key=None): + def __init__( + self, stream_name, checkpoint_table=None, host_key=None, shard_iterator_type=None, iterator_timestamp=None): """ Initialize Async Kinseis Consumer :param stream_name: stream name to read from :param checkpoint_table: DynamoDB table for checkpointing; If not set, checkpointing is not used :param host_key: Key to identify reader instance; If not set, defaults to FQDN. + :param shard_iterator_type Type of shard iterator, see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.get_shard_iterator + :param iterator_timestamp Timestamp (datetime type) for shard iterator of type 'AT_TIMESTAMP'. See link above """ super(AsyncKinesisConsumer, self).__init__() self.stream_name = stream_name + self.shard_iterator_type = shard_iterator_type + self.iterator_timestamp = iterator_timestamp + self.kinesis_client = aioboto3.client('kinesis') self.checkpoint_table = checkpoint_table @@ -249,6 +255,12 @@ async def get_shard_readers(self): log.debug("%s iterator arguments: %s", shard_id, iterator_args) + # override shard_iterator_type if given in constructor + if self.shard_iterator_type: + iterator_args['ShardIteratorType'] = self.shard_iterator_type + if self.shard_iterator_type == 'AT_TIMESTAMP': + iterator_args['Timestamp'] = self.iterator_timestamp + # get our initial iterator shard_iter = await self.kinesis_client.get_shard_iterator( StreamName=self.stream_name, @@ -272,4 +284,3 @@ async def get_shard_readers(self): # If interruptable_sleep returned false, we were signalled to stop if not await self.interruptable_sleep(self.lock_duration * 0.8): return - diff --git a/src/async_kinesis_client/kinesis_producer.py b/src/async_kinesis_client/kinesis_producer.py index 9259212..b7cb4b8 100644 --- a/src/async_kinesis_client/kinesis_producer.py +++ b/src/async_kinesis_client/kinesis_producer.py @@ -1,10 +1,24 @@ import logging import time + import aioboto3 +from src.async_kinesis_client.utils import _sizeof + log = logging.getLogger(__name__) +# Following constants are originating from here: +# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records +MAX_RECORDS_IN_BATCH = 500 +MAX_RECORD_SIZE = 1024 * 1024 # 1 Mb +MAX_BATCH_SIZE = 5 * MAX_RECORD_SIZE # 5 Mb + + +def _get_default_partition_key(): + return '{0}{1}'.format(time.process_time(), time.time()) + + class AsyncKinesisProducer: def __init__(self, stream_name, ordered=True): @@ -14,14 +28,26 @@ def __init__(self, stream_name, ordered=True): self.seq = '0' + self.record_buf = [] + self.buf_size = 0 + self.kinesis_client = aioboto3.client('kinesis') log.debug("Configured kinesis producer for stream '%s'; ordered=%s", stream_name, ordered) + + async def put_record(self, record, partition_key=None, explicit_hash_key=None): + """ + Put single record into Kinesis stream + :param record: record to put, bytes + :param partition_key: partition key to determine shard; if none, time-based key is used + :param explicit_hash_key: hash value used to determine the shard explicitly, overriding partition key + :return: response from kinesis client, see boto3 doc + """ if partition_key is None: - partition_key = '{0}{1}'.format(time.process_time(), time.time()) + partition_key = _get_default_partition_key() kwargs = { 'StreamName': self.stream_name, @@ -32,11 +58,72 @@ async def put_record(self, record, partition_key=None, explicit_hash_key=None): if self.ordered: kwargs['SequenceNumberForOrdering'] = self.seq + kwargs['PartitionKey'] = partition_key or _get_default_partition_key() if explicit_hash_key: kwargs['ExplicitHashKey'] = explicit_hash_key resp = await self.kinesis_client.put_record(**kwargs) if self.ordered: self.seq = resp.get('SequenceNumber') + return resp + + async def put_records(self, records, partition_key=None, explicit_hash_key=None): + """ + Put list of records into Kinesis stream + This call is buffered until it outgrow maximum allowed sizes (500 records or 5 Mb of data including partition + keys) or until explicitly flushed (see flush() below) + + :param records: iterable with records to put; records should be of bytes type + :param partition_key: partition key to determine shard; if none, time-based key is used + :param explicit_hash_key: hash value used to determine the shard explicitly, overriding partition key + :return: Empty list if no records were flushed, list of responses from kinesis client + otherwise + + Raises ValueError if single record exceeds 1 Mb + """ + resp = [] + n = 1 + for r in records: + + if len(self.record_buf) == MAX_RECORDS_IN_BATCH: + resp.append(await self.flush()) + + record_size = _sizeof(r) + + # I hope I'm implementing this correctly, as there are different hints about maximum data sizes + # in boto3 docs and general AWS docs + if record_size > MAX_RECORD_SIZE: + raise ValueError('Record # {} exceeded max record size of {}; size={}; record={}'.format( + n, MAX_RECORD_SIZE, record_size, r)) + + datum = {} + + if explicit_hash_key : + datum['ExplicitHashKey'] = explicit_hash_key + else: + datum['PartitionKey'] = partition_key or _get_default_partition_key() + + datum['Data'] = r + datum_size = _sizeof(datum) + + if self.buf_size + datum_size > MAX_BATCH_SIZE: + resp.append(await self.flush()) + + self.record_buf.append(datum) + self.buf_size += datum_size + n += 1 + + return resp + + async def flush(self): + + if len(self.record_buf) == 0: + return - # TODO: Add bulk put_records \ No newline at end of file + resp = await self.kinesis_client.put_records( + Records=self.record_buf, + StreamName=self.stream_name + ) + self.record_buf = [] + self.buf_size = 0 + return resp diff --git a/src/async_kinesis_client/utils.py b/src/async_kinesis_client/utils.py new file mode 100644 index 0000000..a4e2a1c --- /dev/null +++ b/src/async_kinesis_client/utils.py @@ -0,0 +1,49 @@ +# Next code block is essentially a slightly modified code from https://github.com/NerdWalletOSS/kinesis-python +# and licensed under Apache 2.0 license. +# +# Copyright 2017 NerdWallet +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import sys + + +def _sizeof(obj, seen=None): + """Recursively and fully calculate the size of an object""" + obj_id = id(obj) + try: + if obj_id in seen: + return 0 + except TypeError: + seen = set() + + seen.add(obj_id) + + size = sys.getsizeof(obj) + + # since strings are iterables we return their size explicitly first + if isinstance(obj, str): + return size + elif isinstance(obj, collections.Mapping): + return size + sum( + _sizeof(key, seen) + _sizeof(val, seen) + for key, val in obj.items() + ) + elif isinstance(obj, collections.Iterable): + return size + sum( + _sizeof(item, seen) + for item in obj + ) + + return size diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 7c22f02..28d023d 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -7,6 +7,7 @@ from src.async_kinesis_client.kinesis_consumer import AsyncKinesisConsumer, ShardClosedException + # TODO: Add tests for DynamoDB class TestConsumer(TestCase): @@ -19,7 +20,7 @@ def setUp(self): self.sample_record = { 'MillisBehindLatest': 0, - 'Records': [ {'Data': 'xxxx'} ], + 'Records': [{'Data': 'xxxx'}], } aioboto3.setup_default_session(botocore_session=MagicMock()) @@ -52,7 +53,7 @@ async def mock_describe_stream(self, StreamName): ] } } if not self.shard_closed else { - 'StreamDescription' : { + 'StreamDescription': { 'Shards': [] } } @@ -62,21 +63,20 @@ async def mock_get_shard_iterator(self, StreamName, ShardId, **kwargs): 'ShardIterator': {} } - def test_consmuer(self): async def read(): - cnt = 0 - async for shard_reader in self.consumer.get_shard_readers(): - try: - async for record in shard_reader.get_records(): - self.test_data.append(record[0]['Data']) - except ShardClosedException: - if cnt > 0: - # We should get second shard reader after first one gets closed - # However we signal mocked method to stop returning shards after that - self.shard_closed = True - cnt += 1 + cnt = 0 + async for shard_reader in self.consumer.get_shard_readers(): + try: + async for record in shard_reader.get_records(): + self.test_data.append(record[0]['Data']) + except ShardClosedException: + if cnt > 0: + # We should get second shard reader after first one gets closed + # However we signal mocked method to stop returning shards after that + self.shard_closed = True + cnt += 1 async def stop_test(): await asyncio.sleep(2) @@ -91,4 +91,4 @@ async def test(): asyncio.ensure_future(read()) ) - self.event_loop.run_until_complete(test()) \ No newline at end of file + self.event_loop.run_until_complete(test()) diff --git a/tests/test_producer.py b/tests/test_producer.py index 7eb67aa..63e606e 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -6,6 +6,7 @@ import aioboto3 from src.async_kinesis_client.kinesis_producer import AsyncKinesisProducer +import src.async_kinesis_client.kinesis_producer class TestProducer(TestCase): @@ -20,6 +21,7 @@ def setUp(self): client = MagicMock() client.put_record = asyncio.coroutine(self.mock_put_record) + client.put_records = asyncio.coroutine(self.mock_put_records) self.producer = AsyncKinesisProducer(stream_name='test-stream') self.producer.kinesis_client = client @@ -31,12 +33,15 @@ def setUp(self): async def mock_put_record(self, **record): self.records.append(record) - return { 'SequenceNumber': '1' } + return {'SequenceNumber': '1'} + + async def mock_put_records(self, **records): + self.records.extend(records['Records']) + return [{}] def test_producer(self): async def test(): - await self.producer.put_record({'Data': 'zzzz'}) await self.producer.put_record({'Data': 'wwww'}) self.assertEqual(len(self.records), 2) @@ -44,4 +49,69 @@ async def test(): self.assertEqual(self.records[1].get('Data').get('Data'), 'wwww') self.assertEqual(self.records[1].get('SequenceNumberForOrdering'), '1') - self.event_loop.run_until_complete(test()) \ No newline at end of file + self.event_loop.run_until_complete(test()) + + def test_multiple_records(self): + + async def test(): + records = [ + {'Data': 'zzzz'}, + {'Data': 'wwww'} + ] + await self.producer.put_records(records=records) + await self.producer.flush() + + self.assertEqual(len(self.records), 2) + self.assertEqual(self.records[0].get('Data').get('Data'), 'zzzz') + self.assertEqual(self.records[1].get('Data').get('Data'), 'wwww') + + self.event_loop.run_until_complete(test()) + + def test_limits(self): + + src.async_kinesis_client.kinesis_producer.MAX_RECORDS_IN_BATCH = 3 + src.async_kinesis_client.kinesis_producer.MAX_RECORD_SIZE = 350 + src.async_kinesis_client.kinesis_producer.MAX_BATCH_SIZE = 1000 + + async def test(): + + # Check that 4th record triggers flush + records = [ + {'Data': 'zzzz'}, + {'Data': 'wwww'}, + {'Data': 'qqqq'}, + {'Data': 'dddd'}, + + ] + await self.producer.put_records(records=records) + + self.assertEqual(len(self.records), 3) + self.assertEqual(len(self.producer.record_buf), 1) + + await self.producer.flush() + + # Check that too big record raises ValueError + records = [ + {'Data': 'looongcatislooong' * 10 } + ] + try: + await self.producer.put_records(records=records) + except ValueError: + pass + else: + self.fail('ValueError not raised') + + # Check that exceeding MAX_BATCH_SIZE triggers flush + records = [ + {'Data': 'zzzz'}, + {'Data': 'wwww'}, + {'Data': 'qqqq'} + ] + + self.records = [] + await self.producer.put_records(records=records) + + self.assertEqual(len(self.records), 2) + self.assertEqual(len(self.producer.record_buf), 1) + + self.event_loop.run_until_complete(test())