Skip to content

Commit

Permalink
Update to executeBlocking API change
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jul 19, 2023
1 parent 2a2ee80 commit a0e349f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -674,13 +674,13 @@ public Future<Void> close() {

@Override
public Future<Void> close(long timeout) {
return vertx.executeBlocking(prom -> {
return vertx.executeBlocking(() -> {
if (timeout > 0) {
adminClient.close(Duration.ofMillis(timeout));
} else {
adminClient.close();
}
prom.complete();
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ private static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name,
CloseFuture closeFuture = new CloseFuture();
Producer<K, V> s = ((VertxInternal)vertx).createSharedResource("__vertx.shared.kafka.producer", name, closeFuture, cf -> {
Producer<K, V> producer = streamFactory.get().unwrap();
cf.add(completion -> vertx.<Void>executeBlocking(p -> {
cf.add(completion -> vertx.<Void>executeBlocking(() -> {
producer.close();
p.complete();
return null;
}).onComplete(completion));
return producer;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
ProducerTracer.StartedSpan startedSpan = this.tracer == null ? null : this.tracer.prepareSendMessage(ctx, record);
int len = this.len(record.value());
this.pending += len;
return ctx.executeBlocking(prom -> {
return ctx.executeBlocking(() -> {
Promise<RecordMetadata> prom = ctx.promise();
try {
this.producer.send(record, (metadata, err) -> {

Expand Down Expand Up @@ -125,7 +126,9 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
}
prom.fail(e);
}
}, taskQueue);
return prom.future();
}, taskQueue)
.compose(f -> f);
}

@Override
Expand Down Expand Up @@ -186,19 +189,15 @@ public KafkaWriteStreamImpl<K, V> exceptionHandler(Handler<Throwable> handler) {
@Override
public Future<List<PartitionInfo>> partitionsFor(String topic) {
ContextInternal ctx = vertx.getOrCreateContext();
return ctx.<List<PartitionInfo>>executeBlocking(prom -> {
prom.complete(
this.producer.partitionsFor(topic)
);
}, taskQueue);
return ctx.executeBlocking(() -> this.producer.partitionsFor(topic), taskQueue);
}

@Override
public Future<Void> flush() {
ContextInternal ctx = vertx.getOrCreateContext();
return ctx.<Void>executeBlocking(prom -> {
return ctx.executeBlocking(() -> {
this.producer.flush();
prom.complete();
return null;
}, taskQueue);
}

Expand All @@ -210,14 +209,13 @@ public Future<Void> close() {
@Override
public Future<Void> close(long timeout) {
ContextInternal ctx = vertx.getOrCreateContext();
Promise<Void> trampolineProm = ctx.promise();
return ctx.executeBlocking(prom -> {
return ctx.executeBlocking(() -> {
if (timeout > 0) {
this.producer.close(Duration.ofMillis(timeout));
} else {
this.producer.close();
}
prom.complete();
return null;
}, taskQueue);
}

Expand All @@ -228,13 +226,9 @@ public Producer<K, V> unwrap() {

Future<Void> executeBlocking(final BlockingStatement statement) {
ContextInternal ctx = vertx.getOrCreateContext();
return ctx.executeBlocking(promise -> {
try {
statement.execute();
promise.complete();
} catch (Exception e) {
promise.fail(e);
}
return ctx.executeBlocking(() -> {
statement.execute();
return null;
}, taskQueue);
}

Expand Down

0 comments on commit a0e349f

Please sign in to comment.