From 51fdcb4c6d1c1ef417c493964285e1d9201b6e45 Mon Sep 17 00:00:00 2001 From: Michael Halberstadt Date: Thu, 31 Oct 2024 14:02:54 +0100 Subject: [PATCH] feat: clean entity registry Clean entity registry by recreating entries form cluster information --- build.gradle | 9 + .../io/neonbee/internal/SharedRegistry.java | 3 +- .../neonbee/internal/WriteLockRegistry.java | 207 ++++++++++++++++++ .../cluster/entity/ClusterEntityRegistry.java | 112 +++++++--- .../entity/UnregisterEntityVerticlesHook.java | 39 ++-- .../entity/ClusterEntityRegistryTest.java | 2 +- .../entity/UnregisterEntitiesTest.java | 2 +- .../internal/job/RedeployEntitiesJobTest.java | 7 + src/test/resources/logback-test.xml | 11 + 9 files changed, 333 insertions(+), 59 deletions(-) create mode 100644 src/main/java/io/neonbee/internal/WriteLockRegistry.java create mode 100644 src/test/resources/logback-test.xml diff --git a/build.gradle b/build.gradle index 5c6062a2..2f8be79f 100644 --- a/build.gradle +++ b/build.gradle @@ -7,6 +7,7 @@ plugins { id 'jacoco' id 'pmd' id 'checkstyle' + id 'idea' // community plugins id 'com.diffplug.spotless' version '6.25.0' @@ -20,6 +21,14 @@ plugins { id 'team.yi.semantic-gitlog' version '0.6.12' } +idea { + module { + downloadSources = true + downloadJavadoc = true + } +} + + group = 'io.neonbee' version = '0.37.2-SNAPSHOT' mainClassName = 'io.neonbee.Launcher' diff --git a/src/main/java/io/neonbee/internal/SharedRegistry.java b/src/main/java/io/neonbee/internal/SharedRegistry.java index dd841e38..5d9adc37 100644 --- a/src/main/java/io/neonbee/internal/SharedRegistry.java +++ b/src/main/java/io/neonbee/internal/SharedRegistry.java @@ -87,7 +87,8 @@ public Future unregister(String sharedMapKey, T value) { Future> sharedMap = getSharedMap(); - return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray) + return sharedMap.compose(map -> map.get(sharedMapKey)) + .map(jsonArray -> (JsonArray) jsonArray) .compose(values -> { if (values == null) { return succeededFuture(); diff --git a/src/main/java/io/neonbee/internal/WriteLockRegistry.java b/src/main/java/io/neonbee/internal/WriteLockRegistry.java new file mode 100644 index 00000000..d86d9c9e --- /dev/null +++ b/src/main/java/io/neonbee/internal/WriteLockRegistry.java @@ -0,0 +1,207 @@ +package io.neonbee.internal; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import io.neonbee.logging.LoggingFacade; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.Lock; +import io.vertx.core.shareddata.SharedData; + +/** + * A {@link WriteSafeRegistry} implementation that uses a lock to lock all write operations to ensure that only one + * operation is executed at a time. + * + * @param the type of data this registry stores + */ +public class WriteLockRegistry implements Registry { + + private static final String LOCK_KEY = WriteLockRegistry.class.getName(); + + private final LoggingFacade logger = LoggingFacade.create(); + + private final Registry registry; + + private final SharedData sharedData; + + private final String registryName; + + private final Vertx vertx; + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + */ + public WriteLockRegistry(Vertx vertx, String registryName) { + this(vertx, registryName, new SharedDataAccessor(vertx, WriteLockRegistry.class)); + } + + /** + * Create a new {@link WriteSafeRegistry}. + * + * @param vertx the {@link Vertx} instance + * @param registryName the name of the map registry + * @param sharedData the shared data + */ + public WriteLockRegistry(Vertx vertx, String registryName, SharedData sharedData) { + this.vertx = vertx; + registry = new SharedRegistry<>(registryName, sharedData); + this.registryName = registryName; + this.sharedData = sharedData; + } + + @Override + public Future register(String sharedMapKey, T value) { + return lock(LOCK_KEY) + .compose(lock -> lock.execute(() -> registry.register(sharedMapKey, value))) + .mapEmpty(); + } + + @Override + public Future unregister(String sharedMapKey, T value) { + return lock(LOCK_KEY) + .compose(lock -> lock.execute(() -> registry.unregister(sharedMapKey, value))) + .mapEmpty(); + } + + @Override + public Future get(String key) { + return registry.get(key); + } + + /** + * Method that acquires a lock for the registry and released the lock after the returned TimeOutLock executed the + * futureSupplier or the timeout is reached. + * + * @return the futureSupplier + */ + public Future lock() { + return lock(LOCK_KEY); + } + + /** + * Method that acquires a lock for the sharedMapKey and released the lock after the futureSupplier is executed. + * + * @param sharedMapKey the shared map key + * @return the futureSupplier + */ + private Future lock(String sharedMapKey) { + logger.debug("Get lock for {}", sharedMapKey); + return sharedData.getLock(sharedMapKey) + .map(TimeOutLock::new); + } + + /** + * Shared map that is used as registry. + *

+ * It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)} + * and {@link WriteSafeRegistry#unregister(String, Object)} methods. + * + * @return Future to the shared map + */ + public Future> getSharedMap() { + return sharedData.getAsyncMap(registryName); + } + + /** + * A lock that is released after 10 seconds. + */ + public class TimeOutLock implements Lock { + + private static final long DEFAULT_TIME_OUT_DELAY_MS = TimeUnit.SECONDS.toMillis(1000); + + private final UUID id = UUID.randomUUID(); + + private final LoggingFacade logger = LoggingFacade.create(); + + private final Lock lock; + + private final long timerId; + + private final Promise timeOutPromise; + + private final long startTime; + + private TimeOutLock(Lock lock) { + this.lock = lock; + this.timeOutPromise = Promise.promise(); + this.timerId = startTimer(); + this.startTime = System.currentTimeMillis(); + logger.debug("Lock {} created", id); + } + + private long startTimer() { + if (logger.isDebugEnabled()) { + logger.debug("start lock timer {}", id); + } + return vertx.setTimer(DEFAULT_TIME_OUT_DELAY_MS, id -> { + logger.warn("Lock {} timed out after {} ms. Lock is released.", id, + startTime - System.currentTimeMillis()); + stopRelease(); + timeOutPromise + .fail(new TimeoutException("Lock for " + registryName + " timed out. Lock is released.")); + }); + } + + @Override + public void release() { + stopRelease(); + } + + /** + * Execute the futureSupplier and release the lock after the futureSupplier is executed. If the supplied future + * does not complete within the timeout, the lock is released. + * + * @param futureSupplier the futureSupplier + * @param the type of the future + * @return the future + */ + public Future execute(Supplier> futureSupplier) { + if (logger.isDebugEnabled()) { + logger.debug("execute lock {}", id); + } + Future future = futureSupplier.get().onComplete(event -> { + if (logger.isDebugEnabled()) { + logger.debug("completed lock {}, succeeded {}", id, event.succeeded()); + } + stopRelease(); + }); + Future suppliedFuture = Future.future(future::onComplete); + Future timeOutFuture = timeOutPromise.future(); + return Future.all(suppliedFuture, timeOutFuture) + .mapEmpty() + .recover(throwable -> { + if (logger.isDebugEnabled()) { + logger.debug("recover lock {}", id); + } + if (timeOutFuture.failed()) { + return Future.failedFuture(timeOutFuture.cause()); + } else { + return suppliedFuture; + } + }); + } + + private void stopRelease() { + if (logger.isDebugEnabled()) { + logger.debug("Releasing lock {}", id); + } + + vertx.cancelTimer(timerId); + lock.release(); + timeOutPromise.tryComplete(); + + if (logger.isDebugEnabled()) { + logger.debug("lock {} released", id); + } + } + } +} diff --git a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java index 86fae92c..c2fd1c7b 100644 --- a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java +++ b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java @@ -1,15 +1,18 @@ package io.neonbee.internal.cluster.entity; -import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; import io.neonbee.entity.EntityVerticle; import io.neonbee.internal.Registry; +import io.neonbee.internal.WriteLockRegistry; import io.neonbee.internal.WriteSafeRegistry; import io.neonbee.internal.cluster.ClusterHelper; +import io.vertx.codegen.annotations.Nullable; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; @@ -44,7 +47,7 @@ public class ClusterEntityRegistry implements Registry { @VisibleForTesting final WriteSafeRegistry clusteringInformation; - final WriteSafeRegistry entityRegistry; + final WriteLockRegistry entityRegistry; private final Vertx vertx; @@ -55,7 +58,7 @@ public class ClusterEntityRegistry implements Registry { * @param registryName the name of the map registry */ public ClusterEntityRegistry(Vertx vertx, String registryName) { - this.entityRegistry = new WriteSafeRegistry<>(vertx, registryName); + this.entityRegistry = new WriteLockRegistry<>(vertx, registryName); this.clusteringInformation = new WriteSafeRegistry<>(vertx, registryName + "#ClusteringInformation"); this.vertx = vertx; } @@ -135,40 +138,77 @@ public Future unregisterNode(String clusterNodeId) { // If no entities are registered, return a completed future return Future.succeededFuture(); } - registeredEntities = registeredEntities.copy(); - List> futureList = new ArrayList<>(registeredEntities.size()); - for (Object o : registeredEntities) { - if (shouldRemove(map, o)) { - JsonObject jo = (JsonObject) o; - String entityName = jo.getString(ENTITY_NAME_KEY); - String qualifiedName = jo.getString(QUALIFIED_NAME_KEY); - futureList.add(entityRegistry.unregister(entityName, qualifiedName)); - } - } - return Future.join(futureList).mapEmpty(); - }).compose(cf -> removeClusteringInformation(clusterNodeId)); + + // The clustering information map should look like this: + // + // 6dbe2f47-8e2a-4b41-b019-007b16157f87 -> + // [{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[ERP.Customers]" + // },{ + // "qualifiedName":"unregisterentitiestest/_MarketSalesEntityVerticle-133064210", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_MarketSalesEntityVerticle-133064210", + // "entityName":"entityVerticles[Market.Products]" + // }] + // + // d4f78582-14d4-493f-8a74-03d0e49c566b -> + // [{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[ERP.Customers]" + // }] + + // The entity registry map should look like this: + // + // entityVerticles[Sales.Orders] -> + // [ + // "unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "unregisterentitiestest/_MarketSalesEntityVerticle-133064210" + // ] + // + // entityVerticles[Market.Products] -> + // [ + // "unregisterentitiestest/_MarketSalesEntityVerticle-133064210" + // ] + // entityVerticles[ERP.Customers] -> + // [ + // "unregisterentitiestest/_ErpSalesEntityVerticle-644622197" + // ] + return buildEntryMap(map) + .compose(entryMap -> entityRegistry.lock() + .compose(lock -> lock.execute(() -> createEntityMap(entryMap)))) + .compose(cf -> removeClusteringInformation(clusterNodeId)); + }); } - /** - * Check if the provided object should be removed from the map. - * - * @param map the map - * @param o the object - * @return true if the object should be removed - */ - private boolean shouldRemove(Map map, Object o) { - for (Map.Entry node : map.entrySet()) { - JsonArray ja = (JsonArray) node.getValue(); - // Iterate over the JsonArray to determine if it contains the specified object. - // JsonArray#contains cannot be used directly because the types of objects in the JsonArray may differ from - // those returned by the iterator. This discrepancy occurs because JsonArray.Iter#next automatically wraps - // standard Java types into their corresponding Json types. - for (Object object : ja) { - if (object.equals(o)) { - return false; - } - } - } - return true; + private Future createEntityMap(Map> entryMap) { + return entityRegistry.getSharedMap() + .compose(asyncMap -> asyncMap.clear() + .map(v -> entryMap.entrySet() + .stream() + .map(entry -> asyncMap + .put(entry.getKey(), new JsonArray(entry.getValue()))) + .collect(Collectors.toList())) + .map(Future::all) + .mapEmpty()); + } + + private Future<@Nullable Map>> buildEntryMap(Map map) { + return vertx.executeBlocking(() -> map.values() + .stream() + .map(JsonArray.class::cast) + .flatMap(JsonArray::stream) + .map(JsonObject.class::cast) + .collect(Collectors.toMap( + jo -> jo.getString(ENTITY_NAME_KEY), + jo -> List.of(jo.getString(QUALIFIED_NAME_KEY)), + (l1, l2) -> Stream.of(l1, l2).flatMap(List::stream).toList()))); } } diff --git a/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java b/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java index 6cd129ca..24d47213 100644 --- a/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java +++ b/src/main/java/io/neonbee/internal/cluster/entity/UnregisterEntityVerticlesHook.java @@ -40,6 +40,25 @@ public void unregisterOnShutdown(NeonBee neonBee, HookContext hookContext, Promi .onFailure(ignoredCause -> LOGGER.error("Failed to unregister models on shutdown")); } + /** + * This method is called when a NeonBee node has left the cluster. + * + * @param neonBee the {@link NeonBee} instance + * @param hookContext the {@link HookContext} + * @param promise {@link Promise} to completed the function. + */ + @Hook(HookType.NODE_LEFT) + public void cleanup(NeonBee neonBee, HookContext hookContext, Promise promise) { + String clusterNodeId = hookContext.get(CLUSTER_NODE_ID); + LOGGER.info("Cleanup qualified names for node {}", clusterNodeId); + if (ClusterHelper.isLeader(neonBee.getVertx())) { + LOGGER.info("Cleaning registered qualified names ..."); + unregister(neonBee, clusterNodeId).onComplete(promise) + .onSuccess(unused -> LOGGER.info("Qualified names successfully cleaned up")) + .onFailure(ignoredCause -> LOGGER.error("Failed to cleanup qualified names")); + } + } + /** * Unregister the entity qualified names for the node by ID. * @@ -69,24 +88,4 @@ public static Future unregister(NeonBee neonBee, String clusterNodeId) { .onFailure(cause -> LOGGER.error("Failed to unregistered entity verticle models for node ID {} ...", clusterNodeId, cause)); } - - /** - * This method is called when a NeonBee node has left the cluster. - * - * @param neonBee the {@link NeonBee} instance - * @param hookContext the {@link HookContext} - * @param promise {@link Promise} to completed the function. - */ - @Hook(HookType.NODE_LEFT) - public void cleanup(NeonBee neonBee, HookContext hookContext, Promise promise) { - String clusterNodeId = hookContext.get(CLUSTER_NODE_ID); - LOGGER.info("Cleanup qualified names for node {}", clusterNodeId); - if (ClusterHelper.isLeader(neonBee.getVertx())) { - LOGGER.info("Cleaning registered qualified names ..."); - unregister(neonBee, clusterNodeId).onComplete(promise) - .onSuccess(unused -> LOGGER.info("Qualified names successfully cleaned up")) - .onFailure(ignoredCause -> LOGGER.error("Failed to cleanup qualified names")); - } - } - } diff --git a/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java b/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java index f6465471..dcd001fe 100644 --- a/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java +++ b/src/test/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistryTest.java @@ -135,7 +135,7 @@ void unregisterNode2(Vertx vertx, VertxTestContext context) { checkpoint.flag(); })).compose(unused -> registry2.unregisterNode(clusterIdNode2)).compose(unused -> registry2.get(KEY)) .onSuccess(ja -> context.verify(() -> { - assertThat(ja).isEmpty(); + assertThat(ja).isNull(); checkpoint.flag(); })).onFailure(context::failNow); } diff --git a/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java b/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java index 7691ee8f..d1abd4b1 100644 --- a/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java +++ b/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java @@ -114,7 +114,7 @@ private void testUnregisteringEntitiesSingleNode(NeonBee web, VertxTestContext t })).compose(unused -> UnregisterEntityVerticlesHook.unregister(web, clusterNodeId)) .compose(unused -> registry.get(sharedEntityMapName(ErpSalesEntityVerticle.FQN_ERP_CUSTOMERS))) .onSuccess(jsonArray -> testContext.verify(() -> { - assertThat(jsonArray).isEqualTo(new JsonArray()); + assertThat(jsonArray).isNull(); checkpoint.flag(); })).compose(unused -> registry.clusteringInformation.get(clusterNodeId)) .onSuccess(object -> testContext.verify(() -> { diff --git a/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java b/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java index 0ccf06de..a29c48b2 100644 --- a/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java +++ b/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java @@ -74,8 +74,15 @@ public static Path createTempDirectory() throws IOException { @NeonBeeDeployable(namespace = NeonBeeDeployable.NEONBEE_NAMESPACE, autoDeploy = false) public static class TestEntityVerticle1 extends EntityVerticle { + + /** + * The fully qualified name of the entity type "ERP.Customers". + */ static final FullQualifiedName FQN_ERP_CUSTOMERS = new FullQualifiedName("ERP", "Customers"); + /** + * The fully qualified name of the entity type "Sales.Orders". + */ static final FullQualifiedName FQN_SALES_ORDERS = new FullQualifiedName("Sales.Orders"); @Override diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml new file mode 100644 index 00000000..88bc9faa --- /dev/null +++ b/src/test/resources/logback-test.xml @@ -0,0 +1,11 @@ + + + + + + + + %d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n + + +