From df6c72a22c8d099741ec8e4a54343c9bc58acec5 Mon Sep 17 00:00:00 2001 From: Dmytro Milinevskyi Date: Wed, 6 Apr 2022 15:04:35 +0200 Subject: [PATCH] assignor: API for a custom consumer group assignment strategy --- .gitignore | 1 + examples/.gitignore | 3 + examples/Makefile | 6 +- examples/consumer_custom_assignor.c | 365 +++++++++++++++++++++++ src/rdkafka.c | 2 + src/rdkafka.h | 177 ++++++++++++ src/rdkafka_assignor.c | 432 +++++++++++++++++++++------- src/rdkafka_assignor.h | 131 +++------ src/rdkafka_cgrp.c | 74 ++--- src/rdkafka_cgrp.h | 4 +- src/rdkafka_partition.c | 7 +- src/rdkafka_partition.h | 15 +- src/rdkafka_range_assignor.c | 45 ++- src/rdkafka_request.c | 51 ++-- src/rdkafka_request.h | 21 +- src/rdkafka_roundrobin_assignor.c | 26 +- src/rdkafka_sticky_assignor.c | 307 +++++++------------- 17 files changed, 1139 insertions(+), 528 deletions(-) create mode 100644 examples/consumer_custom_assignor.c diff --git a/.gitignore b/.gitignore index 31c5061e33..96672ac04c 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ cov-int gdbrun*.gdb TAGS vcpkg_installed +.DS_Store \ No newline at end of file diff --git a/examples/.gitignore b/examples/.gitignore index 84e64fc4f5..bb0e972ef7 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -7,7 +7,10 @@ kafkatest_verifiable_client producer producer_cpp consumer +consumer_custom_assignor idempotent_producer +misc +openssl_engine_example_cpp rdkafka_consume_batch transactions delete_records diff --git a/examples/Makefile b/examples/Makefile index fc6eccc56f..158bbe8aa9 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -2,6 +2,7 @@ EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \ rdkafka_complex_consumer_example rdkafka_complex_consumer_example_cpp \ kafkatest_verifiable_client \ producer consumer idempotent_producer transactions \ + consumer_custom_assignor \ delete_records \ openssl_engine_example_cpp \ misc @@ -44,6 +45,10 @@ consumer: ../src/librdkafka.a consumer.c $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) +consumer_custom_assignor: ../src/librdkafka.a consumer_custom_assignor.c + $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ + ../src/librdkafka.a $(LIBS) + idempotent_producer: ../src/librdkafka.a idempotent_producer.c $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) @@ -114,4 +119,3 @@ misc: ../src/librdkafka.a misc.c clean: rm -f $(EXAMPLES) - diff --git a/examples/consumer_custom_assignor.c b/examples/consumer_custom_assignor.c new file mode 100644 index 0000000000..5b12b20185 --- /dev/null +++ b/examples/consumer_custom_assignor.c @@ -0,0 +1,365 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2019, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Simple high-level balanced Apache Kafka consumer + * using the Kafka driver from librdkafka + * (https://github.com/edenhill/librdkafka) + */ + +#include +#include +#include +#include +#include + + +/* Typical include path would be , but this program + * is builtin from within the librdkafka source tree and thus differs. */ +//#include +#include "rdkafka.h" + + +static volatile sig_atomic_t run = 1; + +/** + * @brief Signal termination of program + */ +static void stop(int sig) { + run = 0; +} + + + +/** + * @returns 1 if all bytes are printable, else 0. + */ +static int is_printable(const char *buf, size_t size) { + size_t i; + + for (i = 0; i < size; i++) + if (!isprint((int)buf[i])) + return 0; + + return 1; +} + +static int rd_kafka_group_member_cmp(const void *_a, const void *_b) { + const rd_kafka_group_member_t *a = (const rd_kafka_group_member_t *)_a; + const rd_kafka_group_member_t *b = (const rd_kafka_group_member_t *)_b; + + int res = strcmp(a->rkgm_userdata->data, b->rkgm_userdata->data); + + if (res != 0) { + return res; + } + + return strcmp(a->rkgm_member_id, b->rkgm_member_id); +} + +static rd_kafka_resp_err_t +rd_kafka_custom_assignor_assign_cb(rd_kafka_t *rk, + void *opaque, + const char *member_id, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + size_t member_cnt, + rd_kafka_assignor_topic_t *eligible_topics, + size_t eligible_topic_cnt, + char *errstr, + size_t errstr_size) { + fprintf(stderr, "Rebalancing Leader \"%s\"\n", member_id); + + if (eligible_topic_cnt != 1) { + fprintf(stderr, "Invalid number of topics %zu\n", + eligible_topic_cnt); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + rd_kafka_assignor_topic_t *eligible_topic = &eligible_topics[0]; + + if (member_cnt == 0) { + fprintf(stderr, "Invalid number of members %zu\n", member_cnt); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + qsort(members, member_cnt, sizeof(*members), rd_kafka_group_member_cmp); + + /* Take member with the highest priority. */ + rd_kafka_group_member_t *member = &members[0]; + + /* Assign all eligible partitions of a given topic. + */ + int p; + for (p = 0; p < eligible_topic->metadata->partition_cnt; p++) { + int32_t partition = eligible_topic->metadata->partitions[p].id; + + fprintf(stderr, + "Member \"%s\"%s: " + "assigned topic %s partition %d (metadata: %.*s)\n", + member->rkgm_member_id, + strcmp(member_id, member->rkgm_member_id) == 0 ? " (me)" + : "", + eligible_topic->metadata->topic, partition, + (int)member->rkgm_userdata->len - 1, + (char *)member->rkgm_userdata->data); + + rd_kafka_topic_partition_list_add( + member->rkgm_assignment, eligible_topic->metadata->topic, + partition); + } + + + return 0; +} + +static rd_kafka_member_userdata_serialized_t * +rd_kafka_custom_assignor_get_empty_userdata( + void *opaque, + const char *member_id, + const rd_kafka_topic_partition_list_t *owned_partitions, + int32_t rkcg_generation_id) { + const char *prio = opaque; + return rd_kafka_member_userdata_serialized_new(prio, strlen(prio) + 1); +} + + +int main(int argc, char **argv) { + rd_kafka_t *rk; /* Consumer instance handle */ + rd_kafka_conf_t *conf; /* Temporary configuration object */ + rd_kafka_resp_err_t err; /* librdkafka API error code */ + char errstr[512]; /* librdkafka API error reporting buffer */ + const char *brokers; /* Argument: broker list */ + const char *groupid; /* Argument: Consumer group id */ + char *topic; /* Argument: topic to subscribe to */ + char *prio; /* Argument: subscriber priority */ + + /* + * Argument validation + */ + if (argc < 5) { + fprintf(stderr, + "%% Usage: " + "%s \n", + argv[0]); + return 1; + } + + brokers = argv[1]; + groupid = argv[2]; + topic = argv[3]; + prio = argv[4]; + + /* + * Register custom assigned + */ + err = rd_kafka_assignor_register( + "custom", RD_KAFKA_REBALANCE_PROTOCOL_EAGER, + rd_kafka_custom_assignor_assign_cb, + rd_kafka_custom_assignor_get_empty_userdata, prio); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + fprintf(stderr, "failed to register assignor: %s\n", + rd_kafka_err2str(err)); + return 1; + } + + /* + * Create Kafka client configuration place-holder + */ + conf = rd_kafka_conf_new(); + + /* Set bootstrap broker(s) as a comma-separated list of + * host or host:port (default port 9092). + * librdkafka will use the bootstrap brokers to acquire the full + * set of brokers from the cluster. */ + if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + /* Set the consumer group id. + * All consumers sharing the same group id will join the same + * group, and the subscribed topic' partitions will be assigned + * according to the partition.assignment.strategy + * (consumer config property) to the consumers in the group. */ + if (rd_kafka_conf_set(conf, "group.id", groupid, errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + /* If there is no previously committed offset for a partition + * the auto.offset.reset strategy will be used to decide where + * in the partition to start fetching messages. + * By setting this to earliest the consumer will read all messages + * in the partition if there was no previously committed offset. */ + if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + /* Enable client debug. Most useful for us is "assignor,cgrp" + * + */ + if (getenv("DEBUG") != NULL) { + if (rd_kafka_conf_set(conf, "debug", getenv("DEBUG"), errstr, + sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + } + + /* Set assignor registered above. + */ + if (rd_kafka_conf_set(conf, "partition.assignment.strategy", "custom", + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { + fprintf(stderr, "%s\n", errstr); + rd_kafka_conf_destroy(conf); + return 1; + } + + /* + * Create consumer instance. + * + * NOTE: rd_kafka_new() takes ownership of the conf object + * and the application must not reference it again after + * this call. + */ + rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + if (!rk) { + fprintf(stderr, "%% Failed to create new consumer: %s\n", + errstr); + return 1; + } + + conf = NULL; /* Configuration object is now owned, and freed, + * by the rd_kafka_t instance. */ + + + /* Redirect all messages from per-partition queues to + * the main queue so that messages can be consumed with one + * call from all assigned partitions. + * + * The alternative is to poll the main queue (for events) + * and each partition queue separately, which requires setting + * up a rebalance callback and keeping track of the assignment: + * but that is more complex and typically not recommended. */ + rd_kafka_poll_set_consumer(rk); + + + rd_kafka_topic_partition_list_t *subscription; + subscription = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(subscription, topic, + /* the partition is ignored + * by subscribe() */ + RD_KAFKA_PARTITION_UA); + err = rd_kafka_subscribe(rk, subscription); + if (err) { + fprintf(stderr, "%% Failed to subscribe to topic: %s\n", + rd_kafka_err2str(err)); + rd_kafka_topic_partition_list_destroy(subscription); + rd_kafka_destroy(rk); + return 1; + } + fprintf(stderr, + "%% Subscribed to %s topic, " + "waiting for rebalance and messages...\n", + topic); + rd_kafka_topic_partition_list_destroy(subscription); + + + /* Signal handler for clean shutdown */ + signal(SIGINT, stop); + + /* Subscribing to topic will trigger a group rebalance + * which may take some time to finish, but there is no need + * for the application to handle this idle period in a special way + * since a rebalance may happen at any time. + * Start polling for messages. */ + + while (run) { + rd_kafka_message_t *rkm; + + rkm = rd_kafka_consumer_poll(rk, 100); + if (!rkm) + continue; /* Timeout: no message within 100ms, + * try again. This short timeout allows + * checking for `run` at frequent intervals. + */ + + /* consumer_poll() will return either a proper message + * or a consumer error (rkm->err is set). */ + if (rkm->err) { + /* Consumer errors are generally to be considered + * informational as the consumer will automatically + * try to recover from all types of errors. */ + fprintf(stderr, "%% Consumer error: %s\n", + rd_kafka_message_errstr(rkm)); + rd_kafka_message_destroy(rkm); + continue; + } + + /* Proper message. */ + printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n", + rd_kafka_topic_name(rkm->rkt), rkm->partition, + rkm->offset); + + /* Print the message key. */ + if (rkm->key && is_printable(rkm->key, rkm->key_len)) + printf(" Key: %.*s\n", (int)rkm->key_len, + (const char *)rkm->key); + else if (rkm->key) + printf(" Key: (%d bytes)\n", (int)rkm->key_len); + + /* Print the message value/payload. */ + if (rkm->payload && is_printable(rkm->payload, rkm->len)) + printf(" Value: %.*s\n", (int)rkm->len, + (const char *)rkm->payload); + else if (rkm->payload) + printf(" Value: (%d bytes)\n", (int)rkm->len); + + rd_kafka_message_destroy(rkm); + } + + + /* Close the consumer: commit final offsets and leave the group. */ + fprintf(stderr, "%% Closing consumer\n"); + rd_kafka_consumer_close(rk); + + + /* Destroy the consumer */ + rd_kafka_destroy(rk); + + return 0; +} diff --git a/src/rdkafka.c b/src/rdkafka.c index 33a6b939b7..abcf5c5a87 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -158,6 +158,8 @@ static void rd_kafka_global_init0(void) { #if WITH_CURL rd_http_global_init(); #endif + + rd_kafka_assignor_global_init(); } /** diff --git a/src/rdkafka.h b/src/rdkafka.h index b424b21869..d2474471e1 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -8368,6 +8368,183 @@ RD_EXPORT rd_kafka_error_t *rd_kafka_abort_transaction(rd_kafka_t *rk, int timeout_ms); +/**@}*/ + +/** + * @name Custom Consumer Partition Assignor API + * @{ + */ + +/** + * @enum rd_kafka_rebalance_protocol_t + * + * @brief Enumerates the different rebalance protocol types. + * + * @sa + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol + * @sa rd_kafka_rebalance_protocol() + */ +typedef enum rd_kafka_rebalance_protocol_t { + RD_KAFKA_REBALANCE_PROTOCOL_NONE, /**< Rebalance protocol is + unknown */ + RD_KAFKA_REBALANCE_PROTOCOL_EAGER, /**< Eager rebalance + protocol */ + RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE /**< Cooperative + rebalance protocol*/ +} rd_kafka_rebalance_protocol_t; + + +/** + * @brief Custom user data associated with a given member and preserved in Kafka + * itself. + */ +typedef struct rd_kafka_member_userdata_serialized_s { + /** Pointer to the serialized data. */ + const void *data; + /** Length of the serialized data. */ + size_t len; +} rd_kafka_member_userdata_serialized_t; + + +/** + * @brief Initializes user matadata object. + * + * @param data Pointer the serialized user data. + * It can be NULL, in that case len must be 0. + * @param len Length of the serialized data. + * + * @returns A newly allocated object that could be returned from the + * rd_kafka_assignor_get_user_metadata_cb_t() callback or passed to the + * rd_kafka_assignor_assign_cb_t() callback. + * + * @remark Use rd_kafka_member_userdata_serialized_destroy() to free the + * resources. + */ +RD_EXPORT +rd_kafka_member_userdata_serialized_t * +rd_kafka_member_userdata_serialized_new(const void *data, size_t len); + + +/** + * @brief Destroys object created with rd_kafka_member_userdata_serialized_new() + */ +RD_EXPORT +void rd_kafka_member_userdata_serialized_destroy( + rd_kafka_member_userdata_serialized_t *mdata); + + +/** + * @brief Represents a member of the assignment. + */ +typedef struct rd_kafka_group_member_s { + /** Subscribed topics (partition field is ignored). */ + const rd_kafka_topic_partition_list_t *rkgm_subscription; + /** Partitions assigned to this member after running the assignor. + * E.g., the current assignment coming out of the rebalance. */ + rd_kafka_topic_partition_list_t *rkgm_assignment; + /** Partitions reported as currently owned by the member, read + * from consumer metadata. E.g., the current assignment going into + * the rebalance. */ + const rd_kafka_topic_partition_list_t *rkgm_owned; + /** Member id (e.g., client.id-some-uuid). */ + const char *rkgm_member_id; + /** Group instance id. */ + const char *rkgm_group_instance_id; + /** Member-specific opaque userdata. */ + const rd_kafka_member_userdata_serialized_t *rkgm_userdata; + /** Group generation id. */ + int rkgm_generation; +} rd_kafka_group_member_t; + + +/** + * @brief Returns serialized member user metadata. + * This callback is called when member joins the group. + * + * @param opaque Opaque data passed to rd_kafka_assignor_register(). + * @param member_id Name of the member ID. + * @param owned_partitions List of partitions the member is subscribed to. + * @param generation_id Consumer group generation ID. + * + * @remark It may return NULL if there's nothing to persist. + */ +typedef rd_kafka_member_userdata_serialized_t *( + *rd_kafka_assignor_get_user_metadata_cb_t)( + void *opaque, + const char *member_id, + const rd_kafka_topic_partition_list_t *owned_partitions, + int32_t generation_id); + + +/** + * @brief Structure to hold metadata for a single topic and all its + * subscribing members. + */ +typedef struct rd_kafka_assignor_topic_s { + /** Topic information. */ + const rd_kafka_metadata_topic_t *metadata; + /** List of members subscribed to the given topic. */ + rd_kafka_group_member_t *members; + /** Number of elements in \p members. */ + size_t member_cnt; +} rd_kafka_assignor_topic_t; + + +/** + * @brief rd_kafka_assignor_assign_cb_t is called to perform the group + * assignment given the member subscriptions and current cluster metadata. It + * does that by manipulating `members` argument. + */ +typedef rd_kafka_resp_err_t (*rd_kafka_assignor_assign_cb_t)( + /** Client instance. */ + rd_kafka_t *rk, + /** Opaque data passed to rd_kafka_assignor_register(). */ + void *opaque, + /** Member ID that builds consumer group assignment. */ + const char *member_id, + /** Kafka cluster metadata. */ + const rd_kafka_metadata_t *metadata, + /** List of members eligible for the assignment. This is an "output" + structure. */ + rd_kafka_group_member_t *members, + /** Number of elements in \p members. */ + size_t member_cnt, + /** + * List of the eligible topics for the assignment. + * The callback is free manipulating `eligible_topics` in any way, + * it is not used by the caller but the resources are destroyed afterwards. + */ + rd_kafka_assignor_topic_t *eligible_topics, + /** Number of elements in \p eligible_topics. */ + size_t eligible_topic_cnt, + /** A human readable error string (nul-terminated) is written to + * this location (only written to if there is a fatal error). + */ + char *errstr, + /** Writable size in \p errstr. */ + size_t errstr_size); + + +/** + * @brief Register new assignor. + * + * @param protocol_name Name of the assignor (as set via the + * "partition.assignment.strategy" option). + * @param rebalance_protocol Rebalance protocol option @sa + * rd_kafka_rebalance_protocol_t + * @param assign_cb Callback that decides consumer group assignment. + * @param get_user_metadata_cb Callback that generates consumer group member + * user data. It is an optional parameter and can be NULL. + * @param opaque An opaque data passed to the assignor callbacks. + */ +RD_EXPORT +rd_kafka_resp_err_t rd_kafka_assignor_register( + const char *protocol_name, + rd_kafka_rebalance_protocol_t rebalance_protocol, + rd_kafka_assignor_assign_cb_t assign_cb, + rd_kafka_assignor_get_user_metadata_cb_t get_user_metadata_cb, + void *opaque); + /**@}*/ /* @cond NO_DOC */ diff --git a/src/rdkafka_assignor.c b/src/rdkafka_assignor.c index dfd1c775f3..937d1d427d 100644 --- a/src/rdkafka_assignor.c +++ b/src/rdkafka_assignor.c @@ -32,10 +32,33 @@ #include +static once_flag rd_kafka_assignor_global_registry_init_once = ONCE_FLAG_INIT; +static mtx_t rd_kafka_assignor_global_registry_lock; +static rd_list_t rd_kafka_assignor_global_registry; + + +static void rd_kafka_assignor_global_init0(void) { + mtx_init(&rd_kafka_assignor_global_registry_lock, mtx_plain); + rd_list_init(&rd_kafka_assignor_global_registry, 0, NULL); + + rd_kafka_range_assignor_register(); + rd_kafka_roundrobin_assignor_register(); + rd_kafka_sticky_assignor_register(); +} + +/** + * @brief Initialize assignor registry + */ +void rd_kafka_assignor_global_init(void) { + call_once(&rd_kafka_assignor_global_registry_init_once, + rd_kafka_assignor_global_init0); +} + /** * Clear out and free any memory used by the member, but not the rkgm itself. */ -void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm) { +void rd_kafka_group_member_internal_clear( + rd_kafka_group_member_internal_t *rkgm) { if (rkgm->rkgm_owned) rd_kafka_topic_partition_list_destroy(rkgm->rkgm_owned); @@ -64,11 +87,13 @@ void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm) { /** - * @brief Group member comparator (takes rd_kafka_group_member_t *) + * @brief Group member comparator (takes rd_kafka_group_member_internal_t *) */ -int rd_kafka_group_member_cmp(const void *_a, const void *_b) { - const rd_kafka_group_member_t *a = (const rd_kafka_group_member_t *)_a; - const rd_kafka_group_member_t *b = (const rd_kafka_group_member_t *)_b; +int rd_kafka_group_member_internal_cmp(const void *_a, const void *_b) { + const rd_kafka_group_member_internal_t *a = + (const rd_kafka_group_member_internal_t *)_a; + const rd_kafka_group_member_internal_t *b = + (const rd_kafka_group_member_internal_t *)_b; /* Use the group instance id to compare static group members */ if (!RD_KAFKAP_STR_IS_NULL(a->rkgm_group_instance_id) && @@ -80,6 +105,23 @@ int rd_kafka_group_member_cmp(const void *_a, const void *_b) { } +/** + * @brief Group member comparator (takes rd_kafka_group_member_t *) + */ +int rd_kafka_group_member_cmp(const void *_a, const void *_b) { + const rd_kafka_group_member_t *a = (const rd_kafka_group_member_t *)_a; + const rd_kafka_group_member_t *b = (const rd_kafka_group_member_t *)_b; + + /* Use the group instance id to compare static group members */ + if (a->rkgm_group_instance_id != NULL && + b->rkgm_group_instance_id != NULL) + return strcmp(a->rkgm_group_instance_id, + b->rkgm_group_instance_id); + + return strcmp(a->rkgm_member_id, b->rkgm_member_id); +} + + /** * Returns true if member subscribes to topic, else false. */ @@ -93,8 +135,8 @@ int rd_kafka_group_member_find_subscription(rd_kafka_t *rk, const rd_kafka_topic_partition_t *rktpar = &rkgm->rkgm_subscription->elems[i]; - if (rd_kafka_topic_partition_match(rk, rkgm, rktpar, topic, - NULL)) + if (rd_kafka_topic_partition_match(rk, rkgm->rkgm_member_id, + rktpar, topic, NULL)) return 1; } @@ -163,14 +205,24 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new( } +rd_kafka_member_userdata_serialized_t * +rd_kafka_member_userdata_serialized_new(const void *data, size_t len) { + rd_kafka_member_userdata_serialized_t *mdata; + mdata = rd_malloc(sizeof(*mdata) + len); + mdata->data = (void *)(mdata + 1); + if (data == NULL) { + mdata->data = NULL; + } else { + memcpy((void *)mdata->data, data, len); + } + mdata->len = len; + return mdata; +} -rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata( - const rd_kafka_assignor_t *rkas, - void *assignor_state, - const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions) { - return rd_kafka_consumer_protocol_member_metadata_new(topics, NULL, 0, - owned_partitions); + +void rd_kafka_member_userdata_serialized_destroy( + rd_kafka_member_userdata_serialized_t *mdata) { + rd_free(mdata); } @@ -178,11 +230,11 @@ rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata( /** * Returns 1 if all subscriptions are satifised for this member, else 0. */ -static int rd_kafka_member_subscription_match( +static int rd_kafka_member_internal_subscription_match( rd_kafka_cgrp_t *rkcg, - rd_kafka_group_member_t *rkgm, + rd_kafka_group_member_internal_t *rkgm, const rd_kafka_metadata_topic_t *topic_metadata, - rd_kafka_assignor_topic_t *eligible_topic) { + rd_kafka_assignor_topic_internal_t *eligible_topic) { int i; int has_regex = 0; int matched = 0; @@ -193,9 +245,12 @@ static int rd_kafka_member_subscription_match( &rkgm->rkgm_subscription->elems[i]; int matched_by_regex = 0; - if (rd_kafka_topic_partition_match(rkcg->rkcg_rk, rkgm, rktpar, - topic_metadata->topic, - &matched_by_regex)) { + char *member_id; + RD_KAFKAP_STR_DUPA(&member_id, rkgm->rkgm_member_id); + + if (rd_kafka_topic_partition_match( + rkcg->rkcg_rk, member_id, rktpar, topic_metadata->topic, + &matched_by_regex)) { rd_list_add(&rkgm->rkgm_eligible, (void *)topic_metadata); matched++; @@ -214,16 +269,15 @@ static int rd_kafka_member_subscription_match( } -static void rd_kafka_assignor_topic_destroy(rd_kafka_assignor_topic_t *at) { +static void rd_kafka_assignor_topic_internal_destroy( + rd_kafka_assignor_topic_internal_t *at) { rd_list_destroy(&at->members); rd_free(at); } int rd_kafka_assignor_topic_cmp(const void *_a, const void *_b) { - const rd_kafka_assignor_topic_t *a = - *(const rd_kafka_assignor_topic_t *const *)_a; - const rd_kafka_assignor_topic_t *b = - *(const rd_kafka_assignor_topic_t *const *)_b; + const rd_kafka_assignor_topic_t *a = _a; + const rd_kafka_assignor_topic_t *b = _b; return strcmp(a->metadata->topic, b->metadata->topic); } @@ -234,17 +288,17 @@ int rd_kafka_assignor_topic_cmp(const void *_a, const void *_b) { * complete set of members that are subscribed to it. The result is * returned in `eligible_topics`. */ -static void -rd_kafka_member_subscriptions_map(rd_kafka_cgrp_t *rkcg, - rd_list_t *eligible_topics, - const rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt) { +static void rd_kafka_member_internal_subscriptions_map( + rd_kafka_cgrp_t *rkcg, + rd_list_t *eligible_topics, + const rd_kafka_metadata_t *metadata, + rd_kafka_group_member_internal_t *members, + int member_cnt) { int ti; - rd_kafka_assignor_topic_t *eligible_topic = NULL; + rd_kafka_assignor_topic_internal_t *eligible_topic = NULL; rd_list_init(eligible_topics, RD_MIN(metadata->topic_cnt, 10), - (void *)rd_kafka_assignor_topic_destroy); + (void *)rd_kafka_assignor_topic_internal_destroy); /* For each topic in the cluster, scan through the member list * to find matching subscriptions. */ @@ -273,7 +327,7 @@ rd_kafka_member_subscriptions_map(rd_kafka_cgrp_t *rkcg, for (i = 0; i < member_cnt; i++) { /* Match topic against existing metadata, incl regex matching. */ - rd_kafka_member_subscription_match( + rd_kafka_member_internal_subscription_match( rkcg, &members[i], &metadata->topics[ti], eligible_topic); } @@ -293,23 +347,25 @@ rd_kafka_member_subscriptions_map(rd_kafka_cgrp_t *rkcg, } -rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, - const rd_kafka_assignor_t *rkas, - rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt, - char *errstr, - size_t errstr_size) { +rd_kafka_resp_err_t +rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, + const rd_kafka_assignor_t *rkas, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_internal_t *members_internal, + int member_cnt, + char *errstr, + size_t errstr_size) { rd_kafka_resp_err_t err; rd_ts_t ts_start = rd_clock(); int i; - rd_list_t eligible_topics; + rd_list_t eligible_topics_internal; int j; /* Construct eligible_topics, a map of: * topic -> set of members that are subscribed to it. */ - rd_kafka_member_subscriptions_map(rkcg, &eligible_topics, metadata, - members, member_cnt); + rd_kafka_member_internal_subscriptions_map( + rkcg, &eligible_topics_internal, metadata, members_internal, + member_cnt); if (rkcg->rkcg_rk->rk_conf.debug & @@ -320,10 +376,11 @@ rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, "%d member(s) and " "%d eligible subscribed topic(s):", rkcg->rkcg_group_id->str, rkas->rkas_protocol_name->str, - member_cnt, eligible_topics.rl_cnt); + member_cnt, eligible_topics_internal.rl_cnt); for (i = 0; i < member_cnt; i++) { - const rd_kafka_group_member_t *member = &members[i]; + const rd_kafka_group_member_internal_t *member = + &members_internal[i]; rd_kafka_dbg( rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_ASSIGNOR, @@ -349,11 +406,106 @@ rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, } } + rd_kafka_assignor_topic_t *eligible_topics = + rd_calloc(rd_list_cnt(&eligible_topics_internal), + sizeof(rd_kafka_assignor_topic_t)); + rd_kafka_assignor_topic_internal_t *eligible_topic_internal; + RD_LIST_FOREACH(eligible_topic_internal, &eligible_topics_internal, i) { + rd_kafka_assignor_topic_t *eligible_topic = &eligible_topics[i]; + eligible_topic->metadata = eligible_topic_internal->metadata; + eligible_topic->members = + rd_calloc(rd_list_cnt(&eligible_topic_internal->members), + sizeof(rd_kafka_group_member_internal_t)); + rd_kafka_group_member_internal_t *member_internal; + RD_LIST_FOREACH(member_internal, + &eligible_topic_internal->members, j) { + rd_kafka_group_member_t *member = + &eligible_topic->members[j]; + + member->rkgm_subscription = + member_internal->rkgm_subscription; + member->rkgm_assignment = + member_internal->rkgm_assignment; + member->rkgm_owned = member_internal->rkgm_owned; + member->rkgm_member_id = + RD_KAFKAP_STR_DUP(member_internal->rkgm_member_id); + if (!RD_KAFKAP_STR_IS_NULL( + member_internal->rkgm_group_instance_id)) { + member->rkgm_group_instance_id = + RD_KAFKAP_STR_DUP( + member_internal + ->rkgm_group_instance_id); + } + if (member_internal->rkgm_userdata != NULL) { + member->rkgm_userdata = + rd_kafka_member_userdata_serialized_new( + member_internal->rkgm_userdata->data, + member_internal->rkgm_userdata->len); + } else { + member->rkgm_userdata = + rd_kafka_member_userdata_serialized_new( + NULL, 0); + } + member->rkgm_generation = + member_internal->rkgm_generation; + } + eligible_topic->member_cnt = + rd_list_cnt(&eligible_topic_internal->members); + } + + rd_kafka_group_member_t *members = + rd_calloc(member_cnt, sizeof(rd_kafka_group_member_t)); + for (i = 0; i < member_cnt; i++) { + rd_kafka_group_member_t *member = &members[i]; + rd_kafka_group_member_internal_t *member_internal = + &members_internal[i]; + + member->rkgm_subscription = member_internal->rkgm_subscription; + member->rkgm_assignment = member_internal->rkgm_assignment; + member->rkgm_owned = member_internal->rkgm_owned; + member->rkgm_member_id = + RD_KAFKAP_STR_DUP(member_internal->rkgm_member_id); + if (!RD_KAFKAP_STR_IS_NULL( + member_internal->rkgm_group_instance_id)) { + member->rkgm_group_instance_id = RD_KAFKAP_STR_DUP( + member_internal->rkgm_group_instance_id); + } + if (member_internal->rkgm_userdata != NULL) { + member->rkgm_userdata = + rd_kafka_member_userdata_serialized_new( + member_internal->rkgm_userdata->data, + member_internal->rkgm_userdata->len); + } else { + member->rkgm_userdata = + rd_kafka_member_userdata_serialized_new(NULL, 0); + } + member->rkgm_generation = member_internal->rkgm_generation; + } + /* Call assignors assign callback */ err = rkas->rkas_assign_cb( - rkcg->rkcg_rk, rkas, rkcg->rkcg_member_id->str, metadata, members, - member_cnt, (rd_kafka_assignor_topic_t **)eligible_topics.rl_elems, - eligible_topics.rl_cnt, errstr, errstr_size, rkas->rkas_opaque); + rkcg->rkcg_rk, rkas->rkas_opaque, rkcg->rkcg_member_id->str, + metadata, members, member_cnt, eligible_topics, + eligible_topics_internal.rl_cnt, errstr, errstr_size); + + /* Recover assignment (member->rkgm_assignment) for the caller. + * NOTE: members array might have been manipulated, we can't rely on + * order */ + for (i = 0; i < member_cnt; i++) { + rd_kafka_group_member_t *member = &members[i]; + for (j = 0; j < member_cnt; j++) { + rd_kafka_group_member_internal_t *member_internal = + &members_internal[j]; + char *member_id; + RD_KAFKAP_STR_DUPA(&member_id, + member_internal->rkgm_member_id); + if (strcmp(member->rkgm_member_id, member_id) == 0) { + member_internal->rkgm_assignment = + member->rkgm_assignment; + break; + } + } + } if (err) { rd_kafka_dbg( @@ -373,16 +525,17 @@ rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, for (i = 0; i < member_cnt; i++) { const rd_kafka_group_member_t *member = &members[i]; - rd_kafka_dbg(rkcg->rkcg_rk, - CGRP | RD_KAFKA_DBG_ASSIGNOR, "ASSIGN", - " Member \"%.*s\"%s assigned " - "%d partition(s):", - RD_KAFKAP_STR_PR(member->rkgm_member_id), - !rd_kafkap_str_cmp(member->rkgm_member_id, - rkcg->rkcg_member_id) - ? " (me)" - : "", - member->rkgm_assignment->cnt); + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_ASSIGNOR, + "ASSIGN", + " Member \"%s\"%s assigned " + "%d partition(s):", + member->rkgm_member_id, + !rd_kafkap_str_cmp_str2(member->rkgm_member_id, + rkcg->rkcg_member_id) + ? " (me)" + : "", + member->rkgm_assignment->cnt); for (j = 0; j < member->rkgm_assignment->cnt; j++) { const rd_kafka_topic_partition_t *p = &member->rkgm_assignment->elems[j]; @@ -394,7 +547,35 @@ rd_kafka_resp_err_t rd_kafka_assignor_run(rd_kafka_cgrp_t *rkcg, } } - rd_list_destroy(&eligible_topics); + for (i = 0; i < member_cnt; i++) { + rd_kafka_group_member_t *member = &members[i]; + + rd_free((void *)member->rkgm_member_id); + rd_free((void *)member->rkgm_group_instance_id); + rd_kafka_member_userdata_serialized_destroy( + (rd_kafka_member_userdata_serialized_t *) + member->rkgm_userdata); + } + rd_free(members); + + for (i = 0; i < rd_list_cnt(&eligible_topics_internal); i++) { + rd_kafka_assignor_topic_t *eligible_topic = &eligible_topics[i]; + + for (j = 0; j < (int)eligible_topic->member_cnt; j++) { + rd_kafka_group_member_t *member = + &eligible_topic->members[j]; + + rd_free((void *)member->rkgm_member_id); + rd_free((void *)member->rkgm_group_instance_id); + rd_kafka_member_userdata_serialized_destroy( + (rd_kafka_member_userdata_serialized_t *) + member->rkgm_userdata); + } + rd_free(eligible_topic->members); + } + rd_free(eligible_topics); + + rd_list_destroy(&eligible_topics_internal); return err; } @@ -467,29 +648,8 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( const char *protocol_type, const char *protocol_name, rd_kafka_rebalance_protocol_t rebalance_protocol, - rd_kafka_resp_err_t (*assign_cb)( - rd_kafka_t *rk, - const struct rd_kafka_assignor_s *rkas, - const char *member_id, - const rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, - size_t eligible_topic_cnt, - char *errstr, - size_t errstr_size, - void *opaque), - rd_kafkap_bytes_t *(*get_metadata_cb)( - const struct rd_kafka_assignor_s *rkas, - void *assignor_state, - const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions), - void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas, - void **assignor_state, - const rd_kafka_topic_partition_list_t *assignment, - const rd_kafkap_bytes_t *userdata, - const rd_kafka_consumer_group_metadata_t *rkcgm), - void (*destroy_state_cb)(void *assignor_state), + rd_kafka_assignor_assign_cb_t assign_cb, + rd_kafka_assignor_get_user_metadata_cb_t get_user_metadata_cb, int (*unittest_cb)(void), void *opaque) { rd_kafka_assignor_t *rkas; @@ -508,16 +668,14 @@ rd_kafka_resp_err_t rd_kafka_assignor_add( rkas = rd_calloc(1, sizeof(*rkas)); - rkas->rkas_protocol_name = rd_kafkap_str_new(protocol_name, -1); - rkas->rkas_protocol_type = rd_kafkap_str_new(protocol_type, -1); - rkas->rkas_protocol = rebalance_protocol; - rkas->rkas_assign_cb = assign_cb; - rkas->rkas_get_metadata_cb = get_metadata_cb; - rkas->rkas_on_assignment_cb = on_assignment_cb; - rkas->rkas_destroy_state_cb = destroy_state_cb; - rkas->rkas_unittest = unittest_cb; - rkas->rkas_opaque = opaque; - rkas->rkas_index = INT_MAX; + rkas->rkas_protocol_name = rd_kafkap_str_new(protocol_name, -1); + rkas->rkas_protocol_type = rd_kafkap_str_new(protocol_type, -1); + rkas->rkas_protocol = rebalance_protocol; + rkas->rkas_assign_cb = assign_cb; + rkas->rkas_get_user_metadata_cb = get_user_metadata_cb; + rkas->rkas_unittest = unittest_cb; + rkas->rkas_opaque = opaque; + rkas->rkas_index = INT_MAX; rd_list_add(&rk->rk_conf.partition_assignors, rkas); @@ -546,6 +704,23 @@ static int rd_kafka_assignor_cmp_idx(const void *ptr1, const void *ptr2) { } +/* Initialize builtin assignors (ignore errors) */ +static void rd_kafka_assignors_init_registered(rd_kafka_t *rk) { + rd_kafka_assignor_t *rkas; + int i; + + mtx_lock(&rd_kafka_assignor_global_registry_lock); + RD_LIST_FOREACH(rkas, &rd_kafka_assignor_global_registry, i) { + rd_kafka_assignor_add(rk, rkas->rkas_protocol_type->str, + rkas->rkas_protocol_name->str, + rkas->rkas_protocol, rkas->rkas_assign_cb, + rkas->rkas_get_user_metadata_cb, + rkas->rkas_unittest, rkas->rkas_opaque); + } + mtx_unlock(&rd_kafka_assignor_global_registry_lock); +} + + /** * Initialize assignor list based on configuration. */ @@ -557,10 +732,7 @@ int rd_kafka_assignors_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { rd_list_init(&rk->rk_conf.partition_assignors, 3, (void *)rd_kafka_assignor_destroy); - /* Initialize builtin assignors (ignore errors) */ - rd_kafka_range_assignor_init(rk); - rd_kafka_roundrobin_assignor_init(rk); - rd_kafka_sticky_assignor_init(rk); + rd_kafka_assignors_init_registered(rk); rd_strdupa(&wanted, rk->rk_conf.partition_assignment_strategy); @@ -635,6 +807,61 @@ void rd_kafka_assignors_term(rd_kafka_t *rk) { } +rd_kafka_resp_err_t rd_kafka_assignor_register( + const char *protocol_name, + rd_kafka_rebalance_protocol_t rebalance_protocol, + rd_kafka_assignor_assign_cb_t assign_cb, + rd_kafka_assignor_get_user_metadata_cb_t get_user_metadata_cb, + void *opaque) { + rd_kafka_assignor_global_init(); + + return rd_kafka_assignor_register_internal( + protocol_name, rebalance_protocol, assign_cb, get_user_metadata_cb, + NULL, opaque); +} + + +rd_kafka_resp_err_t rd_kafka_assignor_register_internal( + const char *protocol_name, + rd_kafka_rebalance_protocol_t rebalance_protocol, + rd_kafka_assignor_assign_cb_t assign_cb, + rd_kafka_assignor_get_user_metadata_cb_t get_user_metadata_cb, + int (*unittest_cb)(void), + void *opaque) { + if (rebalance_protocol != RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE && + rebalance_protocol != RD_KAFKA_REBALANCE_PROTOCOL_EAGER) + return RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL; + + if (assign_cb == NULL) { + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + mtx_lock(&rd_kafka_assignor_global_registry_lock); + + if ((rd_list_find(&rd_kafka_assignor_global_registry, protocol_name, + rd_kafka_assignor_cmp_str))) { + mtx_unlock(&rd_kafka_assignor_global_registry_lock); + return RD_KAFKA_RESP_ERR__CONFLICT; + } + + rd_kafka_assignor_t *rkas; + + rkas = rd_calloc(1, sizeof(*rkas)); + + rkas->rkas_protocol_name = rd_kafkap_str_new(protocol_name, -1); + rkas->rkas_protocol_type = rd_kafkap_str_new("consumer", -1); + rkas->rkas_protocol = rebalance_protocol; + rkas->rkas_assign_cb = assign_cb; + rkas->rkas_get_user_metadata_cb = get_user_metadata_cb; + rkas->rkas_unittest = unittest_cb; + rkas->rkas_opaque = opaque; + + rd_list_add(&rd_kafka_assignor_global_registry, rkas); + + mtx_unlock(&rd_kafka_assignor_global_registry_lock); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} /** * @brief Unittest for assignors @@ -880,7 +1107,7 @@ static int ut_assignors(void) { for (i = 0; tests[i].name; i++) { int ie, it, im; rd_kafka_metadata_t metadata; - rd_kafka_group_member_t *members; + rd_kafka_group_member_internal_t *members; /* Create topic metadata */ metadata.topic_cnt = tests[i].topic_cnt; @@ -901,7 +1128,7 @@ static int ut_assignors(void) { memset(members, 0, sizeof(*members) * tests[i].member_cnt); for (im = 0; im < tests[i].member_cnt; im++) { - rd_kafka_group_member_t *rkgm = &members[im]; + rd_kafka_group_member_internal_t *rkgm = &members[im]; rkgm->rkgm_member_id = rd_kafkap_str_new(tests[i].members[im].name, -1); rkgm->rkgm_group_instance_id = @@ -954,7 +1181,8 @@ static int ut_assignors(void) { /* Verify assignments */ for (im = 0; im < tests[i].member_cnt; im++) { - rd_kafka_group_member_t *rkgm = &members[im]; + rd_kafka_group_member_internal_t *rkgm = + &members[im]; int ia; if (rkgm->rkgm_assignment->cnt != @@ -1034,8 +1262,8 @@ static int ut_assignors(void) { } for (im = 0; im < tests[i].member_cnt; im++) { - rd_kafka_group_member_t *rkgm = &members[im]; - rd_kafka_group_member_clear(rkgm); + rd_kafka_group_member_internal_t *rkgm = &members[im]; + rd_kafka_group_member_internal_clear(rkgm); } } diff --git a/src/rdkafka_assignor.h b/src/rdkafka_assignor.h index b90e7dc980..5f54dcf3f1 100644 --- a/src/rdkafka_assignor.h +++ b/src/rdkafka_assignor.h @@ -29,24 +29,13 @@ #define _RDKAFKA_ASSIGNOR_H_ - -/*! - * Enumerates the different rebalance protocol types. - * - * @sa rd_kafka_rebalance_protocol() - */ -typedef enum rd_kafka_rebalance_protocol_t { - RD_KAFKA_REBALANCE_PROTOCOL_NONE, /**< Rebalance protocol is - unknown */ - RD_KAFKA_REBALANCE_PROTOCOL_EAGER, /**< Eager rebalance - protocol */ - RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE /**< Cooperative - rebalance protocol*/ -} rd_kafka_rebalance_protocol_t; +void rd_kafka_assignor_global_init(void); - -typedef struct rd_kafka_group_member_s { +/** + * Internal view of rd_kafka_group_member_t + */ +typedef struct rd_kafka_group_member_internal_s { /** Subscribed topics (partition field is ignored). */ rd_kafka_topic_partition_list_t *rkgm_subscription; /** Partitions assigned to this member after running the assignor. @@ -69,29 +58,26 @@ typedef struct rd_kafka_group_member_s { rd_kafkap_bytes_t *rkgm_member_metadata; /** Group generation id. */ int rkgm_generation; -} rd_kafka_group_member_t; +} rd_kafka_group_member_internal_t; - -int rd_kafka_group_member_cmp(const void *_a, const void *_b); +int rd_kafka_group_member_internal_cmp(const void *_a, const void *_b); int rd_kafka_group_member_find_subscription(rd_kafka_t *rk, const rd_kafka_group_member_t *rkgm, const char *topic); +int rd_kafka_group_member_cmp(const void *_a, const void *_b); /** - * Structure to hold metadata for a single topic and all its - * subscribing members. + * Internal view of rd_kafka_assignor_topic_t */ -typedef struct rd_kafka_assignor_topic_s { +typedef struct rd_kafka_assignor_topic_internal_s { const rd_kafka_metadata_topic_t *metadata; - rd_list_t members; /* rd_kafka_group_member_t * */ -} rd_kafka_assignor_topic_t; - + rd_list_t members; /* rd_kafka_group_member_internal_t * */ +} rd_kafka_assignor_topic_internal_t; int rd_kafka_assignor_topic_cmp(const void *_a, const void *_b); - typedef struct rd_kafka_assignor_s { rd_kafkap_str_t *rkas_protocol_type; rd_kafkap_str_t *rkas_protocol_name; @@ -103,33 +89,8 @@ typedef struct rd_kafka_assignor_s { rd_kafka_rebalance_protocol_t rkas_protocol; - rd_kafka_resp_err_t (*rkas_assign_cb)( - rd_kafka_t *rk, - const struct rd_kafka_assignor_s *rkas, - const char *member_id, - const rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, - size_t eligible_topic_cnt, - char *errstr, - size_t errstr_size, - void *opaque); - - rd_kafkap_bytes_t *(*rkas_get_metadata_cb)( - const struct rd_kafka_assignor_s *rkas, - void *assignor_state, - const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions); - - void (*rkas_on_assignment_cb)( - const struct rd_kafka_assignor_s *rkas, - void **assignor_state, - const rd_kafka_topic_partition_list_t *assignment, - const rd_kafkap_bytes_t *assignment_userdata, - const rd_kafka_consumer_group_metadata_t *rkcgm); - - void (*rkas_destroy_state_cb)(void *assignor_state); + rd_kafka_assignor_assign_cb_t rkas_assign_cb; + rd_kafka_assignor_get_user_metadata_cb_t rkas_get_user_metadata_cb; int (*rkas_unittest)(void); @@ -137,34 +98,22 @@ typedef struct rd_kafka_assignor_s { } rd_kafka_assignor_t; +rd_kafka_resp_err_t rd_kafka_assignor_register_internal( + const char *protocol_name, + rd_kafka_rebalance_protocol_t rebalance_protocol, + rd_kafka_assignor_assign_cb_t assign_cb, + rd_kafka_assignor_get_user_metadata_cb_t get_user_metadata_cb, + int (*unittest_cb)(void), + void *opaque); + + rd_kafka_resp_err_t rd_kafka_assignor_add( rd_kafka_t *rk, const char *protocol_type, const char *protocol_name, rd_kafka_rebalance_protocol_t rebalance_protocol, - rd_kafka_resp_err_t (*assign_cb)( - rd_kafka_t *rk, - const struct rd_kafka_assignor_s *rkas, - const char *member_id, - const rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, - size_t eligible_topic_cnt, - char *errstr, - size_t errstr_size, - void *opaque), - rd_kafkap_bytes_t *(*get_metadata_cb)( - const struct rd_kafka_assignor_s *rkas, - void *assignor_state, - const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions), - void (*on_assignment_cb)(const struct rd_kafka_assignor_s *rkas, - void **assignor_state, - const rd_kafka_topic_partition_list_t *assignment, - const rd_kafkap_bytes_t *userdata, - const rd_kafka_consumer_group_metadata_t *rkcgm), - void (*destroy_state_cb)(void *assignor_state), + rd_kafka_assignor_assign_cb_t assign_cb, + rd_kafka_assignor_get_user_metadata_cb_t get_user_metadata_cb, int (*unittest_cb)(void), void *opaque); @@ -174,25 +123,20 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new( size_t userdata_size, const rd_kafka_topic_partition_list_t *owned_partitions); -rd_kafkap_bytes_t *rd_kafka_assignor_get_metadata_with_empty_userdata( - const rd_kafka_assignor_t *rkas, - void *assignor_state, - const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions); - void rd_kafka_assignor_update_subscription( const rd_kafka_assignor_t *rkas, const rd_kafka_topic_partition_list_t *subscription); -rd_kafka_resp_err_t rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg, - const rd_kafka_assignor_t *rkas, - rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt, - char *errstr, - size_t errstr_size); +rd_kafka_resp_err_t +rd_kafka_assignor_run(struct rd_kafka_cgrp_s *rkcg, + const rd_kafka_assignor_t *rkas, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_internal_t *members, + int member_cnt, + char *errstr, + size_t errstr_size); rd_kafka_assignor_t *rd_kafka_assignor_find(rd_kafka_t *rk, const char *protocol); @@ -202,11 +146,12 @@ void rd_kafka_assignors_term(rd_kafka_t *rk); -void rd_kafka_group_member_clear(rd_kafka_group_member_t *rkgm); +void rd_kafka_group_member_internal_clear( + rd_kafka_group_member_internal_t *rkgm); -rd_kafka_resp_err_t rd_kafka_range_assignor_init(rd_kafka_t *rk); -rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_init(rd_kafka_t *rk); -rd_kafka_resp_err_t rd_kafka_sticky_assignor_init(rd_kafka_t *rk); +rd_kafka_resp_err_t rd_kafka_range_assignor_register(void); +rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_register(void); +rd_kafka_resp_err_t rd_kafka_sticky_assignor_register(void); #endif /* _RDKAFKA_ASSIGNOR_H_ */ diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index 4934130ac3..c5caf0a6e7 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -221,12 +221,12 @@ static void rd_kafka_cgrp_clear_wait_resp(rd_kafka_cgrp_t *rkcg, * @struct Auxillary glue type used for COOPERATIVE rebalance set operations. */ typedef struct PartitionMemberInfo_s { - const rd_kafka_group_member_t *member; + const rd_kafka_group_member_internal_t *member; rd_bool_t members_match; } PartitionMemberInfo_t; static PartitionMemberInfo_t * -PartitionMemberInfo_new(const rd_kafka_group_member_t *member, +PartitionMemberInfo_new(const rd_kafka_group_member_internal_t *member, rd_bool_t members_match) { PartitionMemberInfo_t *pmi; @@ -369,9 +369,6 @@ void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg) { rd_list_destroy(&rkcg->rkcg_toppars); rd_list_destroy(rkcg->rkcg_subscribed_topics); rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics); - if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb) - rkcg->rkcg_assignor->rkas_destroy_state_cb( - rkcg->rkcg_assignor_state); rd_free(rkcg); } @@ -1226,7 +1223,7 @@ static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...) { * else rkgm_assignment partitions will be collected. */ static map_toppar_member_info_t * -rd_kafka_collect_partitions(const rd_kafka_group_member_t *members, +rd_kafka_collect_partitions(const rd_kafka_group_member_internal_t *members, size_t member_cnt, size_t par_cnt, rd_bool_t collect_owned) { @@ -1240,7 +1237,7 @@ rd_kafka_collect_partitions(const rd_kafka_group_member_t *members, for (i = 0; i < member_cnt; i++) { size_t j; - const rd_kafka_group_member_t *rkgm = &members[i]; + const rd_kafka_group_member_internal_t *rkgm = &members[i]; const rd_kafka_topic_partition_list_t *toppars = collect_owned ? rkgm->rkgm_owned : rkgm->rkgm_assignment; @@ -1289,9 +1286,9 @@ rd_kafka_member_partitions_intersect(map_toppar_member_info_t *a, if (b_v == NULL) continue; - members_match = - a_v->member && b_v->member && - rd_kafka_group_member_cmp(a_v->member, b_v->member) == 0; + members_match = a_v->member && b_v->member && + rd_kafka_group_member_internal_cmp( + a_v->member, b_v->member) == 0; RD_MAP_SET(intersection, rd_kafka_topic_partition_copy(key), PartitionMemberInfo_new(b_v->member, members_match)); @@ -1343,7 +1340,7 @@ rd_kafka_member_partitions_subtract(map_toppar_member_info_t *a, */ static void rd_kafka_cooperative_protocol_adjust_assignment( rd_kafka_cgrp_t *rkcg, - rd_kafka_group_member_t *members, + rd_kafka_group_member_internal_t *members, int member_cnt) { /* https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafk\ @@ -1394,7 +1391,7 @@ static void rd_kafka_cooperative_protocol_adjust_assignment( (int)(RD_MAP_CNT(assigned) / member_cnt) + 4; for (i = 0; i < member_cnt; i++) { - rd_kafka_group_member_t *rkgm = &members[i]; + rd_kafka_group_member_internal_t *rkgm = &members[i]; rd_kafka_topic_partition_list_destroy(rkgm->rkgm_assignment); rkgm->rkgm_assignment = rd_kafka_topic_partition_list_new( @@ -1513,21 +1510,6 @@ static void rd_kafka_cgrp_handle_SyncGroup_memberstate( done: rd_kafka_cgrp_update_session_timeout(rkcg, rd_true /*reset timeout*/); - rd_assert(rkcg->rkcg_assignor); - if (rkcg->rkcg_assignor->rkas_on_assignment_cb) { - char *member_id; - RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id); - rd_kafka_consumer_group_metadata_t *cgmd = - rd_kafka_consumer_group_metadata_new_with_genid( - rkcg->rkcg_rk->rk_conf.group_id_str, - rkcg->rkcg_generation_id, member_id, - rkcg->rkcg_rk->rk_conf.group_instance_id); - rkcg->rkcg_assignor->rkas_on_assignment_cb( - rkcg->rkcg_assignor, &(rkcg->rkcg_assignor_state), - assignment, &UserData, cgmd); - rd_kafka_consumer_group_metadata_destroy(cgmd); - } - // FIXME: Remove when we're done debugging. rd_kafka_topic_partition_list_log(rkcg->rkcg_rk, "ASSIGNMENT", RD_KAFKA_DBG_CGRP, assignment); @@ -1656,12 +1638,13 @@ static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk, /** * @brief Run group assignment. */ -static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, - rd_kafka_assignor_t *rkas, - rd_kafka_resp_err_t err, - rd_kafka_metadata_t *metadata, - rd_kafka_group_member_t *members, - int member_cnt) { +static void +rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, + rd_kafka_assignor_t *rkas, + rd_kafka_resp_err_t err, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_internal_t *members, + int member_cnt) { char errstr[512]; if (err) { @@ -1760,7 +1743,7 @@ rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk, */ static int rd_kafka_group_MemberMetadata_consumer_read( rd_kafka_broker_t *rkb, - rd_kafka_group_member_t *rkgm, + rd_kafka_group_member_internal_t *rkgm, const rd_kafkap_bytes_t *MemberMetadata) { rd_kafka_buf_t *rkbuf; @@ -1893,12 +1876,7 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk, "Unsupported assignment strategy \"%s\"", protocol_name); if (rkcg->rkcg_assignor) { - if (rkcg->rkcg_assignor->rkas_destroy_state_cb) - rkcg->rkcg_assignor - ->rkas_destroy_state_cb( - rkcg->rkcg_assignor_state); - rkcg->rkcg_assignor_state = NULL; - rkcg->rkcg_assignor = NULL; + rkcg->rkcg_assignor = NULL; } ErrorCode = RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL; } @@ -1930,16 +1908,10 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk, goto err; } - if (rkcg->rkcg_assignor && rkcg->rkcg_assignor != rkas) { - if (rkcg->rkcg_assignor->rkas_destroy_state_cb) - rkcg->rkcg_assignor->rkas_destroy_state_cb( - rkcg->rkcg_assignor_state); - rkcg->rkcg_assignor_state = NULL; - } rkcg->rkcg_assignor = rkas; if (i_am_leader) { - rd_kafka_group_member_t *members; + rd_kafka_group_member_internal_t *members; int i; int sub_cnt = 0; rd_list_t topics; @@ -1961,7 +1933,7 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk, for (i = 0; i < member_cnt; i++) { rd_kafkap_str_t MemberId; rd_kafkap_bytes_t MemberMetadata; - rd_kafka_group_member_t *rkgm; + rd_kafka_group_member_internal_t *rkgm; rd_kafkap_str_t GroupInstanceId = RD_KAFKAP_STR_INITIALIZER; @@ -3894,7 +3866,7 @@ static void rd_kafka_cgrp_group_leader_reset(rd_kafka_cgrp_t *rkcg, int i; for (i = 0; i < rkcg->rkcg_group_leader.member_cnt; i++) - rd_kafka_group_member_clear( + rd_kafka_group_member_internal_clear( &rkcg->rkcg_group_leader.members[i]); rkcg->rkcg_group_leader.member_cnt = 0; rd_free(rkcg->rkcg_group_leader.members); @@ -5714,8 +5686,8 @@ static int unittest_set_intersect(void) { char *id = "id"; rd_kafkap_str_t id1 = RD_KAFKAP_STR_INITIALIZER; rd_kafkap_str_t id2 = RD_KAFKAP_STR_INITIALIZER; - rd_kafka_group_member_t *gm1; - rd_kafka_group_member_t *gm2; + rd_kafka_group_member_internal_t *gm1; + rd_kafka_group_member_internal_t *gm2; id1.len = 2; id1.str = id; diff --git a/src/rdkafka_cgrp.h b/src/rdkafka_cgrp.h index 1d0d0cacde..c37b2b6514 100644 --- a/src/rdkafka_cgrp.h +++ b/src/rdkafka_cgrp.h @@ -117,7 +117,7 @@ typedef struct rd_kafka_cgrp_s { /* State when group leader */ struct { - rd_kafka_group_member_t *members; + rd_kafka_group_member_internal_t *members; int member_cnt; } rkcg_group_leader; @@ -184,8 +184,6 @@ typedef struct rd_kafka_cgrp_s { rd_kafka_assignor_t *rkcg_assignor; /**< The current partition * assignor. used by both * leader and members. */ - void *rkcg_assignor_state; /**< current partition - * assignor state */ int32_t rkcg_coord_id; /**< Current coordinator id, * or -1 if not known. */ diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 2d889e09dc..fcd43763f0 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -3145,7 +3145,7 @@ int rd_kafka_topic_partition_list_del( * On match, if rktpar is a regex pattern then 'matched_by_regex' is set to 1. */ int rd_kafka_topic_partition_match(rd_kafka_t *rk, - const rd_kafka_group_member_t *rkgm, + const char *group_member_id, const rd_kafka_topic_partition_t *rktpar, const char *topic, int *matched_by_regex) { @@ -3159,9 +3159,8 @@ int rd_kafka_topic_partition_match(rd_kafka_t *rk, if (ret == -1) { rd_kafka_dbg(rk, CGRP, "SUBMATCH", "Invalid regex for member " - "\"%.*s\" subscription \"%s\": %s", - RD_KAFKAP_STR_PR(rkgm->rkgm_member_id), - rktpar->topic, errstr); + "\"%s\" subscription \"%s\": %s", + group_member_id, rktpar->topic, errstr); return 0; } diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index c86022e2a6..96451caad4 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -231,12 +231,13 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */ int32_t rktp_fetch_version; /* Op version of curr fetch. (broker thread) */ - enum { RD_KAFKA_TOPPAR_FETCH_NONE = 0, - RD_KAFKA_TOPPAR_FETCH_STOPPING, - RD_KAFKA_TOPPAR_FETCH_STOPPED, - RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY, - RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT, - RD_KAFKA_TOPPAR_FETCH_ACTIVE, + enum { + RD_KAFKA_TOPPAR_FETCH_NONE = 0, + RD_KAFKA_TOPPAR_FETCH_STOPPING, + RD_KAFKA_TOPPAR_FETCH_STOPPED, + RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY, + RD_KAFKA_TOPPAR_FETCH_OFFSET_WAIT, + RD_KAFKA_TOPPAR_FETCH_ACTIVE, } rktp_fetch_state; /* Broker thread's state */ #define RD_KAFKA_TOPPAR_FETCH_IS_STARTED(fetch_state) \ @@ -626,7 +627,7 @@ void rd_kafka_topic_partition_list_add_list( (RKTPAR) >= &(TPLIST)->elems[0]; RKTPAR--) int rd_kafka_topic_partition_match(rd_kafka_t *rk, - const rd_kafka_group_member_t *rkgm, + const char *group_member_id, const rd_kafka_topic_partition_t *rktpar, const char *topic, int *matched_by_regex); diff --git a/src/rdkafka_range_assignor.c b/src/rdkafka_range_assignor.c index c83f1f1a44..7dd9d9f405 100644 --- a/src/rdkafka_range_assignor.c +++ b/src/rdkafka_range_assignor.c @@ -52,53 +52,54 @@ rd_kafka_resp_err_t rd_kafka_range_assignor_assign_cb(rd_kafka_t *rk, - const rd_kafka_assignor_t *rkas, + void *opaque, const char *member_id, const rd_kafka_metadata_t *metadata, rd_kafka_group_member_t *members, size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, + rd_kafka_assignor_topic_t *eligible_topics, size_t eligible_topic_cnt, char *errstr, - size_t errstr_size, - void *opaque) { + size_t errstr_size) { unsigned int ti; - int i; + unsigned int i; /* The range assignor works on a per-topic basis. */ for (ti = 0; ti < eligible_topic_cnt; ti++) { - rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti]; - int numPartitionsPerConsumer; - int consumersWithExtraPartition; + rd_kafka_assignor_topic_t *eligible_topic = + &eligible_topics[ti]; + unsigned int numPartitionsPerConsumer; + unsigned int consumersWithExtraPartition; /* For each topic, we lay out the available partitions in * numeric order and the consumers in lexicographic order. */ - rd_list_sort(&eligible_topic->members, - rd_kafka_group_member_cmp); + qsort(eligible_topic->members, eligible_topic->member_cnt, + sizeof(*eligible_topic->members), + rd_kafka_group_member_cmp); /* We then divide the number of partitions by the total number * of consumers to determine the number of partitions to assign * to each consumer. */ numPartitionsPerConsumer = eligible_topic->metadata->partition_cnt / - rd_list_cnt(&eligible_topic->members); + eligible_topic->member_cnt; /* If it does not evenly divide, then the first few consumers * will have one extra partition. */ consumersWithExtraPartition = eligible_topic->metadata->partition_cnt % - rd_list_cnt(&eligible_topic->members); + eligible_topic->member_cnt; rd_kafka_dbg(rk, CGRP, "ASSIGN", "range: Topic %s with %d partition(s) and " - "%d subscribing member(s)", + "%zu subscribing member(s)", eligible_topic->metadata->topic, eligible_topic->metadata->partition_cnt, - rd_list_cnt(&eligible_topic->members)); + eligible_topic->member_cnt); - for (i = 0; i < rd_list_cnt(&eligible_topic->members); i++) { + for (i = 0; i < eligible_topic->member_cnt; i++) { rd_kafka_group_member_t *rkgm = - rd_list_elem(&eligible_topic->members, i); + &eligible_topic->members[i]; int start = numPartitionsPerConsumer * i + RD_MIN(i, consumersWithExtraPartition); int length = @@ -111,7 +112,7 @@ rd_kafka_range_assignor_assign_cb(rd_kafka_t *rk, rd_kafka_dbg(rk, CGRP, "ASSIGN", "range: Member \"%s\": " "assigned topic %s partitions %d..%d", - rkgm->rkgm_member_id->str, + rkgm->rkgm_member_id, eligible_topic->metadata->topic, start, start + length - 1); rd_kafka_topic_partition_list_add_range( @@ -129,10 +130,8 @@ rd_kafka_range_assignor_assign_cb(rd_kafka_t *rk, /** * @brief Initialzie and add range assignor. */ -rd_kafka_resp_err_t rd_kafka_range_assignor_init(rd_kafka_t *rk) { - return rd_kafka_assignor_add( - rk, "consumer", "range", RD_KAFKA_REBALANCE_PROTOCOL_EAGER, - rd_kafka_range_assignor_assign_cb, - rd_kafka_assignor_get_metadata_with_empty_userdata, NULL, NULL, - NULL, NULL); +rd_kafka_resp_err_t rd_kafka_range_assignor_register(void) { + return rd_kafka_assignor_register_internal( + "range", RD_KAFKA_REBALANCE_PROTOCOL_EAGER, + rd_kafka_range_assignor_assign_cb, NULL, NULL, NULL); } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 5b2290b0de..c4578da895 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -1409,9 +1409,9 @@ rd_kafka_OffsetDeleteRequest(rd_kafka_broker_t *rkb, * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to * enveloping buffer \p rkbuf. */ -static void -rd_kafka_group_MemberState_consumer_write(rd_kafka_buf_t *env_rkbuf, - const rd_kafka_group_member_t *rkgm) { +static void rd_kafka_group_MemberState_consumer_write( + rd_kafka_buf_t *env_rkbuf, + const rd_kafka_group_member_internal_t *rkgm) { rd_kafka_buf_t *rkbuf; rd_slice_t slice; @@ -1438,16 +1438,17 @@ rd_kafka_group_MemberState_consumer_write(rd_kafka_buf_t *env_rkbuf, /** * Send SyncGroupRequest */ -void rd_kafka_SyncGroupRequest(rd_kafka_broker_t *rkb, - const rd_kafkap_str_t *group_id, - int32_t generation_id, - const rd_kafkap_str_t *member_id, - const rd_kafkap_str_t *group_instance_id, - const rd_kafka_group_member_t *assignments, - int assignment_cnt, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { +void rd_kafka_SyncGroupRequest( + rd_kafka_broker_t *rkb, + const rd_kafkap_str_t *group_id, + int32_t generation_id, + const rd_kafkap_str_t *member_id, + const rd_kafkap_str_t *group_instance_id, + const rd_kafka_group_member_internal_t *assignments, + int assignment_cnt, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { rd_kafka_buf_t *rkbuf; int i; int16_t ApiVersion; @@ -1471,7 +1472,7 @@ void rd_kafka_SyncGroupRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_write_i32(rkbuf, assignment_cnt); for (i = 0; i < assignment_cnt; i++) { - const rd_kafka_group_member_t *rkgm = &assignments[i]; + const rd_kafka_group_member_internal_t *rkgm = &assignments[i]; rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id); rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm); @@ -1534,15 +1535,31 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt); RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) { + rd_kafka_member_userdata_serialized_t *member_userdata = NULL; rd_kafkap_bytes_t *member_metadata; + char *member_id_str; + if (!rkas->rkas_enabled) continue; + RD_KAFKAP_STR_DUPA(&member_id_str, member_id); rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name); - member_metadata = rkas->rkas_get_metadata_cb( - rkas, rk->rk_cgrp->rkcg_assignor_state, topics, - rk->rk_cgrp->rkcg_group_assignment); + if (rkas->rkas_get_user_metadata_cb != NULL) { + member_userdata = rkas->rkas_get_user_metadata_cb( + rkas->rkas_opaque, member_id_str, + rk->rk_cgrp->rkcg_group_assignment, + rk->rk_cgrp->rkcg_generation_id); + } + if (member_userdata == NULL) { + member_userdata = + rd_kafka_member_userdata_serialized_new(NULL, 0); + } + member_metadata = + rd_kafka_consumer_protocol_member_metadata_new( + topics, member_userdata->data, member_userdata->len, + rk->rk_cgrp->rkcg_group_assignment); rd_kafka_buf_write_kbytes(rkbuf, member_metadata); rd_kafkap_bytes_destroy(member_metadata); + rd_kafka_member_userdata_serialized_destroy(member_userdata); } rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index 1c2675d51b..828079c965 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -175,16 +175,17 @@ void rd_kafka_handle_LeaveGroup(rd_kafka_t *rk, rd_kafka_buf_t *request, void *opaque); -void rd_kafka_SyncGroupRequest(rd_kafka_broker_t *rkb, - const rd_kafkap_str_t *group_id, - int32_t generation_id, - const rd_kafkap_str_t *member_id, - const rd_kafkap_str_t *group_instance_id, - const rd_kafka_group_member_t *assignments, - int assignment_cnt, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque); +void rd_kafka_SyncGroupRequest( + rd_kafka_broker_t *rkb, + const rd_kafkap_str_t *group_id, + int32_t generation_id, + const rd_kafkap_str_t *member_id, + const rd_kafkap_str_t *group_instance_id, + const rd_kafka_group_member_internal_t *assignments, + int assignment_cnt, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque); void rd_kafka_handle_SyncGroup(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, diff --git a/src/rdkafka_roundrobin_assignor.c b/src/rdkafka_roundrobin_assignor.c index 6cb9193645..fbb476281d 100644 --- a/src/rdkafka_roundrobin_assignor.c +++ b/src/rdkafka_roundrobin_assignor.c @@ -51,28 +51,28 @@ rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_assign_cb( rd_kafka_t *rk, - const rd_kafka_assignor_t *rkas, + void *opaque, const char *member_id, const rd_kafka_metadata_t *metadata, rd_kafka_group_member_t *members, size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, + rd_kafka_assignor_topic_t *eligible_topics, size_t eligible_topic_cnt, char *errstr, - size_t errstr_size, - void *opaque) { + size_t errstr_size) { unsigned int ti; int next = -1; /* Next member id */ /* Sort topics by name */ - qsort(eligible_topics, eligible_topic_cnt, sizeof(*eligible_topics), - rd_kafka_assignor_topic_cmp); + qsort(eligible_topics, eligible_topic_cnt, + sizeof(rd_kafka_assignor_topic_t), rd_kafka_assignor_topic_cmp); /* Sort members by name */ qsort(members, member_cnt, sizeof(*members), rd_kafka_group_member_cmp); for (ti = 0; ti < eligible_topic_cnt; ti++) { - rd_kafka_assignor_topic_t *eligible_topic = eligible_topics[ti]; + rd_kafka_assignor_topic_t *eligible_topic = + &eligible_topics[ti]; int partition; /* For each topic+partition, assign one member (in a cyclic @@ -95,7 +95,7 @@ rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_assign_cb( rd_kafka_dbg(rk, CGRP, "ASSIGN", "roundrobin: Member \"%s\": " "assigned topic %s partition %d", - rkgm->rkgm_member_id->str, + rkgm->rkgm_member_id, eligible_topic->metadata->topic, partition); @@ -114,10 +114,8 @@ rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_assign_cb( /** * @brief Initialzie and add roundrobin assignor. */ -rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_init(rd_kafka_t *rk) { - return rd_kafka_assignor_add( - rk, "consumer", "roundrobin", RD_KAFKA_REBALANCE_PROTOCOL_EAGER, - rd_kafka_roundrobin_assignor_assign_cb, - rd_kafka_assignor_get_metadata_with_empty_userdata, NULL, NULL, - NULL, NULL); +rd_kafka_resp_err_t rd_kafka_roundrobin_assignor_register(void) { + return rd_kafka_assignor_register_internal( + "roundrobin", RD_KAFKA_REBALANCE_PROTOCOL_EAGER, + rd_kafka_roundrobin_assignor_assign_cb, NULL, NULL, NULL); } diff --git a/src/rdkafka_sticky_assignor.c b/src/rdkafka_sticky_assignor.c index d0a6e03167..54f524c83b 100644 --- a/src/rdkafka_sticky_assignor.c +++ b/src/rdkafka_sticky_assignor.c @@ -49,14 +49,6 @@ */ -/** @brief Assignor state from last rebalance */ -typedef struct rd_kafka_sticky_assignor_state_s { - rd_kafka_topic_partition_list_t *prev_assignment; - int32_t generation_id; -} rd_kafka_sticky_assignor_state_t; - - - /** * Auxilliary glue types */ @@ -151,6 +143,9 @@ static int ConsumerGenerationPair_cmp_generation(const void *_a, * Where the keytype and valuetype are spoken names of the types and * not the specific C types (since that'd be too long). */ +typedef RD_MAP_TYPE(const char *, const rd_kafka_topic_partition_list_t *) + map_str_toppar_list_const_t; + typedef RD_MAP_TYPE(const char *, rd_kafka_topic_partition_list_t *) map_str_toppar_list_t; @@ -1126,7 +1121,7 @@ static void prepopulateCurrentAssignments( rd_kafka_t *rk, rd_kafka_group_member_t *members, size_t member_cnt, - map_str_toppar_list_t *subscriptions, + map_str_toppar_list_const_t *subscriptions, map_str_toppar_list_t *currentAssignment, map_toppar_cgpair_t *prevAssignment, map_toppar_str_t *currentPartitionConsumer, @@ -1159,14 +1154,14 @@ static void prepopulateCurrentAssignments( rd_kafka_group_member_t *consumer = &members[i]; int j; - RD_MAP_SET(subscriptions, consumer->rkgm_member_id->str, + RD_MAP_SET(subscriptions, consumer->rkgm_member_id, consumer->rkgm_subscription); - RD_MAP_SET(currentAssignment, consumer->rkgm_member_id->str, + RD_MAP_SET(currentAssignment, consumer->rkgm_member_id, rd_kafka_topic_partition_list_new(10)); RD_MAP_SET(consumer2AllPotentialPartitions, - consumer->rkgm_member_id->str, + consumer->rkgm_member_id, rd_kafka_topic_partition_list_new( (int)estimated_partition_cnt)); @@ -1184,28 +1179,27 @@ static void prepopulateCurrentAssignments( rd_list_find( consumers, &consumer->rkgm_generation, ConsumerGenerationPair_cmp_generation)) { - rd_kafka_log( - rk, LOG_WARNING, "STICKY", - "Sticky assignor: " - "%s [%" PRId32 - "] is assigned to " - "multiple consumers with same " - "generation %d: " - "skipping member %.*s", - partition->topic, partition->partition, - consumer->rkgm_generation, - RD_KAFKAP_STR_PR(consumer->rkgm_member_id)); + rd_kafka_log(rk, LOG_WARNING, "STICKY", + "Sticky assignor: " + "%s [%" PRId32 + "] is assigned to " + "multiple consumers with same " + "generation %d: " + "skipping member %s", + partition->topic, + partition->partition, + consumer->rkgm_generation, + consumer->rkgm_member_id); continue; } - rd_list_add(consumers, - ConsumerGenerationPair_new( - consumer->rkgm_member_id->str, - consumer->rkgm_generation)); + rd_list_add(consumers, ConsumerGenerationPair_new( + consumer->rkgm_member_id, + consumer->rkgm_generation)); RD_MAP_SET(currentPartitionConsumer, rd_kafka_topic_partition_copy(partition), - consumer->rkgm_member_id->str); + consumer->rkgm_member_id); } } @@ -1252,8 +1246,7 @@ populatePotentialMaps(const rd_kafka_assignor_topic_t *atopic, map_toppar_list_t *partition2AllPotentialConsumers, map_str_toppar_list_t *consumer2AllPotentialPartitions, size_t estimated_partition_cnt) { - int i; - const rd_kafka_group_member_t *rkgm; + size_t i; /* for each eligible (subscribed and available) topic (\p atopic): * for each member subscribing to that topic: @@ -1263,8 +1256,9 @@ populatePotentialMaps(const rd_kafka_assignor_topic_t *atopic, * consumer2AllPotentialPartitions */ - RD_LIST_FOREACH(rkgm, &atopic->members, i) { - const char *consumer = rkgm->rkgm_member_id->str; + for (i = 0; i < atopic->member_cnt; i++) { + const rd_kafka_group_member_t *rkgm = &atopic->members[i]; + const char *consumer = rkgm->rkgm_member_id; rd_kafka_topic_partition_list_t *partitions = RD_MAP_GET(consumer2AllPotentialPartitions, consumer); int j; @@ -1561,7 +1555,7 @@ static void assignToMembers(map_str_toppar_list_t *currentAssignment, for (i = 0; i < member_cnt; i++) { rd_kafka_group_member_t *rkgm = &members[i]; const rd_kafka_topic_partition_list_t *partitions = - RD_MAP_GET(currentAssignment, rkgm->rkgm_member_id->str); + RD_MAP_GET(currentAssignment, rkgm->rkgm_member_id); if (rkgm->rkgm_assignment) rd_kafka_topic_partition_list_destroy( rkgm->rkgm_assignment); @@ -1578,21 +1572,20 @@ static void assignToMembers(map_str_toppar_list_t *currentAssignment, */ rd_kafka_resp_err_t rd_kafka_sticky_assignor_assign_cb(rd_kafka_t *rk, - const rd_kafka_assignor_t *rkas, + void *opaque, const char *member_id, const rd_kafka_metadata_t *metadata, rd_kafka_group_member_t *members, size_t member_cnt, - rd_kafka_assignor_topic_t **eligible_topics, + rd_kafka_assignor_topic_t *eligible_topics, size_t eligible_topic_cnt, char *errstr, - size_t errstr_size, - void *opaque) { + size_t errstr_size) { /* FIXME: Let the cgrp pass the actual eligible partition count */ size_t partition_cnt = member_cnt * 10; /* FIXME */ /* Map of subscriptions. This is \p member turned into a map. */ - map_str_toppar_list_t subscriptions = + map_str_toppar_list_const_t subscriptions = RD_MAP_INITIALIZER(member_cnt, rd_map_str_cmp, rd_map_str_hash, NULL /* refs members.rkgm_member_id */, NULL /* refs members.rkgm_subscription */); @@ -1665,7 +1658,7 @@ rd_kafka_sticky_assignor_assign_cb(rd_kafka_t *rk, * consumer2AllPotentialPartitions maps by each eligible topic. */ for (i = 0; i < (int)eligible_topic_cnt; i++) populatePotentialMaps( - eligible_topics[i], &partition2AllPotentialConsumers, + &eligible_topics[i], &partition2AllPotentialConsumers, &consumer2AllPotentialPartitions, partition_cnt); @@ -1811,96 +1804,6 @@ rd_kafka_sticky_assignor_assign_cb(rd_kafka_t *rk, -/** @brief FIXME docstring */ -static void rd_kafka_sticky_assignor_on_assignment_cb( - const rd_kafka_assignor_t *rkas, - void **assignor_state, - const rd_kafka_topic_partition_list_t *partitions, - const rd_kafkap_bytes_t *assignment_userdata, - const rd_kafka_consumer_group_metadata_t *rkcgm) { - rd_kafka_sticky_assignor_state_t *state = - (rd_kafka_sticky_assignor_state_t *)*assignor_state; - - if (!state) - state = rd_calloc(1, sizeof(*state)); - else - rd_kafka_topic_partition_list_destroy(state->prev_assignment); - - state->prev_assignment = rd_kafka_topic_partition_list_copy(partitions); - state->generation_id = rkcgm->generation_id; - - *assignor_state = state; -} - -/** @brief FIXME docstring */ -static rd_kafkap_bytes_t *rd_kafka_sticky_assignor_get_metadata( - const rd_kafka_assignor_t *rkas, - void *assignor_state, - const rd_list_t *topics, - const rd_kafka_topic_partition_list_t *owned_partitions) { - rd_kafka_sticky_assignor_state_t *state; - rd_kafka_buf_t *rkbuf; - rd_kafkap_bytes_t *metadata; - rd_kafkap_bytes_t *kbytes; - size_t len; - - /* - * UserData (Version: 1) => [previous_assignment] generation - * previous_assignment => topic [partitions] - * topic => STRING - * partitions => partition - * partition => INT32 - * generation => INT32 - * - * If there is no previous assignment, UserData is NULL. - */ - - if (!assignor_state) { - return rd_kafka_consumer_protocol_member_metadata_new( - topics, NULL, 0, owned_partitions); - } - - state = (rd_kafka_sticky_assignor_state_t *)assignor_state; - - rkbuf = rd_kafka_buf_new(1, 100); - rd_assert(state->prev_assignment != NULL); - rd_kafka_buf_write_topic_partitions( - rkbuf, state->prev_assignment, rd_false /*skip invalid offsets*/, - rd_false /*any offset*/, rd_false /*write offsets*/, - rd_false /*write epoch*/, rd_false /*write metadata*/); - rd_kafka_buf_write_i32(rkbuf, state->generation_id); - - /* Get binary buffer and allocate a new Kafka Bytes with a copy. */ - rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf); - len = rd_slice_remains(&rkbuf->rkbuf_reader); - kbytes = rd_kafkap_bytes_new(NULL, (int32_t)len); - rd_slice_read(&rkbuf->rkbuf_reader, (void *)kbytes->data, len); - rd_kafka_buf_destroy(rkbuf); - - metadata = rd_kafka_consumer_protocol_member_metadata_new( - topics, kbytes->data, kbytes->len, owned_partitions); - - rd_kafkap_bytes_destroy(kbytes); - - return metadata; -} - - -/** - * @brief Destroy assignor state - */ -static void rd_kafka_sticky_assignor_state_destroy(void *assignor_state) { - rd_kafka_sticky_assignor_state_t *state = - (rd_kafka_sticky_assignor_state_t *)assignor_state; - - rd_assert(assignor_state); - - rd_kafka_topic_partition_list_destroy(state->prev_assignment); - rd_free(state); -} - - - /** * @name Sticky assignor unit tests * @@ -1920,7 +1823,7 @@ static void rd_kafka_sticky_assignor_state_destroy(void *assignor_state) { * its new assignment and including it in the next rebalance as its * owned-partitions. */ -static void ut_set_owned(rd_kafka_group_member_t *rkgm) { +static void ut_set_owned(rd_kafka_group_member_internal_t *rkgm) { if (rkgm->rkgm_owned) rd_kafka_topic_partition_list_destroy(rkgm->rkgm_owned); @@ -1937,7 +1840,7 @@ static void ut_set_owned(rd_kafka_group_member_t *rkgm) { static int verifyValidityAndBalance0(const char *func, int line, - rd_kafka_group_member_t *members, + rd_kafka_group_member_internal_t *members, size_t member_cnt, const rd_kafka_metadata_t *metadata) { int fails = 0; @@ -2059,7 +1962,7 @@ static int verifyValidityAndBalance0(const char *func, */ static int isFullyBalanced0(const char *function, int line, - const rd_kafka_group_member_t *members, + const rd_kafka_group_member_internal_t *members, size_t member_cnt) { int min_assignment = INT_MAX; int max_assignment = -1; @@ -2109,7 +2012,7 @@ ut_print_toppar_list(const rd_kafka_topic_partition_list_t *partitions) { */ static int verifyAssignment0(const char *function, int line, - rd_kafka_group_member_t *rkgm, + rd_kafka_group_member_internal_t *rkgm, ...) { va_list ap; int cnt = 0; @@ -2165,10 +2068,11 @@ static int verifyAssignment0(const char *function, * * va-args is a NULL-terminated list of (const char *) topics. * - * Use rd_kafka_group_member_clear() to free fields. + * Use rd_kafka_group_member_internal_clear() to free fields. */ -static void -ut_init_member(rd_kafka_group_member_t *rkgm, const char *member_id, ...) { +static void ut_init_member(rd_kafka_group_member_internal_t *rkgm, + const char *member_id, + ...) { va_list ap; const char *topic; @@ -2197,7 +2101,7 @@ static int ut_testOneConsumerNoTopic(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[1]; + rd_kafka_group_member_internal_t members[1]; metadata = rd_kafka_metadata_new_topic_mock(NULL, 0); ut_init_member(&members[0], "consumer1", "topic1", NULL); @@ -2211,7 +2115,7 @@ static int ut_testOneConsumerNoTopic(rd_kafka_t *rk, verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[0]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2223,7 +2127,7 @@ static int ut_testOneConsumerNonexistentTopic(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[1]; + rd_kafka_group_member_internal_t members[1]; metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 0); ut_init_member(&members[0], "consumer1", "topic1", NULL); @@ -2237,7 +2141,7 @@ static int ut_testOneConsumerNonexistentTopic(rd_kafka_t *rk, verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[0]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2250,7 +2154,7 @@ static int ut_testOneConsumerOneTopic(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[1]; + rd_kafka_group_member_internal_t members[1]; metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 3); ut_init_member(&members[0], "consumer1", "topic1", NULL); @@ -2269,7 +2173,7 @@ static int ut_testOneConsumerOneTopic(rd_kafka_t *rk, verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[0]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2283,7 +2187,7 @@ static int ut_testOnlyAssignsPartitionsFromSubscribedTopics( rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[1]; + rd_kafka_group_member_internal_t members[1]; metadata = rd_kafka_metadata_new_topic_mockv(2, "topic1", 3, "topic2", 3); @@ -2300,7 +2204,7 @@ static int ut_testOnlyAssignsPartitionsFromSubscribedTopics( verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[0]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2312,7 +2216,7 @@ static int ut_testOneConsumerMultipleTopics(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[1]; + rd_kafka_group_member_internal_t members[1]; metadata = rd_kafka_metadata_new_topic_mockv(2, "topic1", 1, "topic2", 2); @@ -2329,7 +2233,7 @@ static int ut_testOneConsumerMultipleTopics(rd_kafka_t *rk, verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[0]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2341,7 +2245,7 @@ ut_testTwoConsumersOneTopicOnePartition(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[2]; + rd_kafka_group_member_internal_t members[2]; metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 1); ut_init_member(&members[0], "consumer1", "topic1", NULL); @@ -2358,8 +2262,8 @@ ut_testTwoConsumersOneTopicOnePartition(rd_kafka_t *rk, verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); - rd_kafka_group_member_clear(&members[1]); + rd_kafka_group_member_internal_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[1]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2372,7 +2276,7 @@ ut_testTwoConsumersOneTopicTwoPartitions(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[2]; + rd_kafka_group_member_internal_t members[2]; metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 2); ut_init_member(&members[0], "consumer1", "topic1", NULL); @@ -2389,8 +2293,8 @@ ut_testTwoConsumersOneTopicTwoPartitions(rd_kafka_t *rk, verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); - rd_kafka_group_member_clear(&members[1]); + rd_kafka_group_member_internal_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[1]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2404,7 +2308,7 @@ static int ut_testMultipleConsumersMixedTopicSubscriptions( rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[3]; + rd_kafka_group_member_internal_t members[3]; metadata = rd_kafka_metadata_new_topic_mockv(2, "topic1", 3, "topic2", 2); @@ -2424,9 +2328,9 @@ static int ut_testMultipleConsumersMixedTopicSubscriptions( verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); - rd_kafka_group_member_clear(&members[1]); - rd_kafka_group_member_clear(&members[2]); + rd_kafka_group_member_internal_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[1]); + rd_kafka_group_member_internal_clear(&members[2]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2439,7 +2343,7 @@ ut_testTwoConsumersTwoTopicsSixPartitions(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[2]; + rd_kafka_group_member_internal_t members[2]; metadata = rd_kafka_metadata_new_topic_mockv(2, "topic1", 3, "topic2", 3); @@ -2459,8 +2363,8 @@ ut_testTwoConsumersTwoTopicsSixPartitions(rd_kafka_t *rk, verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); - rd_kafka_group_member_clear(&members[1]); + rd_kafka_group_member_internal_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[1]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2472,7 +2376,7 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[2]; + rd_kafka_group_member_internal_t members[2]; metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 3); ut_init_member(&members[0], "consumer1", "topic1", NULL); @@ -2515,8 +2419,8 @@ static int ut_testAddRemoveConsumerOneTopic(rd_kafka_t *rk, isFullyBalanced(&members[1], 1); // FIXME: isSticky(); - rd_kafka_group_member_clear(&members[0]); - rd_kafka_group_member_clear(&members[1]); + rd_kafka_group_member_internal_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[1]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2549,7 +2453,7 @@ ut_testPoorRoundRobinAssignmentScenario(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[4]; + rd_kafka_group_member_internal_t members[4]; metadata = rd_kafka_metadata_new_topic_mockv( 5, "topic1", 2, "topic2", 1, "topic3", 2, "topic4", 1, "topic5", 2); @@ -2576,10 +2480,10 @@ ut_testPoorRoundRobinAssignmentScenario(rd_kafka_t *rk, verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); - rd_kafka_group_member_clear(&members[1]); - rd_kafka_group_member_clear(&members[2]); - rd_kafka_group_member_clear(&members[3]); + rd_kafka_group_member_internal_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[1]); + rd_kafka_group_member_internal_clear(&members[2]); + rd_kafka_group_member_internal_clear(&members[3]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2592,7 +2496,7 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[2]; + rd_kafka_group_member_internal_t members[2]; metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 3); ut_init_member(&members[0], "consumer1", "topic1", "topic2", NULL); @@ -2651,8 +2555,8 @@ static int ut_testAddRemoveTopicTwoConsumers(rd_kafka_t *rk, isFullyBalanced(members, RD_ARRAYSIZE(members)); // FIXME: isSticky(); - rd_kafka_group_member_clear(&members[0]); - rd_kafka_group_member_clear(&members[1]); + rd_kafka_group_member_internal_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[1]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2665,7 +2569,7 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[19]; + rd_kafka_group_member_internal_t members[19]; int member_cnt = RD_ARRAYSIZE(members); rd_kafka_metadata_topic_t mt[19]; int topic_cnt = RD_ARRAYSIZE(mt); @@ -2709,7 +2613,7 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, /* * Remove consumer10. */ - rd_kafka_group_member_clear(&members[9]); + rd_kafka_group_member_internal_clear(&members[9]); memmove(&members[9], &members[10], sizeof(*members) * (member_cnt - 10)); member_cnt--; @@ -2722,7 +2626,7 @@ ut_testReassignmentAfterOneConsumerLeaves(rd_kafka_t *rk, // FIXME: isSticky(); for (i = 0; i < member_cnt; i++) - rd_kafka_group_member_clear(&members[i]); + rd_kafka_group_member_internal_clear(&members[i]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2735,7 +2639,7 @@ ut_testReassignmentAfterOneConsumerAdded(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[9]; + rd_kafka_group_member_internal_t members[9]; int member_cnt = RD_ARRAYSIZE(members); int i; @@ -2775,7 +2679,7 @@ ut_testReassignmentAfterOneConsumerAdded(rd_kafka_t *rk, // FIXME: isSticky(); for (i = 0; i < member_cnt; i++) - rd_kafka_group_member_clear(&members[i]); + rd_kafka_group_member_internal_clear(&members[i]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2787,7 +2691,7 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[9]; + rd_kafka_group_member_internal_t members[9]; int member_cnt = RD_ARRAYSIZE(members); rd_kafka_metadata_topic_t mt[15]; int topic_cnt = RD_ARRAYSIZE(mt); @@ -2825,7 +2729,7 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, /* * Remove consumer5 */ - rd_kafka_group_member_clear(&members[5]); + rd_kafka_group_member_internal_clear(&members[5]); memmove(&members[5], &members[6], sizeof(*members) * (member_cnt - 6)); member_cnt--; @@ -2837,7 +2741,7 @@ static int ut_testSameSubscriptions(rd_kafka_t *rk, // FIXME: isSticky(); for (i = 0; i < member_cnt; i++) - rd_kafka_group_member_clear(&members[i]); + rd_kafka_group_member_internal_clear(&members[i]); rd_kafka_metadata_destroy(metadata); rd_kafka_topic_partition_list_destroy(subscription); @@ -2852,7 +2756,7 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[200]; + rd_kafka_group_member_internal_t members[200]; int member_cnt = RD_ARRAYSIZE(members); rd_kafka_metadata_topic_t mt[40]; int topic_cnt = RD_ARRAYSIZE(mt); @@ -2898,7 +2802,7 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( * Remove every 4th consumer (~50) */ for (i = member_cnt - 1; i >= 0; i -= 4) { - rd_kafka_group_member_clear(&members[i]); + rd_kafka_group_member_internal_clear(&members[i]); memmove(&members[i], &members[i + 1], sizeof(*members) * (member_cnt - (i + 1))); member_cnt--; @@ -2912,7 +2816,7 @@ static int ut_testLargeAssignmentWithMultipleConsumersLeaving( // FIXME: isSticky(); for (i = 0; i < member_cnt; i++) - rd_kafka_group_member_clear(&members[i]); + rd_kafka_group_member_internal_clear(&members[i]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2924,7 +2828,7 @@ static int ut_testNewSubscription(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[3]; + rd_kafka_group_member_internal_t members[3]; int member_cnt = RD_ARRAYSIZE(members); int i; @@ -2974,7 +2878,7 @@ static int ut_testNewSubscription(rd_kafka_t *rk, // FIXME: isSticky(); for (i = 0; i < member_cnt; i++) - rd_kafka_group_member_clear(&members[i]); + rd_kafka_group_member_internal_clear(&members[i]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -2986,7 +2890,7 @@ static int ut_testMoveExistingAssignments(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[4]; + rd_kafka_group_member_internal_t members[4]; int member_cnt = RD_ARRAYSIZE(members); rd_kafka_topic_partition_list_t *assignments[4] = RD_ZERO_INIT; int i; @@ -3054,7 +2958,7 @@ static int ut_testMoveExistingAssignments(rd_kafka_t *rk, for (i = 0; i < member_cnt; i++) { - rd_kafka_group_member_clear(&members[i]); + rd_kafka_group_member_internal_clear(&members[i]); if (assignments[i]) rd_kafka_topic_partition_list_destroy(assignments[i]); } @@ -3069,7 +2973,7 @@ static int ut_testStickiness(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[3]; + rd_kafka_group_member_internal_t members[3]; int member_cnt = RD_ARRAYSIZE(members); int i; @@ -3112,7 +3016,7 @@ static int ut_testStickiness(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { for (i = 0; i < member_cnt; i++) - rd_kafka_group_member_clear(&members[i]); + rd_kafka_group_member_internal_clear(&members[i]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -3126,7 +3030,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[3]; + rd_kafka_group_member_internal_t members[3]; int member_cnt = RD_ARRAYSIZE(members); int i; @@ -3195,7 +3099,7 @@ static int ut_testStickiness2(rd_kafka_t *rk, const rd_kafka_assignor_t *rkas) { "topic1", 3, "topic1", 4, "topic1", 5, NULL); for (i = 0; i < member_cnt; i++) - rd_kafka_group_member_clear(&members[i]); + rd_kafka_group_member_internal_clear(&members[i]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -3208,7 +3112,7 @@ ut_testAssignmentUpdatedForDeletedTopic(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[1]; + rd_kafka_group_member_internal_t members[1]; metadata = rd_kafka_metadata_new_topic_mockv(2, "topic1", 1, "topic3", 100); @@ -3227,7 +3131,7 @@ ut_testAssignmentUpdatedForDeletedTopic(rd_kafka_t *rk, "Expected %d assigned partitions, not %d", 1 + 100, members[0].rkgm_assignment->cnt); - rd_kafka_group_member_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[0]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -3241,7 +3145,7 @@ static int ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted( rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[1]; + rd_kafka_group_member_internal_t members[1]; metadata = rd_kafka_metadata_new_topic_mockv(1, "topic1", 3); @@ -3269,7 +3173,7 @@ static int ut_testNoExceptionThrownWhenOnlySubscribedTopicDeleted( verifyValidityAndBalance(members, RD_ARRAYSIZE(members), metadata); isFullyBalanced(members, RD_ARRAYSIZE(members)); - rd_kafka_group_member_clear(&members[0]); + rd_kafka_group_member_internal_clear(&members[0]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -3282,7 +3186,7 @@ ut_testConflictingPreviousAssignments(rd_kafka_t *rk, rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_metadata_t *metadata; - rd_kafka_group_member_t members[2]; + rd_kafka_group_member_internal_t members[2]; int member_cnt = RD_ARRAYSIZE(members); int i; @@ -3330,7 +3234,7 @@ ut_testConflictingPreviousAssignments(rd_kafka_t *rk, /* FIXME: isSticky() */ for (i = 0; i < member_cnt; i++) - rd_kafka_group_member_clear(&members[i]); + rd_kafka_group_member_internal_clear(&members[i]); rd_kafka_metadata_destroy(metadata); RD_UT_PASS(); @@ -3415,12 +3319,9 @@ static int rd_kafka_sticky_assignor_unittest(void) { /** * @brief Initialzie and add sticky assignor. */ -rd_kafka_resp_err_t rd_kafka_sticky_assignor_init(rd_kafka_t *rk) { - return rd_kafka_assignor_add(rk, "consumer", "cooperative-sticky", - RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE, - rd_kafka_sticky_assignor_assign_cb, - rd_kafka_sticky_assignor_get_metadata, - rd_kafka_sticky_assignor_on_assignment_cb, - rd_kafka_sticky_assignor_state_destroy, - rd_kafka_sticky_assignor_unittest, NULL); +rd_kafka_resp_err_t rd_kafka_sticky_assignor_register(void) { + return rd_kafka_assignor_register_internal( + "cooperative-sticky", RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE, + rd_kafka_sticky_assignor_assign_cb, NULL, + rd_kafka_sticky_assignor_unittest, NULL); }