Skip to content

Commit

Permalink
CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
idantavor committed Aug 6, 2024
1 parent 37a9630 commit f890121
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it.

#### Consumer-source options

| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `fn [consumer-context poll-fn] -> [ConsumerRecord]` | optional | Decorates the internal poll function. when provided the decorator will be called with the following params:<br/>consumer-context: {:ketu.source/consumer consumer}<br/>pool-fn: fn [] -> ConsumerRecords <br/>Returns an iterable collection of consumerRecord.<br/>The decorator should call the poll-fn on behalf of the consumer source.<br/> |
| Key | Type | Req? | Notes |
|---------------------------------|-----------------------------------------------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| :group-id | string | required | |
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
| :ketu.source/consumer-decorator | `fn [consumer-context poll-fn] -> Iterable<ConsumerRecord>` | optional | Decorates the internal poll function. when provided the decorator will be called with the following params:<br/>consumer-context: {:ketu.source/consumer consumer}<br/>pool-fn: fn [] -> Iterable<ConsumerRecord> <br/>Returns an iterable collection of consumerRecord.<br/>The decorator should call the poll-fn on behalf of the consumer source.<br/> |

#### Producer-sink options

Expand Down Expand Up @@ -138,12 +138,15 @@ Similarly, to put a clojure data structure on the producer channel:
```

## Consumer Decorator

The consumer decorator allows running custom logic on the consumer polling thread.
This allows custom control on the consumer behavior including manual offset management.
Custom decorator logic may require different consumer configurations.
for example when managing the offset manually, auto-commit should usually set to false.
for example when managing the offset manually, auto-commit should usually set to false.

In this example we demonstare how to enable pause/resume of the consumer:
In this example we use the decorator to run commands in the polling thread context.
The consumer is paused/resumed based on commands sent from the application.
The decorator processes all immediately available commands in the commands-chan, and only then calls (poll-fn).
```clojure
(ns consumer-decorator-example
(:require [clojure.core.async :as async]
Expand Down

0 comments on commit f890121

Please sign in to comment.