Skip to content

Commit

Permalink
Document consumer-commands-chan
Browse files Browse the repository at this point in the history
  • Loading branch information
Yaron Thurm committed Jan 11, 2023
1 parent 58a64eb commit 313c38c
Showing 1 changed file with 78 additions and 4 deletions.
82 changes: 78 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it.
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |

#### Consumer-source options
| Key | Type | Req? | Notes |
|-----|------|------|-------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| Key | Type | Req? | Notes |
|-----|------|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-commands-chan | channel | optional | Used for passing custom functions to be executed from within the poll loop. Items of this channel are expected to be of type `fn[x]`. One example for using this channel is to enable pausing/resuming of the underlying kafka consumer, since trying to do that outside the poll loop causes a `ConcurrentModificationException` to be thrown. [Code example](#example-of-using-the-custom-commands-channel) |

#### Producer-sink options
| Key | Type | Req? | Notes |
Expand Down Expand Up @@ -127,6 +128,79 @@ Similarly, to put a clojure data structure on the producer channel:
(>!! producer-chan ["k2" "v2" "events"])
```

## Example of using the custom commands channel

In this example we demonstare how to enable pause/resume of the consumer:

```clojure
(ns custom-commands-channel-example
(:require [clojure.core.async :as async]
[ketu.async.source :as source]
[ketu.async.sink :as sink]))

(let [commands-chan (async/chan 10)
consumer-chan (async/chan 10)
consumer-opts {:name "consumer-example"
:brokers "broker1:9092"
:topic "example"
:group-id "example"
:value-type :string
:shape :value
:ketu.source/consumer-commands-chan commands-chan}
source (source/source consumer-chan consumer-opts)

producer-chan (async/chan 10)
sink-opts {:name "producer-example"
:brokers "broker1:9092"
:topic "example"
:value-type :string
:shape :value}
sink (sink/sink producer-chan sink-opts)

; periodically produce data to the topic
producing (future
(dotimes [i 20]
(async/>!! producer-chan (str i))
(Thread/sleep 300))
(async/>!! producer-chan "done")
(async/close! producer-chan))

; read from the consumer channel and print to the screen
processing (future
(loop []
(let [message (async/<!! consumer-chan)]
(println message)
(when (not= message "done")
(recur)))))]
(try
(Thread/sleep 2000) ; consumer is consuming normally
(let [paused (promise)
resumed (promise)]

; Send the commands channel a function that will pause the consumer
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(.pause consumer (.assignment consumer))
(deliver paused true)))

@paused
(println "consumer is paused")
(Thread/sleep 2000)

; Send the commands channel a function that will resume the consumer
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(.resume consumer (.paused consumer))
(deliver resumed true)))

@resumed
(println "consumer is resumed")

; Wait for all futures to finish
@producing
@processing)
(finally
(source/stop! source))))
```

## Development & Contribution

We welcome feedback and would love to hear about use-cases other than ours. You can open issues, send pull requests,
Expand Down

0 comments on commit 313c38c

Please sign in to comment.