diff --git a/test/ketu/async/integration_test.clj b/test/ketu/async/integration_test.clj index 0c853b8..df58b4d 100644 --- a/test/ketu/async/integration_test.clj +++ b/test/ketu/async/integration_test.clj @@ -6,7 +6,7 @@ [clojure.core.async :as async] [ketu.clients.consumer :as consumer] [ketu.clients.producer :as producer] - [ketu.decorators.consumer.protocol :as cdp] + [ketu.decorators.consumer.protocol :refer [ConsumerDecorator]] [ketu.async.source :as source] [ketu.async.sink :as sink] [ketu.test.kafka-setup :as kafka-setup]) @@ -223,13 +223,13 @@ :group-id "clicks-test-consumer" :auto-offset-reset "earliest" :shape :value - :ketu.source/consumer-decorator (reify cdp/ConsumerDecorator + :ketu.source/consumer-decorator (reify ConsumerDecorator (poll! [_ {_consumer :ketu.source/consumer} poll-fn] (let [records (poll-fn)] (doseq [^ConsumerRecord record records] (async/>!! result-chan (String. ^"[B" (.value record)))) records)) - (validate [_ opts] + (validate [_ _] true))} source (source/source consumer-chan clicks-consumer-opts) clicks-producer-opts {:name "clicks-producer" @@ -260,10 +260,10 @@ :group-id "clicks-test-consumer" :auto-offset-reset "earliest" :shape :value - :ketu.source/consumer-decorator (reify cdp/ConsumerDecorator + :ketu.source/consumer-decorator (reify ConsumerDecorator (poll! [_ _ _] nil) - (validate [_ opts] + (validate [_ _] false))}] (is (thrown-with-msg? Exception #"Consumer decorator validation failed" (source/source consumer-chan clicks-consumer-opts)))))))