From 2a548529956825d53565207033f05afffaeb37fe Mon Sep 17 00:00:00 2001 From: Michael Halberstadt Date: Fri, 25 Oct 2024 12:19:05 +0200 Subject: [PATCH] fix: entity unregister node When the `unregisterNode` method is called for another NeonBee node, and the executing node has an `EntityVerticle` that was also registered by the other node, the `EntityVerticle` was incorrectly unregistered for the executing node. This fix ensures that the `EntityVerticle` remains registered for the executing node. --- .../cluster/entity/ClusterEntityRegistry.java | 70 +-- .../entity/UnregisterEntitiesTest.java | 442 +++++++++++++++++- .../internal/job/RedeployEntitiesJobTest.java | 14 +- 3 files changed, 472 insertions(+), 54 deletions(-) diff --git a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java index 5056a376..86fae92c 100644 --- a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java +++ b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java @@ -44,9 +44,9 @@ public class ClusterEntityRegistry implements Registry { @VisibleForTesting final WriteSafeRegistry clusteringInformation; - private final Vertx vertx; + final WriteSafeRegistry entityRegistry; - private final WriteSafeRegistry entityRegistry; + private final Vertx vertx; /** * Create a new instance of {@link ClusterEntityRegistry}. @@ -76,8 +76,11 @@ public Future register(String sharedMapKey, String value) { @Override public Future unregister(String sharedMapKey, String value) { - return Future.all(entityRegistry.unregister(sharedMapKey, value), clusteringInformation - .unregister(getClusterNodeId(), clusterRegistrationInformation(sharedMapKey, value))) + return Future.all( + entityRegistry.unregister(sharedMapKey, value), + clusteringInformation.unregister( + getClusterNodeId(), + clusterRegistrationInformation(sharedMapKey, value))) .mapEmpty(); } @@ -123,32 +126,47 @@ Future removeClusteringInformation(String clusterNodeId) { * @return the future */ public Future unregisterNode(String clusterNodeId) { - return clusteringInformation.getSharedMap().compose(AsyncMap::entries).compose(map -> { - JsonArray registeredEntities = (JsonArray) map.remove(clusterNodeId); - - if (registeredEntities == null) { - // If no entities are registered, return a completed future - return Future.succeededFuture(); - } - registeredEntities = registeredEntities.copy(); - List> futureList = new ArrayList<>(registeredEntities.size()); - for (Object o : registeredEntities) { - if (remove(map, o)) { - JsonObject jo = (JsonObject) o; - String entityName = jo.getString(ENTITY_NAME_KEY); - String qualifiedName = jo.getString(QUALIFIED_NAME_KEY); - futureList.add(unregister(entityName, qualifiedName)); - } - } - return Future.join(futureList).mapEmpty(); - }).compose(cf -> removeClusteringInformation(clusterNodeId)); + return clusteringInformation.getSharedMap() + .compose(AsyncMap::entries) + .compose(map -> { + JsonArray registeredEntities = (JsonArray) map.remove(clusterNodeId); + + if (registeredEntities == null) { + // If no entities are registered, return a completed future + return Future.succeededFuture(); + } + registeredEntities = registeredEntities.copy(); + List> futureList = new ArrayList<>(registeredEntities.size()); + for (Object o : registeredEntities) { + if (shouldRemove(map, o)) { + JsonObject jo = (JsonObject) o; + String entityName = jo.getString(ENTITY_NAME_KEY); + String qualifiedName = jo.getString(QUALIFIED_NAME_KEY); + futureList.add(entityRegistry.unregister(entityName, qualifiedName)); + } + } + return Future.join(futureList).mapEmpty(); + }).compose(cf -> removeClusteringInformation(clusterNodeId)); } - private boolean remove(Map map, Object o) { + /** + * Check if the provided object should be removed from the map. + * + * @param map the map + * @param o the object + * @return true if the object should be removed + */ + private boolean shouldRemove(Map map, Object o) { for (Map.Entry node : map.entrySet()) { JsonArray ja = (JsonArray) node.getValue(); - if (ja.contains(o)) { - return false; + // Iterate over the JsonArray to determine if it contains the specified object. + // JsonArray#contains cannot be used directly because the types of objects in the JsonArray may differ from + // those returned by the iterator. This discrepancy occurs because JsonArray.Iter#next automatically wraps + // standard Java types into their corresponding Json types. + for (Object object : ja) { + if (object.equals(o)) { + return false; + } } } return true; diff --git a/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java b/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java index cbf759af..7691ee8f 100644 --- a/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java +++ b/src/test/java/io/neonbee/internal/cluster/entity/UnregisterEntitiesTest.java @@ -4,56 +4,87 @@ import static io.neonbee.NeonBeeInstanceConfiguration.ClusterManager.HAZELCAST; import static io.neonbee.NeonBeeInstanceConfiguration.ClusterManager.INFINISPAN; import static io.neonbee.NeonBeeProfile.WEB; +import static io.neonbee.test.base.NeonBeeTestBase.LONG_RUNNING_TEST; import static io.vertx.core.Future.succeededFuture; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; import org.apache.olingo.commons.api.edm.FullQualifiedName; import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Isolated; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableList; import io.neonbee.NeonBee; +import io.neonbee.NeonBeeDeployable; import io.neonbee.NeonBeeExtension; import io.neonbee.NeonBeeInstanceConfiguration; +import io.neonbee.data.DataVerticle; import io.neonbee.entity.EntityVerticle; import io.neonbee.internal.cluster.ClusterHelper; import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Timer; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.ext.cluster.infinispan.InfinispanClusterManager; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.Timeout; import io.vertx.junit5.VertxTestContext; +@Tag(LONG_RUNNING_TEST) @ExtendWith({ NeonBeeExtension.class }) +@Isolated class UnregisterEntitiesTest { static final String SHARED_ENTITY_MAP_NAME = "entityVerticles[%s]"; + private static final Logger LOGGER = LoggerFactory.getLogger(UnregisterEntitiesTest.class); + + public static final String TEST_NAMESPACE = "unregisterentitiestest"; + @Test - @DisplayName("test unregistering entity models (Infinispan cluster)) ") - void testInfinispanUnregisteringEntities(@NeonBeeInstanceConfiguration(activeProfiles = WEB, clustered = true, - clusterManager = INFINISPAN) NeonBee web, VertxTestContext testContext) { - testUnregisteringEntities(web, testContext); + @DisplayName("test unregistering entity models in a single node Infinispan cluster") + void testInfinispanUnregisteringEntitiesSingleNode( + @NeonBeeInstanceConfiguration(activeProfiles = WEB, clustered = true, + clusterManager = INFINISPAN) NeonBee web, + VertxTestContext testContext) { + testUnregisteringEntitiesSingleNode(web, testContext); } @Test - @DisplayName("test unregistering entity models (Hazelcast cluster)") - void testHazelcastUnregisteringEntities(@NeonBeeInstanceConfiguration(activeProfiles = WEB, clustered = true, - clusterManager = HAZELCAST) NeonBee web, VertxTestContext testContext) { - testUnregisteringEntities(web, testContext); + @DisplayName("test unregistering entity models in a single node Hazelcast cluster") + void testHazelcastUnregisteringEntitiesSingleNode( + @NeonBeeInstanceConfiguration(activeProfiles = WEB, clustered = true, + clusterManager = HAZELCAST) NeonBee web, + VertxTestContext testContext) { + testUnregisteringEntitiesSingleNode(web, testContext); } - private void testUnregisteringEntities(NeonBee web, VertxTestContext testContext) { + private void testUnregisteringEntitiesSingleNode(NeonBee web, VertxTestContext testContext) { assertThat(isClustered(web)).isTrue(); Vertx vertx = web.getVertx(); String clusterNodeId = ClusterHelper.getClusterNodeId(vertx); + Checkpoint checkpoint = testContext.checkpoint(5); + ClusterEntityRegistry registry = (ClusterEntityRegistry) web.getEntityRegistry(); - EntityVerticleUnregisterImpl entityVerticle = new EntityVerticleUnregisterImpl(); + ErpSalesEntityVerticle entityVerticle = new ErpSalesEntityVerticle(); vertx.deployVerticle(entityVerticle) .compose(unused -> registry.clusteringInformation.get(ClusterHelper.getClusterNodeId(web.getVertx()))) .onSuccess(jsonArray -> testContext.verify(() -> { @@ -65,42 +96,372 @@ private void testUnregisteringEntities(NeonBee web, VertxTestContext testContext assertThat(jsonObjectList.get(0)) .isEqualTo(JsonObject.of("qualifiedName", entityVerticle.getQualifiedName(), "entityName", - sharedEntityMapName(EntityVerticleUnregisterImpl.FQN_ERP_CUSTOMERS))); + sharedEntityMapName(ErpSalesEntityVerticle.FQN_ERP_CUSTOMERS))); assertThat(jsonObjectList.get(1)) .isEqualTo(JsonObject.of("qualifiedName", entityVerticle.getQualifiedName(), "entityName", - sharedEntityMapName(EntityVerticleUnregisterImpl.FQN_SALES_ORDERS))); + sharedEntityMapName(ErpSalesEntityVerticle.FQN_SALES_ORDERS))); + checkpoint.flag(); })) .compose(unused -> EntityVerticle.getVerticlesForEntityType(vertx, - EntityVerticleUnregisterImpl.FQN_ERP_CUSTOMERS)) + ErpSalesEntityVerticle.FQN_ERP_CUSTOMERS)) .compose(list1 -> EntityVerticle - .getVerticlesForEntityType(vertx, EntityVerticleUnregisterImpl.FQN_SALES_ORDERS).map(list2 -> { - return ImmutableList.builder().addAll(list1).addAll(list2).build(); - })) + .getVerticlesForEntityType(vertx, ErpSalesEntityVerticle.FQN_SALES_ORDERS) + .map(list2 -> ImmutableList.builder().addAll(list1).addAll(list2).build())) .onSuccess(list -> testContext.verify(() -> { assertThat(list).hasSize(2); + checkpoint.flag(); })).compose(unused -> UnregisterEntityVerticlesHook.unregister(web, clusterNodeId)) - .compose(unused -> registry.get(sharedEntityMapName(EntityVerticleUnregisterImpl.FQN_ERP_CUSTOMERS))) + .compose(unused -> registry.get(sharedEntityMapName(ErpSalesEntityVerticle.FQN_ERP_CUSTOMERS))) .onSuccess(jsonArray -> testContext.verify(() -> { assertThat(jsonArray).isEqualTo(new JsonArray()); + checkpoint.flag(); })).compose(unused -> registry.clusteringInformation.get(clusterNodeId)) .onSuccess(object -> testContext.verify(() -> { assertThat(object).isNull(); - testContext.completeNow(); + checkpoint.flag(); })) .compose(unused -> EntityVerticle.getVerticlesForEntityType(vertx, - EntityVerticleUnregisterImpl.FQN_ERP_CUSTOMERS)) + ErpSalesEntityVerticle.FQN_ERP_CUSTOMERS)) .compose(list1 -> EntityVerticle - .getVerticlesForEntityType(vertx, EntityVerticleUnregisterImpl.FQN_SALES_ORDERS).map(list2 -> { - return ImmutableList.builder().addAll(list1).addAll(list2).build(); - })) + .getVerticlesForEntityType(vertx, ErpSalesEntityVerticle.FQN_SALES_ORDERS) + .map(list2 -> ImmutableList.builder().addAll(list1).addAll(list2).build())) .onSuccess(list -> testContext.verify(() -> { assertThat(list).isEmpty(); + checkpoint.flag(); })) + .onFailure(testContext::failNow); + } + + @Test + @DisplayName("test unregistering entity models in a multi node Infinispan cluster") + void testInfinispanUnregisteringEntitiesMultiNode( + @NeonBeeInstanceConfiguration(activeProfiles = WEB, clustered = true, + clusterManager = INFINISPAN) NeonBee neonBee1, + @NeonBeeInstanceConfiguration(activeProfiles = WEB, clustered = true, + clusterManager = INFINISPAN) NeonBee neonBee2, + VertxTestContext testContext) { + waitForClusterToForm(neonBee1.getVertx(), () -> infinispanMembersJoined(neonBee1.getVertx(), 2)) + .onSuccess(unused -> testUnregisteringEntitiesMultiNode(neonBee1, neonBee2, testContext)) + .onFailure(testContext::failNow); + } + + /** + * Check if the number of Hazelcast members has joined the cluster. + * + * @param vertx the vertx instance + * @param membershipSize the expected number of members + * @return true if the number of members has joined the cluster + */ + boolean infinispanMembersJoined(Vertx vertx, int membershipSize) { + return ClusterHelper.getInfinispanClusterManager(vertx) + .map(InfinispanClusterManager::getNodes) + .map(List::size) + .map(size -> size == membershipSize) + .orElse(Boolean.FALSE); + } + @Test + @Timeout(value = 2, timeUnit = TimeUnit.MINUTES) + @DisplayName("test unregistering entity models in a multi node Hazelcast cluster") + void testHazelcastUnregisteringEntitiesMultiNode( + @NeonBeeInstanceConfiguration(activeProfiles = WEB, clustered = true, + clusterManager = HAZELCAST, clusterConfigFile = "hazelcast-localtcp.xml") NeonBee neonBee1, + @NeonBeeInstanceConfiguration(activeProfiles = WEB, clustered = true, + clusterManager = HAZELCAST, clusterConfigFile = "hazelcast-localtcp.xml") NeonBee neonBee2, + VertxTestContext testContext) { + waitForClusterToForm(neonBee1.getVertx(), () -> hazelcastMembersJoined(neonBee1.getVertx(), 2)) + .onSuccess(unused -> testUnregisteringEntitiesMultiNode(neonBee1, neonBee2, testContext)) .onFailure(testContext::failNow); } + /** + * Wait for the Hazelcast cluster to form. + * + * @param vertx the vertx instance + * @return a future that completes when the cluster has formed + */ + private Future waitForClusterToForm(Vertx vertx, BooleanSupplier cluserFormed) { + long startTime = System.currentTimeMillis(); + Promise promise = Promise.promise(); + Timer timer = vertx.timer(1, TimeUnit.MINUTES); + vertx.setPeriodic(100, id -> { // Check every 100 ms + if (cluserFormed.getAsBoolean()) { + vertx.cancelTimer(id); + LOGGER.info("The cluster has formed after {} ms.", System.currentTimeMillis() - startTime); + promise.complete(); + } else if (timer.isComplete()) { // fail after ~60 seconds + vertx.cancelTimer(id); + promise.fail( + new IllegalStateException("Cluster did not form in time. (Waited for " + + (System.currentTimeMillis() - startTime) + " ms).")); + } else { + LOGGER.info("The cluster has not yet formed after {} ms.", System.currentTimeMillis() - startTime); + } + }); + return promise.future(); + } + + /** + * Check if the number of Hazelcast members has joined the cluster. + * + * @param vertx the vertx instance + * @param membershipSize the expected number of members + * @return true if the number of members has joined the cluster + */ + boolean hazelcastMembersJoined(Vertx vertx, int membershipSize) { + return ClusterHelper.getHazelcastClusterManager(vertx) + .map(clusterManager -> clusterManager.getHazelcastInstance().getCluster().getMembers() + .size() == membershipSize) + .orElse(Boolean.FALSE); + } + + private void testUnregisteringEntitiesMultiNode(NeonBee node1, NeonBee node2, VertxTestContext testContext) { + assertThat(isClustered(node1)).isTrue(); + + Checkpoint checkpoint = testContext.checkpoint(3); + ClusterEntityRegistry n1Registry = (ClusterEntityRegistry) node1.getEntityRegistry(); + + ErpSalesEntityVerticle erpSales = new ErpSalesEntityVerticle(); + ErpSalesEntityVerticle erpSales2 = new ErpSalesEntityVerticle(); + MarketSalesEntityVerticle makedSales = new MarketSalesEntityVerticle(); + + String clusterNode1Id = ClusterHelper.getClusterNodeId(node1.getVertx()); + String clusterNode2Id = ClusterHelper.getClusterNodeId(node2.getVertx()); + LOGGER.info("NeonBee ID: {}; Node 1 ID: {}", node1.getNodeId(), clusterNode1Id); + LOGGER.info("NeonBee ID: {}; Node 2 ID: {}", node2.getNodeId(), clusterNode2Id); + + // 1. deploy the same EntitiyVerticle on two NeonBee nodes. + Future.all( + node1.getVertx().deployVerticle(erpSales), + node1.getVertx().deployVerticle(makedSales), + node2.getVertx().deployVerticle(erpSales2)) + .compose(event -> printMaps(n1Registry, "After EntityVerticles deployed")) + + // 2. verify the content of ClusterEntityRegistry#clusteringInformation and + // ClusterEntityRegistry#entityRegistry + .compose(unused -> { + Future> clusterInfoEntries = + n1Registry.clusteringInformation.getSharedMap().compose(AsyncMap::entries); + Future> entityRegistryEntries = + n1Registry.entityRegistry.getSharedMap().compose(AsyncMap::entries); + + return Future.all(clusterInfoEntries, entityRegistryEntries) + .onSuccess(compositeFuture -> testContext.verify(() -> { + // The clustering information map should look like this: + // + // 6dbe2f47-8e2a-4b41-b019-007b16157f87 -> + // [{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[ERP.Customers]" + // },{ + // "qualifiedName":"unregisterentitiestest/_MarketSalesEntityVerticle-133064210", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_MarketSalesEntityVerticle-133064210", + // "entityName":"entityVerticles[Market.Products]" + // }] + // + // d4f78582-14d4-493f-8a74-03d0e49c566b -> + // [{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[ERP.Customers]" + // }] + verifyClusterInformation( + clusterInfoEntries.result(), + Map.of( + clusterNode1Id, List.of(erpSales, makedSales), + clusterNode2Id, List.of(erpSales))); + + // The entity registry map should look like this: + // + // entityVerticles[Sales.Orders] -> + // [ + // "unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "unregisterentitiestest/_MarketSalesEntityVerticle-133064210" + // ] + // + // entityVerticles[Market.Products] -> + // [ + // "unregisterentitiestest/_MarketSalesEntityVerticle-133064210" + // ] + // entityVerticles[ERP.Customers] -> + // [ + // "unregisterentitiestest/_ErpSalesEntityVerticle-644622197" + // ] + verifyEntityRegistry(entityRegistryEntries.result(), Set.of(erpSales, makedSales)); + checkpoint.flag(); + })); + }) + + // 3. unregister node2 executed by node1 + .compose(unused -> n1Registry.unregisterNode(ClusterHelper.getClusterNodeId(node2.getVertx()))) + .compose(event -> printMaps(n1Registry, "After unregister node 2")) + + // 4. verify the content of ClusterEntityRegistry#clusteringInformation and + // ClusterEntityRegistry#entityRegistry + .compose(unused -> { + Future> clusterInfoEntries = + n1Registry.clusteringInformation.getSharedMap().compose(AsyncMap::entries); + Future> entityRegistryEntries = + n1Registry.entityRegistry.getSharedMap().compose(AsyncMap::entries); + + return Future.all(clusterInfoEntries, entityRegistryEntries) + .onSuccess(compositeFuture -> testContext.verify(() -> { + // The clustering information map should look like this: + // + // 6dbe2f47-8e2a-4b41-b019-007b16157f87 -> + // [{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "entityName":"entityVerticles[ERP.Customers]" + // },{ + // "qualifiedName":"unregisterentitiestest/_MarketSalesEntityVerticle-133064210", + // "entityName":"entityVerticles[Sales.Orders]" + // },{ + // "qualifiedName":"unregisterentitiestest/_MarketSalesEntityVerticle-133064210", + // "entityName":"entityVerticles[Market.Products]" + // }] + verifyClusterInformation( + clusterInfoEntries.result(), + Map.of(clusterNode1Id, List.of(erpSales, makedSales))); + + // The entity registry map should look like this: + // + // entityVerticles[Sales.Orders] -> + // [ + // "unregisterentitiestest/_ErpSalesEntityVerticle-644622197", + // "unregisterentitiestest/_MarketSalesEntityVerticle-133064210" + // ] + // entityVerticles[Market.Products] -> + // [ + // "unregisterentitiestest/_MarketSalesEntityVerticle-133064210" + // ] + // entityVerticles[ERP.Customers] -> + // [ + // "unregisterentitiestest/_ErpSalesEntityVerticle-644622197" + // ] + verifyEntityRegistry(entityRegistryEntries.result(), Set.of(erpSales, makedSales)); + checkpoint.flag(); + })); + }) + + // 5. unregister node1 + .compose(unused -> n1Registry.unregisterNode(ClusterHelper.getClusterNodeId(node1.getVertx()))) + .compose(event -> printMaps(n1Registry, "After unregister node 1")) + + // 6. verify the content of ClusterEntityRegistry#clusteringInformation and + // ClusterEntityRegistry#entityRegistry + .compose(unused -> { + Future> clusterInfoEntries = + n1Registry.clusteringInformation.getSharedMap().compose(AsyncMap::entries); + Future> entityRegistryEntries = + n1Registry.entityRegistry.getSharedMap().compose(AsyncMap::entries); + + return Future.all(clusterInfoEntries, entityRegistryEntries) + .onSuccess(compositeFuture -> testContext.verify(() -> { + assertThat(clusterInfoEntries.result()).isEmpty(); + + // The entity registry map should look like this: + // entityVerticles[Sales.Orders] -> [] + // entityVerticles[Market.Products] -> [] + // entityVerticles[ERP.Customers] -> [] + long entityVerticlesCount = entityRegistryEntries.result().values() + .stream() + .map(JsonArray.class::cast) + .flatMap(JsonArray::stream) + .count(); + assertThat(entityVerticlesCount).isEqualTo(0); + checkpoint.flag(); + })); + }) + .onFailure(testContext::failNow); + } + + private void verifyEntityRegistry(Map entityRegistryEntries, + Set deployedEntityVerticles) { + + Map> deployedEntityRegistryMap = new HashMap<>(); + for (EntityVerticle entityVerticle : deployedEntityVerticles) { + for (FullQualifiedName entityTypeName : entityVerticle.entityTypeNames().result()) { + String sharedEntityMapName = sharedEntityMapName(entityTypeName); + + String qualifiedName = DataVerticle.createQualifiedName( + TEST_NAMESPACE, + EntityVerticle.getName(entityVerticle.getClass())); + + Set qualifiedNames = + deployedEntityRegistryMap.computeIfAbsent(sharedEntityMapName, k -> new HashSet<>()); + qualifiedNames.add(qualifiedName); + } + } + + assertThat(entityRegistryEntries).hasSize(deployedEntityRegistryMap.size()); + + for (Map.Entry> entry : deployedEntityRegistryMap.entrySet()) { + String entityName = entry.getKey(); + Set qualifiedNames = entry.getValue(); + JsonArray jsonArray = (JsonArray) entityRegistryEntries.get(entityName); + assertThat(jsonArray).containsExactly(qualifiedNames.toArray()); + } + } + + private void verifyClusterInformation(Map clusteringInformationMap, + Map> deployedEntityVerticles) { + assertThat(clusteringInformationMap).hasSize(deployedEntityVerticles.size()); + assertThat(clusteringInformationMap.keySet()).containsExactly(deployedEntityVerticles.keySet().toArray()); + + for (Map.Entry> e : deployedEntityVerticles.entrySet()) { + String nodeId = e.getKey(); + List entityVerticles = e.getValue(); + Object o = clusteringInformationMap.get(nodeId); + assertThat(o).isInstanceOf(JsonArray.class); + JsonArray jsonArray = (JsonArray) o; + + int entityNamesCount = entityVerticles.stream() + .map(EntityVerticle::entityTypeNames) + .map(Future::result) + .map(Set::size) + .reduce(0, Integer::sum); + assertThat(jsonArray.size()).isEqualTo(entityNamesCount); + + Map> deployedEntityVerticleMap = entityVerticles.stream() + .collect(Collectors.toMap( + entityVerticle -> DataVerticle.createQualifiedName( + TEST_NAMESPACE, + EntityVerticle.getName(entityVerticle.getClass())), + entityVerticle -> entityVerticle.entityTypeNames() + .result() + .stream() + .map(UnregisterEntitiesTest::sharedEntityMapName) + .collect(Collectors.toSet()))); + + Map> clusterInfromationEntityVerticleMap = jsonArray.stream() + .map(JsonObject.class::cast) + .collect(Collectors.groupingBy( + jsonObject -> jsonObject.getString(ClusterEntityRegistry.QUALIFIED_NAME_KEY), + Collectors.mapping( + jsonObject -> jsonObject.getString(ClusterEntityRegistry.ENTITY_NAME_KEY), + Collectors.toSet()))); + + assertThat(clusterInfromationEntityVerticleMap.keySet()) + .containsExactly(deployedEntityVerticleMap.keySet().toArray()); + for (Map.Entry> entry : deployedEntityVerticleMap.entrySet()) { + assertThat(clusterInfromationEntityVerticleMap).containsKey(entry.getKey()); + Set entityNames = clusterInfromationEntityVerticleMap.get(entry.getKey()); + assertThat(entityNames).containsExactly(entry.getValue().toArray()); + } + } + } + static String sharedEntityMapName(FullQualifiedName entityTypeName) { return String.format(SHARED_ENTITY_MAP_NAME, entityTypeName.getFullQualifiedNameAsString()); } @@ -109,7 +470,30 @@ private boolean isClustered(NeonBee neonBee) { return ClusterHelper.getClusterManager(neonBee.getVertx()).isPresent(); } - public static class EntityVerticleUnregisterImpl extends EntityVerticle { + /** + * Print the content of the Clustering Information and Entity Registry map. + * + * @param registry the cluster entity registry + * @param identifier an identifier for the printed content + * @return a future that completes when the content of the shared maps has been printed + */ + private Future> printMaps(ClusterEntityRegistry registry, String identifier) { + return registry.clusteringInformation.getSharedMap() + .compose(AsyncMap::entries) + .onSuccess(entires -> { + LOGGER.info("##### Clustering Information {}:", identifier); + entires.forEach((k, v) -> LOGGER.info("##### {}: {} -> {}", identifier, k, v)); + }) + .compose(unused -> registry.entityRegistry.getSharedMap()) + .compose(AsyncMap::entries) + .onSuccess(entires -> { + LOGGER.info("##### Entity Registry {}:", identifier); + entires.forEach((k, v) -> LOGGER.info("##### {}: {} -> {} ", identifier, k, v)); + }); + } + + @NeonBeeDeployable(namespace = TEST_NAMESPACE, autoDeploy = false) + public static class ErpSalesEntityVerticle extends EntityVerticle { static final FullQualifiedName FQN_ERP_CUSTOMERS = new FullQualifiedName("ERP", "Customers"); static final FullQualifiedName FQN_SALES_ORDERS = new FullQualifiedName("Sales", "Orders"); @@ -119,4 +503,16 @@ public Future> entityTypeNames() { return succeededFuture(Set.of(FQN_ERP_CUSTOMERS, FQN_SALES_ORDERS)); } } + + @NeonBeeDeployable(namespace = TEST_NAMESPACE, autoDeploy = false) + public static class MarketSalesEntityVerticle extends EntityVerticle { + static final FullQualifiedName FQN_TEST_PRODUCTS = new FullQualifiedName("Market", "Products"); + + static final FullQualifiedName FQN_SALES_ORDERS = new FullQualifiedName("Sales", "Orders"); + + @Override + public Future> entityTypeNames() { + return succeededFuture(Set.of(FQN_TEST_PRODUCTS, FQN_SALES_ORDERS)); + } + } } diff --git a/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java b/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java index 96294cd0..0ccf06de 100644 --- a/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java +++ b/src/test/java/io/neonbee/internal/job/RedeployEntitiesJobTest.java @@ -116,10 +116,10 @@ public Future execute(DataContext context) { return clusterEntityRegistry .getClusteringInformation(ClusterHelper.getClusterNodeId(vertx)); }) - .onSuccess(event -> testContext.verify(() -> { - assertThat(event).isNotNull(); - assertThat(event.size()).isEqualTo(6); - Set deployedSet = event.stream() + .onSuccess(clusteringInformation -> testContext.verify(() -> { + assertThat(clusteringInformation).isNotNull(); + assertThat(clusteringInformation.size()).isEqualTo(6); + Set deployedSet = clusteringInformation.stream() .map(o -> (JsonObject) o) .map(jo -> jo.getString(ClusterEntityRegistry.QUALIFIED_NAME_KEY)) .map(s -> s.replaceAll("-\\d*", "")) @@ -136,7 +136,11 @@ public Future execute(DataContext context) { @Override boolean filterByAutoDeployAndProfiles(Class verticleClass, Collection activeProfiles) { - return true; + return Set.of( + TestEntityVerticle1.class, + TestEntityVerticle2.class, + TestEntityVerticle3.class) + .contains(verticleClass); } }