From 8edcf9c799b5725b274b7f091a32595b5c4be51a Mon Sep 17 00:00:00 2001 From: Michael Halberstadt Date: Mon, 28 Oct 2024 13:22:35 +0100 Subject: [PATCH 1/2] fix: improve WriteSafeRegistry lock method If the supplier of the lock method fails with an exception, the lock is not released. --- .../neonbee/internal/WriteSafeRegistry.java | 11 ++- .../internal/WriteSafeRegistryTest.java | 95 +++++++++++++++++++ 2 files changed, 101 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java index dcd3a3fb..e8cf2f80 100644 --- a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java +++ b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java @@ -63,14 +63,15 @@ public Future get(String sharedMapKey) { * @param futureSupplier supplier for the future to be secured by the lock * @return the futureSupplier */ - private Future lock(String sharedMapKey, Supplier> futureSupplier) { + protected Future lock(String sharedMapKey, Supplier> futureSupplier) { logger.debug("Get lock for {}", sharedMapKey); return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> { logger.error("Error acquiring lock for {}", sharedMapKey, throwable); - }).compose(lock -> futureSupplier.get().onComplete(anyResult -> { - logger.debug("Releasing lock for {}", sharedMapKey); - lock.release(); - })); + }).compose(lock -> Future.future(event -> futureSupplier.get().onComplete(event)) + .onComplete(anyResult -> { + logger.debug("Releasing lock for {}", sharedMapKey); + lock.release(); + })); } private Future addValue(String sharedMapKey, Object value) { diff --git a/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java b/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java index c232e95a..25ddeafe 100644 --- a/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java +++ b/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java @@ -2,12 +2,18 @@ import static com.google.common.truth.Truth.assertThat; +import java.util.concurrent.TimeUnit; + import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import io.vertx.core.Future; import io.vertx.core.Vertx; +import io.vertx.core.impl.NoStackTraceThrowable; import io.vertx.core.json.JsonArray; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.Timeout; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; @@ -58,4 +64,93 @@ void get(Vertx vertx, VertxTestContext context) { context.completeNow(); })).onFailure(context::failNow); } + + @Test + @DisplayName("test lock method") + void lock(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(4); + String lockedname = "test-lock-key1"; + + // execute the lockTest twice to make sure that you can acquire the lock multiple times + lockTest(lockedname, context, registry, checkpoints) + .compose(unused -> lockTest(lockedname, context, registry, checkpoints)); + } + + @Test + // The used timeout for the lock is set to 10 seconds + // @see io.vertx.core.shareddata.impl.SharedDataImpl#DEFAULT_LOCK_TIMEOUT + @Timeout(value = 12, timeUnit = TimeUnit.SECONDS) + @DisplayName("test acquire lock twice") + void acquireLockTwice(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(5); + + String lockedname = "test-lock-key2"; + registry.lock(lockedname, () -> { + checkpoints.flag(); + // try to acquire the lock to make sure that it is locked + return registry.lock(lockedname, () -> { + context.failNow("should not be called because lock cannot be acquired"); + return Future.succeededFuture(); + }) + .onSuccess(unused -> context.failNow("should not be successful")) + .onFailure(cause -> context.verify(() -> { + assertThat(cause).isInstanceOf(NoStackTraceThrowable.class); + assertThat(cause).hasMessageThat().isEqualTo("Timed out waiting to get lock"); + checkpoints.flag(); + })) + .recover(throwable -> Future.succeededFuture()); + }) + .onSuccess(unused -> checkpoints.flag()) + .onFailure(context::failNow) + // execute the lockTest to make sure that you can acquire the lock again + .compose(unused -> lockTest(lockedname, context, registry, checkpoints)); + } + + private static Future lockTest(String lockName, VertxTestContext context, WriteSafeRegistry registry, + Checkpoint checkpoints) { + return registry.lock(lockName, () -> { + checkpoints.flag(); + return Future.succeededFuture(); + }) + .onSuccess(unused -> checkpoints.flag()) + .onFailure(context::failNow); + } + + @Test + @DisplayName("test lock supplier retuning null") + void lockNPE(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(2); + + registry.lock("lockNPE", () -> { + checkpoints.flag(); + return null; + }).onSuccess(unused -> context.failNow("should not be successful")) + .onFailure(cause -> context.verify(() -> { + assertThat(cause).isInstanceOf(NullPointerException.class); + checkpoints.flag(); + })); + } + + @Test + @DisplayName("test lock supplier throws exception") + void lockISE(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(2); + + registry.lock("lockISE", () -> { + checkpoints.flag(); + throw new IllegalStateException("Illegal state"); + }).onSuccess(unused -> context.failNow("should not be successful")) + .onFailure(cause -> context.verify(() -> { + assertThat(cause).isInstanceOf(IllegalStateException.class); + checkpoints.flag(); + })); + } } From 2a4ec009cfc3a70cfe70ba561854104eecde7939 Mon Sep 17 00:00:00 2001 From: Michael Halberstadt Date: Wed, 30 Oct 2024 12:22:39 +0100 Subject: [PATCH 2/2] refactor: extract SharedRegistry class --- .../io/neonbee/internal/SharedRegistry.java | 118 ++++++++++++++++++ .../neonbee/internal/WriteSafeRegistry.java | 106 +++++++--------- 2 files changed, 166 insertions(+), 58 deletions(-) create mode 100644 src/main/java/io/neonbee/internal/SharedRegistry.java diff --git a/src/main/java/io/neonbee/internal/SharedRegistry.java b/src/main/java/io/neonbee/internal/SharedRegistry.java new file mode 100644 index 00000000..dd841e38 --- /dev/null +++ b/src/main/java/io/neonbee/internal/SharedRegistry.java @@ -0,0 +1,118 @@ +package io.neonbee.internal; + +import static io.vertx.core.Future.succeededFuture; + +import io.neonbee.NeonBee; +import io.neonbee.logging.LoggingFacade; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.SharedData; + +/** + * A registry to manage values in the {@link SharedData} shared map. + *

+ * This class is a generic implementation of a registry that can be used to store values in a shared map. + * + * @param the type of the value to store in the shared registry + */ +public class SharedRegistry implements Registry { + + private final LoggingFacade logger = LoggingFacade.create(); + + private final SharedData sharedData; + + private final String registryName; + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + */ + public SharedRegistry(Vertx vertx, String registryName) { + this(registryName, new SharedDataAccessor(vertx, SharedRegistry.class)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param registryName the name of the map registry + * @param sharedData the shared data + */ + public SharedRegistry(String registryName, SharedData sharedData) { + this.registryName = registryName; + this.sharedData = sharedData; + } + + /** + * Register a value in the async shared map of {@link NeonBee} by key. + * + * @param sharedMapKey the shared map key + * @param value the value to register + * @return the future + */ + @Override + public Future register(String sharedMapKey, T value) { + logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value); + + Future> sharedMap = getSharedMap(); + + return sharedMap.compose(map -> map.get(sharedMapKey)) + .map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray()) + .compose(valueArray -> { + if (!valueArray.contains(value)) { + valueArray.add(value); + } + + if (logger.isInfoEnabled()) { + logger.info("Registered verticle {} in shared map.", value); + } + + return sharedMap.compose(map -> map.put(sharedMapKey, valueArray)); + }); + } + + /** + * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey. + * + * @param sharedMapKey the shared map key + * @param value the value to unregister + * @return the future + */ + @Override + public Future unregister(String sharedMapKey, T value) { + logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); + + Future> sharedMap = getSharedMap(); + + return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray) + .compose(values -> { + if (values == null) { + return succeededFuture(); + } + + if (logger.isInfoEnabled()) { + logger.info("Unregistered verticle {} in shared map.", value); + } + + values.remove(value); + return sharedMap.compose(map -> map.put(sharedMapKey, values)); + }); + } + + @Override + public Future get(String sharedMapKey) { + return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o); + } + + /** + * Shared map that is used as registry. + * + * @return Future to the shared map + */ + public Future> getSharedMap() { + return sharedData.getAsyncMap(registryName); + } +} diff --git a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java index e8cf2f80..c9b97964 100644 --- a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java +++ b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java @@ -1,7 +1,5 @@ package io.neonbee.internal; -import static io.vertx.core.Future.succeededFuture; - import java.util.function.Supplier; import io.neonbee.NeonBee; @@ -10,6 +8,7 @@ import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.SharedData; /** * A registry to manage values in the {@link SharedDataAccessor} shared map. @@ -22,7 +21,9 @@ public class WriteSafeRegistry implements Registry { private final LoggingFacade logger = LoggingFacade.create(); - private final SharedDataAccessor sharedDataAccessor; + private final SharedData sharedData; + + private final Registry sharedRegistry; private final String registryName; @@ -33,8 +34,30 @@ public class WriteSafeRegistry implements Registry { * @param registryName the name of the map registry */ public WriteSafeRegistry(Vertx vertx, String registryName) { + this(registryName, new SharedDataAccessor(vertx, WriteSafeRegistry.class)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param registryName the name of the map registry + * @param sharedData the shared data + */ + public WriteSafeRegistry(String registryName, SharedData sharedData) { + this(registryName, sharedData, new SharedRegistry<>(registryName, sharedData)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param registryName the name of the map registry + * @param sharedData the shared data + * @param registry the shared registry + */ + WriteSafeRegistry(String registryName, SharedData sharedData, Registry registry) { this.registryName = registryName; - this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass()); + this.sharedData = sharedData; + this.sharedRegistry = registry; } /** @@ -48,12 +71,26 @@ public WriteSafeRegistry(Vertx vertx, String registryName) { public Future register(String sharedMapKey, T value) { logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value); - return lock(sharedMapKey, () -> addValue(sharedMapKey, value)); + return lock(sharedMapKey, () -> sharedRegistry.register(sharedMapKey, value)); } @Override public Future get(String sharedMapKey) { - return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o); + return sharedRegistry.get(sharedMapKey); + } + + /** + * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey. + * + * @param sharedMapKey the shared map key + * @param value the value to unregister + * @return the future + */ + @Override + public Future unregister(String sharedMapKey, T value) { + logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); + + return lock(sharedMapKey, () -> sharedRegistry.unregister(sharedMapKey, value)); } /** @@ -65,7 +102,7 @@ public Future get(String sharedMapKey) { */ protected Future lock(String sharedMapKey, Supplier> futureSupplier) { logger.debug("Get lock for {}", sharedMapKey); - return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> { + return sharedData.getLock(sharedMapKey).onFailure(throwable -> { logger.error("Error acquiring lock for {}", sharedMapKey, throwable); }).compose(lock -> Future.future(event -> futureSupplier.get().onComplete(event)) .onComplete(anyResult -> { @@ -74,62 +111,15 @@ protected Future lock(String sharedMapKey, Supplier> futureSu })); } - private Future addValue(String sharedMapKey, Object value) { - Future> sharedMap = getSharedMap(); - - return sharedMap.compose(map -> map.get(sharedMapKey)) - .map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray()) - .compose(valueArray -> { - if (!valueArray.contains(value)) { - valueArray.add(value); - } - - if (logger.isInfoEnabled()) { - logger.info("Registered verticle {} in shared map.", value); - } - - return sharedMap.compose(map -> map.put(sharedMapKey, valueArray)); - }); - } - - /** - * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey. - * - * @param sharedMapKey the shared map key - * @param value the value to unregister - * @return the future - */ - @Override - public Future unregister(String sharedMapKey, T value) { - logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); - - return lock(sharedMapKey, () -> removeValue(sharedMapKey, value)); - } - - private Future removeValue(String sharedMapKey, Object value) { - Future> sharedMap = getSharedMap(); - - return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray) - .compose(values -> { - if (values == null) { - return succeededFuture(); - } - - if (logger.isInfoEnabled()) { - logger.info("Unregistered verticle {} in shared map.", value); - } - - values.remove(value); - return sharedMap.compose(map -> map.put(sharedMapKey, values)); - }); - } - /** * Shared map that is used as registry. + *

+ * It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)} + * and {@link WriteSafeRegistry#unregister(String, Object)} methods. * * @return Future to the shared map */ public Future> getSharedMap() { - return sharedDataAccessor.getAsyncMap(registryName); + return sharedData.getAsyncMap(registryName); } }