Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Support pluggable partition assignment algorithms for consumer #2284

Open
jeffwidman opened this issue Apr 18, 2019 · 19 comments

Comments

@jeffwidman
Copy link

Please add support for pluggable partition assignment algorithms for the consumers.

I was in a design review today where we will need this for a handful of upcoming services.

This was originally requested back in #1135.

Using this ticket to track progress on this feature.

@chris-zen
Copy link

I would like to implement a service like Kafka Streams Interactive Queries on top of librdkafka. After looking at the source code of both projects, the only critical part that I miss in librdkafka is the ability to provide a custom assignor to be able to manage standby replicas of local states.

According to @edenhill here, here and the code here, it is possible to configure custom assignors internally, but it is not exposed at the API level, and only the builtin range and roundrobin assignors are allowed.

I was wondering if there is any plan to implement it, how difficult would it be, or what are the chances to get mentoring for contributing.

@chris-zen
Copy link

Some more context on why is so relevant to be able to customise the assignor. This is the case for Kafka Streams, where there are many consumers with state, which makes rebalances quite expensive when not done properly. Currently Kafka Streams uses one assignor called Sticky that minimises state migration between instances (and also accounts for standby replicas).

There is a recent KIP-429 to make the protocol more incremental, that gives an interesting view on the current state and the relevance of the matter.

@edenhill
Copy link
Contributor

Let's look at making the assignor API public when we implement KIP-429 (et.al) to make sure all use-cases are covered.

@Buttered
Copy link

Is there any update on this feature?

@rhoban13
Copy link

rhoban13 commented Apr 3, 2020

I am also interested in using this feature. Is it likely the proposed api will be as @edenhill reference here. Is making this api public something you might accept a PR for, or is the above ref larger change doing to nullify such a PR?

@dshivashankar1994
Copy link

+1 for the feature

@edenhill
Copy link
Contributor

edenhill commented Jun 9, 2020

What is your use-case for supplying a custom partition assignor?

@dshivashankar1994
Copy link

I wanted to ensure that only one consumer is active for a consumer group. The topics i am using have only one partition and we recommend our users to create a single user per consumer group. But if by mistake there are more consumers joining, there is a problem.
Currently I am using RebalanceCb to handle things. Say a,b subscribe to set of 10 topics, then only one of the consumer will receive all the topics while the other will not start (fail). This works fine.

Problem is, consumer A subscribes to 2 topics, consumer B subscribes to 10 topics out of which 2 are overlapping. Not after the rebalanceCb, consumer A will start receiving in 2 topics and B in remaining 8 topics. B is not even aware that it is not receiving the messages because of A. To avoid this, I wanted to implement my own partition assignment algorithm.

@ChadJessup
Copy link

@edenhill - would you consider a PR implementing a public API for custom assignors? It looks like a similar pattern to custom Partitioners would work?

@edenhill
Copy link
Contributor

@ChadJessup I think a good start would be a PR containing only the public API (rdkafka.h), but I suggest to wait with that until the Incremental rebalancing PRs have been merged since there will be internal changes to the assignor APIs, see the incrreb branch.

@Sats4Dev
Copy link

Sats4Dev commented Feb 5, 2021

@edenhill I am also looking for partition assignor support - to implement Active/Passive (Hot-Hot) Consumers

My use case is - I have kafka a TOPIC with 6 partition, when first Consumer starts it receive all partition and consumes messages and process them. Now if second Consumer start in same consumer group.id then I dont want it to assign any Partition as first Active Consumer is up & processing messages. But, when first Active Consumer goes down then all partition should be assign to Second (or Next) Consumer and it will take over and start processing. Now, if use restarts first process/Consumer then it should go it stand by mode. I would like to achieve this by implementing Custom SingleConsumerGroup Partition Assigner (Use case something like mentioned here - Failover strategy: https://medium.com/streamthoughts/understanding-kafka-partition-assignment-strategies-and-how-to-write-your-own-custom-assignor-ebeda1fc06f3).

I see there are more use case have same/similar requirement where only single consumer in a consumer group should get all partition and other/new instances should sit idle. So, I think it will be best that this support is available as out of the box option along with existing 3 strategies RangeAssigner, RoundRobinAssigner & StickyAssigner something like SingleConsumerGroupAssigner.

@edenhill
Copy link
Contributor

edenhill commented Feb 8, 2021

I'd really like to add support for pluggable assignors, the problem is it is a big and fluent API surface area and with our API and ABI guarantees we only get one shot, we can't change it once it has been shipped.
So the way to make it extensible and future proof is to make all types private and provide accessor methods, which is a lot of tedious work.

Now, since this is sort of a niche feature which will initially only be used by advanced users (that are capable of writing an assignor), what we could perhaps do is expose the internal assignor API as an experimental API without API/ABI stability guarantees.
This would require you as a user to rebuild your assignor when you upgrade librdkafka, but maybe that's an acceptable alternative to move things forward here. The long term plan should be to add it as a proper public API.

What do you all think?

@niamster
Copy link
Contributor

Hi @edenhill

I have attempted to export minimal API needed to support custom assignors in #3812
Can you please take a look?

@look
Copy link

look commented Aug 23, 2022

My team is also interested in this feature (we use librdkafka via rdkafka in Rust). We would appreciate a review of @niamster's PR. Thank you!

@jgoulah
Copy link

jgoulah commented Oct 14, 2022

Hi @edenhill - we're hoping to get another look at this, the PR solution was updated to be in sync with the current master and passes all tests. Based on the likes in the original issue (which is now 3+ years old) there are at least 20+ people that are interested in this feature. Can you let us know what needs to happen to get this considered for merging? Thanks so much

@jgoulah
Copy link

jgoulah commented Feb 2, 2023

bump

@massive
Copy link

massive commented Jun 6, 2023

We are also interested in this.

@ciobion
Copy link

ciobion commented Jan 12, 2024

I am also interested in implementing a custom strategy.
My use case is: I have a consumer group consuming from more than one topic and I want the partitions assignment to be fair per topic.
Diving more, I have 3 consumer instances consuming from 3 topics: T1, T2, T3 each having 3 partitions P11, P12, ... P21, P22 ... P31, P32 ..., I don't want to end up with consumer instance C1 consuming just from one topic: C1(P11,P12,P13), C2(P21,P22,P23), C3(P31, P32, P33). I want to have Cx(P1x, P2x, P3x). The alternative is to create a consumer group per topic but I think that uses more resources.

@dtmistry
Copy link

+1

We already use a custom CooperativeStickyAssignor for our Java based consumers to support our deployment model. When a new version of a consuming service is released, it joins the same consumer group as the existing version. As traffic is ramped up for the new version, partitions are assigned based on the percentage of traffic handled by each version.

We would like to do the same for our Python consumers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests