Skip to content

Commit

Permalink
[PR #295] move throttle constants to configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
lukaskabc committed Sep 12, 2024
1 parent 5974704 commit 6df9653
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 38 deletions.
37 changes: 37 additions & 0 deletions src/main/java/cz/cvut/kbss/termit/util/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package cz.cvut.kbss.termit.util;

import cz.cvut.kbss.termit.model.acl.AccessLevel;
import cz.cvut.kbss.termit.util.throttle.ThrottleAspect;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Future;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand Down Expand Up @@ -63,6 +65,25 @@ public class Configuration {
*/
@Min(1)
private Integer asyncThreadCount = Runtime.getRuntime().availableProcessors();

/**
* The amount of time in which calls of throttled methods
* should be merged.
* The value must be positive ({@code > 0}).
* @configurationdoc.default 10 seconds
* @see cz.cvut.kbss.termit.util.throttle.Throttle
* @see cz.cvut.kbss.termit.util.throttle.ThrottleAspect
*/
private Duration throttleThreshold = Duration.ofSeconds(10);

/**
* After how much time, should objects with completed futures be discarded.
* The value must be positive ({@code > 0}).
* @configurationdoc.default 1 minute
* @see ThrottleAspect#clearOldFutures()
*/
private Duration throttleDiscardThreshold = Duration.ofMinutes(1);

@Valid
private Persistence persistence = new Persistence();
@Valid
Expand Down Expand Up @@ -278,6 +299,22 @@ public void setTemplate(Template template) {
this.template = template;
}

public Duration getThrottleThreshold() {
return throttleThreshold;
}

public void setThrottleThreshold(Duration throttleThreshold) {
this.throttleThreshold = throttleThreshold;
}

public Duration getThrottleDiscardThreshold() {
return throttleDiscardThreshold;
}

public void setThrottleDiscardThreshold(Duration throttleDiscardThreshold) {
this.throttleDiscardThreshold = throttleDiscardThreshold;
}

@Validated
public static class Persistence {
/**
Expand Down
16 changes: 0 additions & 16 deletions src/main/java/cz/cvut/kbss/termit/util/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,6 @@ public class Constants {
*/
public static final String REST_MAPPING_PATH = "/rest";

/**
* The amount of time in which calls of methods
* with {@link cz.cvut.kbss.termit.util.throttle.Throttle} annotation
* should be merged.
*
* @see cz.cvut.kbss.termit.util.throttle.Throttle
* @see cz.cvut.kbss.termit.util.throttle.ThrottleAspect
*/
public static final Duration THROTTLE_THRESHOLD = Duration.ofSeconds(10);

/**
* After how much time, should complete futures be discarded.
* @see ThrottleAspect#clearOldFutures()
*/
public static final Duration THROTTLE_DISCARD_THRESHOLD = Duration.ofMinutes(1);

/**
* Default page size.
* <p>
Expand Down
28 changes: 15 additions & 13 deletions src/main/java/cz/cvut/kbss/termit/util/throttle/ThrottleAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import cz.cvut.kbss.termit.TermItApplication;
import cz.cvut.kbss.termit.exception.TermItException;
import cz.cvut.kbss.termit.exception.ThrottleAspectException;
import cz.cvut.kbss.termit.util.Constants;
import cz.cvut.kbss.termit.util.Configuration;
import cz.cvut.kbss.termit.util.Pair;
import cz.cvut.kbss.termit.util.longrunning.LongRunningTaskScheduler;
import cz.cvut.kbss.termit.util.longrunning.LongRunningTasksRegistry;
Expand Down Expand Up @@ -53,8 +53,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static cz.cvut.kbss.termit.util.Constants.THROTTLE_DISCARD_THRESHOLD;
import static cz.cvut.kbss.termit.util.Constants.THROTTLE_THRESHOLD;
import static org.springframework.beans.factory.config.ConfigurableBeanFactory.SCOPE_SINGLETON;

/**
Expand Down Expand Up @@ -131,13 +129,16 @@ public class ThrottleAspect extends LongRunningTaskScheduler {
*/
private final AtomicReference<Instant> lastClear;

private final Configuration configuration;

@Autowired
public ThrottleAspect(@Qualifier("longRunningTaskScheduler") TaskScheduler taskScheduler,
SynchronousTransactionExecutor transactionExecutor,
LongRunningTasksRegistry longRunningTasksRegistry) {
LongRunningTasksRegistry longRunningTasksRegistry, Configuration configuration) {
super(longRunningTasksRegistry);
this.taskScheduler = taskScheduler;
this.transactionExecutor = transactionExecutor;
this.configuration = configuration;
throttledFutures = new HashMap<>();
lastRun = new HashMap<>();
scheduledFutures = new TreeMap<>();
Expand All @@ -153,14 +154,15 @@ protected ThrottleAspect(Map<Identifier, ThrottledFuture<Object>> throttledFutur
Map<Identifier, Instant> lastRun,
NavigableMap<Identifier, Future<Object>> scheduledFutures, TaskScheduler taskScheduler,
Clock clock, SynchronousTransactionExecutor transactionExecutor,
LongRunningTasksRegistry longRunningTasksRegistry) {
LongRunningTasksRegistry longRunningTasksRegistry, Configuration configuration) {
super(longRunningTasksRegistry);
this.throttledFutures = throttledFutures;
this.lastRun = lastRun;
this.scheduledFutures = scheduledFutures;
this.taskScheduler = taskScheduler;
this.clock = clock;
this.transactionExecutor = transactionExecutor;
this.configuration = configuration;
standardEvaluationContext = makeDefaultContext();
lastClear = new AtomicReference<>(Instant.now(clock));
}
Expand Down Expand Up @@ -231,7 +233,7 @@ private static StandardEvaluationContext makeDefaultContext() {
}
}

// if there is a scheduled task and this throttled instance was executed in the last THROTTLE_THRESHOLD
// if there is a scheduled task and this throttled instance was executed in the last configuration.getThrottleThreshold()
// cancel the scheduled task
// -> the execution is further delayed
Future<Object> oldScheduledFuture = scheduledFutures.get(identifier);
Expand Down Expand Up @@ -423,13 +425,13 @@ private Runnable createRunnableToSchedule(ThrottledFuture<?> throttledFuture, Id

/**
* Discards futures from {@link #throttledFutures}, {@link #lastRun} and {@link #scheduledFutures} maps.
* <p>Every completed future for which a {@link Constants#THROTTLE_DISCARD_THRESHOLD} expired is discarded.</p>
* <p>Every completed future for which a {@link Configuration#throttleDiscardThreshold throttleDiscardThreshold} expired is discarded.</p>
* @see #isThresholdExpired(Identifier)
*/
private void clearOldFutures() {
// if the last clear was performed less than a threshold ago, skip it for now
Instant last = lastClear.get();
if (last.isAfter(Instant.now(clock).minus(THROTTLE_THRESHOLD).minus(THROTTLE_DISCARD_THRESHOLD))) {
if (last.isAfter(Instant.now(clock).minus(configuration.getThrottleThreshold()).minus(configuration.getThrottleDiscardThreshold()))) {
return;
}
if (!lastClear.compareAndSet(last, Instant.now(clock))) {
Expand All @@ -442,7 +444,7 @@ private void clearOldFutures() {
.stream())
.flatMap(s -> s).distinct().toList() // ensures safe modification of maps
.forEach(identifier -> {
if (isThresholdExpiredByMoreThan(identifier, THROTTLE_DISCARD_THRESHOLD)) {
if (isThresholdExpiredByMoreThan(identifier, configuration.getThrottleDiscardThreshold())) {
Optional.ofNullable(throttledFutures.get(identifier)).ifPresent(throttled -> {
if (throttled.isDone()) {
throttledFutures.remove(identifier);
Expand All @@ -465,23 +467,23 @@ private void clearOldFutures() {
* @param identifier of the task
* @param duration to add to the throttle threshold
* @return Whether the last time when a task with specified {@code identifier} run
* is older than ({@link Constants#THROTTLE_THRESHOLD} + {@code duration})
* is older than ({@link Configuration#throttleThreshold throttleThreshold} + {@code duration})
*/
private boolean isThresholdExpiredByMoreThan(Identifier identifier, Duration duration) {
return lastRun.getOrDefault(identifier, Instant.MAX).isBefore(Instant.now(clock).minus(THROTTLE_THRESHOLD).minus(duration));
return lastRun.getOrDefault(identifier, Instant.MAX).isBefore(Instant.now(clock).minus(configuration.getThrottleThreshold()).minus(duration));
}

/**
* @return Whether the time when the identifier last run is older than the threshold,
* true when the task had never run
*/
private boolean isThresholdExpired(Identifier identifier) {
return lastRun.getOrDefault(identifier, Instant.EPOCH).isBefore(Instant.now(clock).minus(THROTTLE_THRESHOLD));
return lastRun.getOrDefault(identifier, Instant.EPOCH).isBefore(Instant.now(clock).minus(configuration.getThrottleThreshold()));
}

@SuppressWarnings("unchecked")
private void schedule(Identifier identifier, Runnable task, boolean immediately) {
Instant startTime = Instant.now(clock).plus(THROTTLE_THRESHOLD);
Instant startTime = Instant.now(clock).plus(configuration.getThrottleThreshold());
if (immediately) {
startTime = Instant.now(clock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.vladsch.flexmark.util.collection.OrderedMap;
import cz.cvut.kbss.termit.exception.TermItException;
import cz.cvut.kbss.termit.exception.ThrottleAspectException;
import cz.cvut.kbss.termit.util.Configuration;
import cz.cvut.kbss.termit.util.longrunning.LongRunningTasksRegistry;
import org.aspectj.lang.ProceedingJoinPoint;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -33,8 +34,6 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import static cz.cvut.kbss.termit.util.Constants.THROTTLE_DISCARD_THRESHOLD;
import static cz.cvut.kbss.termit.util.Constants.THROTTLE_THRESHOLD;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -56,6 +55,7 @@

class ThrottleAspectTest {
private static final long THREAD_JOIN_TIMEOUT_MILLIS = 60 * 1000;
private final Configuration configuration = new Configuration();

/**
* Throttled futures from {@link #sut}
Expand Down Expand Up @@ -206,7 +206,7 @@ void beforeEach() throws Throwable {
transactionExecutor = spy(SynchronousTransactionExecutor.class);
longRunningTasksRegistry = mock(LongRunningTasksRegistry.class);

sut = new ThrottleAspect(throttledFutures, lastRun, scheduledFutures, taskScheduler, mockedClock, transactionExecutor, longRunningTasksRegistry);
sut = new ThrottleAspect(throttledFutures, lastRun, scheduledFutures, taskScheduler, mockedClock, transactionExecutor, longRunningTasksRegistry, configuration);
}

/**
Expand All @@ -221,13 +221,13 @@ void addSecond() {
}

void skipThreshold() {
clock = Clock.fixed(clock.instant().plus(THROTTLE_THRESHOLD), ZoneId.of("UTC"));
clock = Clock.fixed(clock.instant().plus(configuration.getThrottleThreshold()), ZoneId.of("UTC"));
}

void skipDiscardThreshold() {
clock = Clock.fixed(clock.instant()
.plus(THROTTLE_DISCARD_THRESHOLD)
.plus(THROTTLE_THRESHOLD)
.plus(configuration.getThrottleDiscardThreshold())
.plus(configuration.getThrottleThreshold())
.plusSeconds(1),
ZoneId.of("UTC"));
}
Expand Down Expand Up @@ -618,8 +618,8 @@ void taskFromMethodAnnotatedWithTransactionalIsExecutedWithTransactionExecutor()

/**
* When a task is executed, all three maps are cleared from
* entries older than {@link cz.cvut.kbss.termit.util.Constants#THROTTLE_DISCARD_THRESHOLD THROTTLE_DISCARD_THRESHOLD}
* plus {@link cz.cvut.kbss.termit.util.Constants#THROTTLE_THRESHOLD THROTTLE_THRESHOLD}
* entries older than {@link Configuration#throttleDiscardThreshold throttleDiscardThreshold}
* plus {@link Configuration#throttleThreshold throttleThreshold}
*/
@Test
void allMapsAreClearedAfterDiscardThreshold() throws Throwable {
Expand Down Expand Up @@ -819,7 +819,7 @@ void aspectDownNotThrowsOnEmptyGroup() {

@Test
void aspectConstructsFromAutowiredConstructor() {
assertDoesNotThrow(() -> new ThrottleAspect(taskScheduler, transactionExecutor, longRunningTasksRegistry));
assertDoesNotThrow(() -> new ThrottleAspect(taskScheduler, transactionExecutor, longRunningTasksRegistry, configuration));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cz.cvut.kbss.termit.util.throttle;

import cz.cvut.kbss.termit.util.Configuration;
import org.mockito.Mockito;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -27,4 +28,9 @@ public ThreadPoolTaskScheduler longRunningTaskScheduler() {
public ThrottledService throttledService() {
return new ThrottledService();
}

@Bean
public Configuration configuration() {
return new Configuration();
}
}

0 comments on commit 6df9653

Please sign in to comment.