Skip to content

Commit

Permalink
Try Vert.x 5
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj committed Dec 19, 2024
1 parent 35af6b7 commit ab2223f
Show file tree
Hide file tree
Showing 18 changed files with 109 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
import io.strimzi.certs.OpenSslCertManager;
import io.strimzi.operator.cluster.leaderelection.LeaderElectionManager;
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderFactory;
Expand Down Expand Up @@ -178,21 +178,20 @@ static CompositeFuture deployClusterOperatorVerticles(Vertx vertx, KubernetesCli
kafkaBridgeAssemblyOperator,
kafkaRebalanceAssemblyOperator,
resourceOperatorSupplier);
vertx.deployVerticle(operator,
res -> {
if (res.succeeded()) {
shutdownHook.register(() -> ShutdownHook.undeployVertxVerticle(vertx, res.result(), SHUTDOWN_TIMEOUT));
vertx.deployVerticle(operator).onComplete(res -> {
if (res.succeeded()) {
shutdownHook.register(() -> ShutdownHook.undeployVertxVerticle(vertx, res.result(), SHUTDOWN_TIMEOUT));

if (config.getCustomResourceSelector() != null) {
LOGGER.info("Cluster Operator verticle started in namespace {} with label selector {}", namespace, config.getCustomResourceSelector());
} else {
LOGGER.info("Cluster Operator verticle started in namespace {} without label selector", namespace);
}
if (config.getCustomResourceSelector() != null) {
LOGGER.info("Cluster Operator verticle started in namespace {} with label selector {}", namespace, config.getCustomResourceSelector());
} else {
LOGGER.error("Cluster Operator verticle in namespace {} failed to start", namespace, res.cause());
LOGGER.info("Cluster Operator verticle started in namespace {} without label selector", namespace);
}
prom.handle(res);
});
} else {
LOGGER.error("Cluster Operator verticle in namespace {} failed to start", namespace, res.cause());
}
prom.handle(res);
});
}
return Future.join(futures);
}
Expand Down Expand Up @@ -271,7 +270,7 @@ private static Future<HttpServer> startHealthServer(Vertx vertx, MetricsProvider
.end(metrics.scrape());
}
})
.listen(HEALTH_SERVER_PORT, ar -> {
.listen(HEALTH_SERVER_PORT).onComplete(ar -> {
if (ar.succeeded()) {
LOGGER.info("Health and metrics server is ready on port {})", HEALTH_SERVER_PORT);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static void shutdownVertx(Vertx vertx, long timeoutMs) {

CountDownLatch latch = new CountDownLatch(1);

vertx.close(ar -> {
vertx.close().onComplete(ar -> {
if (!ar.succeeded()) {
LOGGER.error("Vert.x close failed", ar.cause());
}
Expand Down Expand Up @@ -89,7 +89,7 @@ public static void shutdownVertx(Vertx vertx, long timeoutMs) {
public static void undeployVertxVerticle(Vertx vertx, String verticleId, long timeoutMs) {
LOGGER.info("Shutting down Vert.x verticle {}", verticleId);
CountDownLatch latch = new CountDownLatch(1);
vertx.undeploy(verticleId, ar -> {
vertx.undeploy(verticleId).onComplete(ar -> {
if (!ar.succeeded()) {
LOGGER.error("Vert.x verticle failed to undeploy", ar.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ protected final <T> Future<T> withLock(Reconciliation reconciliation, long lockT
String name = reconciliation.name();
final String lockName = getLockName(namespace, name);
LOGGER.debugCr(reconciliation, "Try to acquire lock {}", lockName);
vertx.sharedData().getLockWithTimeout(lockName, lockTimeoutMs, res -> {
vertx.sharedData().getLockWithTimeout(lockName, lockTimeoutMs).onComplete(res -> {
if (res.succeeded()) {
LOGGER.debugCr(reconciliation, "Lock {} acquired", lockName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,53 +173,52 @@ private Future<String> zookeeperLeader(Reconciliation reconciliation, Set<String
* Returns whether the given pod is the zookeeper leader.
*/
protected Future<Boolean> isLeader(Reconciliation reconciliation, String podName, NetClientOptions netClientOptions) {

Promise<Boolean> promise = Promise.promise();
String host = host(reconciliation, podName);
int port = port(podName);
LOGGER.debugCr(reconciliation, "Connecting to zookeeper on {}:{}", host, port);
vertx.createNetClient(netClientOptions)
.connect(port, host, ar -> {
if (ar.failed()) {
LOGGER.warnCr(reconciliation, "ZK {}:{}: failed to connect to zookeeper:", host, port, ar.cause().getMessage());
promise.fail(ar.cause());
} else {
LOGGER.debugCr(reconciliation, "ZK {}:{}: connected", host, port);
NetSocket socket = ar.result();
socket.exceptionHandler(ex -> {
if (!promise.tryFail(ex)) {
LOGGER.debugCr(reconciliation, "ZK {}:{}: Ignoring error, since leader status of pod {} is already known: {}",
host, port, podName, ex);
}
});
StringBuilder sb = new StringBuilder();
// We could use socket idle timeout, but this times out even if the server just responds
// very slowly
long timerId = vertx.setTimer(10_000, tid -> {
LOGGER.debugCr(reconciliation, "ZK {}:{}: Timeout waiting for Zookeeper {} to close socket",
host, port, socket.remoteAddress());
socket.close();
});
socket.closeHandler(v -> {
vertx.cancelTimer(timerId);
Matcher matcher = LEADER_MODE_PATTERN.matcher(sb);
boolean isLeader = matcher.find();
LOGGER.debugCr(reconciliation, "ZK {}:{}: {} leader", host, port, isLeader ? "is" : "is not");
if (!promise.tryComplete(isLeader)) {
LOGGER.debugCr(reconciliation, "ZK {}:{}: Ignoring leader result: Future is already complete",
host, port);
}
});
LOGGER.debugCr(reconciliation, "ZK {}:{}: upgrading to TLS", host, port);
socket.handler(buffer -> {
LOGGER.traceCr(reconciliation, "buffer: {}", buffer);
sb.append(buffer.toString());
});
LOGGER.debugCr(reconciliation, "ZK {}:{}: sending stat", host, port);
socket.write("stat");
}

});
.connect(port, host)
.onComplete(ar -> {
if (ar.failed()) {
LOGGER.warnCr(reconciliation, "ZK {}:{}: failed to connect to zookeeper:", host, port, ar.cause().getMessage());
promise.fail(ar.cause());
} else {
LOGGER.debugCr(reconciliation, "ZK {}:{}: connected", host, port);
NetSocket socket = ar.result();
socket.exceptionHandler(ex -> {
if (!promise.tryFail(ex)) {
LOGGER.debugCr(reconciliation, "ZK {}:{}: Ignoring error, since leader status of pod {} is already known: {}",
host, port, podName, ex);
}
});
StringBuilder sb = new StringBuilder();
// We could use socket idle timeout, but this times out even if the server just responds
// very slowly
long timerId = vertx.setTimer(10_000, tid -> {
LOGGER.debugCr(reconciliation, "ZK {}:{}: Timeout waiting for Zookeeper {} to close socket",
host, port, socket.remoteAddress());
socket.close();
});
socket.closeHandler(v -> {
vertx.cancelTimer(timerId);
Matcher matcher = LEADER_MODE_PATTERN.matcher(sb);
boolean isLeader = matcher.find();
LOGGER.debugCr(reconciliation, "ZK {}:{}: {} leader", host, port, isLeader ? "is" : "is not");
if (!promise.tryComplete(isLeader)) {
LOGGER.debugCr(reconciliation, "ZK {}:{}: Ignoring leader result: Future is already complete",
host, port);
}
});
LOGGER.debugCr(reconciliation, "ZK {}:{}: upgrading to TLS", host, port);
socket.handler(buffer -> {
LOGGER.traceCr(reconciliation, "buffer: {}", buffer);
sb.append(buffer.toString());
});
LOGGER.debugCr(reconciliation, "ZK {}:{}: sending stat", host, port);
socket.write("stat");
}
});

return promise.future().recover(error -> {
LOGGER.debugOp("ZK {}:{}: Error trying to determine leader ({}) => not leader", host, port, error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private void startStop(VertxTestContext context, String namespaces, boolean podS
.onComplete(context.succeeding(v -> context.verify(() -> {
assertThat("A verticle per namespace", VERTX.deploymentIDs(), hasSize(namespaceList.size()));
for (String deploymentId: VERTX.deploymentIDs()) {
VERTX.undeploy(deploymentId, asyncResult -> {
VERTX.undeploy(deploymentId).onComplete(asyncResult -> {
if (asyncResult.failed()) {
LOGGER.error("Failed to undeploy {}", deploymentId);
context.failNow(asyncResult.cause());
Expand Down Expand Up @@ -271,7 +271,7 @@ private void startStopAllNamespaces(VertxTestContext context, String namespaces,
.onComplete(context.succeeding(v -> context.verify(() -> {
assertThat("A verticle per namespace", VERTX.deploymentIDs(), hasSize(1));
for (String deploymentId: VERTX.deploymentIDs()) {
VERTX.undeploy(deploymentId, asyncResult -> {
VERTX.undeploy(deploymentId).onComplete(asyncResult -> {
if (asyncResult.failed()) {
LOGGER.error("Failed to undeploy {}", deploymentId);
context.failNow(asyncResult.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,14 @@ void startFailingMockApi(Vertx vertx) throws InterruptedException, ExecutionExce
server = httpServer.listen(0).toCompletionStage().toCompletableFuture().get();
}


@AfterEach()
void teardown() throws ExecutionException, InterruptedException {
if (server == null) {
return;
}

Promise<Void> serverStopped = Promise.promise();
server.close(x -> serverStopped.complete());
server.close().onComplete(x -> serverStopped.complete());
serverStopped.future().toCompletionStage().toCompletableFuture().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ public void testReconcileAll(VertxTestContext context) {

Promise<Void> reconcileAllPromise = Promise.promise();
((ReconcileAllMockOperator) operator).setResources(resources);
operator.reconcileAll("test", "my-namespace", reconcileAllPromise);
operator.reconcileAll("test", "my-namespace", reconcileAllPromise::handle);

Checkpoint async = context.checkpoint();
reconcileAllPromise.future().onComplete(context.succeeding(v -> context.verify(() -> {
Expand Down Expand Up @@ -416,7 +416,7 @@ public void testReconcileAllOverMultipleNamespaces(VertxTestContext context) {

Promise<Void> reconcileAllPromise = Promise.promise();
((ReconcileAllMockOperator) operator).setResources(resources);
operator.reconcileAll("test", "*", reconcileAllPromise);
operator.reconcileAll("test", "*", reconcileAllPromise::handle);

Checkpoint async = context.checkpoint();
reconcileAllPromise.future()
Expand Down Expand Up @@ -449,7 +449,7 @@ public void testReconcileAllOverMultipleNamespaces(VertxTestContext context) {
// Reconcile again with resource in my-namespace2 deleted
Promise<Void> secondReconcileAllPromise = Promise.promise();
((ReconcileAllMockOperator) operator).setResources(updatedResources);
operator.reconcileAll("test", "*", secondReconcileAllPromise);
operator.reconcileAll("test", "*", secondReconcileAllPromise::handle);

return secondReconcileAllPromise.future();
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand All @@ -44,6 +45,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

@Disabled
@ExtendWith(VertxExtension.class)
public class ZookeeperLeaderFinderTest {

Expand Down Expand Up @@ -117,7 +119,7 @@ class FakeZk {

public void stop() {
CountDownLatch countDownLatch = new CountDownLatch(1);
netServer.close(closeResult -> countDownLatch.countDown());
netServer.close().onComplete(closeResult -> countDownLatch.countDown());
try {
countDownLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -151,7 +153,7 @@ public Future<Integer> start() {
}
});
})
.listen(ar -> {
.listen().onComplete(ar -> {
if (ar.succeeded()) {
promise.complete(ar.result().actualPort());
} else {
Expand Down
3 changes: 3 additions & 0 deletions docker-images/operator/scripts/launch_java.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ function get_gc_opts {

export MALLOC_ARENA_MAX=2

# Workaround for Netty bug on systems with less than 2 CPUs
JAVA_OPTS="${JAVA_OPTS} -Dio.netty.allocator.centralQueueCapacity=2"

# Make sure that we use /dev/urandom
JAVA_OPTS="${JAVA_OPTS} -Djava.security.egd=file:/dev/./urandom"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/
package io.strimzi.operator.common.http;

import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
import io.strimzi.operator.common.MetricsProvider;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -163,7 +163,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
response.setContentType("text/plain; version=0.0.4");
response.setCharacterEncoding(StandardCharsets.UTF_8.toString());
response.setStatus(HttpServletResponse.SC_OK);
prometheusMeterRegistry.scrape(response.getWriter());
prometheusMeterRegistry.scrape(response.getOutputStream());
} else {
response.setContentType("text/plain");
response.setStatus(HttpServletResponse.SC_NOT_IMPLEMENTED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.prometheusmetrics.PrometheusConfig;
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
import io.strimzi.operator.common.MetricsProvider;
import io.strimzi.operator.common.MicrometerMetricsProvider;
import io.strimzi.test.TestUtils;
Expand Down
14 changes: 10 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@
<fasterxml.jackson-annotations.version>2.16.2</fasterxml.jackson-annotations.version>
<fasterxml.jackson-datatype.version>2.16.2</fasterxml.jackson-datatype.version>
<fasterxml.jackson-jaxrs.version>2.16.2</fasterxml.jackson-jaxrs.version>
<vertx.version>4.5.11</vertx.version>
<vertx-junit5.version>4.5.11</vertx-junit5.version>
<vertx.version>5.0.0.CR3</vertx.version>
<vertx-junit5.version>5.0.0.CR3</vertx-junit5.version>
<kafka.version>3.9.0</kafka.version>
<yammer-metrics.version>2.2.0</yammer-metrics.version>
<zookeeper.version>3.8.4</zookeeper.version>
Expand All @@ -150,8 +150,8 @@
<jetty.version>9.4.56.v20240826</jetty.version>
<javax-servlet.version>3.1.0</javax-servlet.version>
<strimzi-oauth.version>0.15.0</strimzi-oauth.version>
<netty.version>4.1.115.Final</netty.version>
<micrometer.version>1.12.3</micrometer.version>
<netty.version>4.2.0.RC1</netty.version>
<micrometer.version>1.14.2</micrometer.version>
<jayway-jsonpath.version>2.9.0</jayway-jsonpath.version>
<registry.version>1.3.2.Final</registry.version>
<commons-codec.version>1.13</commons-codec.version>
Expand Down Expand Up @@ -820,6 +820,12 @@
<version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.strimzi.systemtest.utils.kubeUtils.objects.PersistentVolumeClaimUtils;
import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils;
import io.strimzi.test.TestUtils;
import io.vertx.core.cli.annotations.Description;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -179,8 +178,8 @@ void testManualTriggeringRollingUpdate() {
}

// This test is affected by https://github.com/strimzi/strimzi-kafka-operator/issues/3913 so it needs longer operation timeout set in CO
@Description("Test for checking that overriding of bootstrap server, triggers the rolling update and verifying that" +
" new bootstrap DNS is appended inside certificate in subject alternative names property.")
//@Description("Test for checking that overriding of bootstrap server, triggers the rolling update and verifying that" +
// " new bootstrap DNS is appended inside certificate in subject alternative names property.")
@ParallelNamespaceTest
@Tag(ROLLING_UPDATE)
void testTriggerRollingUpdateAfterOverrideBootstrap() throws CertificateException {
Expand Down
Loading

0 comments on commit ab2223f

Please sign in to comment.