Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/registry #584

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions src/main/java/io/neonbee/internal/SharedRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.neonbee.internal;

import static io.vertx.core.Future.succeededFuture;

import io.neonbee.NeonBee;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.SharedData;

/**
* A registry to manage values in the {@link SharedData} shared map.
* <p>
* This class is a generic implementation of a registry that can be used to store values in a shared map.
*
* @param <T> the type of the value to store in the shared registry
*/
public class SharedRegistry<T> implements Registry<T> {

private final LoggingFacade logger = LoggingFacade.create();

private final SharedData sharedData;

private final String registryName;

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param vertx the {@link Vertx} instance
* @param registryName the name of the map registry
*/
public SharedRegistry(Vertx vertx, String registryName) {
this(registryName, new SharedDataAccessor(vertx, SharedRegistry.class));
}

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param registryName the name of the map registry
* @param sharedData the shared data
*/
public SharedRegistry(String registryName, SharedData sharedData) {
this.registryName = registryName;
this.sharedData = sharedData;
}

/**
* Register a value in the async shared map of {@link NeonBee} by key.
*
* @param sharedMapKey the shared map key
* @param value the value to register
* @return the future
*/
@Override
public Future<Void> register(String sharedMapKey, T value) {
logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value);

Future<AsyncMap<String, Object>> sharedMap = getSharedMap();

return sharedMap.compose(map -> map.get(sharedMapKey))
.map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray())
.compose(valueArray -> {
if (!valueArray.contains(value)) {
valueArray.add(value);
}

if (logger.isInfoEnabled()) {
logger.info("Registered verticle {} in shared map.", value);
}

return sharedMap.compose(map -> map.put(sharedMapKey, valueArray));
});
}

/**
* Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey.
*
* @param sharedMapKey the shared map key
* @param value the value to unregister
* @return the future
*/
@Override
public Future<Void> unregister(String sharedMapKey, T value) {
logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value);

Future<AsyncMap<String, Object>> sharedMap = getSharedMap();

return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray)
.compose(values -> {
if (values == null) {
return succeededFuture();
}

if (logger.isInfoEnabled()) {
logger.info("Unregistered verticle {} in shared map.", value);
}

values.remove(value);
return sharedMap.compose(map -> map.put(sharedMapKey, values));
});
}

@Override
public Future<JsonArray> get(String sharedMapKey) {
return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o);
}

/**
* Shared map that is used as registry.
*
* @return Future to the shared map
*/
public Future<AsyncMap<String, Object>> getSharedMap() {
return sharedData.getAsyncMap(registryName);
}
}
109 changes: 50 additions & 59 deletions src/main/java/io/neonbee/internal/WriteSafeRegistry.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.neonbee.internal;

import static io.vertx.core.Future.succeededFuture;

import java.util.function.Supplier;

import io.neonbee.NeonBee;
Expand All @@ -10,6 +8,7 @@
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.SharedData;

/**
* A registry to manage values in the {@link SharedDataAccessor} shared map.
Expand All @@ -22,7 +21,9 @@ public class WriteSafeRegistry<T> implements Registry<T> {

private final LoggingFacade logger = LoggingFacade.create();

private final SharedDataAccessor sharedDataAccessor;
private final SharedData sharedData;

private final Registry<T> sharedRegistry;

private final String registryName;

Expand All @@ -33,8 +34,30 @@ public class WriteSafeRegistry<T> implements Registry<T> {
* @param registryName the name of the map registry
*/
public WriteSafeRegistry(Vertx vertx, String registryName) {
this(registryName, new SharedDataAccessor(vertx, WriteSafeRegistry.class));
}

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param registryName the name of the map registry
* @param sharedData the shared data
*/
public WriteSafeRegistry(String registryName, SharedData sharedData) {
this(registryName, sharedData, new SharedRegistry<>(registryName, sharedData));
}

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param registryName the name of the map registry
* @param sharedData the shared data
* @param registry the shared registry
*/
WriteSafeRegistry(String registryName, SharedData sharedData, Registry<T> registry) {
this.registryName = registryName;
this.sharedDataAccessor = new SharedDataAccessor(vertx, this.getClass());
this.sharedData = sharedData;
this.sharedRegistry = registry;
}

/**
Expand All @@ -48,47 +71,12 @@ public WriteSafeRegistry(Vertx vertx, String registryName) {
public Future<Void> register(String sharedMapKey, T value) {
logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value);

return lock(sharedMapKey, () -> addValue(sharedMapKey, value));
return lock(sharedMapKey, () -> sharedRegistry.register(sharedMapKey, value));
}

@Override
public Future<JsonArray> get(String sharedMapKey) {
return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o);
}

/**
* Method that acquires a lock for the sharedMapKey and released the lock after the futureSupplier is executed.
*
* @param sharedMapKey the shared map key
* @param futureSupplier supplier for the future to be secured by the lock
* @return the futureSupplier
*/
private Future<Void> lock(String sharedMapKey, Supplier<Future<Void>> futureSupplier) {
logger.debug("Get lock for {}", sharedMapKey);
return sharedDataAccessor.getLock(sharedMapKey).onFailure(throwable -> {
logger.error("Error acquiring lock for {}", sharedMapKey, throwable);
}).compose(lock -> futureSupplier.get().onComplete(anyResult -> {
logger.debug("Releasing lock for {}", sharedMapKey);
lock.release();
}));
}

private Future<Void> addValue(String sharedMapKey, Object value) {
Future<AsyncMap<String, Object>> sharedMap = getSharedMap();

return sharedMap.compose(map -> map.get(sharedMapKey))
.map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray())
.compose(valueArray -> {
if (!valueArray.contains(value)) {
valueArray.add(value);
}

if (logger.isInfoEnabled()) {
logger.info("Registered verticle {} in shared map.", value);
}

return sharedMap.compose(map -> map.put(sharedMapKey, valueArray));
});
return sharedRegistry.get(sharedMapKey);
}

/**
Expand All @@ -102,33 +90,36 @@ private Future<Void> addValue(String sharedMapKey, Object value) {
public Future<Void> unregister(String sharedMapKey, T value) {
logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value);

return lock(sharedMapKey, () -> removeValue(sharedMapKey, value));
return lock(sharedMapKey, () -> sharedRegistry.unregister(sharedMapKey, value));
}

private Future<Void> removeValue(String sharedMapKey, Object value) {
Future<AsyncMap<String, Object>> sharedMap = getSharedMap();

return sharedMap.compose(map -> map.get(sharedMapKey)).map(jsonArray -> (JsonArray) jsonArray)
.compose(values -> {
if (values == null) {
return succeededFuture();
}

if (logger.isInfoEnabled()) {
logger.info("Unregistered verticle {} in shared map.", value);
}

values.remove(value);
return sharedMap.compose(map -> map.put(sharedMapKey, values));
});
/**
* Method that acquires a lock for the sharedMapKey and released the lock after the futureSupplier is executed.
*
* @param sharedMapKey the shared map key
* @param futureSupplier supplier for the future to be secured by the lock
* @return the futureSupplier
*/
protected Future<Void> lock(String sharedMapKey, Supplier<Future<Void>> futureSupplier) {
logger.debug("Get lock for {}", sharedMapKey);
return sharedData.getLock(sharedMapKey).onFailure(throwable -> {
logger.error("Error acquiring lock for {}", sharedMapKey, throwable);
}).compose(lock -> Future.<Void>future(event -> futureSupplier.get().onComplete(event))
.onComplete(anyResult -> {
logger.debug("Releasing lock for {}", sharedMapKey);
lock.release();
}));
}

/**
* Shared map that is used as registry.
* <p>
* It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)}
* and {@link WriteSafeRegistry#unregister(String, Object)} methods.
*
* @return Future to the shared map
*/
public Future<AsyncMap<String, Object>> getSharedMap() {
return sharedDataAccessor.getAsyncMap(registryName);
return sharedData.getAsyncMap(registryName);
}
}
95 changes: 95 additions & 0 deletions src/test/java/io/neonbee/internal/WriteSafeRegistryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

import static com.google.common.truth.Truth.assertThat;

import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.json.JsonArray;
import io.vertx.junit5.Checkpoint;
import io.vertx.junit5.Timeout;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;

Expand Down Expand Up @@ -58,4 +64,93 @@ void get(Vertx vertx, VertxTestContext context) {
context.completeNow();
})).onFailure(context::failNow);
}

@Test
@DisplayName("test lock method")
void lock(Vertx vertx, VertxTestContext context) {
WriteSafeRegistry<String> registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME);

Checkpoint checkpoints = context.checkpoint(4);
String lockedname = "test-lock-key1";

// execute the lockTest twice to make sure that you can acquire the lock multiple times
lockTest(lockedname, context, registry, checkpoints)
.compose(unused -> lockTest(lockedname, context, registry, checkpoints));
}

@Test
// The used timeout for the lock is set to 10 seconds
// @see io.vertx.core.shareddata.impl.SharedDataImpl#DEFAULT_LOCK_TIMEOUT
@Timeout(value = 12, timeUnit = TimeUnit.SECONDS)
@DisplayName("test acquire lock twice")
void acquireLockTwice(Vertx vertx, VertxTestContext context) {
WriteSafeRegistry<String> registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME);

Checkpoint checkpoints = context.checkpoint(5);

String lockedname = "test-lock-key2";
registry.lock(lockedname, () -> {
checkpoints.flag();
// try to acquire the lock to make sure that it is locked
return registry.lock(lockedname, () -> {
context.failNow("should not be called because lock cannot be acquired");
return Future.succeededFuture();
})
.onSuccess(unused -> context.failNow("should not be successful"))
.onFailure(cause -> context.verify(() -> {
assertThat(cause).isInstanceOf(NoStackTraceThrowable.class);
assertThat(cause).hasMessageThat().isEqualTo("Timed out waiting to get lock");
checkpoints.flag();
}))
.recover(throwable -> Future.succeededFuture());
})
.onSuccess(unused -> checkpoints.flag())
.onFailure(context::failNow)
// execute the lockTest to make sure that you can acquire the lock again
.compose(unused -> lockTest(lockedname, context, registry, checkpoints));
}

private static Future<Void> lockTest(String lockName, VertxTestContext context, WriteSafeRegistry<String> registry,
Checkpoint checkpoints) {
return registry.lock(lockName, () -> {
checkpoints.flag();
return Future.succeededFuture();
})
.onSuccess(unused -> checkpoints.flag())
.onFailure(context::failNow);
}

@Test
@DisplayName("test lock supplier retuning null")
void lockNPE(Vertx vertx, VertxTestContext context) {
WriteSafeRegistry<String> registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME);

Checkpoint checkpoints = context.checkpoint(2);

registry.lock("lockNPE", () -> {
checkpoints.flag();
return null;
}).onSuccess(unused -> context.failNow("should not be successful"))
.onFailure(cause -> context.verify(() -> {
assertThat(cause).isInstanceOf(NullPointerException.class);
checkpoints.flag();
}));
}

@Test
@DisplayName("test lock supplier throws exception")
void lockISE(Vertx vertx, VertxTestContext context) {
WriteSafeRegistry<String> registry = new WriteSafeRegistry<>(vertx, REGISTRY_NAME);

Checkpoint checkpoints = context.checkpoint(2);

registry.lock("lockISE", () -> {
checkpoints.flag();
throw new IllegalStateException("Illegal state");
}).onSuccess(unused -> context.failNow("should not be successful"))
.onFailure(cause -> context.verify(() -> {
assertThat(cause).isInstanceOf(IllegalStateException.class);
checkpoints.flag();
}));
}
}
Loading