From fa8048a0a4dd63f97e148c79cf037c9808a172c5 Mon Sep 17 00:00:00 2001 From: Brian Stansberry Date: Mon, 26 Feb 2024 15:45:35 -0600 Subject: [PATCH] [WFLY-19067] Allow ordering of ServerActivity execution by registering them in distinct, ordered, 'execution groups' --- .../as/server/suspend/ServerActivity.java | 49 +++++++++- .../as/server/suspend/SuspendController.java | 98 ++++++++++++++----- 2 files changed, 123 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/jboss/as/server/suspend/ServerActivity.java b/server/src/main/java/org/jboss/as/server/suspend/ServerActivity.java index 08fbacf1592..ca3dbbe5fe9 100644 --- a/server/src/main/java/org/jboss/as/server/suspend/ServerActivity.java +++ b/server/src/main/java/org/jboss/as/server/suspend/ServerActivity.java @@ -8,11 +8,55 @@ /** * A server activity that may have to finish before the server can shut down gracefully. * - * * @author Stuart Douglas */ public interface ServerActivity { + /** + * The lowest valid value to return from {@link #getExecutionGroup()}. + */ + @SuppressWarnings("unused") + int LOWEST_EXECUTION_GROUP = 1; + /** + * The default value returned from {@link #getExecutionGroup()}. Implementations should use this + * unless there is a clear reason to use a different value. + */ + int DEFAULT_EXECUTION_GROUP = 5; + /** + * The highest valid value to return from {@link #getExecutionGroup()}. + */ + @SuppressWarnings("unused") + int HIGHEST_EXECUTION_GROUP = 10; + + /** + * Returns a value that indicates to which set of {@code ServerActivity} instances + * {@link SuspendController#registerActivity(ServerActivity) registered} with the {@link SuspendController} + * this activity should belong. All {@code ServerActivity} instances with the same execution group value have their + * {@link #preSuspend(ServerActivityCallback) preSuspend}, {@link #suspended(ServerActivityCallback) suspended} + * and {@link #resume() resume} methods invoked separately from activities with different execution group values. + *

+ * The order in which execution groups will be processed depends on the method being invoked: + *

+ *

+ * There is no guarantee of any ordering of method invocation between activities in the same execution group, + * and they may even be processed concurrently. + *

+ * Note that {@code preSuspend} is invoked for all activity instances before the overall suspend process proceeds + * to calls to {@code suspended}. The unit of grouping is the individual method invocations, not the overall + * preSuspend/suspended process. + *

+ * The default implementation of this method returns {@link #DEFAULT_EXECUTION_GROUP}. + * + * @return a value between {@link #LOWEST_EXECUTION_GROUP} and {@link #HIGHEST_EXECUTION_GROUP}, inclusive. + */ + default int getExecutionGroup() { + return DEFAULT_EXECUTION_GROUP; + } + /** * Invoked before the server is paused. This is the place where pause notifications should * be sent to external systems such as load balancers to tell them this node is about to go away. @@ -29,7 +73,8 @@ public interface ServerActivity { void suspended(ServerActivityCallback listener); /** - * Invoked if the suspend or pre-suspened is cancelled + * Invoked if the suspend or pre-suspend is cancelled or if a suspended server + * is resumed. */ void resume(); diff --git a/server/src/main/java/org/jboss/as/server/suspend/SuspendController.java b/server/src/main/java/org/jboss/as/server/suspend/SuspendController.java index 894cda7c4af..f13a5789eef 100644 --- a/server/src/main/java/org/jboss/as/server/suspend/SuspendController.java +++ b/server/src/main/java/org/jboss/as/server/suspend/SuspendController.java @@ -6,9 +6,14 @@ package org.jboss.as.server.suspend; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.NavigableMap; import java.util.Timer; import java.util.TimerTask; +import java.util.TreeMap; +import java.util.function.BiConsumer; + import org.jboss.as.controller.notification.NotificationHandlerRegistry; import org.jboss.as.server.logging.ServerLogger; import org.jboss.msc.service.Service; @@ -16,6 +21,7 @@ import org.jboss.msc.service.StartException; import org.jboss.msc.service.StopContext; import org.jboss.msc.value.InjectedValue; +import org.wildfly.common.Assert; /** * The graceful shutdown controller. This class co-ordinates the graceful shutdown and pause/resume of a @@ -23,7 +29,7 @@ *

*

* In most cases this work is delegated to the request controller subsystem. - * however for workflows that do no correspond directly to a request model a {@link ServerActivity} instance + * however for workflows that do not correspond directly to a request model a {@link ServerActivity} instance * can be registered directly with this controller. * * @author Stuart Douglas @@ -37,13 +43,13 @@ public class SuspendController implements Service { private State state = State.SUSPENDED; - private final List activities = new ArrayList<>(); + private final NavigableMap> activitiesByGroup = new TreeMap<>(); private final List operationListeners = new ArrayList<>(); private final InjectedValue notificationHandlerRegistry = new InjectedValue<>(); - private int outstandingCount; + private int groupsCount; private boolean startSuspended; @@ -75,20 +81,32 @@ public synchronized void suspend(long timeoutMillis) { for(OperationListener listener: new ArrayList<>(operationListeners)) { listener.suspendStarted(); } - outstandingCount = activities.size(); - if (outstandingCount == 0) { + groupsCount = activitiesByGroup.size(); + if (groupsCount == 0) { handlePause(); } else { - CountingRequestCountCallback cb = new CountingRequestCountCallback(outstandingCount, () -> { + // Set up the logic that will handle the 'suspended' calls when all the preSuspend calls have reported 'done' + CountingRequestCountCallback preSuspendGroupCallBack = new CountingRequestCountCallback(groupsCount, () -> { state = State.SUSPENDING; - for (ServerActivity activity : activities) { - activity.suspended(SuspendController.this.listener); - } + processGroups(activitiesByGroup.values().iterator(), (executionGroup, cb) -> { + for (ServerActivity activity : executionGroup) { + // TODO considering making this concurrent by passing this call as a task to an executor. + // This would allow each activity a "fair" share of the timeout budget + // Alternatively we could iterate executionGroup in reverse (LIFO) order. + // But the executionGroups themselves already provide an ability for that kind of ordering + activity.suspended(cb); + } + }, SuspendController.this.listener); }); - for (ServerActivity activity : activities) { - activity.preSuspend(cb); - } + // Invoke the preSuspend calls + processGroups(activitiesByGroup.values().iterator(), (executionGroup, cb) -> { + for (ServerActivity activity : executionGroup) { + // TODO see the 'suspended' section comment above re possible concurrent or LIFO execution + activity.preSuspend(cb); + } + }, preSuspendGroupCallBack); + if (timeoutMillis > 0) { timer = new Timer(); timer.schedule(new TimerTask() { @@ -125,24 +143,38 @@ private synchronized void resume(boolean gracefulStart) { timer.cancel(); timer = null; } - for(OperationListener listener: new ArrayList<>(operationListeners)) { + for (OperationListener listener : new ArrayList<>(operationListeners)) { listener.cancelled(); } - for (ServerActivity activity : activities) { - try { - activity.resume(); - } catch (Exception e) { - ServerLogger.ROOT_LOGGER.failedToResume(activity, e); + for (List executionGroup : activitiesByGroup.descendingMap().values()) { + for (ServerActivity activity : executionGroup) { + try { + activity.resume(); + } catch (Exception e) { + ServerLogger.ROOT_LOGGER.failedToResume(activity, e); + } } } state = State.RUNNING; } + /** + * Registers the given {@link ServerActivity} with this controller + * @param activity the activity. Cannot be {@code null} + * @throws IllegalArgumentException if {@code activity} is {@code null} of if its + * {@link ServerActivity#getExecutionGroup() getExecutionGroup()} method + * returns a value outside of that method's documented legal range. + */ public synchronized void registerActivity(final ServerActivity activity) { - this.activities.add(activity); + Assert.checkNotNullParam("activity", activity); + Assert.checkMinimumParameter("activity.getExecutionGroup()", ServerActivity.LOWEST_EXECUTION_GROUP, activity.getExecutionGroup()); + Assert.checkMaximumParameter("activity.getExecutionGroup()", ServerActivity.HIGHEST_EXECUTION_GROUP, activity.getExecutionGroup()); + List executionGroup = this.activitiesByGroup.computeIfAbsent(activity.getExecutionGroup(), ArrayList::new); + executionGroup.add(activity); if(state != State.RUNNING) { //if the activity is added when we are not running we just immediately suspend it //this should only happen at boot, so there should be no outstanding requests anyway + // note that this means there is no execution group grouping of these calls. activity.suspended(() -> { }); @@ -150,7 +182,13 @@ public synchronized void registerActivity(final ServerActivity activity) { } public synchronized void unRegisterActivity(final ServerActivity activity) { - this.activities.remove(activity); + List executionGroup = activitiesByGroup.get(activity.getExecutionGroup()); + if (executionGroup != null) { + executionGroup.remove(activity); + if (executionGroup.isEmpty()) { + activitiesByGroup.remove(activity.getExecutionGroup()); + } + } } @Override @@ -169,12 +207,12 @@ public State getState() { } private synchronized void activityPaused() { - --outstandingCount; + --groupsCount; handlePause(); } private void handlePause() { - if (outstandingCount == 0) { + if (groupsCount == 0) { state = State.SUSPENDED; if (timer != null) { timer.cancel(); @@ -215,6 +253,22 @@ public InjectedValue getNotificationHandlerRegistry return notificationHandlerRegistry; } + private void processGroups(Iterator> iterator, + BiConsumer, ServerActivityCallback> groupFunction, + ServerActivityCallback groupsCallback) { + // Take the first element from the iterator and apply the groupFunction, with a callback that + // calls this again to take the next element when all activities from the current element are done. + // When no elements are left, tell the groupsCallback we are done. + if (iterator.hasNext()) { + List activityList = iterator.next(); + CountingRequestCountCallback cb = new CountingRequestCountCallback(activityList.size(), () -> { + processGroups(iterator, groupFunction, groupsCallback); + groupsCallback.done(); + }); + groupFunction.accept(activityList, cb); + } + } + public enum State { RUNNING, PRE_SUSPEND,