Skip to content

Commit

Permalink
Remove usage of Handler/AsyncResult idiom.
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Sep 12, 2024
1 parent bd4b102 commit a453f68
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
*/
package io.vertx.kafka.client.common.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.*;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;

Expand All @@ -34,9 +31,9 @@ public class CloseHandler {

private Closeable closeable;
private Runnable closeableHookCleanup;
private final BiConsumer<Long, Handler<AsyncResult<Void>>> close;
private final BiConsumer<Long, Completable<Void>> close;

public CloseHandler(BiConsumer<Long, Handler<AsyncResult<Void>>> close) {
public CloseHandler(BiConsumer<Long, Completable<Void>> close) {
this.close = close;
}

Expand Down Expand Up @@ -80,15 +77,15 @@ public synchronized void unregisterCloseHook() {

public void close() {
unregisterCloseHook();
close.accept(0L, ar -> {});
close.accept(0L, (res, err) -> {});
}

public void close(Handler<AsyncResult<Void>> completionHandler) {
public void close(Completable<Void> completionHandler) {
unregisterCloseHook();
close.accept(0L, completionHandler);
}

public void close(long timeout, Handler<AsyncResult<Void>> completionHandler) {
public void close(long timeout, Completable<Void> completionHandler) {
unregisterCloseHook();
close.accept(timeout, completionHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@

package io.vertx.kafka.client.consumer.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.*;
import io.vertx.core.internal.ContextInternal;
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.common.impl.Helper;
Expand Down Expand Up @@ -111,24 +106,24 @@ public KafkaReadStreamImpl(Vertx vertx, Consumer<K, V> consumer, KafkaClientOpti
this.tracer = ConsumerTracer.create(ctxInt.tracer(), options);
}

private <T> void start(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Handler<AsyncResult<T>> handler) {
private <T> void start(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Completable<T> handler) {
this.worker = Executors.newSingleThreadExecutor(r -> new Thread(r, "vert.x-kafka-consumer-thread-" + threadCount.getAndIncrement()));
this.submitTaskWhenStarted(task, handler);
}

private <T> void submitTaskWhenStarted(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Handler<AsyncResult<T>> handler) {
private <T> void submitTaskWhenStarted(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task, Completable<T> handler) {
if (worker == null) {
throw new IllegalStateException();
}
this.worker.submit(() -> {
Promise<T> future = null;
if (handler != null) {
future = Promise.promise();
future.future().onComplete(event-> {
future.future().onComplete((res, err)-> {
// When we've executed the task on the worker thread,
// run the callback on the eventloop thread
this.context.runOnContext(v-> {
handler.handle(event);
handler.complete(res, err);
});
});
}
Expand Down Expand Up @@ -264,7 +259,7 @@ protected <T> Future<T> submitTask2(java.util.function.BiConsumer<Consumer<K, V>
}

protected <T> void submitTask(java.util.function.BiConsumer<Consumer<K, V>, Promise<T>> task,
Handler<AsyncResult<T>> handler) {
Completable<T> handler) {
if (this.closed.compareAndSet(true, false)) {
this.start(task, handler);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,13 @@ public void testProducerFailureDrain(TestContext ctx) throws Exception {

private void testProducerDrain(TestContext ctx, RuntimeException failure) throws Exception {
TestProducer mock = new TestProducer();
KafkaWriteStream<String, String> producer = ProducerTest.producer(Vertx.vertx(), mock);
Vertx vertx = Vertx.vertx();
if (failure != null) {
vertx.exceptionHandler(err -> {
// Ignore
});
}
KafkaWriteStream<String, String> producer = ProducerTest.producer(vertx, mock);
int sent = 0;
while (!producer.writeQueueFull()) {
producer.write(new ProducerRecord<>("the_topic", 0, 0L, "abc", "def"));
Expand Down

0 comments on commit a453f68

Please sign in to comment.