Skip to content

Commit

Permalink
Change enable.partition.eof default from true to false (#2020)
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Dec 4, 2018
1 parent 764b6fb commit a869fa1
Show file tree
Hide file tree
Showing 18 changed files with 56 additions and 23 deletions.
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ offset.store.method | C | none, file, broker | bro
consume_cb | C | | | low | Message consume callback (set with rd_kafka_conf_set_consume_cb()) <br>*Type: pointer*
rebalance_cb | C | | | low | Called after consumer group has been rebalanced (set with rd_kafka_conf_set_rebalance_cb()) <br>*Type: pointer*
offset_commit_cb | C | | | low | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb()) <br>*Type: pointer*
enable.partition.eof | C | true, false | true | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean*
enable.partition.eof | C | true, false | false | low | Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition. <br>*Type: boolean*
check.crcs | C | true, false | false | medium | Verify CRC32 of consumed messages, ensuring no on-the-wire or on-disk corruption to the messages occurred. This check comes at slightly increased CPU usage. <br>*Type: boolean*
enable.idempotence | P | true, false | false | high | When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5` (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0), `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied configuration is incompatible. <br>*Type: boolean*
enable.gapless.guarantee | P | true, false | true | high | When set to `true`, any error that could result in a gap in the produced message series when a batch of messages fails, will raise a fatal error (ERR__GAPLESS_GUARANTEE) and stop the producer. Requires `enable.idempotence=true`. <br>*Type: boolean*
Expand Down
2 changes: 2 additions & 0 deletions examples/kafkatest_verifiable_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,8 @@ int main (int argc, char **argv) {
conf->set("fetch.wait.max.ms", "500", errstr);
conf->set("fetch.min.bytes", "4096", errstr);

conf->set("enable.partition.eof", "true", errstr);

for (int i = 1 ; i < argc ; i++) {
const char *name = argv[i];
const char *val = i+1 < argc ? argv[i+1] : NULL;
Expand Down
3 changes: 3 additions & 0 deletions examples/rdkafka_consumer_example.c
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ int main (int argc, char **argv) {

/* Callback called on partition assignment changes */
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);

rd_kafka_conf_set(conf, "enable.partition.eof", "true",
NULL, 0);
}

/* Create Kafka handle */
Expand Down
2 changes: 2 additions & 0 deletions examples/rdkafka_consumer_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ int main (int argc, char **argv) {
ExampleRebalanceCb ex_rebalance_cb;
conf->set("rebalance_cb", &ex_rebalance_cb, errstr);

conf->set("enable.partition.eof", "true", errstr);

while ((opt = getopt(argc, argv, "g:b:z:qd:eX:AM:qv")) != -1) {
switch (opt) {
case 'g':
Expand Down
3 changes: 3 additions & 0 deletions examples/rdkafka_example.c
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,9 @@ int main (int argc, char **argv) {
* Consumer
*/

rd_kafka_conf_set(conf, "enable.partition.eof", "true",
NULL, 0);

/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr)))) {
Expand Down
2 changes: 2 additions & 0 deletions examples/rdkafka_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ int main (int argc, char **argv) {
* Consumer mode
*/

conf->set("enable.partition.eof", "true", errstr);

if(topic_str.empty())
goto usage;

Expand Down
4 changes: 4 additions & 0 deletions examples/rdkafka_performance.c
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,10 @@ int main (int argc, char **argv) {

topic = topics->elems[0].topic;

if (mode == 'C' || mode == 'G')
rd_kafka_conf_set(conf, "enable.partition.eof", "true",
NULL, 0);

if (mode == 'P') {
/*
* Producer
Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
_RK(enable_partition_eof),
"Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the "
"consumer reaches the end of a partition.",
0, 1, 1 },
0, 1, 0 },
{ _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "check.crcs", _RK_C_BOOL,
_RK(check_crcs),
"Verify CRC32 of consumed messages, ensuring no on-the-wire or "
Expand Down
2 changes: 2 additions & 0 deletions tests/0012-produce_consume.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ static void consume_messages_with_queues (uint64_t testid, const char *topic,

test_conf_init(&conf, &topic_conf, 20);

test_conf_set(conf, "enable.partition.eof", "true");

/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_CONSUMER, conf);

Expand Down
2 changes: 2 additions & 0 deletions tests/0014-reconsume-191.c
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ static void consume_messages_callback_multi (const char *desc,
test_conf_set(conf, "group.id", topic);
}

test_conf_set(conf, "enable.partition.eof", "true");

/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_CONSUMER, conf);

Expand Down
13 changes: 10 additions & 3 deletions tests/0026-consume_pause.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ static int consume_pause (void) {
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
const int partition_cnt = 3;
rd_kafka_t *rk;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *tconf;
rd_kafka_topic_partition_list_t *topics;
rd_kafka_resp_err_t err;
Expand All @@ -54,7 +55,9 @@ static int consume_pause (void) {
int fails = 0;
char group_id[32];

test_conf_init(NULL, &tconf, 60 + (test_session_timeout_ms * 3 / 1000));
test_conf_init(&conf, &tconf,
60 + (test_session_timeout_ms * 3 / 1000));
test_conf_set(conf, "enable.partition.eof", "true");
test_topic_conf_set(tconf, "auto.offset.reset", "smallest");

test_create_topic(topic, partition_cnt, 1);
Expand Down Expand Up @@ -94,8 +97,9 @@ static int consume_pause (void) {
it, iterations-1, group_id,
per_pause_msg_cnt, eof_cnt);

rk = test_create_consumer(group_id, NULL, NULL,
rd_kafka_topic_conf_dup(tconf));
rk = test_create_consumer(group_id, NULL,
rd_kafka_conf_dup(conf),
rd_kafka_topic_conf_dup(tconf));


TEST_SAY("Subscribing to %d topic(s): %s\n",
Expand Down Expand Up @@ -210,6 +214,7 @@ static int consume_pause (void) {
}

rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_conf_destroy(conf);
rd_kafka_topic_conf_destroy(tconf);

return 0;
Expand Down Expand Up @@ -261,6 +266,7 @@ static int consume_pause_resume_after_reassign (void) {
/**
* Create consumer.
*/
test_conf_set(conf, "enable.partition.eof", "true");
rk = test_create_consumer(topic, NULL, conf, NULL);

test_consumer_assign("assign", rk, partitions);
Expand Down Expand Up @@ -411,6 +417,7 @@ static int consume_subscribe_assign_pause_resume (void) {
*/
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
test_conf_set(conf, "session.timeout.ms", "6000");
test_conf_set(conf, "enable.partition.eof", "true");
rk = test_create_consumer(topic, NULL, conf, NULL);

test_consumer_subscribe(rk, topic);
Expand Down
10 changes: 2 additions & 8 deletions tests/0030-offset_commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,7 @@ static void do_offset_test (const char *what, int auto_commit, int auto_store,

if (rkm->err == RD_KAFKA_RESP_ERR__TIMED_OUT)
TEST_FAIL("%s: Timed out waiting for message %d", what,cnt);
else if (rkm->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
rd_kafka_message_destroy(rkm);
continue;
} else if (rkm->err)
else if (rkm->err)
TEST_FAIL("%s: Consumer error: %s",
what, rd_kafka_message_errstr(rkm));

Expand Down Expand Up @@ -306,10 +303,7 @@ static void do_offset_test (const char *what, int auto_commit, int auto_store,

if (rkm->err == RD_KAFKA_RESP_ERR__TIMED_OUT)
TEST_FAIL("%s: Timed out waiting for message %d", what,cnt);
else if (rkm->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
rd_kafka_message_destroy(rkm);
continue;
} else if (rkm->err)
else if (rkm->err)
TEST_FAIL("%s: Consumer error: %s",
what, rd_kafka_message_errstr(rkm));

Expand Down
6 changes: 5 additions & 1 deletion tests/0034-offset_reset.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,16 @@ static void do_test_reset (const char *topic, int partition,
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
int eofcnt = 0, msgcnt = 0, errcnt = 0;
rd_kafka_conf_t *conf;

TEST_SAY("Test auto.offset.reset=%s, "
"expect %d msgs, %d EOFs, %d errors\n",
reset, exp_msgcnt, exp_eofcnt, exp_errcnt);

rk = test_create_consumer(NULL, NULL, NULL, NULL);
test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "enable.partition.eof", "true");

rk = test_create_consumer(NULL, NULL, conf, NULL);
rkt = test_create_topic_object(rk, topic, "auto.offset.reset", reset,
NULL);

Expand Down
9 changes: 6 additions & 3 deletions tests/0056-balanced_group_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ int main_0056_balanced_group_mt (int argc, char **argv) {
int partition_cnt = 2;
int partition;
uint64_t testid;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *default_topic_conf;
rd_kafka_topic_partition_list_t *sub, *topics;
rd_kafka_resp_err_t err;
Expand All @@ -230,9 +231,11 @@ int main_0056_balanced_group_mt (int argc, char **argv) {
if (mtx_init(&lock, mtx_plain) != thrd_success)
TEST_FAIL("Cannot create mutex.");

test_conf_init(NULL, &default_topic_conf,
test_conf_init(&conf, &default_topic_conf,
(test_session_timeout_ms * 3) / 1000);

test_conf_set(conf, "enable.partition.eof", "true");

test_topic_conf_set(default_topic_conf, "auto.offset.reset",
"smallest");

Expand All @@ -242,8 +245,8 @@ int main_0056_balanced_group_mt (int argc, char **argv) {

/* Create consumers and start subscription */
rk_c = test_create_consumer(
topic /*group_id*/, rebalance_cb, NULL,
default_topic_conf);
topic /*group_id*/, rebalance_cb,
conf, default_topic_conf);

test_consumer_subscribe(rk_c, topic);

Expand Down
4 changes: 2 additions & 2 deletions tests/0061-consumer_lag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ static void do_test_consumer_lag (void) {
Test::Fail("set event_cb failed: " + errstr);
Test::conf_set(conf, "group.id", topic);
Test::conf_set(conf, "enable.auto.commit", "false");
Test::conf_set(conf, "enable.partition.eof", "false");
Test::conf_set(conf, "auto.offset.reset", "earliest");
Test::conf_set(conf, "statistics.interval.ms", "100");

Expand All @@ -159,7 +158,8 @@ static void do_test_consumer_lag (void) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR__PARTITION_EOF:
Test::Fail(tostr() << "Consume error after " << cnt << "/" << msgcnt << " messages: " << msg->errstr());
Test::Fail(tostr() << "Unexpected PARTITION_EOF (not enbaled) after "
<< cnt << "/" << msgcnt << " messages: " << msg->errstr());
break;

case RdKafka::ERR_NO_ERROR:
Expand Down
4 changes: 4 additions & 0 deletions tests/0067-empty_topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ static void do_test_empty_topic_consumer () {

Test::conf_init(&conf, NULL, 0);

Test::conf_set(conf, "enable.partition.eof", "true");

/* Create simple consumer */
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer)
Expand Down Expand Up @@ -97,6 +99,8 @@ static void do_test_empty_topic_consumer () {

Test::conf_set(conf, "group.id", topic);

Test::conf_set(conf, "enable.partition.eof", "true");

RdKafka::KafkaConsumer *kconsumer = RdKafka::KafkaConsumer::create(conf, errstr);
if (!kconsumer)
Test::Fail("Failed to create KafkaConsumer: " + errstr);
Expand Down
1 change: 0 additions & 1 deletion tests/0082-fetch_max_bytes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ static void do_test_fetch_max_bytes (void) {
switch (msg->err())
{
case RdKafka::ERR__TIMED_OUT:
case RdKafka::ERR__PARTITION_EOF:
break;

case RdKafka::ERR_NO_ERROR:
Expand Down
8 changes: 5 additions & 3 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -2100,15 +2100,17 @@ test_consume_msgs_easy_mv (const char *group_id, const char *topic,
test_msgver_t *mv) {
rd_kafka_t *rk;
char grpid0[64];
rd_kafka_conf_t *conf;

if (!tconf)
test_conf_init(NULL, &tconf, 0);
test_conf_init(&conf, tconf ? NULL : &tconf, 0);

if (!group_id)
group_id = test_str_id_generate(grpid0, sizeof(grpid0));

test_topic_conf_set(tconf, "auto.offset.reset", "smallest");
rk = test_create_consumer(group_id, NULL, NULL, tconf);
if (exp_eofcnt != -1)
test_conf_set(conf, "enable.partition.eof", "true");
rk = test_create_consumer(group_id, NULL, conf, tconf);

rd_kafka_poll_set_consumer(rk);

Expand Down

0 comments on commit a869fa1

Please sign in to comment.