diff --git a/build.gradle b/build.gradle
index 5c6062a2..2f8be79f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -7,6 +7,7 @@ plugins {
id 'jacoco'
id 'pmd'
id 'checkstyle'
+ id 'idea'
// community plugins
id 'com.diffplug.spotless' version '6.25.0'
@@ -20,6 +21,14 @@ plugins {
id 'team.yi.semantic-gitlog' version '0.6.12'
}
+idea {
+ module {
+ downloadSources = true
+ downloadJavadoc = true
+ }
+}
+
+
group = 'io.neonbee'
version = '0.37.2-SNAPSHOT'
mainClassName = 'io.neonbee.Launcher'
diff --git a/src/main/java/io/neonbee/internal/SharedRegistry.java b/src/main/java/io/neonbee/internal/SharedRegistry.java
new file mode 100644
index 00000000..5d9adc37
--- /dev/null
+++ b/src/main/java/io/neonbee/internal/SharedRegistry.java
@@ -0,0 +1,119 @@
+package io.neonbee.internal;
+
+import static io.vertx.core.Future.succeededFuture;
+
+import io.neonbee.NeonBee;
+import io.neonbee.logging.LoggingFacade;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.shareddata.AsyncMap;
+import io.vertx.core.shareddata.SharedData;
+
+/**
+ * A registry to manage values in the {@link SharedData} shared map.
+ *
+ * This class is a generic implementation of a registry that can be used to store values in a shared map.
+ *
+ * @param the type of the value to store in the shared registry
+ */
+public class SharedRegistry implements Registry {
+
+ private final LoggingFacade logger = LoggingFacade.create();
+
+ private final SharedData sharedData;
+
+ private final String registryName;
+
+ /**
+ * Create a new {@link WriteSafeRegistry}.
+ *
+ * @param vertx the {@link Vertx} instance
+ * @param registryName the name of the map registry
+ */
+ public SharedRegistry(Vertx vertx, String registryName) {
+ this(registryName, new SharedDataAccessor(vertx, SharedRegistry.class));
+ }
+
+ /**
+ * Create a new {@link WriteSafeRegistry}.
+ *
+ * @param registryName the name of the map registry
+ * @param sharedData the shared data
+ */
+ public SharedRegistry(String registryName, SharedData sharedData) {
+ this.registryName = registryName;
+ this.sharedData = sharedData;
+ }
+
+ /**
+ * Register a value in the async shared map of {@link NeonBee} by key.
+ *
+ * @param sharedMapKey the shared map key
+ * @param value the value to register
+ * @return the future
+ */
+ @Override
+ public Future register(String sharedMapKey, T value) {
+ logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value);
+
+ Future> sharedMap = getSharedMap();
+
+ return sharedMap.compose(map -> map.get(sharedMapKey))
+ .map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray())
+ .compose(valueArray -> {
+ if (!valueArray.contains(value)) {
+ valueArray.add(value);
+ }
+
+ if (logger.isInfoEnabled()) {
+ logger.info("Registered verticle {} in shared map.", value);
+ }
+
+ return sharedMap.compose(map -> map.put(sharedMapKey, valueArray));
+ });
+ }
+
+ /**
+ * Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey.
+ *
+ * @param sharedMapKey the shared map key
+ * @param value the value to unregister
+ * @return the future
+ */
+ @Override
+ public Future unregister(String sharedMapKey, T value) {
+ logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value);
+
+ Future> sharedMap = getSharedMap();
+
+ return sharedMap.compose(map -> map.get(sharedMapKey))
+ .map(jsonArray -> (JsonArray) jsonArray)
+ .compose(values -> {
+ if (values == null) {
+ return succeededFuture();
+ }
+
+ if (logger.isInfoEnabled()) {
+ logger.info("Unregistered verticle {} in shared map.", value);
+ }
+
+ values.remove(value);
+ return sharedMap.compose(map -> map.put(sharedMapKey, values));
+ });
+ }
+
+ @Override
+ public Future get(String sharedMapKey) {
+ return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o);
+ }
+
+ /**
+ * Shared map that is used as registry.
+ *
+ * @return Future to the shared map
+ */
+ public Future> getSharedMap() {
+ return sharedData.getAsyncMap(registryName);
+ }
+}
diff --git a/src/main/java/io/neonbee/internal/WriteLockRegistry.java b/src/main/java/io/neonbee/internal/WriteLockRegistry.java
new file mode 100644
index 00000000..d86d9c9e
--- /dev/null
+++ b/src/main/java/io/neonbee/internal/WriteLockRegistry.java
@@ -0,0 +1,207 @@
+package io.neonbee.internal;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import io.neonbee.logging.LoggingFacade;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.shareddata.AsyncMap;
+import io.vertx.core.shareddata.Lock;
+import io.vertx.core.shareddata.SharedData;
+
+/**
+ * A {@link WriteSafeRegistry} implementation that uses a lock to lock all write operations to ensure that only one
+ * operation is executed at a time.
+ *
+ * @param the type of data this registry stores
+ */
+public class WriteLockRegistry implements Registry {
+
+ private static final String LOCK_KEY = WriteLockRegistry.class.getName();
+
+ private final LoggingFacade logger = LoggingFacade.create();
+
+ private final Registry registry;
+
+ private final SharedData sharedData;
+
+ private final String registryName;
+
+ private final Vertx vertx;
+
+ /**
+ * Create a new {@link WriteSafeRegistry}.
+ *
+ * @param vertx the {@link Vertx} instance
+ * @param registryName the name of the map registry
+ */
+ public WriteLockRegistry(Vertx vertx, String registryName) {
+ this(vertx, registryName, new SharedDataAccessor(vertx, WriteLockRegistry.class));
+ }
+
+ /**
+ * Create a new {@link WriteSafeRegistry}.
+ *
+ * @param vertx the {@link Vertx} instance
+ * @param registryName the name of the map registry
+ * @param sharedData the shared data
+ */
+ public WriteLockRegistry(Vertx vertx, String registryName, SharedData sharedData) {
+ this.vertx = vertx;
+ registry = new SharedRegistry<>(registryName, sharedData);
+ this.registryName = registryName;
+ this.sharedData = sharedData;
+ }
+
+ @Override
+ public Future register(String sharedMapKey, T value) {
+ return lock(LOCK_KEY)
+ .compose(lock -> lock.execute(() -> registry.register(sharedMapKey, value)))
+ .mapEmpty();
+ }
+
+ @Override
+ public Future unregister(String sharedMapKey, T value) {
+ return lock(LOCK_KEY)
+ .compose(lock -> lock.execute(() -> registry.unregister(sharedMapKey, value)))
+ .mapEmpty();
+ }
+
+ @Override
+ public Future get(String key) {
+ return registry.get(key);
+ }
+
+ /**
+ * Method that acquires a lock for the registry and released the lock after the returned TimeOutLock executed the
+ * futureSupplier or the timeout is reached.
+ *
+ * @return the futureSupplier
+ */
+ public Future lock() {
+ return lock(LOCK_KEY);
+ }
+
+ /**
+ * Method that acquires a lock for the sharedMapKey and released the lock after the futureSupplier is executed.
+ *
+ * @param sharedMapKey the shared map key
+ * @return the futureSupplier
+ */
+ private Future lock(String sharedMapKey) {
+ logger.debug("Get lock for {}", sharedMapKey);
+ return sharedData.getLock(sharedMapKey)
+ .map(TimeOutLock::new);
+ }
+
+ /**
+ * Shared map that is used as registry.
+ *
+ * It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)}
+ * and {@link WriteSafeRegistry#unregister(String, Object)} methods.
+ *
+ * @return Future to the shared map
+ */
+ public Future> getSharedMap() {
+ return sharedData.getAsyncMap(registryName);
+ }
+
+ /**
+ * A lock that is released after 10 seconds.
+ */
+ public class TimeOutLock implements Lock {
+
+ private static final long DEFAULT_TIME_OUT_DELAY_MS = TimeUnit.SECONDS.toMillis(1000);
+
+ private final UUID id = UUID.randomUUID();
+
+ private final LoggingFacade logger = LoggingFacade.create();
+
+ private final Lock lock;
+
+ private final long timerId;
+
+ private final Promise timeOutPromise;
+
+ private final long startTime;
+
+ private TimeOutLock(Lock lock) {
+ this.lock = lock;
+ this.timeOutPromise = Promise.promise();
+ this.timerId = startTimer();
+ this.startTime = System.currentTimeMillis();
+ logger.debug("Lock {} created", id);
+ }
+
+ private long startTimer() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("start lock timer {}", id);
+ }
+ return vertx.setTimer(DEFAULT_TIME_OUT_DELAY_MS, id -> {
+ logger.warn("Lock {} timed out after {} ms. Lock is released.", id,
+ startTime - System.currentTimeMillis());
+ stopRelease();
+ timeOutPromise
+ .fail(new TimeoutException("Lock for " + registryName + " timed out. Lock is released."));
+ });
+ }
+
+ @Override
+ public void release() {
+ stopRelease();
+ }
+
+ /**
+ * Execute the futureSupplier and release the lock after the futureSupplier is executed. If the supplied future
+ * does not complete within the timeout, the lock is released.
+ *
+ * @param futureSupplier the futureSupplier
+ * @param the type of the future
+ * @return the future
+ */
+ public Future execute(Supplier> futureSupplier) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("execute lock {}", id);
+ }
+ Future future = futureSupplier.get().onComplete(event -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("completed lock {}, succeeded {}", id, event.succeeded());
+ }
+ stopRelease();
+ });
+ Future suppliedFuture = Future.future(future::onComplete);
+ Future timeOutFuture = timeOutPromise.future();
+ return Future.all(suppliedFuture, timeOutFuture)
+ .mapEmpty()
+ .recover(throwable -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("recover lock {}", id);
+ }
+ if (timeOutFuture.failed()) {
+ return Future.failedFuture(timeOutFuture.cause());
+ } else {
+ return suppliedFuture;
+ }
+ });
+ }
+
+ private void stopRelease() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Releasing lock {}", id);
+ }
+
+ vertx.cancelTimer(timerId);
+ lock.release();
+ timeOutPromise.tryComplete();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("lock {} released", id);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java
index dcd3a3fb..c9b97964 100644
--- a/src/main/java/io/neonbee/internal/WriteSafeRegistry.java
+++ b/src/main/java/io/neonbee/internal/WriteSafeRegistry.java
@@ -1,7 +1,5 @@
package io.neonbee.internal;
-import static io.vertx.core.Future.succeededFuture;
-
import java.util.function.Supplier;
import io.neonbee.NeonBee;
@@ -10,6 +8,7 @@
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.shareddata.AsyncMap;
+import io.vertx.core.shareddata.SharedData;
/**
* A registry to manage values in the {@link SharedDataAccessor} shared map.
@@ -22,7 +21,9 @@ public class WriteSafeRegistry implements Registry {
private final LoggingFacade logger = LoggingFacade.create();
- private final SharedDataAccessor sharedDataAccessor;
+ private final SharedData sharedData;
+
+ private final Registry sharedRegistry;
private final String registryName;
@@ -33,8 +34,30 @@ public class WriteSafeRegistry implements Registry {
* @param registryName the name of the map registry
*/
public WriteSafeRegistry(Vertx vertx, String registryName) {
+ this(registryName, new SharedDataAccessor(vertx, WriteSafeRegistry.class));
+ }
+
+ /**
+ * Create a new {@link WriteSafeRegistry}.
+ *
+ * @param registryName the name of the map registry
+ * @param sharedData the shared data
+ */
+ public WriteSafeRegistry(String registryName, SharedData sharedData) {
+ this(registryName, sharedData, new SharedRegistry<>(registryName, sharedData));
+ }
+
+ /**
+ * Create a new {@link WriteSafeRegistry}.
+ *
+ * @param registryName the name of the map registry
+ * @param sharedData the shared data
+ * @param registry the shared registry
+ */
+ WriteSafeRegistry(String registryName, SharedData sharedData, Registry registry) {
this.registryName = registryName;
- this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass());
+ this.sharedData = sharedData;
+ this.sharedRegistry = registry;
}
/**
@@ -48,47 +71,12 @@ public WriteSafeRegistry(Vertx vertx, String registryName) {
public Future register(String sharedMapKey, T value) {
logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value);
- return lock(sharedMapKey, () -> addValue(sharedMapKey, value));
+ return lock(sharedMapKey, () -> sharedRegistry.register(sharedMapKey, value));
}
@Override
public Future get(String sharedMapKey) {
- return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o);
- }
-
- /**
- * Method that acquires a lock for the sharedMapKey and released the lock after the futureSupplier is executed.
- *
- * @param sharedMapKey the shared map key
- * @param futureSupplier supplier for the future to be secured by the lock
- * @return the futureSupplier
- */
- private Future lock(String sharedMapKey, Supplier> futureSupplier) {
- logger.debug("Get lock for {}", sharedMapKey);
- return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> {
- logger.error("Error acquiring lock for {}", sharedMapKey, throwable);
- }).compose(lock -> futureSupplier.get().onComplete(anyResult -> {
- logger.debug("Releasing lock for {}", sharedMapKey);
- lock.release();
- }));
- }
-
- private Future addValue(String sharedMapKey, Object value) {
- Future> sharedMap = getSharedMap();
-
- return sharedMap.compose(map -> map.get(sharedMapKey))
- .map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray())
- .compose(valueArray -> {
- if (!valueArray.contains(value)) {
- valueArray.add(value);
- }
-
- if (logger.isInfoEnabled()) {
- logger.info("Registered verticle {} in shared map.", value);
- }
-
- return sharedMap.compose(map -> map.put(sharedMapKey, valueArray));
- });
+ return sharedRegistry.get(sharedMapKey);
}
/**
@@ -102,33 +90,36 @@ private Future addValue(String sharedMapKey, Object value) {
public Future unregister(String sharedMapKey, T value) {
logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value);
- return lock(sharedMapKey, () -> removeValue(sharedMapKey, value));
+ return lock(sharedMapKey, () -> sharedRegistry.unregister(sharedMapKey, value));
}
- private Future removeValue(String sharedMapKey, Object value) {
- Future> sharedMap = getSharedMap();
-
- return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray)
- .compose(values -> {
- if (values == null) {
- return succeededFuture();
- }
-
- if (logger.isInfoEnabled()) {
- logger.info("Unregistered verticle {} in shared map.", value);
- }
-
- values.remove(value);
- return sharedMap.compose(map -> map.put(sharedMapKey, values));
- });
+ /**
+ * Method that acquires a lock for the sharedMapKey and released the lock after the futureSupplier is executed.
+ *
+ * @param sharedMapKey the shared map key
+ * @param futureSupplier supplier for the future to be secured by the lock
+ * @return the futureSupplier
+ */
+ protected Future lock(String sharedMapKey, Supplier> futureSupplier) {
+ logger.debug("Get lock for {}", sharedMapKey);
+ return sharedData.getLock(sharedMapKey).onFailure(throwable -> {
+ logger.error("Error acquiring lock for {}", sharedMapKey, throwable);
+ }).compose(lock -> Future.future(event -> futureSupplier.get().onComplete(event))
+ .onComplete(anyResult -> {
+ logger.debug("Releasing lock for {}", sharedMapKey);
+ lock.release();
+ }));
}
/**
* Shared map that is used as registry.
+ *
+ * It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)}
+ * and {@link WriteSafeRegistry#unregister(String, Object)} methods.
*
* @return Future to the shared map
*/
public Future> getSharedMap() {
- return sharedDataAccessor.getAsyncMap(registryName);
+ return sharedData.getAsyncMap(registryName);
}
}
diff --git a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java
index 86fae92c..c2fd1c7b 100644
--- a/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java
+++ b/src/main/java/io/neonbee/internal/cluster/entity/ClusterEntityRegistry.java
@@ -1,15 +1,18 @@
package io.neonbee.internal.cluster.entity;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import io.neonbee.entity.EntityVerticle;
import io.neonbee.internal.Registry;
+import io.neonbee.internal.WriteLockRegistry;
import io.neonbee.internal.WriteSafeRegistry;
import io.neonbee.internal.cluster.ClusterHelper;
+import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
@@ -44,7 +47,7 @@ public class ClusterEntityRegistry implements Registry {
@VisibleForTesting
final WriteSafeRegistry clusteringInformation;
- final WriteSafeRegistry