-
Notifications
You must be signed in to change notification settings - Fork 662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Committed offset is reset periodically #215
Comments
Do your consumers by chance go long periods of time without processing or committing new offsets? More specifically is it possible your consume will not commit a new offset before I ask because even with auto offset commits librdkafka will only commit new offsets. It will not recommit the same offset which means its possible for your consumers offsets to be cleaned up by the broker. In the even this happens you will fall back on your offset reset policy. |
@rnpridgeon, very interesting. I'll dig in this direction as well. My consumer is short-lived, it lasts few seconds. I've checked the logs in the moments where the offset is reset and nothing which might cause a timeout is there. I handle all errors, but the log is clean, just offset starts from the beginning. |
Enable |
@edenhill, I'll do, though the bug hasn't showed up for a week. From the other hand I do have offsets in my logs. It resets to -1001 and then increments as usual: 1, 2, 3, ... |
It is duplication of #1102. |
Description
We have a consumer which reads the last uncommitted message, processes and commits it. It was working fine until we discovered that all messages are periodically re-consumed. It looks like the last offset is reset and consumer.Committed() returns the -1001. It happened every other day without a strong pattern. About 5 days ago the problem was gone.
The consumer code is below.
`// Set up Kafka consumer
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": config.KafkaHost,
"group.id": "XXXSync",
"session.timeout.ms": 6000,
"go.events.channel.enable": true,
"enable.auto.commit": false,
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"}})
if err != nil {
return fmt.Errorf("xxx")
}
defer consumer.Close()
// Read only from the first partition
var partition int32 = 0
// Get offset of the last committed message
lastOffsets, err := consumer.Committed([]kafka.TopicPartition{{Topic: &topic, Partition: partition}}, 5000)
if err != nil {
return fmt.Errorf("xxx")
}
// Assign offset to the consumer
var offset kafka.Offset
if len(lastOffsets) > 0 {
offset = lastOffsets[partition].Offset
}
err = consumer.Assign([]kafka.TopicPartition{{Topic: &topic, Partition: partition, Offset: offset}})
if err != nil {
return fmt.Errorf("xxx")
}`
We have kafka in docker provided by confluentinc/cp-kafka, confluentinc/cp-zookeeper. The kafka container is run in the following way:
docker run -d
--name=kafka
--hostname=kafka
--publish=0.0.0.0:xxxx:xxxx
-e KAFKA_ZOOKEEPER_CONNECT=x.x.x.x:xxxxx
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://x.x.x.x:xxxx
-e KAFKA_BROKER_ID=2
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
confluentinc/cp-kafka:latest
Checklist
Please provide the following information:
LibraryVersion()
): 722175 0.11.4-12-gf26b9bConfigMap{...}
: See above."debug": ".."
as necessary)The text was updated successfully, but these errors were encountered: