diff --git a/README.md b/README.md index 0e28112..9f0a8a8 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,9 @@ Kinsky provides the following: **JSON**, **EDN** and a **keyword** serializer for keys. - Documentation +The `core.async` facade that used to be part of this project is now +[a separate project](https://github.com/fmjrey/kinsky-async). + ## Usage ```clojure @@ -50,6 +53,11 @@ Thanks a lot to these awesome contributors ### 0.1.25 - Removal of the asynchronous façade, transducers should suffice +- Consumer+Producer: added name and counter to support toString +- Producer: added send-cb! function that takes a callback fn +- Producer: added abort-transaction! function +- Consumer: added seek-beginning! and seek-end! +- Minor typo and code refactoring ### 0.1.24 diff --git a/src/kinsky/client.clj b/src/kinsky/client.clj old mode 100644 new mode 100755 index bfa1e77..74583dc --- a/src/kinsky/client.clj +++ b/src/kinsky/client.clj @@ -12,8 +12,10 @@ ConsumerRecords KafkaConsumer OffsetAndMetadata) - (org.apache.kafka.clients.producer KafkaProducer - ProducerRecord) + (org.apache.kafka.clients.producer Callback + KafkaProducer + ProducerRecord + RecordMetadata) (org.apache.kafka.common Node PartitionInfo TopicPartition) @@ -89,9 +91,9 @@ "Subscribe to a topic or list of topics. The topics argument can be: - - A simple string when subscribing to a single topic + - A simple string or keyword when subscribing to a single topic - A regex pattern to subscribe to matching topics - - A sequence of strings + - A collection of strings or keywords The optional listener argument is either a callback function or an implementation of @@ -115,10 +117,18 @@ (wake-up! [this] "Safely wake-up a consumer which may be blocking during polling.") (seek! [this] [this topic-partition offset] - "Overrides the fetch offsets that the consumer will use on the next poll") - (position! [this] [this topic-partition] + "Overrides the fetch offsets the consumer will use on the next poll. + topic-partition must be a map with `:topic` and `:partition` entries. + and offset must be a long value.") + (seek-beginning! [this] [this topic-partitions] + "Overrides the fetch offsets the consumer will use on the next poll. + topic-partitions must be a seq of maps with `:topic` and `:partition`.") + (seek-end! [this] [this topic-partitions] + "Overrides the fetch offsets the consumer will use on the next poll. + topic-partitions must be a seq of maps with `:topic` and `:partition`.") + (position! [this] [this topic-partition] "Get the offset of the next record that will be fetched (if a record with that offset exists).") - (subscription [this] + (subscription [this] "Currently assigned topics")) (defprotocol ProducerDriver @@ -129,11 +139,22 @@ with the following possible keys is expected: `:key`, `:topic`, `:partition`, `:headers`, `:timestamp` and `:value`. ") + (send-cb! [this record cb] [this topic k v cb] [this topic k v headers cb] + "Produce a record on a topic with a callback function of 2 arguments: + - a producer record map as per rm->data, + - an exception thrown during the processing of a record. + One of the two arguments will be nil depending on the send result. + ") (flush! [this] "Ensure that produced messages are flushed.") - (init-transactions! [this]) - (begin-transaction! [this]) - (commit-transaction! [this])) + (init-transactions! [this] + "To be called once when a `transactional.id` is configured.") + (begin-transaction! [this] + "Initiate a new transaction.") + (commit-transaction! [this] + "Commits the ongoing transaction.") + (abort-transaction! [this] + "Aborts the ongoing transaction.")) (defprotocol GenericDriver (close! [this] [this timeout] @@ -417,9 +438,17 @@ :by-partition (group-by (juxt :topic :partition) edc)})) (defn ->topics - "Yield a valid object for subscription" + "Yield a valid topic object for subscription given a string, keyword, + regex pattern or collection of strings or keywords." ^Collection [topics] + (assert (or (string? topics) + (keyword? topics) + (instance? Pattern topics) + (and (instance? Collection topics) + (every? (some-fn string? keyword?) topics))) + (str "topics argument must be a string, keyword, regex pattern or " + "collection of strings or keywords, received " topics)) (cond (keyword? topics) [(name topics)] (string? topics) [topics] @@ -427,9 +456,17 @@ (instance? Pattern topics) topics :else (throw (ex-info "topics argument is invalid" {:topics topics})))) +(def consumer-counter (atom 0)) +(defn opts->consumer-id + ^String + [opts] + (str "kinsky-consumer-" (swap! consumer-counter inc) "-" + (get opts "group.id" "") "-" + (get opts "client.id" "") "-")) + (defn consumer->driver - "Given a consumer-driver and an optional callback to callback - to call when stopping, yield a consumer driver. + "Given a KafkaConsumer and an optional callback to call when stopping, + yield a consumer driver. The consumer driver implements the following protocols: @@ -438,6 +475,7 @@ - `clojure.lang.IDeref`: `deref` to access underlying [KafkaConsumer](http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaConsumer.html) instance. + - `java.lang.Object`: `.toString` produces the kinsky consumer `id`. The consumer-driver can also take options: @@ -448,69 +486,65 @@ (consumer->driver consumer nil)) ([^KafkaConsumer consumer {::keys [run-fn consumer-decoder-fn] - :or {consumer-decoder-fn consumer-records->data}}] - (reify - ConsumerDriver - (poll! [this timeout] - (consumer-decoder-fn (.poll consumer (java.time.Duration/ofMillis timeout)))) - (stop! [this] - (stop! this 0)) - (stop! [this timeout] - (when (fn? run-fn) - (run-fn)) - (.wakeup consumer)) - (pause! [this topic-partitions] - (.pause consumer - (map ->topic-partition topic-partitions))) - (resume! [this topic-partitions] - (.resume consumer - (map ->topic-partition topic-partitions))) - (subscribe! [this topics] - (assert (or (string? topics) - (keyword? topics) - (instance? Pattern topics) - (and (instance? Collection topics) - (every? (some-fn string? keyword?) topics))) - (str "topic argument must be a string, keyword, regex pattern or " - "collection of strings or keywords.")) - (.subscribe consumer (->topics topics))) - (subscribe! [this topics listener] - (assert (or (string? topics) - (keyword? topics) - (instance? Pattern topics) - (and (instance? Collection topics) - (every? (some-fn string? keyword?) topics))) - (str "topic argument must be a string, keyword, regex pattern or " - "collection of strings or keywords.")) - (.subscribe consumer (->topics topics) (rebalance-listener listener))) - (unsubscribe! [this] - (.unsubscribe consumer)) - (wake-up! [this] - (.wakeup consumer)) - (commit! [this] - (.commitSync consumer)) - (commit! [this topic-offsets] - (let [offsets (->> topic-offsets - (map (juxt ->topic-partition ->offset-metadata)) - (reduce merge {}))] - (.commitSync consumer ^Map offsets))) - (seek! [this topic-partition offset] - (.seek consumer - (->topic-partition topic-partition) - (long offset))) - (position! [this topic-partition] - (.position consumer (->topic-partition topic-partition))) - (subscription [this] - (.subscription consumer)) - GenericDriver - (close! [this] - (.close consumer)) - MetadataDriver - (partitions-for [this topic] - (mapv partition-info->data (.partitionsFor consumer topic))) - clojure.lang.IDeref - (deref [this] - consumer)))) + :or {consumer-decoder-fn consumer-records->data} + :as config}] + (let [id (opts->consumer-id config)] + (reify + ConsumerDriver + (poll! [this timeout] + (consumer-decoder-fn (.poll consumer (java.time.Duration/ofMillis timeout)))) + (stop! [this] + (stop! this 0)) + (stop! [this timeout] + (when (fn? run-fn) + (run-fn)) + (.wakeup consumer)) + (pause! [this topic-partitions] + (.pause consumer + (map ->topic-partition topic-partitions))) + (resume! [this topic-partitions] + (.resume consumer + (map ->topic-partition topic-partitions))) + (subscribe! [this topics] + (.subscribe consumer (->topics topics))) + (subscribe! [this topics listener] + (.subscribe consumer (->topics topics) (rebalance-listener listener))) + (unsubscribe! [this] + (.unsubscribe consumer)) + (wake-up! [this] + (.wakeup consumer)) + (commit! [this] + (.commitSync consumer)) + (commit! [this topic-offsets] + (let [offsets (->> topic-offsets + (map (juxt ->topic-partition ->offset-metadata)) + (reduce merge {}))] + (.commitSync consumer ^Map offsets))) + (seek! [this topic-partition offset] + (.seek consumer (->topic-partition topic-partition) (long offset))) + (seek-beginning! [this topic-partitions] + (.seekToBeginning consumer (map ->topic-partition topic-partitions))) + (seek-end! [this topic-partitions] + (.seekToEnd consumer (map ->topic-partition topic-partitions))) + (position! [this topic-partition] + (.position consumer (->topic-partition topic-partition))) + (subscription [this] + (.subscription consumer)) + GenericDriver + (close! [this] + (.close consumer)) + (close! [this timeout] + (if (nil? timeout) + (.close consumer) + (.close consumer (long timeout) TimeUnit/MILLISECONDS))) + MetadataDriver + (partitions-for [this topic] + (mapv partition-info->data (.partitionsFor consumer topic))) + clojure.lang.IDeref + (deref [this] + consumer) + Object + (toString [this] id))))) (defn safe-poll! "Implementation of poll which disregards wake-up exceptions" @@ -537,8 +571,9 @@ (map ->header headers)) (defn ->record - "Build a producer record from a clojure map. Leave ProducerRecord instances - untouched." + "Build a ProducerRecord from a clojure map. + Leave ProducerRecord instances untouched." + ^ProducerRecord [payload] (if (instance? ProducerRecord payload) payload @@ -556,6 +591,42 @@ key value (->headers headers))))) +(defn rm->data + "Yield a clojure representation of a producer RecordMetadata. + The map returned is in the form: + {:topic \"topic-name\" + :partition 0 ;; nil if unknown + :offset 1234567890 + :timestamp 9876543210}" + [^RecordMetadata rm] + {:topic (.topic rm) + :partition (let [partition (.partition rm)] + (when (not= partition RecordMetadata/UNKNOWN_PARTITION) + partition)) + :offset (.offset rm) + :timestamp (.timestamp rm)}) + +(defn ->callback + "Return a producer Callback instance given a function taking 2 arguments, + - a producer record map as per rm->data, + - an exception thrown during the processing of a record. + One of the two argument will be nil depending on the send result." + ^Callback + [f] + (when f + (reify + Callback + (onCompletion [_ record-metadata exception] + (f (some-> record-metadata rm->data) exception))))) + +(def producer-counter (atom 0)) +(defn opts->producer-id + ^String + [opts] + (str "kinsky-producer-" (swap! producer-counter inc) "-" + (get opts "transaction.id" "") "-" + (get opts "client.id" "") "-")) + (defn producer->driver "Yield a driver from a Kafka Producer. The producer driver implements the following protocols: @@ -564,38 +635,55 @@ - [MetadataDriver](#var-MetadataDriver) - `clojure.lang.IDeref`: `deref` to access underlying [KafkaProducer](http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html) - instance." - [^KafkaProducer producer] - (reify - GenericDriver - (close! [this] - (.close producer)) - (close! [this timeout] - (if (nil? timeout) - (.close producer) - (.close producer (long timeout) TimeUnit/MILLISECONDS))) - ProducerDriver - (send! [this record] - (.send producer (->record record))) - (send! [this topic k v] - (.send producer (->record {:key k :value v :topic topic}))) - (send! [this topic k v headers] - "Defaults partition and timestamp to 0, if you need to set either, use the single arity version" - (.send producer (->record {:key k :value v :topic topic :headers headers}))) - (flush! [this] - (.flush producer)) - (init-transactions! [this] - (.initTransactions producer)) - (begin-transaction! [this] - (.beginTransaction producer)) - (commit-transaction! [this] - (.commitTransaction producer)) - MetadataDriver - (partitions-for [this topic] - (mapv partition-info->data (.partitionsFor producer topic))) - clojure.lang.IDeref - (deref [this] - producer))) + instance. + - `java.lang.Object`: `.toString` produces the kinsky producer `id`." + ([^KafkaProducer producer] + (producer->driver producer nil)) + ([^KafkaProducer producer config] + (let [id (opts->producer-id config)] + (reify + GenericDriver + (close! [this] + (.close producer)) + (close! [this timeout] + (if (nil? timeout) + (.close producer) + (.close producer (long timeout) TimeUnit/MILLISECONDS))) + ProducerDriver + (send! [this record] + (.send producer (->record record))) + (send! [this topic k v] + (.send producer (->record {:key k :value v :topic topic}))) + (send! [this topic k v headers] + (.send producer (->record {:key k :value v :topic topic :headers headers}))) + (send-cb! [this record cb] + (.send producer (->record record) (->callback cb))) + (send-cb! [this topic k v cb] + (.send producer + (->record {:key k :value v :topic topic}) + (->callback cb))) + (send-cb! [this topic k v headers cb] + (.send producer + (->record {:key k :value v :topic topic :headers headers}) + (->callback cb))) + (flush! [this] + (.flush producer)) + (init-transactions! [this] + (.initTransactions producer)) + (begin-transaction! [this] + (.beginTransaction producer)) + (commit-transaction! [this] + (.commitTransaction producer)) + (abort-transaction! [this] + (.abortTransaction producer)) + MetadataDriver + (partitions-for [this topic] + (mapv partition-info->data (.partitionsFor producer topic))) + clojure.lang.IDeref + (deref [this] + producer) + Object + (toString [this] id))))) (defn producer "Create a producer from a configuration and optional serializers. @@ -603,16 +691,17 @@ and values. If none are provided, the configuration is expected to hold serializer class names." ([config] - (producer->driver (KafkaProducer. (opts->props config)))) + (producer->driver (KafkaProducer. (opts->props config)) config)) ([config serializer] (producer->driver (KafkaProducer. (opts->props config) (->serializer serializer) - (->serializer serializer)))) + (->serializer serializer)) + config)) ([config kserializer vserializer] (producer->driver (KafkaProducer. (opts->props config) (->serializer kserializer) - (->serializer vserializer))))) - + (->serializer vserializer)) + config))) (defn consumer "Create a consumer from a configuration and optional deserializers. If a callback is given, call it when stopping the consumer. diff --git a/test/kinsky/client_test.clj b/test/kinsky/client_test.clj index a0548f6..d716951 100644 --- a/test/kinsky/client_test.clj +++ b/test/kinsky/client_test.clj @@ -1,5 +1,5 @@ (ns kinsky.client-test - (:require [clojure.test :refer :all :as t] + (:require [clojure.test :refer :all :as t] [clojure.pprint :as pp] [kinsky.client :as client] [kinsky.embedded :as e])) @@ -32,22 +32,22 @@ (testing "string serializer" (is (= "foo" (String. - (.serialize (client/string-serializer) "" "foo"))))) + (.serialize (client/string-serializer) "" "foo"))))) (testing "keyword serializer" (is (= "foo" (String. - (.serialize (client/keyword-serializer) "" :foo))))) + (.serialize (client/keyword-serializer) "" :foo))))) (testing "edn serializer" (is (= "{:a :b, :c :d}" (String. - (.serialize (client/edn-serializer) "" {:a :b :c :d}))))) + (.serialize (client/edn-serializer) "" {:a :b :c :d}))))) (testing "json serializer" (is (= "[0,1,2]" (String. - (.serialize (client/json-serializer) "" [0 1 2])))))) + (.serialize (client/json-serializer) "" [0 1 2])))))) (deftest deserializer (testing "string deserializer" @@ -69,12 +69,14 @@ (.getBytes "{\"a\": \"b\", \"c\": \"d\"}")))))) (deftest config-props - (testing "valid configuration properties" - (is (= {"foo.bar" "0"} - (client/opts->props {:foo.bar 0}))))) + (testing "configuration properties" + (is (= {"foo.bar" "0" "foo.baz" "1"} + (client/opts->props {:foo.bar 0 + "foo.baz" "1" + :qualified/kw :discarded}))))) (deftest rebalance-listener - (testing "idempotency" + (testing "identity when given a ConsumerRebalanceListener" (let [sink (client/rebalance-listener (fn [& _]))] (is (= sink (client/rebalance-listener sink)))))