Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not able to pause/resume consumtion #13

Open
yaronthurm opened this issue Oct 13, 2022 · 3 comments
Open

Not able to pause/resume consumtion #13

yaronthurm opened this issue Oct 13, 2022 · 3 comments

Comments

@yaronthurm
Copy link

Hello,
I was looking for a way to pause/resume consumption using the library but it seems that this is not supported.
I tried to achive this "manually" by using the underlying kafka consumer and the raw java API, but since the consumer is not thread safe, an ConcurrentModificationException exception is thrown when I attempt to call .pause().
It seems that the only way to achieve this is by calling pause/resume from within the poll loop but this loop is handled by the library itself and cannot be modified.

Sample Code

(let [consumer-source (source/source
                        (chan 10)
                        {:name       "test-consumer"
                         :brokers    "localhost:9092"
                         :topic      "test1"
                         :group-id   "group1"
                         :value-type :string
                         :shape      :value})
      ^KafkaConsumer kafka-consumer (:ketu.source/consumer consumer-source)]
  (Thread/sleep 1000) ; To allow consumer to start polling
  (try
    (.pause kafka-consumer (.assignment kafka-consumer))
    (catch Exception e
      (println e))
    (finally
      (source/stop! consumer-source))))

Output:

2022-10-13 10:12:25 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka version: 2.5.1  
2022-10-13 10:12:25 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka commitId: 0efa8fb0f4c73d92  
2022-10-13 10:12:25 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1665645145541  
2022-10-13 10:12:25 INFO - [ketu.async.source] [source=test-consumer] Start consumer thread  
2022-10-13 10:12:25 INFO - [o.a.k.clients.consumer.KafkaConsumer] [Consumer clientId=consumer-group1-25, groupId=group1] Subscribed to topic(s): test1  
2022-10-13 10:12:25 INFO - [org.apache.kafka.clients.Metadata] [Consumer clientId=consumer-group1-25, groupId=group1] Cluster ID: m86DoQ0mQZW-sSgHvQvrCA  
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] Discovered group coordinator 10.100.102.8:9092 (id: 2147483647 rack: null)  
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] (Re-)joining group  
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group  
2022-10-13 10:12:25 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-25, groupId=group1] (Re-)joining group  
#error {
 :cause KafkaConsumer is not safe for multi-threaded access
 :via
 [{:type java.util.ConcurrentModificationException
   :message KafkaConsumer is not safe for multi-threaded access
   :at [org.apache.kafka.clients.consumer.KafkaConsumer acquire KafkaConsumer.java 2421]}]
 :trace
 [[org.apache.kafka.clients.consumer.KafkaConsumer acquire KafkaConsumer.java 2421]
  [org.apache.kafka.clients.consumer.KafkaConsumer acquireAndEnsureOpen KafkaConsumer.java 2405]
  [org.apache.kafka.clients.consumer.KafkaConsumer assignment KafkaConsumer.java 899]
  [ketu.example$eval12819 invokeStatic example.clj 12]
  [ketu.example$eval12819 invoke example.clj 7]
  [clojure.lang.Compiler eval Compiler.java 7177]
  [clojure.lang.Compiler eval Compiler.java 7132]
  [clojure.core$eval invokeStatic core.clj 3214]
  [clojure.core$eval invoke core.clj 3210]
  [clojure.main$repl$read_eval_print__9086$fn__9089 invoke main.clj 437]
  [clojure.main$repl$read_eval_print__9086 invoke main.clj 437]
  [clojure.main$repl$fn__9095 invoke main.clj 458]
  [clojure.main$repl invokeStatic main.clj 458]
  [clojure.main$repl doInvoke main.clj 368]
  [clojure.lang.RestFn invoke RestFn.java 1523]
  [nrepl.middleware.interruptible_eval$evaluate invokeStatic interruptible_eval.clj 79]
  [nrepl.middleware.interruptible_eval$evaluate invoke interruptible_eval.clj 55]
  [nrepl.middleware.interruptible_eval$interruptible_eval$fn__923$fn__927 invoke interruptible_eval.clj 142]
  [clojure.lang.AFn run AFn.java 22]
  [nrepl.middleware.session$session_exec$main_loop__1024$fn__1028 invoke session.clj 171]
  [nrepl.middleware.session$session_exec$main_loop__1024 invoke session.clj 170]
  [clojure.lang.AFn run AFn.java 22]
  [java.lang.Thread run Thread.java 832]]}
@yaronthurm
Copy link
Author

yaronthurm commented Oct 13, 2022

One possible solution is to add support for a custom function that would be called from within the poll loop, allowing clients of the library to add custom logic (which will also enable my use case of pause/resume).

I have tested it locally and it seems to work.

Sample code of suggested version

Changes in poll loop inside source-existing-consumer fn (accept a function in opts and invoke it from the poll loop):

(defn- source-existing-consumer [^Consumer consumer out-chan opts]
  '...
  (let [custom-poll-fn (:ketu.source/on-poll-fn opts)]
    (while [@should-poll?]
      (let [records (poll!)]
        (when custom-poll-fn
          (custom-poll-fn {:consumer consumer}))
        (run! put! records))))
  '...)

Client code:

(let [pause-requested? (atom false)
      on-poll-fn (let [paused? (atom false)]
                   (fn [{:keys [consumer]}]
                     (when (and @pause-requested? (not @paused?))
                       (.pause consumer (.assignment consumer))
                       (reset! paused? true)
                       (println (str (java.time.LocalDateTime/now)) :paused))

                     (when (and (not @pause-requested?) @paused?)
                       (.resume consumer (.paused consumer))
                       (reset! paused? false)
                       (println (str (java.time.LocalDateTime/now)) :resumed))))
      consumer-source (source/source
                        (chan 10)
                        {:name       "test-consumer"
                         :brokers    "localhost:9092"
                         :topic      "test1"
                         :group-id   "group1"
                         :value-type :string
                         :shape      :value
                         :on-poll-fn on-poll-fn})]
  (Thread/sleep 1000) ; To allow consumer to start polling
  (reset! pause-requested? true)
  (Thread/sleep 1000)
  (reset! pause-requested? false)
  (Thread/sleep 1000)
  (source/stop! consumer-source))

Output:

2022-10-13 11:55:14 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka version: 2.5.1  
2022-10-13 11:55:14 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka commitId: 0efa8fb0f4c73d92  
2022-10-13 11:55:14 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1665651314362  
2022-10-13 11:55:14 INFO - [ketu.async.source] [source=test-consumer] Start consumer thread  
2022-10-13 11:55:14 INFO - [o.a.k.clients.consumer.KafkaConsumer] [Consumer clientId=consumer-group1-33, groupId=group1] Subscribed to topic(s): test1  
2022-10-13 11:55:14 INFO - [org.apache.kafka.clients.Metadata] [Consumer clientId=consumer-group1-33, groupId=group1] Cluster ID: m86DoQ0mQZW-sSgHvQvrCA  
2022-10-13 11:55:14 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-33, groupId=group1] Discovered group coordinator 10.100.102.8:9092 (id: 2147483647 rack: null)  
2022-10-13 11:55:14 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-33, groupId=group1] (Re-)joining group  
2022-10-13 11:55:14 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-33, groupId=group1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group  
2022-10-13 11:55:14 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-33, groupId=group1] (Re-)joining group  
2022-10-13T11:55:15.371100 :paused
2022-10-13T11:55:16.381883 :resumed
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-9, groupId=group1] Attempt to heartbeat failed since group is rebalancing  
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-9, groupId=group1] Revoke previously assigned partitions   
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-9, groupId=group1] (Re-)joining group  
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-10, groupId=group1] Attempt to heartbeat failed since group is rebalancing  
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-10, groupId=group1] Revoke previously assigned partitions test1-0  
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-10, groupId=group1] (Re-)joining group  
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-9, groupId=group1] Finished assignment for group at generation 67: {consumer-group1-10-20014310-006b-4674-85f9-4ef222b5134f=Assignment(partitions=[test1-0]), consumer-group1-9-626b8cfa-7e93-4879-806c-31d72b935716=Assignment(partitions=[]), consumer-group1-33-cbf9f022-0133-4690-b14d-8bc7e06fb8ee=Assignment(partitions=[])}  
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-9, groupId=group1] Successfully joined group with generation 67  
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-33, groupId=group1] Successfully joined group with generation 67  
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-9, groupId=group1] Adding newly assigned partitions:   
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-10, groupId=group1] Successfully joined group with generation 67  
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-33, groupId=group1] Adding newly assigned partitions:   
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-10, groupId=group1] Adding newly assigned partitions: test1-0  
2022-10-13 11:55:16 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-10, groupId=group1] Setting offset for partition test1-0 to the committed offset FetchPosition{offset=39, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[10.100.102.8:9092 (id: 0 rack: null)], epoch=0}}  
2022-10-13 11:55:17 INFO - [ketu.async.source] [source=test-consumer] Done consuming  
2022-10-13 11:55:17 INFO - [ketu.async.source] [source=test-consumer] Close out channel  
2022-10-13 11:55:17 INFO - [ketu.async.source] [source=test-consumer] Close consumer  
2022-10-13 11:55:17 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-33, groupId=group1] Member consumer-group1-33-cbf9f022-0133-4690-b14d-8bc7e06fb8ee sending LeaveGroup request to coordinator 10.100.102.8:9092 (id: 2147483647 rack: null) due to the consumer is being closed  
2022-10-13 11:55:17 INFO - [ketu.async.source] [source=test-consumer] Exit consumer thread

@yaronthurm
Copy link
Author

yaronthurm commented Oct 13, 2022

Another possible solution is to use a "commands" channel in which the client of the library will be given a channel to which it can push custom functions that will be executed from within the poll loop.

Sample code of suggested version

Changes in poll loop inside source-existing-consumer fn (take item from commands chan if available and invoke it from the poll loop):

(defn- source-existing-consumer [^Consumer consumer out-chan opts]
  '...
  (while @should-poll?
    (let [records (poll!)]
      (when-let [command (async/poll! commands-chan)]
        (command {:consumer consumer}))
      (run! put! records)))
  '...)

Client code:

(let [consumer-source (source/source
                        (chan 10)
                        {:name       "test-consumer"
                         :brokers    "localhost:9092"
                         :topic      "test1"
                         :group-id   "group1"
                         :value-type :string
                         :shape      :value})
      commands-chan (:ketu.source/commands-chan consumer-source)]
  (Thread/sleep 1000) ; To allow consumer to start polling
  (>!! commands-chan (fn [{:keys [consumer]}]
                       (.pause consumer (.assignment consumer))
                       (println (str (java.time.LocalDateTime/now)) :paused)))
  (Thread/sleep 1000)
  (>!! commands-chan (fn [{:keys [consumer]}]
                       (.resume consumer (.paused consumer))
                       (println (str (java.time.LocalDateTime/now)) :resumed)))
  (Thread/sleep 1000)
  (source/stop! consumer-source))

Output:

2022-10-13 12:20:03 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka version: 2.5.1  
2022-10-13 12:20:03 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka commitId: 0efa8fb0f4c73d92  
2022-10-13 12:20:03 INFO - [o.a.kafka.common.utils.AppInfoParser] Kafka startTimeMs: 1665652803257  
2022-10-13 12:20:03 INFO - [ketu.async.source] [source=test-consumer] Start consumer thread  
2022-10-13 12:20:03 INFO - [o.a.k.clients.consumer.KafkaConsumer] [Consumer clientId=consumer-group1-4, groupId=group1] Subscribed to topic(s): test1  
2022-10-13 12:20:03 INFO - [org.apache.kafka.clients.Metadata] [Consumer clientId=consumer-group1-4, groupId=group1] Cluster ID: m86DoQ0mQZW-sSgHvQvrCA  
2022-10-13 12:20:03 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-4, groupId=group1] Discovered group coordinator 10.100.102.8:9092 (id: 2147483647 rack: null)  
2022-10-13 12:20:03 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-4, groupId=group1] (Re-)joining group  
2022-10-13 12:20:03 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-4, groupId=group1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group  
2022-10-13 12:20:03 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-4, groupId=group1] (Re-)joining group  
2022-10-13 12:20:03 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-4, groupId=group1] Finished assignment for group at generation 78: {consumer-group1-4-572ad8bd-4cd6-4046-8898-b135bfc8127e=Assignment(partitions=[test1-0])}  
2022-10-13 12:20:03 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-4, groupId=group1] Successfully joined group with generation 78  
2022-10-13 12:20:03 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-4, groupId=group1] Adding newly assigned partitions: test1-0  
2022-10-13 12:20:03 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-4, groupId=group1] Setting offset for partition test1-0 to the committed offset FetchPosition{offset=62, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[10.100.102.8:9092 (id: 0 rack: null)], epoch=0}}  
2022-10-13T12:20:04.363999 :paused
2022-10-13T12:20:05.368136 :resumed
2022-10-13 12:20:06 INFO - [ketu.async.source] [source=test-consumer] Done consuming  
2022-10-13 12:20:06 INFO - [ketu.async.source] [source=test-consumer] Close out channel  
2022-10-13 12:20:06 INFO - [ketu.async.source] [source=test-consumer] Close consumer  
2022-10-13 12:20:06 INFO - [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=consumer-group1-4, groupId=group1] Revoke previously assigned partitions test1-0  
2022-10-13 12:20:06 INFO - [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=consumer-group1-4, groupId=group1] Member consumer-group1-4-572ad8bd-4cd6-4046-8898-b135bfc8127e sending LeaveGroup request to coordinator 10.100.102.8:9092 (id: 2147483647 rack: null) due to the consumer is being closed  
2022-10-13 12:20:06 INFO - [ketu.async.source] [source=test-consumer] Exit consumer thread

@yonatane
Copy link
Contributor

@yaronthurm Thank you for the suggestions. I prefer the commands channel solution. Please submit a PR. My suggestions:

  • Allow users to turn on this feature by supplying a commands channel in opts :ketu.source/consumer-commands-chan (I prefer the key to be specific because this channel is very specific for now and we might have a more general solution in the future).
  • If the user didn't supply this option, there's no need to poll the channel. We can later decide to create this channel for the user and always check it. You could instead of a when check inside the loop, create a function that's either empty or checks the channel according to this option. This can be done in the enclosing let where subscribe! and put! are defined.
  • Check the commands channel before polling.
  • After internal discussion, the parameter map should contain :ketu.source/consumer for consistency and easier use with clojure spec.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants