Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom checkpointing #19

Closed
kokes opened this issue Dec 7, 2018 · 1 comment
Closed

Custom checkpointing #19

kokes opened this issue Dec 7, 2018 · 1 comment

Comments

@kokes
Copy link

kokes commented Dec 7, 2018

Do I understand it correctly that the code checkpoints as soon as an element is yielded? (

self.state.checkpoint(state_shard_id, item['SequenceNumber'])
)

The issue I have with this behaviour is that yielding an item does not imply it has been processed (whatever that means for me). So if my code fails after I've received a record but before I have processed it, it will be marked as processed in DynamoDB, so I will lose this piece of information.

Is my understanding correct? If so, do you think there should be a mechanism to manually checkpoint data? I don't think it's difficult to code up, it's just tricky in terms of API - since you're making shards transparent, it's not quite clear which shard can be checkpointed at what time.

whale2 pushed a commit to whale2/async-kinesis-client that referenced this issue Jan 9, 2019
@borgstrom
Copy link
Contributor

Hi @kokes,

Thanks for opening this issue.

I don't think that your understanding is correct.

Let's look at some more of the code block you referenced:

for item in resp['Records']:
if not self.run:
break
log.debug(item)
yield item
try:
self.state.checkpoint(state_shard_id, item['SequenceNumber'])

Since we do not have any exception handling around the yield it means that if you fail to process an item, then the whole consumer will fail and shutdown BEFORE we checkpoint.

Only if you successfully process the item (i.e. no exception is thrown at line 206) will the position be written to the checkpoint.

This leaves retry mechanisms up to you to implement.

I do agree that this forces you to process items serially from all the shards because of the transparent nature of handling them, but for the use-case that this library was built to solve that was the desired behavior. If you have a proposal for how you'd like the shards to be processed explicitly I would be open to considering it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants