-
Notifications
You must be signed in to change notification settings - Fork 902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer offset count reset issue #455
Comments
Restarting a broker alone is not enough to remove offsets. You will need to delete the backing volume as well as it stores the contents of the Had you deleted the offsets you would have received an offsets out of range error which would have prompted your consumer to fallback on its offset reset policy which is smallest in your case. |
Thank you for your reply. By 'delete the backing volume', did you mean using command such as |
You would need to show your compose file, but stopping and removing the container is one way to remove the volume. The alternate would be to mount a physical directory on your host machine into the |
|
librdkafka will currently not re-commit an older offset than the last committed offset: When you remove and recreate a topic the offsets are thus reset, but an existing consumer will not use the new offsets until they pass the old highwater offset. |
Unrelated question, but what tool are you using to view your kafka topic like that? @DavidNeko |
Description
I've been trying to test the correctness of a chunk of data I send to Kafka. When I was trying to use multiprocessing in fabric, I messed up the process as well as the message consumer. The message consumer did not get shut down correctly at first, then it stopped consuming message any more.
After that I re-started Kafka on my local machine (I'm using docker, so I used
docker-compose -f docker-compose-single-broker.yml rm
to remove the kafka I've been using to test,
and re-created a new one using
docker-compose -f dokcer-compose-single-broker.yml up
After kafka and kafka-manager was up and running, I found out that although I don't have any messages transferred to kafka, the offset value of the topic I used to test was not reset to 0.
For data in the picture,
"gateway" is the consumer I've been using before and after I re-started kafka.
"gateway_tester" was the topic that I used to send test messages.
"End 54"(value in red) was the number of data consumed from this topic after I re-started kafka.
"Offset 899"(value in blue) was the number of data consumed from this topic before I re-started kafka.
I'm confused why doesn't the offset number get reset after I re-started kafka.
When I was using this consumer after I re-started kafka, it will consume all the data I sent to kafka because the number of data is less than 899...
Then I created a new consumer called "gateway_2" to consume data from the same topic.
As it is shown in the picture, the offset count matched the End value this time. And everything works fine. If I send data to this topic and try to consume data using this new consumer "gateway_2", it consumes the new messages I sent to the topic and it'll ignore the message that it has consumed before. (My setting of the offset is
'auto.offset.reset': 'smallest'
)I'm wondering, if there's a way to reset offset count on the consumer that I used before? Or the only way of solving this problem is to create a new consumer.
How to reproduce
Start kafka, create a consumer and consume some data to change the offset count on that consumer.
Shut down kafka.
Re-start kafka and use the same consumer to consumer message.
The consumer would consume all data in topic until the amount of data in certain topic reaches the number of offset count.
Checklist
Please provide the following information:
confluent_kafka.version(0.11.4)
kafka-python(1.3.5)
(I could not findconfluent_kafka.libversion()
because the project I'm working on used pip to manage python packages and confluent_kafka.libversion doesn't show on the requirements.txt file...)KAFKA_HOST = '0.0.0.0'
KAFKA_PORT = 9092
KAFKA_HOST_PORT = '%(host)s:%(port)s' % {
'host': KAFKA_HOST,
'port': KAFKA_PORT,
}
kafka_configuration = {
'bootstrap.servers': KAFKA_HOST_PORT,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'},
}
(I updated
group.id
with valuegateway
andgateway_2
(for the new consumer) in my class initializer)'debug': '..'
as necessary)The text was updated successfully, but these errors were encountered: