diff --git a/README.md b/README.md index c10ccc1..8899605 100644 --- a/README.md +++ b/README.md @@ -79,11 +79,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it. #### Consumer-source options -| Key | Type | Req? | Notes | -|---------------------------------|-----------------------------------------------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| :group-id | string | required | | -| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | -| :ketu.source/consumer-decorator | `fn [consumer-context poll-fn] -> [ConsumerRecord]` | optional | Decorates the internal poll function. when provided the decorator will be called with the following params:
consumer-context: {:ketu.source/consumer consumer}
pool-fn: fn [] -> ConsumerRecords
Returns an iterable collection of consumerRecord.
The decorator should call the poll-fn on behalf of the consumer source.
| +| Key | Type | Req? | Notes | +|---------------------------------|-----------------------------------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| :group-id | string | required | | +| :shape | `:value:`, `[:vector ]`,`[:map ]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) | +| :ketu.source/consumer-decorator | `fn [consumer-context poll-fn] -> Iterable` | optional | Decorates the internal poll function. when provided the decorator will be called with the following params:
consumer-context: {:ketu.source/consumer consumer}
pool-fn: fn [] -> Iterable
Returns an iterable collection of consumerRecord.
The decorator should call the poll-fn on behalf of the consumer source.
| #### Producer-sink options @@ -138,12 +138,15 @@ Similarly, to put a clojure data structure on the producer channel: ``` ## Consumer Decorator + The consumer decorator allows running custom logic on the consumer polling thread. This allows custom control on the consumer behavior including manual offset management. Custom decorator logic may require different consumer configurations. -for example when managing the offset manually, auto-commit should usually set to false. +for example when managing the offset manually, auto-commit should usually set to false. -In this example we demonstare how to enable pause/resume of the consumer: +In this example we use the decorator to run commands in the polling thread context. +The consumer is paused/resumed based on commands sent from the application. +The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn). ```clojure (ns consumer-decorator-example (:require [clojure.core.async :as async]