diff --git a/data-plane/THIRD-PARTY.txt b/data-plane/THIRD-PARTY.txt index 300cf412ce..39d5e5436c 100644 --- a/data-plane/THIRD-PARTY.txt +++ b/data-plane/THIRD-PARTY.txt @@ -1,5 +1,5 @@ -Lists of 231 third-party dependencies. +Lists of 232 third-party dependencies. (Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Classic Module (ch.qos.logback:logback-classic:1.2.11 - http://logback.qos.ch/logback-classic) (Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Core Module (ch.qos.logback:logback-core:1.2.11 - http://logback.qos.ch/logback-core) (Apache License 2.0) brotli4j (com.aayushatharva.brotli4j:brotli4j:1.12.0 - https://github.com/hyperxpro/Brotli4j/brotli4j) @@ -34,6 +34,7 @@ Lists of 231 third-party dependencies. (Unknown license) contract (dev.knative.eventing.kafka.broker:contract:1.0-SNAPSHOT - no url defined) (Unknown license) core (dev.knative.eventing.kafka.broker:core:1.0-SNAPSHOT - no url defined) (Unknown license) dispatcher (dev.knative.eventing.kafka.broker:dispatcher:1.0-SNAPSHOT - no url defined) + (Unknown license) dispatcher-loom (dev.knative.eventing.kafka.broker:dispatcher-loom:1.0-SNAPSHOT - no url defined) (Unknown license) dispatcher-vertx (dev.knative.eventing.kafka.broker:dispatcher-vertx:1.0-SNAPSHOT - no url defined) (Unknown license) receiver (dev.knative.eventing.kafka.broker:receiver:1.0-SNAPSHOT - no url defined) (Unknown license) receiver-loom (dev.knative.eventing.kafka.broker:receiver-loom:1.0-SNAPSHOT - no url defined) diff --git a/data-plane/dispatcher-loom/pom.xml b/data-plane/dispatcher-loom/pom.xml index a6b0ea5b97..2546ba55f0 100644 --- a/data-plane/dispatcher-loom/pom.xml +++ b/data-plane/dispatcher-loom/pom.xml @@ -30,6 +30,7 @@ 20 + --enable-preview @@ -39,6 +40,32 @@ dispatcher ${project.version} + + dev.knative.eventing.kafka.broker + receiver-loom + ${project.version} + + + + io.vertx + vertx-junit5 + test + + + org.junit.jupiter + junit-jupiter + test + + + org.assertj + assertj-core + test + + + org.mockito + mockito-junit-jupiter + test + diff --git a/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFactory.java b/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFactory.java index 58106e03e7..cbbda41d92 100644 --- a/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFactory.java +++ b/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFactory.java @@ -15,6 +15,16 @@ */ package dev.knative.eventing.kafka.broker.dispatcherloom; -public class LoomConsumerFactory { - // TODO implement +import dev.knative.eventing.kafka.broker.core.ReactiveConsumerFactory; +import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer; +import io.vertx.core.Vertx; +import java.util.Map; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +public class LoomConsumerFactory implements ReactiveConsumerFactory { + + @Override + public ReactiveKafkaConsumer create(Vertx vertx, Map configs) { + return new LoomKafkaConsumer<>(vertx, new KafkaConsumer<>(configs)); + } } diff --git a/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java b/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java index bdf09b8884..aac391373f 100644 --- a/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java +++ b/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java @@ -15,6 +15,202 @@ */ package dev.knative.eventing.kafka.broker.dispatcherloom; -public class LoomKafkaConsumer { - // TODO implement +import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import java.time.Duration; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LoomKafkaConsumer implements ReactiveKafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger(LoomKafkaConsumer.class); + + private final Consumer consumer; + private final BlockingQueue taskQueue; + private final AtomicBoolean isClosed; + private final Thread taskRunnerThread; + private Handler exceptionHandler; + + public LoomKafkaConsumer(Vertx vertx, Consumer consumer) { + this.consumer = consumer; + this.taskQueue = new LinkedBlockingQueue<>(); + this.isClosed = new AtomicBoolean(false); + + taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue); + } + + private void addTask(Runnable task, Promise promise) { + if (isClosed.get()) { + promise.fail("Consumer is closed"); + } + taskQueue.add(task); + } + + private void processTaskQueue() { + while (!isClosed.get() || !taskQueue.isEmpty()) { + try { + taskQueue.take().run(); + } catch (InterruptedException e) { + logger.debug("Interrupted while waiting for task", e); + } + } + } + + @Override + public Future> commit(Map offset) { + final Promise> promise = Promise.promise(); + addTask( + () -> { + consumer.commitAsync(offset, (offsetMap, exception) -> { + if (exception != null) { + promise.fail(exception); + } else { + promise.complete(offsetMap); + } + }); + }, + promise); + return promise.future(); + } + + @Override + public Future close() { + final Promise promise = Promise.promise(); + isClosed.set(true); + taskQueue.add(() -> { + try { + consumer.close(); + } catch (Exception e) { + promise.fail(e); + } + }); + + Thread.ofVirtual().start(() -> { + try { + while (!taskQueue.isEmpty()) { + Thread.sleep(2000L); + } + taskRunnerThread.interrupt(); + taskRunnerThread.join(); + promise.complete(); + } catch (InterruptedException e) { + logger.debug("Interrupted while waiting for taskRunnerThread to finish", e); + promise.fail(e); + } + }); + return promise.future(); + } + + @Override + public Future pause(Collection partitions) { + final Promise promise = Promise.promise(); + addTask( + () -> { + try { + consumer.pause(partitions); + promise.complete(); + } catch (Exception e) { + promise.fail(e); + } + }, + promise); + return promise.future(); + } + + @Override + public Future> poll(Duration timeout) { + final Promise> promise = Promise.promise(); + addTask( + () -> { + try { + ConsumerRecords records = consumer.poll(timeout); + promise.complete(records); + } catch (Exception e) { + promise.fail(e); + } + }, + promise); + return promise.future(); + } + + @Override + public Future resume(Collection partitions) { + final Promise promise = Promise.promise(); + addTask( + () -> { + try { + consumer.resume(partitions); + promise.complete(); + } catch (Exception e) { + promise.fail(e); + } + }, + promise); + return promise.future(); + } + + @Override + public Future subscribe(Collection topics) { + final Promise promise = Promise.promise(); + addTask( + () -> { + try { + consumer.subscribe(topics); + promise.complete(); + } catch (Exception e) { + promise.fail(e); + } + }, + promise); + return promise.future(); + } + + @Override + public Future subscribe(Collection topics, ConsumerRebalanceListener listener) { + final Promise promise = Promise.promise(); + addTask( + () -> { + try { + consumer.subscribe(topics, listener); + promise.complete(); + } catch (Exception e) { + promise.fail(e); + } + }, + promise); + return promise.future(); + } + + @Override + public Consumer unwrap() { + return this.consumer; + } + + @Override + public ReactiveKafkaConsumer exceptionHandler(Handler handler) { + this.exceptionHandler = handler; + return this; + } + + // functions needed for test + public int getTaskQueueSize() { + return taskQueue.size(); + } + + public boolean isTaskRunnerThreadAlive() { + return taskRunnerThread.isAlive(); + } } diff --git a/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/Main.java b/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/Main.java index 641d3fa7dd..5de6d7c707 100644 --- a/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/Main.java +++ b/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/Main.java @@ -15,8 +15,12 @@ */ package dev.knative.eventing.kafka.broker.dispatcherloom; +import dev.knative.eventing.kafka.broker.receiverloom.LoomProducerFactory; +import java.io.IOException; + public class Main { - public static void main(String[] args) { - System.out.println("Hello World!"); + public static void main(String[] args) throws IOException { + dev.knative.eventing.kafka.broker.dispatcher.main.Main.start( + args, new LoomConsumerFactory<>(), new LoomProducerFactory<>()); } } diff --git a/data-plane/dispatcher-loom/src/test/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFacortyTest.java b/data-plane/dispatcher-loom/src/test/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFacortyTest.java new file mode 100644 index 0000000000..4e3dd5263a --- /dev/null +++ b/data-plane/dispatcher-loom/src/test/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFacortyTest.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.dispatcherloom; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer; +import io.vertx.core.Vertx; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.Test; + +public class LoomConsumerFacortyTest { + + private LoomConsumerFactory consumerFactory = new LoomConsumerFactory<>(); + + @Test + public void testCreate() { + // Create Vertx instance + Vertx vertx = Vertx.vertx(); + + // Create Kafka consumer configuration + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + ReactiveKafkaConsumer loomConsumer = consumerFactory.create(vertx, configs); + + // Verify that the created consumer is an instance of LoomKafkaConsumer + assertTrue(loomConsumer instanceof LoomKafkaConsumer); + // Verify that the unwrapped KafkaConsumer is the same as the one created manually + Consumer unwrappedConsumer = loomConsumer.unwrap(); + assertNotNull(unwrappedConsumer); + } +} diff --git a/data-plane/dispatcher-loom/src/test/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumerTest.java b/data-plane/dispatcher-loom/src/test/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumerTest.java new file mode 100644 index 0000000000..a476356b11 --- /dev/null +++ b/data-plane/dispatcher-loom/src/test/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumerTest.java @@ -0,0 +1,192 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.dispatcherloom; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(VertxExtension.class) +public class LoomKafkaConsumerTest { + + private Vertx vertx; + private LoomKafkaConsumer consumer; + private MockConsumer mockConsumer; + + @BeforeEach + public void setUp() { + vertx = Vertx.vertx(); + mockConsumer = new MockConsumer(OffsetResetStrategy.LATEST); + consumer = new LoomKafkaConsumer(vertx, mockConsumer); + } + + @AfterEach + public void tearDown() { + consumer.close(); + } + + @Test + public void testTaskRunnerThreadIsStopAfterClose(VertxTestContext testContext) throws InterruptedException { + + final var checkpoints = testContext.checkpoint(); + + // add task to TaskRunnerThread + consumer.subscribe(Collections.singletonList("test-topic")); + consumer.poll(Duration.ofMillis(1000)); + consumer.commit(new HashMap<>()); + + // Verify that the task runner thread is running + assertTrue(consumer.isTaskRunnerThreadAlive()); + + // Close the consumer + consumer.close() + .onComplete(ar -> { + ar.succeeded(); + // Verify that the task runner thread has stopped + assertFalse(consumer.isTaskRunnerThreadAlive()); + checkpoints.flag(); + }) + .onFailure(testContext::failNow); + } + + @Test + public void checkTaskQueueIsEmptyAfterClose(VertxTestContext testContext) { + + final var checkpoints = testContext.checkpoint(); + + // add task to TaskQueue + consumer.subscribe(Collections.singletonList("test-topic")); + consumer.poll(Duration.ofMillis(1000)); + consumer.commit(new HashMap<>()); + + // Close the consumer + consumer.close() + .onComplete(ar -> { + ar.succeeded(); + // Verify that the task queue is empty + assertEquals(0, consumer.getTaskQueueSize()); + checkpoints.flag(); + }) + .onFailure(testContext::failNow); + } + + @Test + public void testNoTaskAddingAfterClose(VertxTestContext testContext) { + + // close the consumer + consumer.close().onFailure(testContext::failNow); + + // try to call method after close + consumer.subscribe(Collections.singletonList("test-topic")).onComplete(ar -> { + testContext.verify(() -> { + assertTrue(ar.failed()); + assertEquals("Consumer is closed", ar.cause().getMessage()); + testContext.completeNow(); + }); + }); + } + + @Test + public void testSubscribeAndPoll(VertxTestContext testContext) { + // Test data + final String topic = "test-topic"; + final Duration duration = Duration.ofMillis(1000); + + final var checkpoints = testContext.checkpoint(2); + + // Subscribe to the topic + consumer.subscribe(Collections.singletonList(topic)) + .onComplete(ar -> { + ar.succeeded(); + // verify that the consumer is subscribed to the topic + assertEquals(1, mockConsumer.subscription().size()); + assertTrue(mockConsumer.subscription().contains(topic)); + checkpoints.flag(); + }) + .onFailure(testContext::failNow); + + final var rec1 = new ConsumerRecord<>(topic, 0, 0, "key1", 1); + final var rec2 = new ConsumerRecord<>(topic, 0, 1, "key2", 2); + addRecordAndSeek(topic, mockConsumer, List.of(rec1, rec2)); + + consumer.poll(duration) + .onComplete(ar -> { + ar.succeeded(); + // Verify that the consumer has polled the records + var iter = ar.result().iterator(); + assertEquals(rec1, iter.next()); + assertEquals(rec2, iter.next()); + assertFalse(iter.hasNext()); + + checkpoints.flag(); + }) + .onFailure(testContext::failNow); + } + + @Test + public void testCommitOffsets(VertxTestContext testContext) throws InterruptedException { + // Test data + final String topic = "test-topic"; + final int partition = 0; + int offset = 123; + Map offsetMap = new ConcurrentHashMap<>(); + offsetMap.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset)); + + // Commit offsets + consumer.commit(offsetMap) + .onComplete(testContext.succeeding(result -> { + // Verify that the offsets are committed successfully + assertEquals(offsetMap, result); + testContext.completeNow(); + })) + .onFailure(testContext::failNow); + } + + private void addRecordAndSeek( + String topic, MockConsumer mockConsumer, List> records) { + // Mock consumers need to seek manually since they cannot automatically reset offsets + mockConsumer.rebalance(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1))); + + HashMap offsets = new HashMap<>(); + offsets.put(new TopicPartition(topic, 0), 0L); + offsets.put(new TopicPartition(topic, 1), 0L); + mockConsumer.updateEndOffsets(offsets); + mockConsumer.seek(new TopicPartition(topic, 0), 0); + for (ConsumerRecord record : records) { + mockConsumer.addRecord(record); + } + } +} diff --git a/data-plane/tests/pom.xml b/data-plane/tests/pom.xml index 68e8f6df73..ed68e229ad 100644 --- a/data-plane/tests/pom.xml +++ b/data-plane/tests/pom.xml @@ -30,6 +30,7 @@ 20 + --enable-preview @@ -58,7 +59,7 @@ dev.knative.eventing.kafka.broker - receiver-loom + dispatcher-loom ${project.version} diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/LoomDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/LoomDataPlaneTest.java index 1574666f1d..3a02201579 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/LoomDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/LoomDataPlaneTest.java @@ -17,7 +17,7 @@ import dev.knative.eventing.kafka.broker.core.ReactiveConsumerFactory; import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory; -import dev.knative.eventing.kafka.broker.dispatchervertx.VertxConsumerFactory; +import dev.knative.eventing.kafka.broker.dispatcherloom.LoomConsumerFactory; import dev.knative.eventing.kafka.broker.receiverloom.LoomProducerFactory; public class LoomDataPlaneTest extends AbstractDataPlaneTest { @@ -29,9 +29,6 @@ protected ReactiveProducerFactory getReactiveProducerFactory() { @Override protected ReactiveConsumerFactory getReactiveConsumerFactory() { - // for now, we don't have a loom consumer factory - // so we use the vertx consumer factory instead - // TODO: replace it with Loom Consumer Factory - return new VertxConsumerFactory<>(); + return new LoomConsumerFactory<>(); } }