From 8edcf9c799b5725b274b7f091a32595b5c4be51a Mon Sep 17 00:00:00 2001 From: Michael Halberstadt Date: Mon, 28 Oct 2024 13:22:35 +0100 Subject: [PATCH 1/3] fix: improve WriteSafeRegistry lock method If the supplier of the lock method fails with an exception, the lock is not released. --- .../neonbee/internal/WriteSafeRegistry.java | 11 ++- .../internal/WriteSafeRegistryTest.java | 95 +++++++++++++++++++ 2 files changed, 101 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java index dcd3a3fb..e8cf2f80 100644 --- a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java +++ b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java @@ -63,14 +63,15 @@ public Future get(String sharedMapKey) { * @param futureSupplier supplier for the future to be secured by the lock * @return the futureSupplier */ - private Future lock(String sharedMapKey, Supplier> futureSupplier) { + protected Future lock(String sharedMapKey, Supplier> futureSupplier) { logger.debug("Get lock for {}", sharedMapKey); return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> { logger.error("Error acquiring lock for {}", sharedMapKey, throwable); - }).compose(lock -> futureSupplier.get().onComplete(anyResult -> { - logger.debug("Releasing lock for {}", sharedMapKey); - lock.release(); - })); + }).compose(lock -> Future.future(event -> futureSupplier.get().onComplete(event)) + .onComplete(anyResult -> { + logger.debug("Releasing lock for {}", sharedMapKey); + lock.release(); + })); } private Future addValue(String sharedMapKey, Object value) { diff --git a/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java b/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java index c232e95a..25ddeafe 100644 --- a/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java +++ b/src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java @@ -2,12 +2,18 @@ import static com.google.common.truth.Truth.assertThat; +import java.util.concurrent.TimeUnit; + import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import io.vertx.core.Future; import io.vertx.core.Vertx; +import io.vertx.core.impl.NoStackTraceThrowable; import io.vertx.core.json.JsonArray; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.Timeout; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; @@ -58,4 +64,93 @@ void get(Vertx vertx, VertxTestContext context) { context.completeNow(); })).onFailure(context::failNow); } + + @Test + @DisplayName("test lock method") + void lock(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(4); + String lockedname = "test-lock-key1"; + + // execute the lockTest twice to make sure that you can acquire the lock multiple times + lockTest(lockedname, context, registry, checkpoints) + .compose(unused -> lockTest(lockedname, context, registry, checkpoints)); + } + + @Test + // The used timeout for the lock is set to 10 seconds + // @see io.vertx.core.shareddata.impl.SharedDataImpl#DEFAULT_LOCK_TIMEOUT + @Timeout(value = 12, timeUnit = TimeUnit.SECONDS) + @DisplayName("test acquire lock twice") + void acquireLockTwice(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(5); + + String lockedname = "test-lock-key2"; + registry.lock(lockedname, () -> { + checkpoints.flag(); + // try to acquire the lock to make sure that it is locked + return registry.lock(lockedname, () -> { + context.failNow("should not be called because lock cannot be acquired"); + return Future.succeededFuture(); + }) + .onSuccess(unused -> context.failNow("should not be successful")) + .onFailure(cause -> context.verify(() -> { + assertThat(cause).isInstanceOf(NoStackTraceThrowable.class); + assertThat(cause).hasMessageThat().isEqualTo("Timed out waiting to get lock"); + checkpoints.flag(); + })) + .recover(throwable -> Future.succeededFuture()); + }) + .onSuccess(unused -> checkpoints.flag()) + .onFailure(context::failNow) + // execute the lockTest to make sure that you can acquire the lock again + .compose(unused -> lockTest(lockedname, context, registry, checkpoints)); + } + + private static Future lockTest(String lockName, VertxTestContext context, WriteSafeRegistry registry, + Checkpoint checkpoints) { + return registry.lock(lockName, () -> { + checkpoints.flag(); + return Future.succeededFuture(); + }) + .onSuccess(unused -> checkpoints.flag()) + .onFailure(context::failNow); + } + + @Test + @DisplayName("test lock supplier retuning null") + void lockNPE(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(2); + + registry.lock("lockNPE", () -> { + checkpoints.flag(); + return null; + }).onSuccess(unused -> context.failNow("should not be successful")) + .onFailure(cause -> context.verify(() -> { + assertThat(cause).isInstanceOf(NullPointerException.class); + checkpoints.flag(); + })); + } + + @Test + @DisplayName("test lock supplier throws exception") + void lockISE(Vertx vertx, VertxTestContext context) { + WriteSafeRegistry registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME); + + Checkpoint checkpoints = context.checkpoint(2); + + registry.lock("lockISE", () -> { + checkpoints.flag(); + throw new IllegalStateException("Illegal state"); + }).onSuccess(unused -> context.failNow("should not be successful")) + .onFailure(cause -> context.verify(() -> { + assertThat(cause).isInstanceOf(IllegalStateException.class); + checkpoints.flag(); + })); + } } From 2a4ec009cfc3a70cfe70ba561854104eecde7939 Mon Sep 17 00:00:00 2001 From: Michael Halberstadt Date: Wed, 30 Oct 2024 12:22:39 +0100 Subject: [PATCH 2/3] refactor: extract SharedRegistry class --- .../io/neonbee/internal/SharedRegistry.java | 118 ++++++++++++++++++ .../neonbee/internal/WriteSafeRegistry.java | 106 +++++++--------- 2 files changed, 166 insertions(+), 58 deletions(-) create mode 100644 src/main/java/io/neonbee/internal/SharedRegistry.java diff --git a/src/main/java/io/neonbee/internal/SharedRegistry.java b/src/main/java/io/neonbee/internal/SharedRegistry.java new file mode 100644 index 00000000..dd841e38 --- /dev/null +++ b/src/main/java/io/neonbee/internal/SharedRegistry.java @@ -0,0 +1,118 @@ +package io.neonbee.internal; + +import static io.vertx.core.Future.succeededFuture; + +import io.neonbee.NeonBee; +import io.neonbee.logging.LoggingFacade; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.SharedData; + +/** + * A registry to manage values in the {@link SharedData} shared map. + *

+ * This class is a generic implementation of a registry that can be used to store values in a shared map. + * + * @param the type of the value to store in the shared registry + */ +public class SharedRegistry implements Registry { + + private final LoggingFacade logger = LoggingFacade.create(); + + private final SharedData sharedData; + + private final String registryName; + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + */ + public SharedRegistry(Vertx vertx, String registryName) { + this(registryName, new SharedDataAccessor(vertx, SharedRegistry.class)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param registryName the name of the map registry + * @param sharedData the shared data + */ + public SharedRegistry(String registryName, SharedData sharedData) { + this.registryName = registryName; + this.sharedData = sharedData; + } + + /** + * Register a value in the async shared map of {@link NeonBee} by key. + * + * @param sharedMapKey the shared map key + * @param value the value to register + * @return the future + */ + @Override + public Future register(String sharedMapKey, T value) { + logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value); + + Future> sharedMap = getSharedMap(); + + return sharedMap.compose(map -> map.get(sharedMapKey)) + .map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray()) + .compose(valueArray -> { + if (!valueArray.contains(value)) { + valueArray.add(value); + } + + if (logger.isInfoEnabled()) { + logger.info("Registered verticle {} in shared map.", value); + } + + return sharedMap.compose(map -> map.put(sharedMapKey, valueArray)); + }); + } + + /** + * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey. + * + * @param sharedMapKey the shared map key + * @param value the value to unregister + * @return the future + */ + @Override + public Future unregister(String sharedMapKey, T value) { + logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); + + Future> sharedMap = getSharedMap(); + + return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray) + .compose(values -> { + if (values == null) { + return succeededFuture(); + } + + if (logger.isInfoEnabled()) { + logger.info("Unregistered verticle {} in shared map.", value); + } + + values.remove(value); + return sharedMap.compose(map -> map.put(sharedMapKey, values)); + }); + } + + @Override + public Future get(String sharedMapKey) { + return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o); + } + + /** + * Shared map that is used as registry. + * + * @return Future to the shared map + */ + public Future> getSharedMap() { + return sharedData.getAsyncMap(registryName); + } +} diff --git a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java index e8cf2f80..c9b97964 100644 --- a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java +++ b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java @@ -1,7 +1,5 @@ package io.neonbee.internal; -import static io.vertx.core.Future.succeededFuture; - import java.util.function.Supplier; import io.neonbee.NeonBee; @@ -10,6 +8,7 @@ import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.SharedData; /** * A registry to manage values in the {@link SharedDataAccessor} shared map. @@ -22,7 +21,9 @@ public class WriteSafeRegistry implements Registry { private final LoggingFacade logger = LoggingFacade.create(); - private final SharedDataAccessor sharedDataAccessor; + private final SharedData sharedData; + + private final Registry sharedRegistry; private final String registryName; @@ -33,8 +34,30 @@ public class WriteSafeRegistry implements Registry { * @param registryName the name of the map registry */ public WriteSafeRegistry(Vertx vertx, String registryName) { + this(registryName, new SharedDataAccessor(vertx, WriteSafeRegistry.class)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param registryName the name of the map registry + * @param sharedData the shared data + */ + public WriteSafeRegistry(String registryName, SharedData sharedData) { + this(registryName, sharedData, new SharedRegistry<>(registryName, sharedData)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param registryName the name of the map registry + * @param sharedData the shared data + * @param registry the shared registry + */ + WriteSafeRegistry(String registryName, SharedData sharedData, Registry registry) { this.registryName = registryName; - this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass()); + this.sharedData = sharedData; + this.sharedRegistry = registry; } /** @@ -48,12 +71,26 @@ public WriteSafeRegistry(Vertx vertx, String registryName) { public Future register(String sharedMapKey, T value) { logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value); - return lock(sharedMapKey, () -> addValue(sharedMapKey, value)); + return lock(sharedMapKey, () -> sharedRegistry.register(sharedMapKey, value)); } @Override public Future get(String sharedMapKey) { - return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o); + return sharedRegistry.get(sharedMapKey); + } + + /** + * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey. + * + * @param sharedMapKey the shared map key + * @param value the value to unregister + * @return the future + */ + @Override + public Future unregister(String sharedMapKey, T value) { + logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); + + return lock(sharedMapKey, () -> sharedRegistry.unregister(sharedMapKey, value)); } /** @@ -65,7 +102,7 @@ public Future get(String sharedMapKey) { */ protected Future lock(String sharedMapKey, Supplier> futureSupplier) { logger.debug("Get lock for {}", sharedMapKey); - return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> { + return sharedData.getLock(sharedMapKey).onFailure(throwable -> { logger.error("Error acquiring lock for {}", sharedMapKey, throwable); }).compose(lock -> Future.future(event -> futureSupplier.get().onComplete(event)) .onComplete(anyResult -> { @@ -74,62 +111,15 @@ protected Future lock(String sharedMapKey, Supplier> futureSu })); } - private Future addValue(String sharedMapKey, Object value) { - Future> sharedMap = getSharedMap(); - - return sharedMap.compose(map -> map.get(sharedMapKey)) - .map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray()) - .compose(valueArray -> { - if (!valueArray.contains(value)) { - valueArray.add(value); - } - - if (logger.isInfoEnabled()) { - logger.info("Registered verticle {} in shared map.", value); - } - - return sharedMap.compose(map -> map.put(sharedMapKey, valueArray)); - }); - } - - /** - * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey. - * - * @param sharedMapKey the shared map key - * @param value the value to unregister - * @return the future - */ - @Override - public Future unregister(String sharedMapKey, T value) { - logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value); - - return lock(sharedMapKey, () -> removeValue(sharedMapKey, value)); - } - - private Future removeValue(String sharedMapKey, Object value) { - Future> sharedMap = getSharedMap(); - - return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray) - .compose(values -> { - if (values == null) { - return succeededFuture(); - } - - if (logger.isInfoEnabled()) { - logger.info("Unregistered verticle {} in shared map.", value); - } - - values.remove(value); - return sharedMap.compose(map -> map.put(sharedMapKey, values)); - }); - } - /** * Shared map that is used as registry. + *

+ * It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)} + * and {@link WriteSafeRegistry#unregister(String, Object)} methods. * * @return Future to the shared map */ public Future> getSharedMap() { - return sharedDataAccessor.getAsyncMap(registryName); + return sharedData.getAsyncMap(registryName); } } From 51fdcb4c6d1c1ef417c493964285e1d9201b6e45 Mon Sep 17 00:00:00 2001 From: Michael Halberstadt Date: Thu, 31 Oct 2024 14:02:54 +0100 Subject: [PATCH 3/3] feat: clean entity registry Clean entity registry by recreating entries form cluster information --- build.gradle | 9 + .../io/neonbee/internal/SharedRegistry.java | 3 +- .../neonbee/internal/WriteLockRegistry.java | 207 ++++++++++++++++++ .../cluster/entity/ClusterEntityRegistry.java | 112 +++++++--- .../entity/UnregisterEntityVerticlesHook.java | 39 ++-- .../entity/ClusterEntityRegistryTest.java | 2 +- .../entity/UnregisterEntitiesTest.java | 2 +- .../internal/job/RedeployEntitiesJobTest.java | 7 + src/test/resources/logback-test.xml | 11 + 9 files changed, 333 insertions(+), 59 deletions(-) create mode 100644 src/main/java/io/neonbee/internal/WriteLockRegistry.java create mode 100644 src/test/resources/logback-test.xml diff --git a/build.gradle b/build.gradle index 5c6062a2..2f8be79f 100644 --- a/build.gradle +++ b/build.gradle @@ -7,6 +7,7 @@ plugins { id 'jacoco' id 'pmd' id 'checkstyle' + id 'idea' // community plugins id 'com.diffplug.spotless' version '6.25.0' @@ -20,6 +21,14 @@ plugins { id 'team.yi.semantic-gitlog' version '0.6.12' } +idea { + module { + downloadSources = true + downloadJavadoc = true + } +} + + group = 'io.neonbee' version = '0.37.2-SNAPSHOT' mainClassName = 'io.neonbee.Launcher' diff --git a/src/main/java/io/neonbee/internal/SharedRegistry.java b/src/main/java/io/neonbee/internal/SharedRegistry.java index dd841e38..5d9adc37 100644 --- a/src/main/java/io/neonbee/internal/SharedRegistry.java +++ b/src/main/java/io/neonbee/internal/SharedRegistry.java @@ -87,7 +87,8 @@ public Future unregister(String sharedMapKey, T value) { Future> sharedMap = getSharedMap(); - return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray) + return sharedMap.compose(map -> map.get(sharedMapKey)) + .map(jsonArray -> (JsonArray) jsonArray) .compose(values -> { if (values == null) { return succeededFuture(); diff --git a/src/main/java/io/neonbee/internal/WriteLockRegistry.java b/src/main/java/io/neonbee/internal/WriteLockRegistry.java new file mode 100644 index 00000000..d86d9c9e --- /dev/null +++ b/src/main/java/io/neonbee/internal/WriteLockRegistry.java @@ -0,0 +1,207 @@ +package io.neonbee.internal; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import io.neonbee.logging.LoggingFacade; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.Lock; +import io.vertx.core.shareddata.SharedData; + +/** + * A {@link WriteSafeRegistry} implementation that uses a lock to lock all write operations to ensure that only one + * operation is executed at a time. + * + * @param the type of data this registry stores + */ +public class WriteLockRegistry implements Registry { + + private static final String LOCK_KEY = WriteLockRegistry.class.getName(); + + private final LoggingFacade logger = LoggingFacade.create(); + + private final Registry registry; + + private final SharedData sharedData; + + private final String registryName; + + private final Vertx vertx; + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + */ + public WriteLockRegistry(Vertx vertx, String registryName) { + this(vertx, registryName, new SharedDataAccessor(vertx, WriteLockRegistry.class)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + * @param sharedData the shared data + */ + public WriteLockRegistry(Vertx vertx, String registryName, SharedData sharedData) { + this.vertx = vertx; + registry = new SharedRegistry<>(registryName, sharedData); + this.registryName = registryName; + this.sharedData = sharedData; + } + + @Override + public Future register(String sharedMapKey, T value) { + return lock(LOCK_KEY) + .compose(lock -> lock.execute(() -> registry.register(sharedMapKey, value))) + .mapEmpty(); + } + + @Override + public Future unregister(String sharedMapKey, T value) { + return lock(LOCK_KEY) + .compose(lock -> lock.execute(() -> registry.unregister(sharedMapKey, value))) + .mapEmpty(); + } + + @Override + public Future get(String key) { + return registry.get(key); + } + + /** + * Method that acquires a lock for the registry and released the lock after the returned TimeOutLock executed the + * futureSupplier or the timeout is reached. + * + * @return the futureSupplier + */ + public Future lock() { + return lock(LOCK_KEY); + } + + /** + * Method that acquires a lock for the sharedMapKey and released the lock after the futureSupplier is executed. + * + * @param sharedMapKey the shared map key + * @return the futureSupplier + */ + private Future lock(String sharedMapKey) { + logger.debug("Get lock for {}", sharedMapKey); + return sharedData.getLock(sharedMapKey) + .map(TimeOutLock::new); + } + + /** + * Shared map that is used as registry. + *

+ * It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)} + * and {@link WriteSafeRegistry#unregister(String, Object)} methods. + * + * @return Future to the shared map + */ + public Future> getSharedMap() { + return sharedData.getAsyncMap(registryName); + } + + /** + * A lock that is released after 10 seconds. + */ + public class TimeOutLock implements Lock { + + private static final long DEFAULT_TIME_OUT_DELAY_MS = TimeUnit.SECONDS.toMillis(1000); + + private final UUID id = UUID.randomUUID(); + + private final LoggingFacade logger = LoggingFacade.create(); + + private final Lock lock; + + private final long timerId; + + private final Promise timeOutPromise; + + private final long startTime; + + private TimeOutLock(Lock lock) { + this.lock = lock; + this.timeOutPromise = Promise.promise(); + this.timerId = startTimer(); + this.startTime = System.currentTimeMillis(); + logger.debug("Lock {} created", id); + } + + private long startTimer() { + if (logger.isDebugEnabled()) { + logger.debug("start lock timer {}", id); + } + return vertx.setTimer(DEFAULT_TIME_OUT_DELAY_MS, id -> { + logger.warn("Lock {} timed out after {} ms. Lock is released.", id, + startTime - System.currentTimeMillis()); + stopRelease(); + timeOutPromise + .fail(new TimeoutException("Lock for " + registryName + " timed out. Lock is released.")); + }); + } + + @Override + public void release() { + stopRelease(); + } + + /** + * Execute the futureSupplier and release the lock after the futureSupplier is executed. If the supplied future + * does not complete within the timeout, the lock is released. + * + * @param futureSupplier the futureSupplier + * @param the type of the future + * @return the future + */ + public Future execute(Supplier> futureSupplier) { + if (logger.isDebugEnabled()) { + logger.debug("execute lock {}", id); + } + Future future = futureSupplier.get().onComplete(event -> { + if (logger.isDebugEnabled()) { + logger.debug("completed lock {}, succeeded {}", id, event.succeeded()); + } + stopRelease(); + }); + Future suppliedFuture = Future.future(future::onComplete); + Future timeOutFuture = timeOutPromise.future(); + return Future.all(suppliedFuture, timeOutFuture) + .mapEmpty() + .recover(throwable -> { + if (logger.isDebugEnabled()) { + logger.debug("recover lock {}", id); + } + if (timeOutFuture.failed()) { + return Future.failedFuture(timeOutFuture.cause()); + } else { + return suppliedFuture; + } + }); + } + + private void stopRelease() { + if (logger.isDebugEnabled()) { + logger.debug("Releasing lock {}", id); + } + + vertx.cancelTimer(timerId); + lock.release(); + timeOutPromise.tryComplete(); + + if (logger.isDebugEnabled()) { + logger.debug("lock {} released", id); + } + } + } +} diff --git a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java index 86fae92c..c2fd1c7b 100644 --- a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java +++ b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java @@ -1,15 +1,18 @@ package io.neonbee.internal.cluster.entity; -import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; import io.neonbee.entity.EntityVerticle; import io.neonbee.internal.Registry; +import io.neonbee.internal.WriteLockRegistry; import io.neonbee.internal.WriteSafeRegistry; import io.neonbee.internal.cluster.ClusterHelper; +import io.vertx.codegen.annotations.Nullable; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; @@ -44,7 +47,7 @@ public class ClusterEntityRegistry implements Registry { @VisibleForTesting final WriteSafeRegistry clusteringInformation; - final WriteSafeRegistry entityRegistry; + final WriteLockRegistry entityRegistry; private final Vertx vertx; @@ -55,7 +58,7 @@ public class ClusterEntityRegistry implements Registry { * @param registryName the name of the map registry */ public ClusterEntityRegistry(Vertx vertx, String registryName) { - this.entityRegistry = new WriteSafeRegistry<>(vertx, registryName); + this.entityRegistry = new WriteLockRegistry<>(vertx, registryName); this.clusteringInformation = new WriteSafeRegistry<>(vertx, registryName + "#ClusteringInformation"); this.vertx = vertx; } @@ -135,40 +138,77 @@ public Future unregisterNode(String clusterNodeId) { // If no entities are registered, return a completed future return Future.succeededFuture(); } - registeredEntities = registeredEntities.copy(); - List> futureList = new ArrayList<>(registeredEntities.size()); - for (Object o : registeredEntities) { - if (shouldRemove(map, o)) { - JsonObject jo = (JsonObject) o; - String entityName = jo.getString(ENTITY_NAME_KEY); - String qualifiedName = jo.getString(QUALIFIED_NAME_KEY); - futureList.add(entityRegistry.unregister(entityName, qualifiedName)); - } - } - return Future.join(futureList).mapEmpty(); - }).compose(cf -> removeClusteringInformation(clusterNodeId)); + + // The clustering information map should look like this: + // + // 6dbe2f47-8e2a-4b41-b019-007b16157f87 -> + // [{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[ERP.Customers]" + // },{ + // "qualifiedName":"unregisterentitiestest/_MarketSalesEntityVerticle-133064210", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_MarketSalesEntityVerticle-133064210", + // "entityName":"entityVerticles[Market.Products]" + // }] + // + // d4f78582-14d4-493f-8a74-03d0e49c566b -> + // [{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[ERP.Customers]" + // }] + + // The entity registry map should look like this: + // + // entityVerticles[Sales.Orders] -> + // [ + // "unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "unregisterentitiestest/_MarketSalesEntityVerticle-133064210" + // ] + // + // entityVerticles[Market.Products] -> + // [ + // "unregisterentitiestest/_MarketSalesEntityVerticle-133064210" + // ] + // entityVerticles[ERP.Customers] -> + // [ + // "unregisterentitiestest/_ErpSalesEntityVerticle-644622197" + // ] + return buildEntryMap(map) + .compose(entryMap -> entityRegistry.lock() + .compose(lock -> lock.execute(() -> createEntityMap(entryMap)))) + .compose(cf -> removeClusteringInformation(clusterNodeId)); + }); } - /** - * Check if the provided object should be removed from the map. - * - * @param map the map - * @param o the object - * @return true if the object should be removed - */ - private boolean shouldRemove(Map map, Object o) { - for (Map.Entry node : map.entrySet()) { - JsonArray ja = (JsonArray) node.getValue(); - // Iterate over the JsonArray to determine if it contains the specified object. - // JsonArray#contains cannot be used directly because the types of objects in the JsonArray may differ from - // those returned by the iterator. This discrepancy occurs because JsonArray.Iter#next automatically wraps - // standard Java types into their corresponding Json types. - for (Object object : ja) { - if (object.equals(o)) { - return false; - } - } - } - return true; + private Future createEntityMap(Map> entryMap) { + return entityRegistry.getSharedMap() + .compose(asyncMap -> asyncMap.clear() + .map(v -> entryMap.entrySet() + .stream() + .map(entry -> asyncMap + .put(entry.getKey(), new JsonArray(entry.getValue()))) + .collect(Collectors.toList())) + .map(Future::all) + .mapEmpty()); + } + + private Future<@Nullable Map>> buildEntryMap(Map map) { + return vertx.executeBlocking(() -> map.values() + .stream() + .map(JsonArray.class::cast) + .flatMap(JsonArray::stream) + .map(JsonObject.class::cast) + .collect(Collectors.toMap( + jo -> jo.getString(ENTITY_NAME_KEY), + jo -> List.of(jo.getString(QUALIFIED_NAME_KEY)), + (l1, l2) -> Stream.of(l1, l2).flatMap(List::stream).toList()))); } } diff --git a/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java b/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java index 6cd129ca..24d47213 100644 --- a/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java +++ b/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java @@ -40,6 +40,25 @@ public void unregisterOnShutdown(NeonBee neonBee, HookContext hookContext, Promi .onFailure(ignoredCause -> LOGGER.error("Failed to unregister models on shutdown")); } + /** + * This method is called when a NeonBee node has left the cluster. + * + * @param neonBee the {@link NeonBee} instance + * @param hookContext the {@link HookContext} + * @param promise {@link Promise} to completed the function. + */ + @Hook(HookType.NODE_LEFT) + public void cleanup(NeonBee neonBee, HookContext hookContext, Promise promise) { + String clusterNodeId = hookContext.get(CLUSTER_NODE_ID); + LOGGER.info("Cleanup qualified names for node {}", clusterNodeId); + if (ClusterHelper.isLeader(neonBee.getVertx())) { + LOGGER.info("Cleaning registered qualified names ..."); + unregister(neonBee, clusterNodeId).onComplete(promise) + .onSuccess(unused -> LOGGER.info("Qualified names successfully cleaned up")) + .onFailure(ignoredCause -> LOGGER.error("Failed to cleanup qualified names")); + } + } + /** * Unregister the entity qualified names for the node by ID. * @@ -69,24 +88,4 @@ public static Future unregister(NeonBee neonBee, String clusterNodeId) { .onFailure(cause -> LOGGER.error("Failed to unregistered entity verticle models for node ID {} ...", clusterNodeId, cause)); } - - /** - * This method is called when a NeonBee node has left the cluster. - * - * @param neonBee the {@link NeonBee} instance - * @param hookContext the {@link HookContext} - * @param promise {@link Promise} to completed the function. - */ - @Hook(HookType.NODE_LEFT) - public void cleanup(NeonBee neonBee, HookContext hookContext, Promise promise) { - String clusterNodeId = hookContext.get(CLUSTER_NODE_ID); - LOGGER.info("Cleanup qualified names for node {}", clusterNodeId); - if (ClusterHelper.isLeader(neonBee.getVertx())) { - LOGGER.info("Cleaning registered qualified names ..."); - unregister(neonBee, clusterNodeId).onComplete(promise) - .onSuccess(unused -> LOGGER.info("Qualified names successfully cleaned up")) - .onFailure(ignoredCause -> LOGGER.error("Failed to cleanup qualified names")); - } - } - } diff --git a/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java b/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java index f6465471..dcd001fe 100644 --- a/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java +++ b/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java @@ -135,7 +135,7 @@ void unregisterNode2(Vertx vertx, VertxTestContext context) { checkpoint.flag(); })).compose(unused -> registry2.unregisterNode(clusterIdNode2)).compose(unused -> registry2.get(KEY)) .onSuccess(ja -> context.verify(() -> { - assertThat(ja).isEmpty(); + assertThat(ja).isNull(); checkpoint.flag(); })).onFailure(context::failNow); } diff --git a/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java b/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java index 7691ee8f..d1abd4b1 100644 --- a/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java +++ b/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java @@ -114,7 +114,7 @@ private void testUnregisteringEntitiesSingleNode(NeonBee web, VertxTestContext t })).compose(unused -> UnregisterEntityVerticlesHook.unregister(web, clusterNodeId)) .compose(unused -> registry.get(sharedEntityMapName(ErpSalesEntityVerticle.FQN_ERP_CUSTOMERS))) .onSuccess(jsonArray -> testContext.verify(() -> { - assertThat(jsonArray).isEqualTo(new JsonArray()); + assertThat(jsonArray).isNull(); checkpoint.flag(); })).compose(unused -> registry.clusteringInformation.get(clusterNodeId)) .onSuccess(object -> testContext.verify(() -> { diff --git a/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java b/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java index 0ccf06de..a29c48b2 100644 --- a/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java +++ b/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java @@ -74,8 +74,15 @@ public static Path createTempDirectory() throws IOException { @NeonBeeDeployable(namespace = NeonBeeDeployable.NEONBEE_NAMESPACE, autoDeploy = false) public static class TestEntityVerticle1 extends EntityVerticle { + + /** + * The fully qualified name of the entity type "ERP.Customers". + */ static final FullQualifiedName FQN_ERP_CUSTOMERS = new FullQualifiedName("ERP", "Customers"); + /** + * The fully qualified name of the entity type "Sales.Orders". + */ static final FullQualifiedName FQN_SALES_ORDERS = new FullQualifiedName("Sales.Orders"); @Override diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml new file mode 100644 index 00000000..88bc9faa --- /dev/null +++ b/src/test/resources/logback-test.xml @@ -0,0 +1,11 @@ + + + + + + + + %d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n + + +