diff --git a/src/main/java/io/vertx/kafka/admin/impl/KafkaAdminClientImpl.java b/src/main/java/io/vertx/kafka/admin/impl/KafkaAdminClientImpl.java index b5d06d7f..7fa32970 100644 --- a/src/main/java/io/vertx/kafka/admin/impl/KafkaAdminClientImpl.java +++ b/src/main/java/io/vertx/kafka/admin/impl/KafkaAdminClientImpl.java @@ -674,13 +674,13 @@ public Future close() { @Override public Future close(long timeout) { - return vertx.executeBlocking(prom -> { + return vertx.executeBlocking(() -> { if (timeout > 0) { adminClient.close(Duration.ofMillis(timeout)); } else { adminClient.close(); } - prom.complete(); + return null; }); } } diff --git a/src/main/java/io/vertx/kafka/client/producer/impl/KafkaProducerImpl.java b/src/main/java/io/vertx/kafka/client/producer/impl/KafkaProducerImpl.java index 4080b00f..5917b5eb 100644 --- a/src/main/java/io/vertx/kafka/client/producer/impl/KafkaProducerImpl.java +++ b/src/main/java/io/vertx/kafka/client/producer/impl/KafkaProducerImpl.java @@ -84,9 +84,9 @@ private static KafkaProducer createShared(Vertx vertx, String name, CloseFuture closeFuture = new CloseFuture(); Producer s = ((VertxInternal)vertx).createSharedResource("__vertx.shared.kafka.producer", name, closeFuture, cf -> { Producer producer = streamFactory.get().unwrap(); - cf.add(completion -> vertx.executeBlocking(p -> { + cf.add(completion -> vertx.executeBlocking(() -> { producer.close(); - p.complete(); + return null; }).onComplete(completion)); return producer; }); diff --git a/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java b/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java index 559af232..2214d371 100644 --- a/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java +++ b/src/main/java/io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.java @@ -74,7 +74,8 @@ public Future send(ProducerRecord record) { ProducerTracer.StartedSpan startedSpan = this.tracer == null ? null : this.tracer.prepareSendMessage(ctx, record); int len = this.len(record.value()); this.pending += len; - return ctx.executeBlocking(prom -> { + return ctx.executeBlocking(() -> { + Promise prom = ctx.promise(); try { this.producer.send(record, (metadata, err) -> { @@ -125,7 +126,9 @@ public Future send(ProducerRecord record) { } prom.fail(e); } - }, taskQueue); + return prom.future(); + }, taskQueue) + .compose(f -> f); } @Override @@ -186,19 +189,15 @@ public KafkaWriteStreamImpl exceptionHandler(Handler handler) { @Override public Future> partitionsFor(String topic) { ContextInternal ctx = vertx.getOrCreateContext(); - return ctx.>executeBlocking(prom -> { - prom.complete( - this.producer.partitionsFor(topic) - ); - }, taskQueue); + return ctx.executeBlocking(() -> this.producer.partitionsFor(topic), taskQueue); } @Override public Future flush() { ContextInternal ctx = vertx.getOrCreateContext(); - return ctx.executeBlocking(prom -> { + return ctx.executeBlocking(() -> { this.producer.flush(); - prom.complete(); + return null; }, taskQueue); } @@ -210,14 +209,13 @@ public Future close() { @Override public Future close(long timeout) { ContextInternal ctx = vertx.getOrCreateContext(); - Promise trampolineProm = ctx.promise(); - return ctx.executeBlocking(prom -> { + return ctx.executeBlocking(() -> { if (timeout > 0) { this.producer.close(Duration.ofMillis(timeout)); } else { this.producer.close(); } - prom.complete(); + return null; }, taskQueue); } @@ -228,13 +226,9 @@ public Producer unwrap() { Future executeBlocking(final BlockingStatement statement) { ContextInternal ctx = vertx.getOrCreateContext(); - return ctx.executeBlocking(promise -> { - try { - statement.execute(); - promise.complete(); - } catch (Exception e) { - promise.fail(e); - } + return ctx.executeBlocking(() -> { + statement.execute(); + return null; }, taskQueue); }