Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/clean up cluster information #585

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ plugins {
id 'jacoco'
id 'pmd'
id 'checkstyle'
id 'idea'

// community plugins
id 'com.diffplug.spotless' version '6.25.0'
Expand All @@ -20,6 +21,14 @@ plugins {
id 'team.yi.semantic-gitlog' version '0.6.12'
}

idea {
module {
downloadSources = true
downloadJavadoc = true
}
}


group = 'io.neonbee'
version = '0.37.2-SNAPSHOT'
mainClassName = 'io.neonbee.Launcher'
Expand Down
119 changes: 119 additions & 0 deletions src/main/java/io/neonbee/internal/SharedRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package io.neonbee.internal;

import static io.vertx.core.Future.succeededFuture;

import io.neonbee.NeonBee;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.SharedData;

/**
* A registry to manage values in the {@link SharedData} shared map.
* <p>
* This class is a generic implementation of a registry that can be used to store values in a shared map.
*
* @param <T> the type of the value to store in the shared registry
*/
public class SharedRegistry<T> implements Registry<T> {

private final LoggingFacade logger = LoggingFacade.create();

private final SharedData sharedData;

private final String registryName;

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param vertx the {@link Vertx} instance
* @param registryName the name of the map registry
*/
public SharedRegistry(Vertx vertx, String registryName) {
this(registryName, new SharedDataAccessor(vertx, SharedRegistry.class));
}

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param registryName the name of the map registry
* @param sharedData the shared data
*/
public SharedRegistry(String registryName, SharedData sharedData) {
this.registryName = registryName;
this.sharedData = sharedData;
}

/**
* Register a value in the async shared map of {@link NeonBee} by key.
*
* @param sharedMapKey the shared map key
* @param value the value to register
* @return the future
*/
@Override
public Future<Void> register(String sharedMapKey, T value) {
logger.info("register value: \"{}\" in shared map: \"{}\"", sharedMapKey, value);

Future<AsyncMap<String, Object>> sharedMap = getSharedMap();

return sharedMap.compose(map -> map.get(sharedMapKey))
.map(valueOrNull -> valueOrNull != null ? (JsonArray) valueOrNull : new JsonArray())
.compose(valueArray -> {
if (!valueArray.contains(value)) {
valueArray.add(value);
}

if (logger.isInfoEnabled()) {
logger.info("Registered verticle {} in shared map.", value);
}

return sharedMap.compose(map -> map.put(sharedMapKey, valueArray));
});
}

/**
* Unregister the value in the {@link NeonBee} async shared map from the sharedMapKey.
*
* @param sharedMapKey the shared map key
* @param value the value to unregister
* @return the future
*/
@Override
public Future<Void> unregister(String sharedMapKey, T value) {
logger.debug("unregister value: \"{}\" from shared map: \"{}\"", sharedMapKey, value);

Future<AsyncMap<String, Object>> sharedMap = getSharedMap();

return sharedMap.compose(map -> map.get(sharedMapKey))
.map(jsonArray -> (JsonArray) jsonArray)
.compose(values -> {
if (values == null) {
return succeededFuture();
}

if (logger.isInfoEnabled()) {
logger.info("Unregistered verticle {} in shared map.", value);
}

values.remove(value);
return sharedMap.compose(map -> map.put(sharedMapKey, values));
});
}

@Override
public Future<JsonArray> get(String sharedMapKey) {
return getSharedMap().compose(map -> map.get(sharedMapKey)).map(o -> (JsonArray) o);
}

/**
* Shared map that is used as registry.
*
* @return Future to the shared map
*/
public Future<AsyncMap<String, Object>> getSharedMap() {
return sharedData.getAsyncMap(registryName);
}
}
207 changes: 207 additions & 0 deletions src/main/java/io/neonbee/internal/WriteLockRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package io.neonbee.internal;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import io.neonbee.logging.LoggingFacade;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.shareddata.SharedData;

/**
* A {@link WriteSafeRegistry} implementation that uses a lock to lock all write operations to ensure that only one
* operation is executed at a time.
*
* @param <T> the type of data this registry stores
*/
public class WriteLockRegistry<T> implements Registry<T> {

private static final String LOCK_KEY = WriteLockRegistry.class.getName();

private final LoggingFacade logger = LoggingFacade.create();

private final Registry<T> registry;

private final SharedData sharedData;

private final String registryName;

private final Vertx vertx;

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param vertx the {@link Vertx} instance
* @param registryName the name of the map registry
*/
public WriteLockRegistry(Vertx vertx, String registryName) {
this(vertx, registryName, new SharedDataAccessor(vertx, WriteLockRegistry.class));
}

/**
* Create a new {@link WriteSafeRegistry}.
*
* @param vertx the {@link Vertx} instance
* @param registryName the name of the map registry
* @param sharedData the shared data
*/
public WriteLockRegistry(Vertx vertx, String registryName, SharedData sharedData) {
this.vertx = vertx;
registry = new SharedRegistry<>(registryName, sharedData);
this.registryName = registryName;
this.sharedData = sharedData;
}

@Override
public Future<Void> register(String sharedMapKey, T value) {
return lock(LOCK_KEY)
.compose(lock -> lock.execute(() -> registry.register(sharedMapKey, value)))
.mapEmpty();
}

@Override
public Future<Void> unregister(String sharedMapKey, T value) {
return lock(LOCK_KEY)
.compose(lock -> lock.execute(() -> registry.unregister(sharedMapKey, value)))
.mapEmpty();
}

@Override
public Future<JsonArray> get(String key) {
return registry.get(key);
}

/**
* Method that acquires a lock for the registry and released the lock after the returned TimeOutLock executed the
* futureSupplier or the timeout is reached.
*
* @return the futureSupplier
*/
public Future<TimeOutLock> lock() {
return lock(LOCK_KEY);
}

/**
* Method that acquires a lock for the sharedMapKey and released the lock after the futureSupplier is executed.
*
* @param sharedMapKey the shared map key
* @return the futureSupplier
*/
private Future<TimeOutLock> lock(String sharedMapKey) {
logger.debug("Get lock for {}", sharedMapKey);
return sharedData.getLock(sharedMapKey)
.map(TimeOutLock::new);
}

/**
* Shared map that is used as registry.
* <p>
* It is not safe to write to the shared map directly. Use the {@link WriteSafeRegistry#register(String, Object)}
* and {@link WriteSafeRegistry#unregister(String, Object)} methods.
*
* @return Future to the shared map
*/
public Future<AsyncMap<String, Object>> getSharedMap() {
return sharedData.getAsyncMap(registryName);
}

/**
* A lock that is released after 10 seconds.
*/
public class TimeOutLock implements Lock {

private static final long DEFAULT_TIME_OUT_DELAY_MS = TimeUnit.SECONDS.toMillis(1000);

private final UUID id = UUID.randomUUID();

private final LoggingFacade logger = LoggingFacade.create();

private final Lock lock;

private final long timerId;

private final Promise<Void> timeOutPromise;

private final long startTime;

private TimeOutLock(Lock lock) {
this.lock = lock;
this.timeOutPromise = Promise.promise();
this.timerId = startTimer();
this.startTime = System.currentTimeMillis();
logger.debug("Lock {} created", id);
}

private long startTimer() {
if (logger.isDebugEnabled()) {
logger.debug("start lock timer {}", id);
}
return vertx.setTimer(DEFAULT_TIME_OUT_DELAY_MS, id -> {
logger.warn("Lock {} timed out after {} ms. Lock is released.", id,
startTime - System.currentTimeMillis());
stopRelease();
timeOutPromise
.fail(new TimeoutException("Lock for " + registryName + " timed out. Lock is released."));
});
}

@Override
public void release() {
stopRelease();
}

/**
* Execute the futureSupplier and release the lock after the futureSupplier is executed. If the supplied future
* does not complete within the timeout, the lock is released.
*
* @param futureSupplier the futureSupplier
* @param <U> the type of the future
* @return the future
*/
public <U> Future<U> execute(Supplier<Future<U>> futureSupplier) {
if (logger.isDebugEnabled()) {
logger.debug("execute lock {}", id);
}
Future<U> future = futureSupplier.get().onComplete(event -> {
if (logger.isDebugEnabled()) {
logger.debug("completed lock {}, succeeded {}", id, event.succeeded());
}
stopRelease();
});
Future<U> suppliedFuture = Future.future(future::onComplete);
Future<Void> timeOutFuture = timeOutPromise.future();
return Future.all(suppliedFuture, timeOutFuture)
.<U>mapEmpty()
.recover(throwable -> {
if (logger.isDebugEnabled()) {
logger.debug("recover lock {}", id);
}
if (timeOutFuture.failed()) {
return Future.failedFuture(timeOutFuture.cause());
} else {
return suppliedFuture;
}
});
}

private void stopRelease() {
if (logger.isDebugEnabled()) {
logger.debug("Releasing lock {}", id);
}

vertx.cancelTimer(timerId);
lock.release();
timeOutPromise.tryComplete();

if (logger.isDebugEnabled()) {
logger.debug("lock {} released", id);
}
}
}
}
Loading
Loading