Skip to content

Commit

Permalink
README update
Browse files Browse the repository at this point in the history
  • Loading branch information
idantavor committed Jul 29, 2024
1 parent 0b23bc6 commit 69e852e
Showing 1 changed file with 101 additions and 88 deletions.
189 changes: 101 additions & 88 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,23 @@ Consume a name string from kafka and produce a greeting string for that name bac
(:require [clojure.core.async :refer [chan close! <!! >!!]]
[ketu.async.source :as source]
[ketu.async.sink :as sink]))
(let [<names (chan 10)
source-opts {:name "greeter-consumer"
:brokers "broker1:9092"
:topic "names"
:group-id "greeter"

(let [<names (chan 10)
source-opts {:name "greeter-consumer"
:brokers "broker1:9092"
:topic "names"
:group-id "greeter"
:value-type :string
:shape :value}
source (source/source <names source-opts)
:shape :value}
source (source/source <names source-opts)

>greets (chan 10)
sink-opts {:name "greeter-producer"
:brokers "broker2:9091"
:topic "greetings"
:value-type :string
:shape :value}
sink (sink/sink >greets sink-opts)]
>greets (chan 10)
sink-opts {:name "greeter-producer"
:brokers "broker2:9091"
:topic "greetings"
:value-type :string
:shape :value}
sink (sink/sink >greets sink-opts)]

;; Consume a name and produce a greeting. You could also do this with e.g. clojure.core.async/pipeline.
(->> (<!! <names)
Expand All @@ -61,11 +61,13 @@ Consume a name string from kafka and produce a greeting string for that name bac

Anything that is not documented is not supported and might change.

Read more about the default values used by the underlying Kafka clients v3.3.1 [here](https://kafka.apache.org/33/documentation.html)
Read more about the default values used by the underlying Kafka clients
v3.3.1 [here](https://kafka.apache.org/33/documentation.html)

Note: `int` is used for brevity but can also mean `long`. Don't worry about it.

#### Common options (both source and sink accept these)

| Key | Type | Req? | Notes |
|------------------|-------------------------|----------|----------------------------------------------------------------------------------|
| :brokers | string | required | Comma separated `host:port` values e.g "broker1:9092,broker2:9092" |
Expand All @@ -76,30 +78,32 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it.
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |

#### Consumer-source options
| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|------------|---------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | fucntion | optional | ..... |

| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `fn [consumer-context pool-fn] -> [ConsumerRecord]` | optional | Decorates the internal pool function. when provided the decorator will be called with the following params:<br/>consumer-context: {:consumer-source/consumer consumer}<br/>pool-fn: fn [] -> ConsumerRecords <br/>Returns an iterable collection of consumerRecord.<br/>The decorator should call the poll-fn on behalf of the consumer source.<br/> |

#### Producer-sink options
| Key | Type | Req? | Notes |
|-------------------|------------------------------------------------------------------------------------------------------------------|------------|------------------------------------------------------------------------------------------------|
| :shape | `:value`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) |
| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer |
| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer |

| Key | Type | Req? | Notes |
|-------------------|------------------------------------------------------------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------|
| :shape | `:value`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) |
| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer |
| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer |

## Data shapes

You don't have to deal with ConsumerRecord or ProducerRecord objects.<br>
To get a clojure data structure with any of the ConsumerRecord fields, configure the consumer shape:

```clojure
; Value only:
{:topic "names"
:key-type :string
{:topic "names"
:key-type :string
:value-type :string
:shape :value}
:shape :value}
(<!! consumer-chan)
;=> "v"

Expand All @@ -113,12 +117,14 @@ To get a clojure data structure with any of the ConsumerRecord fields, configure
(<!! consumer-chan)
;=> {:key "k", :value "v", :topic "names"}
```

Similarly, to put a clojure data structure on the producer channel:

```clojure
; Value only:
{:key-type :string
{:key-type :string
:value-type :string
:shape :value}
:shape :value}
(>!! producer-chan "v")

; Vector:
Expand All @@ -131,77 +137,84 @@ Similarly, to put a clojure data structure on the producer channel:
(>!! producer-chan ["k2" "v2" "events"])
```

## Example of using the custom commands channel
## Consumer Decorator
The consumer decorator allows running custom logic on the consumer polling thread.
This allows custom control on the consumer behavior including manual offset management.
Custom decorator logic may require different consumer configurations. for example when managing the offset manually, auto-commit should usually set to false.

In this example we demonstare how to enable pause/resume of the consumer:

```clojure
(ns custom-commands-channel-example
(:require [clojure.core.async :as async]
(ns consumer-decorator-example
(:require [clojure.core.async :as async]
[ketu.async.source :as source]
[ketu.async.sink :as sink]))

(let [commands-chan (async/chan 10)
consumer-chan (async/chan 10)
consumer-opts {:name "consumer-example"
:brokers "broker1:9092"
:topic "example"
:group-id "example"
:value-type :string
:shape :value
:ketu.source/consumer-commands-chan commands-chan}
source (source/source consumer-chan consumer-opts)
consumer-opts {:name "consumer-example"
:brokers "broker1:9092"
:topic "example"
:group-id "example"
:value-type :string
:shape :value
:ketu.source/consumer-decorator (fn [consumer-ctx poll-fn]
(loop []
(when-let [command (async/poll! commands-chan)]
(command consumer-ctx)
(recur)))
(poll-fn))}
source (source/source consumer-chan consumer-opts)

producer-chan (async/chan 10)
sink-opts {:name "producer-example"
:brokers "broker1:9092"
:topic "example"
:value-type :string
:shape :value}
sink (sink/sink producer-chan sink-opts)
sink-opts {:name "producer-example"
:brokers "broker1:9092"
:topic "example"
:value-type :string
:shape :value}
sink (sink/sink producer-chan sink-opts)

; periodically produce data to the topic
producing (future
(dotimes [i 20]
(async/>!! producer-chan (str i))
(Thread/sleep 300))
(async/>!! producer-chan "done")
(async/close! producer-chan))
producing (future
(dotimes [i 20]
(async/>!! producer-chan (str i))
(Thread/sleep 300))
(async/>!! producer-chan "done")
(async/close! producer-chan))

; read from the consumer channel and print to the screen
processing (future
(loop []
(let [message (async/<!! consumer-chan)]
(println message)
(when (not= message "done")
(recur)))))]
(try
(Thread/sleep 2000) ; consumer is consuming normally
(let [paused (promise)
resumed (promise)]
; Send the commands channel a function that will pause the consumer
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(.pause consumer (.assignment consumer))
(deliver paused true)))
@paused
(println "consumer is paused")
(Thread/sleep 2000)

; Send the commands channel a function that will resume the consumer
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(.resume consumer (.paused consumer))
(deliver resumed true)))
@resumed
(println "consumer is resumed")

; Wait for all futures to finish
@producing
@processing)
(finally
(source/stop! source))))
processing (future
(loop []
(let [message (async/<!! consumer-chan)]
(println message)
(when (not= message "done")
(recur)))))]
(try
(Thread/sleep 2000) ; consumer is consuming normally
(let [paused (promise)
resumed (promise)]

; Send the commands channel a function that will pause the consumer
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(.pause consumer (.assignment consumer))
(deliver paused true)))

@paused
(println "consumer is paused")
(Thread/sleep 2000)

; Send the commands channel a function that will resume the consumer
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
(.resume consumer (.paused consumer))
(deliver resumed true)))

@resumed
(println "consumer is resumed")

; Wait for all futures to finish
@producing
@processing)
(finally
(source/stop! source))))
```

## Development & Contribution
Expand Down

0 comments on commit 69e852e

Please sign in to comment.