Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Dispatcher loom based module Implementation #3257

Merged
3 changes: 2 additions & 1 deletion data-plane/THIRD-PARTY.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Lists of 231 third-party dependencies.
Lists of 232 third-party dependencies.
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Classic Module (ch.qos.logback:logback-classic:1.2.11 - http://logback.qos.ch/logback-classic)
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Core Module (ch.qos.logback:logback-core:1.2.11 - http://logback.qos.ch/logback-core)
(Apache License 2.0) brotli4j (com.aayushatharva.brotli4j:brotli4j:1.12.0 - https://github.com/hyperxpro/Brotli4j/brotli4j)
Expand Down Expand Up @@ -34,6 +34,7 @@ Lists of 231 third-party dependencies.
(Unknown license) contract (dev.knative.eventing.kafka.broker:contract:1.0-SNAPSHOT - no url defined)
(Unknown license) core (dev.knative.eventing.kafka.broker:core:1.0-SNAPSHOT - no url defined)
(Unknown license) dispatcher (dev.knative.eventing.kafka.broker:dispatcher:1.0-SNAPSHOT - no url defined)
(Unknown license) dispatcher-loom (dev.knative.eventing.kafka.broker:dispatcher-loom:1.0-SNAPSHOT - no url defined)
(Unknown license) dispatcher-vertx (dev.knative.eventing.kafka.broker:dispatcher-vertx:1.0-SNAPSHOT - no url defined)
(Unknown license) receiver (dev.knative.eventing.kafka.broker:receiver:1.0-SNAPSHOT - no url defined)
(Unknown license) receiver-loom (dev.knative.eventing.kafka.broker:receiver-loom:1.0-SNAPSHOT - no url defined)
Expand Down
27 changes: 27 additions & 0 deletions data-plane/dispatcher-loom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

<properties>
<java.version>20</java.version>
<argLine>--enable-preview</argLine>
debasishbsws marked this conversation as resolved.
Show resolved Hide resolved
</properties>


Expand All @@ -39,6 +40,32 @@
<artifactId>dispatcher</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>dev.knative.eventing.kafka.broker</groupId>
<artifactId>receiver-loom</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
*/
package dev.knative.eventing.kafka.broker.dispatcherloom;

public class LoomConsumerFactory<K, V> {
// TODO implement
import dev.knative.eventing.kafka.broker.core.ReactiveConsumerFactory;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer;
import io.vertx.core.Vertx;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class LoomConsumerFactory<K, V> implements ReactiveConsumerFactory<K, V> {

Check warning on line 24 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFactory.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFactory.java#L24

Added line #L24 was not covered by tests

@Override
public ReactiveKafkaConsumer<K, V> create(Vertx vertx, Map<String, Object> configs) {
return new LoomKafkaConsumer<>(vertx, new KafkaConsumer<>(configs));

Check warning on line 28 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFactory.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomConsumerFactory.java#L28

Added line #L28 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,202 @@
*/
package dev.knative.eventing.kafka.broker.dispatcherloom;

public class LoomKafkaConsumer {
// TODO implement
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoomKafkaConsumer<K, V> implements ReactiveKafkaConsumer<K, V> {

private static final Logger logger = LoggerFactory.getLogger(LoomKafkaConsumer.class);

Check warning on line 39 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L39

Added line #L39 was not covered by tests

private final Consumer<K, V> consumer;
private final BlockingQueue<Runnable> taskQueue;
private final AtomicBoolean isClosed;
private final Thread taskRunnerThread;
private Handler<Throwable> exceptionHandler;

public LoomKafkaConsumer(Vertx vertx, Consumer<K, V> consumer) {
this.consumer = consumer;
this.taskQueue = new LinkedBlockingQueue<>();
this.isClosed = new AtomicBoolean(false);

Check warning on line 50 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L47-L50

Added lines #L47 - L50 were not covered by tests

taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue);
}

Check warning on line 53 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L52-L53

Added lines #L52 - L53 were not covered by tests

private void addTask(Runnable task, Promise<?> promise) {
if (isClosed.get()) {
promise.fail("Consumer is closed");

Check warning on line 57 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L57

Added line #L57 was not covered by tests
}
taskQueue.add(task);
}

Check warning on line 60 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L59-L60

Added lines #L59 - L60 were not covered by tests

private void processTaskQueue() {
while (!isClosed.get() || !taskQueue.isEmpty()) {
try {
taskQueue.take().run();
} catch (InterruptedException e) {
logger.debug("Interrupted while waiting for task", e);
}

Check warning on line 68 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L65-L68

Added lines #L65 - L68 were not covered by tests
}
}

Check warning on line 70 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L70

Added line #L70 was not covered by tests

@Override
public Future<Map<TopicPartition, OffsetAndMetadata>> commit(Map<TopicPartition, OffsetAndMetadata> offset) {
final Promise<Map<TopicPartition, OffsetAndMetadata>> promise = Promise.promise();
addTask(

Check warning on line 75 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L74-L75

Added lines #L74 - L75 were not covered by tests
() -> {
consumer.commitAsync(offset, (offsetMap, exception) -> {

Check warning on line 77 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L77

Added line #L77 was not covered by tests
if (exception != null) {
promise.fail(exception);

Check warning on line 79 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L79

Added line #L79 was not covered by tests
} else {
promise.complete(offsetMap);

Check warning on line 81 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L81

Added line #L81 was not covered by tests
}
});
},

Check warning on line 84 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L83-L84

Added lines #L83 - L84 were not covered by tests
promise);
return promise.future();

Check warning on line 86 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L86

Added line #L86 was not covered by tests
}

@Override
public Future<Void> close() {
final Promise<Void> promise = Promise.promise();
isClosed.set(true);
taskQueue.add(() -> {

Check warning on line 93 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L91-L93

Added lines #L91 - L93 were not covered by tests
try {
consumer.close();
} catch (Exception e) {
promise.fail(e);
}
});

Check warning on line 99 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L95-L99

Added lines #L95 - L99 were not covered by tests

Thread.ofVirtual().start(() -> {

Check warning on line 101 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L101

Added line #L101 was not covered by tests
try {
while (!taskQueue.isEmpty()) {
Thread.sleep(2000L);

Check warning on line 104 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L104

Added line #L104 was not covered by tests
}
taskRunnerThread.interrupt();
taskRunnerThread.join();
promise.complete();
} catch (InterruptedException e) {
logger.debug("Interrupted while waiting for taskRunnerThread to finish", e);
promise.fail(e);
}
});
return promise.future();

Check warning on line 114 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L106-L114

Added lines #L106 - L114 were not covered by tests
}

@Override
public Future<Void> pause(Collection<TopicPartition> partitions) {
final Promise<Void> promise = Promise.promise();
addTask(

Check warning on line 120 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L119-L120

Added lines #L119 - L120 were not covered by tests
() -> {
try {
consumer.pause(partitions);
promise.complete();
} catch (Exception e) {
promise.fail(e);
}
},

Check warning on line 128 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L123-L128

Added lines #L123 - L128 were not covered by tests
promise);
return promise.future();

Check warning on line 130 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L130

Added line #L130 was not covered by tests
}

@Override
public Future<ConsumerRecords<K, V>> poll(Duration timeout) {
final Promise<ConsumerRecords<K, V>> promise = Promise.promise();
addTask(

Check warning on line 136 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L135-L136

Added lines #L135 - L136 were not covered by tests
() -> {
try {
ConsumerRecords<K, V> records = consumer.poll(timeout);
promise.complete(records);
} catch (Exception e) {
promise.fail(e);
}
},

Check warning on line 144 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L139-L144

Added lines #L139 - L144 were not covered by tests
promise);
return promise.future();

Check warning on line 146 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L146

Added line #L146 was not covered by tests
}

@Override
public Future<Void> resume(Collection<TopicPartition> partitions) {
final Promise<Void> promise = Promise.promise();
addTask(

Check warning on line 152 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L151-L152

Added lines #L151 - L152 were not covered by tests
() -> {
try {
consumer.resume(partitions);
promise.complete();
} catch (Exception e) {
promise.fail(e);
}
},

Check warning on line 160 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L155-L160

Added lines #L155 - L160 were not covered by tests
promise);
return promise.future();

Check warning on line 162 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L162

Added line #L162 was not covered by tests
}

@Override
public Future<Void> subscribe(Collection<String> topics) {
final Promise<Void> promise = Promise.promise();
addTask(

Check warning on line 168 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L167-L168

Added lines #L167 - L168 were not covered by tests
() -> {
try {
consumer.subscribe(topics);
promise.complete();
} catch (Exception e) {
promise.fail(e);
}
},

Check warning on line 176 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L171-L176

Added lines #L171 - L176 were not covered by tests
promise);
return promise.future();

Check warning on line 178 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L178

Added line #L178 was not covered by tests
}

@Override
public Future<Void> subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
final Promise<Void> promise = Promise.promise();
addTask(

Check warning on line 184 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L183-L184

Added lines #L183 - L184 were not covered by tests
() -> {
try {
consumer.subscribe(topics, listener);
promise.complete();
} catch (Exception e) {
promise.fail(e);
}
},

Check warning on line 192 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L187-L192

Added lines #L187 - L192 were not covered by tests
promise);
return promise.future();

Check warning on line 194 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L194

Added line #L194 was not covered by tests
}

@Override
public Consumer<K, V> unwrap() {
return this.consumer;

Check warning on line 199 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L199

Added line #L199 was not covered by tests
}

@Override
public ReactiveKafkaConsumer<K, V> exceptionHandler(Handler<Throwable> handler) {
this.exceptionHandler = handler;
return this;

Check warning on line 205 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L204-L205

Added lines #L204 - L205 were not covered by tests
}

// functions needed for test
public int getTaskQueueSize() {
return taskQueue.size();

Check warning on line 210 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L210

Added line #L210 was not covered by tests
}

public boolean isTaskRunnerThreadAlive() {
return taskRunnerThread.isAlive();

Check warning on line 214 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java#L214

Added line #L214 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
*/
package dev.knative.eventing.kafka.broker.dispatcherloom;

import dev.knative.eventing.kafka.broker.receiverloom.LoomProducerFactory;
import java.io.IOException;

public class Main {
public static void main(String[] args) {
System.out.println("Hello World!");
public static void main(String[] args) throws IOException {
dev.knative.eventing.kafka.broker.dispatcher.main.Main.start(

Check warning on line 23 in data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/Main.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/Main.java#L23

Added line #L23 was not covered by tests
args, new LoomConsumerFactory<>(), new LoomProducerFactory<>());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright © 2018 Knative Authors ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcherloom;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;

public class LoomConsumerFacortyTest {

private LoomConsumerFactory<String, String> consumerFactory = new LoomConsumerFactory<>();

@Test
public void testCreate() {
// Create Vertx instance
Vertx vertx = Vertx.vertx();

// Create Kafka consumer configuration
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

ReactiveKafkaConsumer<String, String> loomConsumer = consumerFactory.create(vertx, configs);

// Verify that the created consumer is an instance of LoomKafkaConsumer
assertTrue(loomConsumer instanceof LoomKafkaConsumer);
// Verify that the unwrapped KafkaConsumer is the same as the one created manually
Consumer<String, String> unwrappedConsumer = loomConsumer.unwrap();
assertNotNull(unwrappedConsumer);
}
}
Loading
Loading