Skip to content

Commit

Permalink
remove commands channel
Browse files Browse the repository at this point in the history
  • Loading branch information
idantavor committed Jul 22, 2024
1 parent 91697cc commit 961820c
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 46 deletions.
10 changes: 1 addition & 9 deletions src/ketu/async/source.clj
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@
close-out-chan? (:ketu.source/close-out-chan? opts)
^long close-consumer? (:ketu.source/close-consumer? opts)
consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts)
commands-chan (:ketu.source/consumer-commands-chan opts)
interceptor-fn (or (some-> (:ketu.source/consumer-interceptor opts)
(partial consumer))
(partial {:ketu.source/consumer consumer}))
identity)
should-poll? (volatile! true)
abort-pending-put (async/chan)
Expand All @@ -115,12 +114,6 @@
->data (->data-fn opts)
put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put))

optionally-execute-commands! (if (some? commands-chan)
(fn [] (loop []
(when-let [command (async/poll! commands-chan)]
(command {:ketu.source/consumer consumer})
(recur))))
(fn []))
consumer-thread
(async/thread
(try
Expand All @@ -130,7 +123,6 @@
(subscribe! consumer)

(while @should-poll?
(optionally-execute-commands!)
(let [records (interceptor-fn (poll!))]
(run! put! records)))

Expand Down
1 change: 0 additions & 1 deletion src/ketu/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
(s/def :ketu.source/close-out-chan? boolean?)
(s/def :ketu.source/close-consumer? boolean?)
(s/def :ketu.source/create-rebalance-listener-obj fn?)
(s/def :ketu.source/consumer-commands-chan #(extends? clojure.core.async.impl.protocols/ReadPort (type %)))
(s/def :ketu.source/consumer-interceptor fn?)
(s/def :ketu.source.assign/topic :ketu/topic)
(s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?))
Expand Down
37 changes: 1 addition & 36 deletions test/ketu/async/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -212,41 +212,6 @@
(source/stop! source)
(.close ^AdminClient admin-client)))))

(deftest commands-channel
(let [commands-chan (async/chan 10)
consumer-chan (async/chan 10)
result-chan (async/chan 10)
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:ketu.source/consumer-commands-chan commands-chan}
source (source/source consumer-chan clicks-consumer-opts)]
(try
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}] (async/>!! result-chan [:1 consumer])))
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(try
(.pause consumer (.assignment consumer))
(async/>!! result-chan [:2 :success])
(catch Exception _
(async/>!! result-chan [:2 :failed])))))
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(try
(.resume consumer (.paused consumer))
(async/>!! result-chan [:3 :success])
(catch Exception _
(async/>!! result-chan [:3 :failed])))))

(let [expected [[:1 (:ketu.source/consumer source)]
[:2 :success]
[:3 :success]]
actual (doall (mapv (fn [_] (u/try-take! result-chan)) (range 3)))]
(is (= expected actual)))
(finally
(Thread/sleep 2000)
(source/stop! source)))))

(deftest consumer-interceptor
(let [consumer-chan (async/chan 10)
result-chan (async/chan 100)
Expand All @@ -256,7 +221,7 @@
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:shape :value
:ketu.source/consumer-interceptor (fn [_consumer records]
:ketu.source/consumer-interceptor (fn [{_consumer :ketu.source/consumer} records]
(doseq [^ConsumerRecord record records]
(async/>!! result-chan (String. ^"[B" (.value record))))
records)}
Expand Down

0 comments on commit 961820c

Please sign in to comment.