Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't committed offset after broker migrate #1538

Closed
4 of 9 tasks
DavidLiuXh opened this issue Nov 21, 2017 · 3 comments
Closed
4 of 9 tasks

Can't committed offset after broker migrate #1538

DavidLiuXh opened this issue Nov 21, 2017 · 3 comments

Comments

@DavidLiuXh
Copy link

DavidLiuXh commented Nov 21, 2017

Description

Can't committed offset after broker migrate

How to reproduce

  1. broker list is A,B,C;

  2. topic is test_qbus, it has three partitons, replication is 1, respectively on broker A, B, C:
    Partition 0 -> Broker A
    Partition 1 -> Broker B
    Partition 2 -> Broker C

  3. run producer and conusmer for topic test_qbus;

  4. set "log.dirs" to a new empty dir in the broker C's server.properties (simulate a machine crash, migrate broker C to a new machine) and restar broker C;

  5. Data production is normal, but can't committed offset for partiton 2

  6. I found some logs debug.tar.gz
    | RD_KAFKA_LOG | level: 7 | fac: OFFSET | msg: [thrd:main]: Topic test_qbus [2]: stored offset 249469, committed offset 250455
    Tue Nov 21 07:26:29 2017
    | RD_KAFKA_LOG | level: 7 | fac: OFFSET | msg: [thrd:main]: Topic test_qbus [2]: setting offset INVALID for commit

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): 0.9.4 591103
  • Apache Kafka version: 0.9.0.1
  • librdkafka client configuration:
  • Operating system: centos6
  • Using the legacy Consumer
  • Using the high-level KafkaConsumer
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@DavidLiuXh DavidLiuXh changed the title Can't commit offset after broker migrate Can't committed offset after broker migrate Nov 21, 2017
@edenhill
Copy link
Contributor

Your consumer has rewound to an earlier offset, but the offset commit code checks if the last stored offset (locally stored in memory for commit) is higher than the existing committed offset, and only then will it commit the offset.

This design was intentional to avoid committing an older offset by mistake, but it is highly questionable, the check should probably be removed.
The issue on allowing commits to be re-committed is related as well: #1372

As a workaround you can set a specific offset to start consuming from in assign() (from your rebalance cb), which will prevent the consumer from fetching the existing committed offsets and let you commit an older offset.

@billygout
Copy link
Contributor

billygout commented Nov 27, 2017

@edenhill For the record, the following change solved a similar problem for me, and I believe this what you meant by the questionable check you mentioned above:

index d5a47f4b..53883a55 100644
--- a/libs/librdkafka/src/rdkafka_partition.c
+++ b/libs/librdkafka/src/rdkafka_partition.c
@@ -2687,14 +2687,8 @@ int rd_kafka_topic_partition_list_set_offsets (
                                     rktpar->topic, rktpar->partition,
                                     rktp->rktp_stored_offset,
                                     rktp->rktp_committed_offset);
-
-                       if (rktp->rktp_stored_offset >
-                           rktp->rktp_committed_offset) {
-                               verb = "setting stored";
-                               rktpar->offset = rktp->rktp_stored_offset;
-                       } else {
-                               rktpar->offset = RD_KAFKA_OFFSET_INVALID;
-                       }
+                       verb = "setting stored";
+                       rktpar->offset = rktp->rktp_stored_offset;
                         rd_kafka_toppar_unlock(rktp);
                 } else {
                        if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) {

The scenario I had was the case of a partition getting irretrievably lost due to loss of all replicas of the partition. The result was that, when the partition was recreated, it started off at offset 0, and the last committed offsets for my consumer were way up out of range. The change above allows the earlier offsets to be committed, which was blocked by the check above, and recover from there.

It might be worthwhile to either make this change or add a configuration option to conditionally control the behaviour. The workaround tthat you have mentioned above, while I'm sure it works, requires changes to code and involves some understanding of how the rebalance_cb works and assign() function, something I expect many people, such as I, don't have understanding of and have merely copied the rebalance_cb snippet from your examples.

I should also mentioned that, starting with kafka 0.11.0, the kafka-consumer-groups tool has the ability to "reset offsets" ( https://issues.apache.org/jira/browse/KAFKA-4743) which probably would be a better workaround, although I have not tried it as I have kafka 0.10.

@edenhill
Copy link
Contributor

Right on!
We're discussing adding a configuration option to allow committing offsets that are <= last-committed offset, to eventually transition that behaviour to default. It is not currently clear to me if anyone relies on the existing behaviour.

Similar discussion going on in #1372, dupping this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants