Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resharding the kinesis stream #9

Open
scattym opened this issue Nov 26, 2019 · 2 comments
Open

Resharding the kinesis stream #9

scattym opened this issue Nov 26, 2019 · 2 comments

Comments

@scattym
Copy link
Contributor

scattym commented Nov 26, 2019

Resharding the kinesis stream results in a ShardClosedException. Catching this exception is fine, but I think this is resulting in the reader being added to the "dead_readers" list and then we try to reopen it. Then on line 310 of the kinesis_consumer.py, we get the following exception:

    return future.result()
  File "pull-akcl", line 104, in read_stream
    async for shard_reader in consumer.get_shard_readers():
  File "/home/matt/git/async-kinesis-client/src/async_kinesis_client/kinesis_consumer.py", line 310, in get_shard_readers
    **iterator_args
  File "/home/matt/.pyenv/versions/3.7.4/lib/python3.7/site-packages/aiobotocore/client.py", line 105, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the GetShardIterator operation: 1 validation error detected: Value '' at 'startingSequenceNumber' failed to satisfy constraint: Member must satisfy regular expression pattern: 0|([1-9]\d{0,128})

Is there a way to mark the reader as closed and not try to reopen it? Apologies if there is an obvious way to do this, I just can't see it at the moment.

I did try to stop the reader with the .stop() but I don't think that behaves as I was hoping.

I am invoking the reader with AT_TIMESTAMP and no dynamodb instance.

@scattym
Copy link
Contributor Author

scattym commented Nov 26, 2019

I can work around the problem by putting the following at line 306 in kinesis_consumer.py:

if iterator_args.get("ShardIteratorType", "") == 'AT_SEQUENCE_NUMBER' and iterator_args["StartingSequenceNumber"] == '':
    continue

But I suspect I am missing the point about how to properly handle this scenario :)

Putting the above in results in the following set of messages:

2019-11-26 15:46:19,005 - 30652 - root - INFO - Got shard reader for shard id: shardId-000000000000
2019-11-26 15:46:19,134 - 30652 - root - ERROR - Shard closed with error: 
2019-11-26 15:46:19,248 - 30652 - async_kinesis_client - WARNING - shardId-000000000000: Can not get last saved checkpointed seq!

And this loops at every rescan, from what I can tell.

@whale2
Copy link
Owner

whale2 commented Nov 26, 2019

Hi @scattym
I'm afraid, this case is not very well tested and might be buggy. I'll look into it.
The initial idea was that if resharding happens, some readers might die and new readers will be returned, but I totally didn't think about what will happen to all the checkpointing business, which is really a shame.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants