Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom commands #15

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 78 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it.
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |

#### Consumer-source options
| Key | Type | Req? | Notes |
|-----|------|------|-------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| Key | Type | Req? | Notes |
|-----|------|------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-commands-chan | channel | optional | Used for passing custom functions to be executed from within the poll loop. Items of this channel are expected to be of type `fn[x]`. One example for using this channel is to enable pausing/resuming of the underlying kafka consumer, since trying to do that outside the poll loop causes a `ConcurrentModificationException` to be thrown. [Example](#example-of-using-the-custom-commands-channel) |

#### Producer-sink options
| Key | Type | Req? | Notes |
Expand Down Expand Up @@ -127,6 +128,79 @@ Similarly, to put a clojure data structure on the producer channel:
(>!! producer-chan ["k2" "v2" "events"])
```

## Example of using the custom commands channel

In this example we demonstare how to enable pause/resume of the consumer:

```clojure
(ns custom-commands-channel-example
(:require [clojure.core.async :as async]
[ketu.async.source :as source]
[ketu.async.sink :as sink]))

(let [commands-chan (async/chan 10)
consumer-chan (async/chan 10)
consumer-opts {:name "consumer-example"
:brokers "broker1:9092"
:topic "example"
:group-id "example"
:value-type :string
:shape :value
:ketu.source/consumer-commands-chan commands-chan}
source (source/source consumer-chan consumer-opts)

producer-chan (async/chan 10)
sink-opts {:name "producer-example"
:brokers "broker1:9092"
:topic "example"
:value-type :string
:shape :value}
sink (sink/sink producer-chan sink-opts)

; periodically produce data to the topic
producing (future
(dotimes [i 20]
(async/>!! producer-chan (str i))
(Thread/sleep 300))
(async/>!! producer-chan "done")
(async/close! producer-chan))

; read from the consumer channel and print to the screen
processing (future
(loop []
(let [message (async/<!! consumer-chan)]
(println message)
(when (not= message "done")
(recur)))))]
(try
(Thread/sleep 2000) ; consumer is consuming normally
(let [paused (promise)
resumed (promise)]

; Send the commands channel a function that will pause the consumer
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(.pause consumer (.assignment consumer))
(deliver paused true)))

@paused
(println "consumer is paused")
(Thread/sleep 2000)

; Send the commands channel a function that will resume the consumer
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(.resume consumer (.paused consumer))
(deliver resumed true)))

@resumed
(println "consumer is resumed")

; Wait for all futures to finish
@producing
@processing)
(finally
(source/stop! source))))
```

## Development & Contribution

We welcome feedback and would love to hear about use-cases other than ours. You can open issues, send pull requests,
Expand Down
6 changes: 6 additions & 0 deletions src/ketu/async/source.clj
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
close-out-chan? (:ketu.source/close-out-chan? opts)
^long close-consumer? (:ketu.source/close-consumer? opts)
consumer-close-timeout-ms (:ketu.source/consumer-close-timeout-ms opts)
commands-chan (:ketu.source/consumer-commands-chan opts)

should-poll? (volatile! true)
abort-pending-put (async/chan)
Expand All @@ -112,6 +113,10 @@
->data (->data-fn opts)
put! (fn [record] (put-or-abort-pending! out-chan (->data record) abort-pending-put))

maybe-execute-custom-command (if (some? commands-chan)
(fn [] (when-let [command (async/poll! commands-chan)]
(command {:ketu.source/consumer consumer})))
(fn []))
yaronthurm marked this conversation as resolved.
Show resolved Hide resolved
consumer-thread
(async/thread
(try
Expand All @@ -121,6 +126,7 @@
(subscribe! consumer)

(while @should-poll?
(maybe-execute-custom-command)
(let [records (poll!)]
(run! put! records)))

Expand Down
7 changes: 5 additions & 2 deletions src/ketu/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
(:require [clojure.set]
[clojure.spec.alpha :as s]
[clojure.string]
[expound.alpha :as expound])
[expound.alpha :as expound]
[clojure.core.async.impl.protocols])
(:import (java.util.regex Pattern)
(org.apache.kafka.clients.producer Callback)
(org.apache.kafka.common.serialization Deserializer Serializer)))
Expand All @@ -27,6 +28,7 @@
(s/def :ketu.source/close-out-chan? boolean?)
(s/def :ketu.source/close-consumer? boolean?)
(s/def :ketu.source/create-rebalance-listener-obj fn?)
(s/def :ketu.source/consumer-commands-chan #(extends? clojure.core.async.impl.protocols/ReadPort (type %)))

(s/def :ketu.source.assign/topic :ketu/topic)
(s/def :ketu.source.assign/partition-nums (s/coll-of nat-int?))
Expand Down Expand Up @@ -76,7 +78,8 @@
:ketu.source/consumer-close-timeout-ms
:ketu.source/consumer-thread-timeout-ms
:ketu.source/close-out-chan?
:ketu.source/close-consumer?]))
:ketu.source/close-consumer?
:ketu.source/consumer-commands-chan]))

(s/def :ketu.apache.producer/config map?)
(s/def :ketu.sink/sender-threads-num pos-int?)
Expand Down
35 changes: 35 additions & 0 deletions test/ketu/async/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,38 @@
(source/stop! source-rebalance)
(source/stop! source)
(.close ^AdminClient admin-client)))))

(deftest commands-channel
(let [commands-chan (async/chan 10)
consumer-chan (async/chan 10)
result-chan (async/chan 10)
clicks-consumer-opts {:name "clicks-consumer"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
:ketu.source/consumer-commands-chan commands-chan}
source (source/source consumer-chan clicks-consumer-opts)]
(try
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}] (async/>!! result-chan [:1 consumer])))
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(try
(.pause consumer (.assignment consumer))
(async/>!! result-chan [:2 :success])
(catch Exception _
(async/>!! result-chan [:2 :failed])))))
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(try
(.resume consumer (.paused consumer))
(async/>!! result-chan [:3 :success])
(catch Exception _
(async/>!! result-chan [:3 :failed])))))

(let [expected [[:1 (:ketu.source/consumer source)]
[:2 :success]
[:3 :success]]
actual (doall (mapv (fn [_] (u/try-take! result-chan)) (range 3)))]
(is (= expected actual)))
(finally
(Thread/sleep 2000)
(source/stop! source)))))