Skip to content

Commit

Permalink
feat: add timeout for deployments
Browse files Browse the repository at this point in the history
  • Loading branch information
kristian committed Nov 17, 2023
1 parent 3593b49 commit d843b10
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 72 deletions.
30 changes: 30 additions & 0 deletions src/generated/java/io/neonbee/config/NeonBeeConfigConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ public class NeonBeeConfigConverter {
static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NeonBeeConfig obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "deploymentTimeout":
if (member.getValue() instanceof Number) {
obj.setDeploymentTimeout(((Number) member.getValue()).intValue());
}
break;
case "eventBusCodecs":
if (member.getValue() instanceof JsonObject) {
java.util.Map<String, java.lang.String> map = new java.util.LinkedHashMap<>();
Expand Down Expand Up @@ -62,6 +67,16 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NeonBee
obj.setMicrometerRegistries(list);
}
break;
case "modelsDeploymentTimeout":
if (member.getValue() instanceof Number) {
obj.setModelsDeploymentTimeout(((Number) member.getValue()).intValue());
}
break;
case "moduleDeploymentTimeout":
if (member.getValue() instanceof Number) {
obj.setModuleDeploymentTimeout(((Number) member.getValue()).intValue());
}
break;
case "platformClasses":
if (member.getValue() instanceof JsonArray) {
java.util.ArrayList<java.lang.String> list = new java.util.ArrayList<>();
Expand All @@ -82,6 +97,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NeonBee
obj.setTrackingDataHandlingStrategy((String) member.getValue());
}
break;
case "verticleDeploymentTimeout":
if (member.getValue() instanceof Number) {
obj.setVerticleDeploymentTimeout(((Number) member.getValue()).intValue());
}
break;
}
}
}
Expand All @@ -91,6 +111,7 @@ static void toJson(NeonBeeConfig obj, JsonObject json) {
}

static void toJson(NeonBeeConfig obj, java.util.Map<String, Object> json) {
json.put("deploymentTimeout", obj.getDeploymentTimeout());
if (obj.getEventBusCodecs() != null) {
JsonObject map = new JsonObject();
obj.getEventBusCodecs().forEach((key, value) -> map.put(key, value));
Expand All @@ -109,6 +130,12 @@ static void toJson(NeonBeeConfig obj, java.util.Map<String, Object> json) {
obj.getMicrometerRegistries().forEach(item -> array.add(item.toJson()));
json.put("micrometerRegistries", array);
}
if (obj.getModelsDeploymentTimeout() != null) {
json.put("modelsDeploymentTimeout", obj.getModelsDeploymentTimeout());
}
if (obj.getModuleDeploymentTimeout() != null) {
json.put("moduleDeploymentTimeout", obj.getModuleDeploymentTimeout());
}
if (obj.getPlatformClasses() != null) {
JsonArray array = new JsonArray();
obj.getPlatformClasses().forEach(item -> array.add(item));
Expand All @@ -120,5 +147,8 @@ static void toJson(NeonBeeConfig obj, java.util.Map<String, Object> json) {
if (obj.getTrackingDataHandlingStrategy() != null) {
json.put("trackingDataHandlingStrategy", obj.getTrackingDataHandlingStrategy());
}
if (obj.getVerticleDeploymentTimeout() != null) {
json.put("verticleDeploymentTimeout", obj.getVerticleDeploymentTimeout());
}
}
}
129 changes: 129 additions & 0 deletions src/main/java/io/neonbee/config/NeonBeeConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import static io.neonbee.internal.helper.ConfigHelper.notFound;
import static io.neonbee.internal.helper.ConfigHelper.readConfig;
import static io.neonbee.internal.helper.ConfigHelper.rephraseConfigNames;
import static io.neonbee.internal.helper.StringHelper.EMPTY;
import static io.vertx.core.Future.future;
import static io.vertx.core.Future.succeededFuture;

import java.lang.reflect.InvocationTargetException;
import java.nio.file.Path;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import com.fasterxml.jackson.core.StreamReadConstraints;
Expand Down Expand Up @@ -42,6 +45,11 @@ public class NeonBeeConfig {
*/
public static final int DEFAULT_EVENT_BUS_TIMEOUT = 30;

/**
* The default timeout for a deployment to finish.
*/
public static final int DEFAULT_DEPLOYMENT_TIMEOUT = 30;

/**
* The default tracking data handling strategy.
*/
Expand All @@ -57,6 +65,14 @@ public class NeonBeeConfig {

private int eventBusTimeout = DEFAULT_EVENT_BUS_TIMEOUT;

private int deploymentTimeout = DEFAULT_DEPLOYMENT_TIMEOUT;

private Integer modelsDeploymentTimeout;

private Integer moduleDeploymentTimeout;

private Integer verticleDeploymentTimeout;

private Map<String, String> eventBusCodecs = Map.of();

private String trackingDataHandlingStrategy = DEFAULT_TRACKING_DATA_HANDLING_STRATEGY;
Expand Down Expand Up @@ -231,6 +247,119 @@ public NeonBeeConfig setEventBusTimeout(int eventBusTimeout) {
return this;
}

/**
* Returns the general deployment timeout for an individual deployment of any type in seconds. If unset / equal or
* smaller than 0, no timeout applies to the deployment.
*
* @return the deployment timeout in seconds
*/
public int getDeploymentTimeout() {
return deploymentTimeout;
}

/**
* Returns the deployment timeout for a given deployment type, or the general {@link #getDeploymentTimeout()} if the
* deployment timeout for a given type is unset / equal or smaller than zero or an unrecognized type / {@code null}
* was passed.
*
* @param deploymentType the type of the deployment, e.g. modules, module, verticle or {@code null}
* @return the individual or general deployment timeout in seconds
*/
public int getDeploymentTimeout(String deploymentType) {
switch (Optional.ofNullable(deploymentType).map(type -> type.toLowerCase(Locale.ROOT)).orElse(EMPTY)) {
case "models":
return getModelsDeploymentTimeout();
case "module":
return getModuleDeploymentTimeout();
case "verticle":
return getVerticleDeploymentTimeout();
default:
return getDeploymentTimeout();
}
}

/**
* Set the general deployment timeout for an individual deployment of any type in seconds. If equal or smaller than
* 0, no timeout applies to the deployment.
*
* @param deploymentTimeout the deployment timeout in seconds
* @return the {@linkplain NeonBeeConfig} for fluent use
*/
@Fluent
public NeonBeeConfig setDeploymentTimeout(int deploymentTimeout) {
this.deploymentTimeout = deploymentTimeout;
return this;
}

/**
* Returns the deployment timeout of an individual models deployment in seconds. If unset the general
* {@link #getDeploymentTimeout()} is returned.
*
* @return the deployment timeout in seconds
*/
public Integer getModelsDeploymentTimeout() {
return modelsDeploymentTimeout != null ? modelsDeploymentTimeout : getDeploymentTimeout();
}

/**
* Set the deployment timeout of an individual models deployment in seconds. If equal or smaller than 0, no timeout
* applies to the deployment. If set to {@code null} the general {@link #getDeploymentTimeout()} applies.
*
* @param modelsDeploymentTimeout the deployment timeout in seconds or {@code null}
* @return the {@linkplain NeonBeeConfig} for fluent use
*/
@Fluent
public NeonBeeConfig setModelsDeploymentTimeout(Integer modelsDeploymentTimeout) {
this.modelsDeploymentTimeout = modelsDeploymentTimeout;
return this;
}

/**
* Returns the deployment timeout of an individual module deployment in seconds. If unset the general
* {@link #getDeploymentTimeout()} is returned.
*
* @return the deployment timeout in seconds
*/
public Integer getModuleDeploymentTimeout() {
return moduleDeploymentTimeout != null ? moduleDeploymentTimeout : getDeploymentTimeout();
}

/**
* Set the deployment timeout of an individual module deployment in seconds. If equal or smaller than 0, no timeout
* applies to the deployment. If set to {@code null} the general {@link #getDeploymentTimeout()} applies.
*
* @param moduleDeploymentTimeout the deployment timeout in seconds or {@code null}
* @return the {@linkplain NeonBeeConfig} for fluent use
*/
@Fluent
public NeonBeeConfig setModuleDeploymentTimeout(Integer moduleDeploymentTimeout) {
this.moduleDeploymentTimeout = moduleDeploymentTimeout;
return this;
}

/**
* Returns the deployment timeout of an individual verticle deployment in seconds. If unset the general
* {@link #getDeploymentTimeout()} is returned.
*
* @return the deployment timeout in seconds
*/
public Integer getVerticleDeploymentTimeout() {
return verticleDeploymentTimeout != null ? verticleDeploymentTimeout : getDeploymentTimeout();
}

/**
* Set the deployment timeout of an individual verticle deployment in seconds. If equal or smaller than 0, no
* timeout applies to the deployment. If set to {@code null} the general {@link #getDeploymentTimeout()} applies.
*
* @param verticleDeploymentTimeout the deployment timeout in seconds or {@code null}
* @return the {@linkplain NeonBeeConfig} for fluent use
*/
@Fluent
public NeonBeeConfig setVerticleDeploymentTimeout(Integer verticleDeploymentTimeout) {
this.verticleDeploymentTimeout = verticleDeploymentTimeout;
return this;
}

/**
* Gets a list of default codecs to register on the event bus.
* <p>
Expand Down
20 changes: 19 additions & 1 deletion src/main/java/io/neonbee/internal/deploy/PendingDeployment.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.FutureInternal;
import io.vertx.core.impl.future.Listener;
Expand All @@ -26,7 +28,23 @@ public abstract class PendingDeployment extends Deployment implements FutureInte

LOGGER.info("Started deployment of {} ...", deployable);

this.deployFuture = deployFuture.map(deploymentId -> {
Future<String> timeoutFuture = deployFuture;
int timeout = neonBee.getConfig().getDeploymentTimeout(deployable.getType());
if (timeout > 0) {
Vertx vertx = neonBee.getVertx();
Promise<String> timeoutPromise = Promise.promise();
// fail the promise after the timeout is expired
long timerId = vertx.setTimer(TimeUnit.SECONDS.toMillis(timeout), nothing -> {
timeoutPromise.fail("Deployment timed-out after " + timeout + " seconds");
});
// in case the deployment finished, it completes the promise and we can cancel the timer
deployFuture.onComplete(timeoutPromise).onComplete(deploymentId -> {
vertx.cancelTimer(timerId);
});
timeoutFuture = timeoutPromise.future();
}

this.deployFuture = timeoutFuture.map(deploymentId -> {
// in case a deployment doesn't want to specify a own deployment ID, generate one based on the hash code of
// the pending deployment (thus all deployables, might just return an empty future)
return deploymentId != null ? deploymentId : super.getDeploymentId();
Expand Down
27 changes: 15 additions & 12 deletions src/test/java/io/neonbee/internal/deploy/DeployableModelsTest.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package io.neonbee.internal.deploy;

import static com.google.common.truth.Truth.assertThat;
import static io.neonbee.NeonBeeMockHelper.defaultVertxMock;
import static io.neonbee.NeonBeeMockHelper.registerNeonBeeMock;
import static io.neonbee.internal.deploy.DeploymentTest.newNeonBeeMockForDeployment;
import static io.vertx.core.Future.failedFuture;
import static io.vertx.core.Future.succeededFuture;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -58,13 +57,12 @@ void testDeployUndeploy() throws NoSuchFieldException, IllegalAccessException {
Map.of("okay", new JsonObject().put("namespace", "test").toBuffer().getBytes()), Map.of());
DeployableModels deployable = new DeployableModels(definition);

Vertx vertxMock = defaultVertxMock();
NeonBee neonBee = registerNeonBeeMock(vertxMock, new NeonBeeOptions.Mutable().setIgnoreClassPath(true));
NeonBee neonBeeMock = newNeonBeeMockForDeployment(new NeonBeeOptions.Mutable().setIgnoreClassPath(true));

PendingDeployment deployment = deployable.deploy(neonBee);
PendingDeployment deployment = deployable.deploy(neonBeeMock);
assertThat(deployment.succeeded()).isTrue();
Set<EntityModelDefinition> definitions =
ReflectionHelper.getValueOfPrivateField(neonBee.getModelManager(), "externalModelDefinitions");
ReflectionHelper.getValueOfPrivateField(neonBeeMock.getModelManager(), "externalModelDefinitions");
assertThat(definitions).contains(definition);

assertThat(deployment.undeploy().succeeded()).isTrue();
Expand All @@ -77,11 +75,11 @@ void testDeployFailed() {
EntityModelDefinition definition = new EntityModelDefinition(Map.of(), Map.of());
DeployableModels deployable = new DeployableModels(definition);

Vertx vertxMock = defaultVertxMock();
NeonBee neonBeeMock = newNeonBeeMockForDeployment(new NeonBeeOptions.Mutable().setIgnoreClassPath(true));
Vertx vertxMock = neonBeeMock.getVertx();
when(vertxMock.fileSystem().readDir(any())).thenReturn(failedFuture("any failure"));
NeonBee neonBee = registerNeonBeeMock(vertxMock, new NeonBeeOptions.Mutable().setIgnoreClassPath(true));

PendingDeployment deployment = deployable.deploy(neonBee);
PendingDeployment deployment = deployable.deploy(neonBeeMock);
assertThat(deployment.failed()).isTrue();
assertThat(deployment.cause()).hasMessageThat().isEqualTo("any failure");
assertThat(deployment.undeploy().succeeded()).isTrue();
Expand All @@ -90,7 +88,8 @@ void testDeployFailed() {
@Test
@DisplayName("test read model payloads")
void testReadModelPayloads() {
Vertx vertxMock = defaultVertxMock();
NeonBee neonBeeMock = newNeonBeeMockForDeployment();
Vertx vertxMock = neonBeeMock.getVertx();

ClassLoader classLoaderMock = mock(ClassLoader.class);
when(classLoaderMock.getResourceAsStream(any())).thenAnswer(invocation -> {
Expand All @@ -106,7 +105,8 @@ void testReadModelPayloads() {
@Test
@DisplayName("test scan class path")
void testScanClassPath() {
Vertx vertxMock = defaultVertxMock();
NeonBee neonBeeMock = newNeonBeeMockForDeployment();
Vertx vertxMock = neonBeeMock.getVertx();

ClassPathScanner classPathScannerMock = mock(ClassPathScanner.class);
when(classPathScannerMock.scanManifestFiles(any(), any())).thenReturn(succeededFuture(List.of("entry")));
Expand All @@ -129,8 +129,11 @@ void testScanClassPath() {
@Test
@DisplayName("test from JAR")
void testFromJar() throws IOException {
NeonBee neonBeeMock = newNeonBeeMockForDeployment();
Vertx vertxMock = neonBeeMock.getVertx();

NeonBeeModuleJar moduleJar = NeonBeeModuleJar.create("testmodule").withModels().build();
Future<DeployableModels> deployable = DeployableModels.fromJar(defaultVertxMock(), moduleJar.writeToTempPath());
Future<DeployableModels> deployable = DeployableModels.fromJar(vertxMock, moduleJar.writeToTempPath());
assertThat(deployable.succeeded()).isTrue();
assertThat(deployable.result().modelDefinition.getCSNModelDefinitions())
.comparingValuesUsing(Correspondence.<byte[], byte[]>from(Arrays::equals, "is not equal to"))
Expand Down
Loading

0 comments on commit d843b10

Please sign in to comment.