Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

assignor: API for a custom consumer group assignment strategy #3812

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

niamster
Copy link
Contributor

This addresses #2284

Exported API is sufficient enough to implement existing assignment strategies and allow users define their own.
This PR does not change how rebalance works, it reworks existing API so it can be accessible via <librdkafka/rdkafka.h>.
Some structures are split into "internal" and "public" to minimise the amount of changes. The conversion from one to another is done just before the callbacks are called.

Open questions:

  • Should this new API be somehow mangled as "experimental" by adding _v0 or _exp suffix to functions and structures?
  • Should callbacks receive a single structure as an argument to be able to extend it in the future w/o breaking compatibility?
  • I'm not fond of passing "opaque" object to the rd_kafka_assignor_register function. I would prefer to bass it via rd_kafka_t but I don't have a strong opinion.

NOTE: there are some changes related to formatting. I have setup my editor to automatically call clang-format on save with a given (default repository) configuration.

@@ -49,14 +49,6 @@
*/


/** @brief Assignor state from last rebalance */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this state is not used ... so I removed altogether to simplify "public" interface.

@niamster
Copy link
Contributor Author

niamster commented Apr 12, 2022

Interesting, 0120_asymmetric_subscription fails on windows in Appveyor, while it does not fail on linux (Travis CI).
Passes fine on my local linux setup

TEST 20220412145306 (bare, scenario default) SUMMARY
#==================================================================#
| <MAIN>                                   |     PASSED |  13.003s |
| 0120_asymmetric_subscription             |     PASSED |  12.020s |
#==================================================================#
[<MAIN>                      / 13.003s] 0 thread(s) in use by librdkafka
[<MAIN>                      / 13.003s]
============== ALL TESTS PASSED ==============
###
###  ./test-runner in bare mode PASSED! ###
###

uname -a
Linux 5.13.0-23-generic #23-Ubuntu SMP Fri Nov 26 11:41:15 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

and in the OSX env:

TEST 20220412145911 (bare, scenario default) SUMMARY
#==================================================================#
| <MAIN>                                   |     PASSED |  11.032s |
| 0120_asymmetric_subscription             |     PASSED |  10.090s |
#==================================================================#
[<MAIN>                      / 11.034s] 0 thread(s) in use by librdkafka
[<MAIN>                      / 11.034s]
============== ALL TESTS PASSED ==============
###
###  ./test-runner in bare mode PASSED! ###
###

Darwin 20.6.0 Darwin Kernel Version 20.6.0: Tue Feb 22 21:10:41 PST 2022; root:xnu-7195.141.26~1/RELEASE_X86_64 x86_64 i386 MacBookPro15,2 Darwin

EDIT: it does not pass in Appveyor only, it passes windows build in Travis CI

@niamster niamster force-pushed the dm/custom-assigner-fn-squashed branch from bf5680e to e8cb829 Compare April 12, 2022 14:27
@niamster
Copy link
Contributor Author

niamster commented Apr 12, 2022

This is fixed, commit is squashed, the issues was in this loop where I relied on the order, but the array might be manipulated by the caller (sort, etc.).

Why it was failing only in Appveyor 🤔 (I will try to dig deeper)? I'm wondering how we can add fuzziness to such tests to make such issues easily reproducible.

EDIT: I've found the root cause. In Appveyor (which uses MSVC) strndup("", 0) return NULL .... while in other env it returns "".
The fix (committed, squashed) is to check rkgm_group_instance_id for NULL using RD_KAFKAP_STR_IS_NULL(), so rd_kafka_assignor_topic_cmp properly sorts by member id (and ignores group_instance_id).

This raises a question: should rd_kafka_assignor_topic_cmp be fixed to enforce comparison of member_id if group_instance_id are equal 🤔 ?

@niamster niamster force-pushed the dm/custom-assigner-fn-squashed branch from e8cb829 to 066e7f6 Compare April 13, 2022 22:31
@niamster
Copy link
Contributor Author

niamster commented May 9, 2022

@edenhill are you interested in this PR?

@niamster niamster force-pushed the dm/custom-assigner-fn-squashed branch from 64c57df to df6c72a Compare October 13, 2022 15:22
@niamster
Copy link
Contributor Author

niamster commented Oct 13, 2022

I can't reproduce style check failure with clang format 14.0.6.

@niamster niamster force-pushed the dm/custom-assigner-fn-squashed branch from df6c72a to 52af26c Compare October 13, 2022 16:37
@scanterog
Copy link

@edenhill @emasab are there any chances to get feedback on this PR?

@emasab
Copy link
Contributor

emasab commented May 29, 2023

Hello @scanterog @niamster sorry for the long wait. I want to point out that with KIP-881 the range assignor is being changed to co-partitioned (same partition numbers to the same members, given topics with same partition count) and rack-aware, also with KIP-848 the range assignor will be co-partitioned, rack-aware and sticky. The assignment will be done by the group coordinator.

The two properties: co-partitioned and sticky allow to create joins in a way similar to what Kafka Streams does, with local cache.

Given this roadmap, would it be enough for Datadog's needs? If there's something else that cannot be done without a customer assignor please comment.

Also, I'd like to know from other people about kinds of assignors they're thinking to, other than the cases already listed in #2284

@scanterog
Copy link

scanterog commented Jun 1, 2023

Hi @emasab! Thanks for checking this.

We need a way to compute the assignment logic on the client side and as far as I understood KIP-848 supports that.

The group coordinator will either directly compute a new assignment with its server side assignor or delegate the assignment to a member of the group if a client-side assignor must be used.

Co-partitioned is great when you have a balanced traffic across all partitions. In our case, we want to compute the assignment in order to address the issue of imbalanced partitions. On the client-side assignor we can measure the load on each partition (by using external signals) and based on that distribute the load evenly across the group members which does not necessarily translate to the same number of partitions to each member.

@emasab
Copy link
Contributor

emasab commented Jun 9, 2023

Thanks, that's an interesting use case and you can't handle it with manual assignment because it's not static and that'll mean losing the "transactional" property of consumer group assignments.
KIP-848 supports custom assignors, we were asking the use case to estimate the priority. Explicitly triggering a rebalance isn't supported in librdkafka (KIP-568) and you'd have to unsubscribe and subscribe again, without static membership, in one of your consumers. Unless it's implemented too, together with a custom assignor.

@scanterog
Copy link

Please let us know if you need further information. If you can also provide the estimate you have came across that would useful for us, thanks again @emasab.

@cla-assistant
Copy link

cla-assistant bot commented Aug 21, 2023

CLA assistant check
All committers have signed the CLA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants