Skip to content

Commit

Permalink
restructure poll loop to allow decorators commit
Browse files Browse the repository at this point in the history
  • Loading branch information
idantavor committed Jul 24, 2024
1 parent a2f758e commit ac90ed2
Showing 1 changed file with 24 additions and 21 deletions.
45 changes: 24 additions & 21 deletions src/ketu/async/source.clj
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,22 @@
(fn [consumer]
(consumer/assign! consumer (consumer/topic-partitions topic partitions))))))

(defn- poll-fn [^Consumer consumer opts]
(let [source-name (:ketu/name opts)
poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts))
catching-poll? (:ketu.source.legacy/catching-poll? opts)]
(if catching-poll?
;TODO Eliminate catching poll ASAP.
; Just in case of a production issue and generic error handling wasn't implemented yet.
(fn []
(try
(consumer/poll! consumer poll-timeout-duration)
(catch Exception e
(log/error logger "[source={}] Caught poll exception" source-name e)
nil)))
(fn []
(consumer/poll! consumer poll-timeout-duration)))))
(defn- poll-fn [^Consumer consumer should-poll? opts]
(when @should-poll?
(let [source-name (:ketu/name opts)
poll-timeout-duration (Duration/ofMillis (:ketu.source/poll-timeout-ms opts))
catching-poll? (:ketu.source.legacy/catching-poll? opts)]
(if catching-poll?
;TODO Eliminate catching poll ASAP.
; Just in case of a production issue and generic error handling wasn't implemented yet.
(fn []
(try
(consumer/poll! consumer poll-timeout-duration)
(catch Exception e
(log/error logger "[source={}] Caught poll exception" source-name e)
[])))
(fn []
(consumer/poll! consumer poll-timeout-duration))))))

(defn- ->data-fn [{:keys [ketu.source/shape] :as opts}]
(cond
Expand All @@ -102,15 +103,16 @@
close-out-chan? (:ketu.source/close-out-chan? opts)
^long close-consumer? (:ketu.source/close-consumer? opts)
consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts)
should-poll? (volatile! true)
decorator-fn (some-> (:ketu.source/consumer-decorator opts)
(partial {:ketu.source/consumer consumer}))
(partial {:ketu.source/consumer consumer
:ketu.source/should-poll? should-poll?}))

should-poll? (volatile! true)
abort-pending-put (async/chan)
done-putting (async/chan)

subscribe! (or (subscribe-fn opts) (assign-fn opts))
poll-impl (poll-fn consumer opts)
poll-impl (poll-fn consumer should-poll? opts)
poll! (if (some? decorator-fn)
(partial decorator-fn poll-impl)
poll-impl)
Expand All @@ -125,9 +127,10 @@

(subscribe! consumer)

(while @should-poll?
(let [records (poll!)]
(run! put! records)))
(loop []
(when-let [records (poll!)]
(run! put! records)
(recur)))

(catch WakeupException e
; We wakeup the consumer on graceful shutdown after should-poll? is false.
Expand Down

0 comments on commit ac90ed2

Please sign in to comment.