Skip to content

Commit

Permalink
WFCORE-7077 SuspendController relies on blocking ServiceActivity beha…
Browse files Browse the repository at this point in the history
…vior for ordering
  • Loading branch information
pferraro committed Nov 30, 2024
1 parent a62fb1c commit 7b60c05
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ public CompletionStage<Void> suspend(ServerSuspendContext context) {
}
CompletableFuture<Void> result = new CompletableFuture<>();
// Prepare activity groups in priority order, i.e. first -> last
this.phaseStage(this.activityGroups, SuspendableActivity::prepare, context, Functions.discardingBiConsumer()).whenComplete((ignored, prepareException) -> {
phaseStage(this.activityGroups, SuspendableActivity::prepare, context, Functions.discardingBiConsumer()).whenComplete((ignored, prepareException) -> {
if (prepareException != null) {
result.completeExceptionally(prepareException);
} else {
this.state = State.SUSPENDING;
// Suspend activity groups in priority order, i.e. first -> last order
this.phaseStage(this.activityGroups, SuspendableActivity::suspend, context, Functions.discardingBiConsumer()).whenComplete((ignore, suspendException) -> {
phaseStage(this.activityGroups, SuspendableActivity::suspend, context, Functions.discardingBiConsumer()).whenComplete((ignore, suspendException) -> {
if (suspendException != null) {
result.completeExceptionally(suspendException);
} else {
Expand All @@ -118,7 +118,7 @@ public CompletionStage<Void> resume(ServerResumeContext context) {
return SuspendableActivity.COMPLETED;
}
// Resume activity groups in reverse priority order, i.e. last -> first
CompletionStage<Void> resumeStage = this.phaseStage(this::resumeIterator, SuspendableActivity::resume, context, ServerLogger.ROOT_LOGGER::failedToResume);
CompletionStage<Void> resumeStage = phaseStage(this::resumeIterator, SuspendableActivity::resume, context, ServerLogger.ROOT_LOGGER::failedToResume);
resumeStage.whenComplete((ignore, exception) -> {
if (exception == null) {
this.state = State.RUNNING;
Expand All @@ -143,56 +143,57 @@ private Iterator<List<SuspendableActivity>> resumeIterator() {
* @param exceptionHandler handles exceptions thrown by the phase function
* @return a completion stage for this phase of the suspend/resume process
*/
private <C> CompletionStage<Void> phaseStage(Iterable<List<SuspendableActivity>> activityGroups, BiFunction<SuspendableActivity, C, CompletionStage<Void>> phase, C context, BiConsumer<SuspendableActivity, Throwable> exceptionHandler) {
private static <C> CompletionStage<Void> phaseStage(Iterable<List<SuspendableActivity>> activityGroups, BiFunction<SuspendableActivity, C, CompletionStage<Void>> phase, C context, BiConsumer<SuspendableActivity, Throwable> exceptionHandler) {
// Final stage will complete after all activity for all groups has completed
CompletableFuture<Void> result = new CompletableFuture<>();
// Counter used to determine when to complete final stage
AtomicInteger counter = new AtomicInteger(this.activityGroups.size());
// Iterate over activity groups (in the order dictated by the caller)
Iterator<List<SuspendableActivity>> groups = activityGroups.iterator();
BiConsumer<Void, Throwable> groupCompleter = new BiConsumer<>() {
@Override
public void accept(Void ignore, Throwable exception) {
if (exception != null) {
result.completeExceptionally(exception);
} else if (counter.decrementAndGet() == 0) {
} else if (!groups.hasNext()) {
// No more groups
result.complete(null);
}
}
};
// Iterate over activity groups (in the order dictated by the caller)
for (List<SuspendableActivity> group : activityGroups) {
List<SuspendableActivity> activities = List.copyOf(group);
if (activities.isEmpty()) {
// There are no activities for this group, complete immediately
groupCompleter.accept(null, null);
} else {
// Stage for this group will complete after all activity stages complete
CompletableFuture<Void> groupStage = new CompletableFuture<>();
groupStage.whenComplete(groupCompleter);
// Counter used to determine when to complete group stage
AtomicInteger groupCounter = new AtomicInteger(activities.size());
for (SuspendableActivity activity : activities) {
BiConsumer<Void, Throwable> activityCompleter = new BiConsumer<>() {
@Override
public void accept(Void ignore, Throwable exception) {
if (exception != null) {
try {
exceptionHandler.accept(activity, exception);
} finally {
groupStage.completeExceptionally(exception);
} else {
// Create stage for next group
List<SuspendableActivity> activities = List.copyOf(groups.next());
CompletableFuture<Void> groupStage = new CompletableFuture<>();
groupStage.whenComplete(this);
if (activities.isEmpty()) {
// No activities, complete immediately
groupStage.complete(null);
} else {
// Counter used to determine when to complete group stage
AtomicInteger groupCounter = new AtomicInteger(activities.size());
for (SuspendableActivity activity : activities) {
BiConsumer<Void, Throwable> activityCompleter = new BiConsumer<>() {
@Override
public void accept(Void ignore, Throwable exception) {
if (exception != null) {
try {
exceptionHandler.accept(activity, exception);
} finally {
groupStage.completeExceptionally(exception);
}
} else if (groupCounter.decrementAndGet() == 0) {
// All activities of group have completed
groupStage.complete(null);
}
}
} else if (groupCounter.decrementAndGet() == 0) {
groupStage.complete(null);
};
try {
phase.apply(activity, context).whenComplete(activityCompleter);
} catch (Throwable e) {
activityCompleter.accept(null, e);
}
}
};
try {
phase.apply(activity, context).whenComplete(activityCompleter);
} catch (Throwable e) {
activityCompleter.accept(null, e);
}
}
}
}
};
groupCompleter.accept(null, null);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;

import org.jboss.as.server.suspend.SuspendableActivityRegistry.SuspendPriority;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -41,7 +44,7 @@ private void serverActivityCallbackOrderTest(CounterActivity... activities) {

NavigableSet<CounterActivity> activitySet = new TreeSet<>();
for (CounterActivity activity : activities) {
testee.registerActivity(activity);
testee.registerActivity(activity, SuspendPriority.of(activity.executionGroup));
activitySet.add(activity);
}

Expand All @@ -56,7 +59,7 @@ private void serverActivityCallbackOrderTest(CounterActivity... activities) {
// Randomly unregister some activities
for (CounterActivity activity : activities) {
if (Math.random() < 0.5) {
testee.unRegisterActivity(activity);
testee.unregisterActivity(activity);
assertTrue(activitySet.remove(activity));
}
}
Expand Down Expand Up @@ -130,7 +133,7 @@ private void orderCheck(NavigableSet<CounterActivity> activities) {
}
}

private static class CounterActivity implements ServerActivity, Comparable<CounterActivity> {
private static class CounterActivity implements SuspendableActivity, Comparable<CounterActivity> {
private static final AtomicInteger invocationCounter = new AtomicInteger();

private static final CounterActivity ONE = new CounterActivity(1,1);
Expand All @@ -152,25 +155,30 @@ private CounterActivity(int id, int executionGroup) {
}

@Override
public int getExecutionGroup() {
return executionGroup;
public CompletionStage<Void> prepare(ServerSuspendContext context) {
return CompletableFuture.runAsync(this::preSuspend);
}

@Override
public void preSuspend(ServerActivityCallback listener) {
preSuspend = invocationCounter.getAndIncrement();
listener.done();
public CompletionStage<Void> suspend(ServerSuspendContext context) {
return CompletableFuture.runAsync(this::suspended);
}

@Override
public void suspended(ServerActivityCallback listener) {
suspended = invocationCounter.getAndIncrement();
listener.done();
public CompletionStage<Void> resume(ServerResumeContext context) {
return CompletableFuture.runAsync(this::resume);
}

@Override
public void resume() {
resume = invocationCounter.getAndIncrement();
private void preSuspend() {
this.preSuspend = invocationCounter.getAndIncrement();
}

private void suspended() {
this.suspended = invocationCounter.getAndIncrement();
}

private void resume() {
this.resume = invocationCounter.getAndIncrement();
}

@Override
Expand Down

0 comments on commit 7b60c05

Please sign in to comment.