Skip to content

Commit

Permalink
[WFLY-19067] Allow ordering of ServerActivity execution by registerin…
Browse files Browse the repository at this point in the history
…g them in distinct, ordered, 'execution groups'
  • Loading branch information
bstansberry committed Mar 15, 2024
1 parent 4a98992 commit fa8048a
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,55 @@
/**
* A server activity that may have to finish before the server can shut down gracefully.
*
*
* @author Stuart Douglas
*/
public interface ServerActivity {

/**
* The lowest valid value to return from {@link #getExecutionGroup()}.
*/
@SuppressWarnings("unused")
int LOWEST_EXECUTION_GROUP = 1;
/**
* The default value returned from {@link #getExecutionGroup()}. Implementations should use this
* unless there is a clear reason to use a different value.
*/
int DEFAULT_EXECUTION_GROUP = 5;
/**
* The highest valid value to return from {@link #getExecutionGroup()}.
*/
@SuppressWarnings("unused")
int HIGHEST_EXECUTION_GROUP = 10;

/**
* Returns a value that indicates to which set of {@code ServerActivity} instances
* {@link SuspendController#registerActivity(ServerActivity) registered} with the {@link SuspendController}
* this activity should belong. All {@code ServerActivity} instances with the same execution group value have their
* {@link #preSuspend(ServerActivityCallback) preSuspend}, {@link #suspended(ServerActivityCallback) suspended}
* and {@link #resume() resume} methods invoked separately from activities with different execution group values.
* <p>
* The order in which execution groups will be processed depends on the method being invoked:
* <ul>
* <li>For {@code preSuspend} and {@code suspended}, groups with a lower value are processed before those
* with a higher value.</li>
* <li>For {@code resume}, groups with a higher value are processed before those with a lower value.</li>
* </ul>
* <p>
* There is no guarantee of any ordering of method invocation between activities in the same execution group,
* and they may even be processed concurrently.
* <p>
* Note that {@code preSuspend} is invoked for all activity instances before the overall suspend process proceeds
* to calls to {@code suspended}. The unit of grouping is the individual method invocations, not the overall
* preSuspend/suspended process.
* <p>
* The default implementation of this method returns {@link #DEFAULT_EXECUTION_GROUP}.
*
* @return a value between {@link #LOWEST_EXECUTION_GROUP} and {@link #HIGHEST_EXECUTION_GROUP}, inclusive.
*/
default int getExecutionGroup() {
return DEFAULT_EXECUTION_GROUP;
}

/**
* Invoked before the server is paused. This is the place where pause notifications should
* be sent to external systems such as load balancers to tell them this node is about to go away.
Expand All @@ -29,7 +73,8 @@ public interface ServerActivity {
void suspended(ServerActivityCallback listener);

/**
* Invoked if the suspend or pre-suspened is cancelled
* Invoked if the suspend or pre-suspend is cancelled or if a suspended server
* is resumed.
*/
void resume();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,30 @@
package org.jboss.as.server.suspend;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.function.BiConsumer;

import org.jboss.as.controller.notification.NotificationHandlerRegistry;
import org.jboss.as.server.logging.ServerLogger;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.StartContext;
import org.jboss.msc.service.StartException;
import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;
import org.wildfly.common.Assert;

/**
* The graceful shutdown controller. This class co-ordinates the graceful shutdown and pause/resume of a
* servers operations.
* <p/>
* <p/>
* In most cases this work is delegated to the request controller subsystem.
* however for workflows that do no correspond directly to a request model a {@link ServerActivity} instance
* however for workflows that do not correspond directly to a request model a {@link ServerActivity} instance
* can be registered directly with this controller.
*
* @author Stuart Douglas
Expand All @@ -37,13 +43,13 @@ public class SuspendController implements Service<SuspendController> {

private State state = State.SUSPENDED;

private final List<ServerActivity> activities = new ArrayList<>();
private final NavigableMap<Integer, List<ServerActivity>> activitiesByGroup = new TreeMap<>();

private final List<OperationListener> operationListeners = new ArrayList<>();

private final InjectedValue<NotificationHandlerRegistry> notificationHandlerRegistry = new InjectedValue<>();

private int outstandingCount;
private int groupsCount;

private boolean startSuspended;

Expand Down Expand Up @@ -75,20 +81,32 @@ public synchronized void suspend(long timeoutMillis) {
for(OperationListener listener: new ArrayList<>(operationListeners)) {
listener.suspendStarted();
}
outstandingCount = activities.size();
if (outstandingCount == 0) {
groupsCount = activitiesByGroup.size();
if (groupsCount == 0) {
handlePause();
} else {
CountingRequestCountCallback cb = new CountingRequestCountCallback(outstandingCount, () -> {
// Set up the logic that will handle the 'suspended' calls when all the preSuspend calls have reported 'done'
CountingRequestCountCallback preSuspendGroupCallBack = new CountingRequestCountCallback(groupsCount, () -> {
state = State.SUSPENDING;
for (ServerActivity activity : activities) {
activity.suspended(SuspendController.this.listener);
}
processGroups(activitiesByGroup.values().iterator(), (executionGroup, cb) -> {
for (ServerActivity activity : executionGroup) {
// TODO considering making this concurrent by passing this call as a task to an executor.
// This would allow each activity a "fair" share of the timeout budget
// Alternatively we could iterate executionGroup in reverse (LIFO) order.
// But the executionGroups themselves already provide an ability for that kind of ordering
activity.suspended(cb);
}
}, SuspendController.this.listener);
});

for (ServerActivity activity : activities) {
activity.preSuspend(cb);
}
// Invoke the preSuspend calls
processGroups(activitiesByGroup.values().iterator(), (executionGroup, cb) -> {
for (ServerActivity activity : executionGroup) {
// TODO see the 'suspended' section comment above re possible concurrent or LIFO execution
activity.preSuspend(cb);
}
}, preSuspendGroupCallBack);

if (timeoutMillis > 0) {
timer = new Timer();
timer.schedule(new TimerTask() {
Expand Down Expand Up @@ -125,32 +143,52 @@ private synchronized void resume(boolean gracefulStart) {
timer.cancel();
timer = null;
}
for(OperationListener listener: new ArrayList<>(operationListeners)) {
for (OperationListener listener : new ArrayList<>(operationListeners)) {
listener.cancelled();
}
for (ServerActivity activity : activities) {
try {
activity.resume();
} catch (Exception e) {
ServerLogger.ROOT_LOGGER.failedToResume(activity, e);
for (List<ServerActivity> executionGroup : activitiesByGroup.descendingMap().values()) {
for (ServerActivity activity : executionGroup) {
try {
activity.resume();
} catch (Exception e) {
ServerLogger.ROOT_LOGGER.failedToResume(activity, e);
}
}
}
state = State.RUNNING;
}

/**
* Registers the given {@link ServerActivity} with this controller
* @param activity the activity. Cannot be {@code null}
* @throws IllegalArgumentException if {@code activity} is {@code null} of if its
* {@link ServerActivity#getExecutionGroup() getExecutionGroup()} method
* returns a value outside of that method's documented legal range.
*/
public synchronized void registerActivity(final ServerActivity activity) {
this.activities.add(activity);
Assert.checkNotNullParam("activity", activity);
Assert.checkMinimumParameter("activity.getExecutionGroup()", ServerActivity.LOWEST_EXECUTION_GROUP, activity.getExecutionGroup());
Assert.checkMaximumParameter("activity.getExecutionGroup()", ServerActivity.HIGHEST_EXECUTION_GROUP, activity.getExecutionGroup());
List<ServerActivity> executionGroup = this.activitiesByGroup.computeIfAbsent(activity.getExecutionGroup(), ArrayList::new);
executionGroup.add(activity);
if(state != State.RUNNING) {
//if the activity is added when we are not running we just immediately suspend it
//this should only happen at boot, so there should be no outstanding requests anyway
// note that this means there is no execution group grouping of these calls.
activity.suspended(() -> {

});
}
}

public synchronized void unRegisterActivity(final ServerActivity activity) {
this.activities.remove(activity);
List<ServerActivity> executionGroup = activitiesByGroup.get(activity.getExecutionGroup());
if (executionGroup != null) {
executionGroup.remove(activity);
if (executionGroup.isEmpty()) {
activitiesByGroup.remove(activity.getExecutionGroup());
}
}
}

@Override
Expand All @@ -169,12 +207,12 @@ public State getState() {
}

private synchronized void activityPaused() {
--outstandingCount;
--groupsCount;
handlePause();
}

private void handlePause() {
if (outstandingCount == 0) {
if (groupsCount == 0) {
state = State.SUSPENDED;
if (timer != null) {
timer.cancel();
Expand Down Expand Up @@ -215,6 +253,22 @@ public InjectedValue<NotificationHandlerRegistry> getNotificationHandlerRegistry
return notificationHandlerRegistry;
}

private void processGroups(Iterator<List<ServerActivity>> iterator,
BiConsumer<List<ServerActivity>, ServerActivityCallback> groupFunction,
ServerActivityCallback groupsCallback) {
// Take the first element from the iterator and apply the groupFunction, with a callback that
// calls this again to take the next element when all activities from the current element are done.
// When no elements are left, tell the groupsCallback we are done.
if (iterator.hasNext()) {
List<ServerActivity> activityList = iterator.next();
CountingRequestCountCallback cb = new CountingRequestCountCallback(activityList.size(), () -> {
processGroups(iterator, groupFunction, groupsCallback);
groupsCallback.done();
});
groupFunction.accept(activityList, cb);
}
}

public enum State {
RUNNING,
PRE_SUSPEND,
Expand Down

0 comments on commit fa8048a

Please sign in to comment.