Skip to content

Commit

Permalink
[Enhancement kbss-cvut/termit-ui#535] Add maintenance endpoint that a…
Browse files Browse the repository at this point in the history
…llows an administrator to clear the long-running tasks queue.

All non-running throttled futures are cancelled and non-running tasks are removed from the long-running task registry.
  • Loading branch information
lukaskabc committed Dec 10, 2024
1 parent 5bc2cc8 commit 103d558
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package cz.cvut.kbss.termit.event;

import org.springframework.context.ApplicationEvent;

/**
* Indicates that the long-running task queue should be cleared.
*/
public class ClearLongRunningTaskQueueEvent extends ApplicationEvent {
public ClearLongRunningTaskQueueEvent(Object source) {
super(source);
}
}
11 changes: 11 additions & 0 deletions src/main/java/cz/cvut/kbss/termit/rest/AdminController.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,15 @@ public void invalidateCaches() {
LOG.debug("Cache invalidation request received from client.");
adminBean.invalidateCaches();
}

@Operation(security = {@SecurityRequirement(name = "bearer-key")},
description = "Clears the queue of long-running tasks.")
@ApiResponse(responseCode = "204", description = "Long-running tasks queue cleared.")
@PreAuthorize("hasRole('" + SecurityConstants.ROLE_ADMIN + "')")
@DeleteMapping("/long-running-tasks")
@ResponseStatus(HttpStatus.NO_CONTENT)
public void clearLongRunningTasksQueue() {
LOG.debug("Long-running task queue clearing request received from client.");
adminBean.clearLongRunningTasksQueue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package cz.cvut.kbss.termit.service.jmx;

import cz.cvut.kbss.termit.event.ClearLongRunningTaskQueueEvent;
import cz.cvut.kbss.termit.event.EvictCacheEvent;
import cz.cvut.kbss.termit.event.RefreshLastModifiedEvent;
import cz.cvut.kbss.termit.rest.dto.HealthInfo;
Expand Down Expand Up @@ -67,6 +68,12 @@ public void invalidateCaches() {
eventPublisher.publishEvent(new RefreshLastModifiedEvent(this));
}

@ManagedOperation(description = "Clears the queue of long-running tasks.")
public void clearLongRunningTasksQueue() {
LOG.info("Clearing long-running tasks queue...");
eventPublisher.publishEvent(new ClearLongRunningTaskQueueEvent(this));
}

@ManagedOperation(description = "Sends test email to the specified address.")
public void sendTestEmail(String address) {
final Message message = Message.to(address).subject("TermIt Test Email")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package cz.cvut.kbss.termit.util.longrunning;

import cz.cvut.kbss.termit.event.ClearLongRunningTaskQueueEvent;
import cz.cvut.kbss.termit.event.LongRunningTaskChangedEvent;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class LongRunningTasksRegistry {
Expand Down Expand Up @@ -38,13 +43,39 @@ public void onTaskChanged(@Nonnull final LongRunningTask task) {
eventPublisher.publishEvent(new LongRunningTaskChangedEvent(this, status));
}

@Order(Ordered.LOWEST_PRECEDENCE)
@EventListener(ClearLongRunningTaskQueueEvent.class)
public void onClearLongRunningTaskQueueEvent() {
AtomicInteger count = new AtomicInteger();
LOG.info("Clearing long running task registry...");

registry.entrySet().removeIf(entry -> {
if (!entry.getValue().isRunning()) {
count.incrementAndGet();
return true;
}
return false;
});
performCleanup();

if (count.get() > 0) {
LOG.warn("Cleared {} non-running tasks from the registry", count.get());
} else {
LOG.info("Long running task registry cleared.");
}
}

private void handleTaskChanged(@Nonnull final LongRunningTask task) {
if(task.isDone()) {
registry.remove(task.getUuid());
} else {
registry.put(task.getUuid(), task);
}

performCleanup();
}

private void performCleanup() {
// perform cleanup
registry.forEach((key, value) -> {
if (value.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cz.cvut.kbss.termit.util.throttle;

import cz.cvut.kbss.termit.TermItApplication;
import cz.cvut.kbss.termit.event.ClearLongRunningTaskQueueEvent;
import cz.cvut.kbss.termit.exception.TermItException;
import cz.cvut.kbss.termit.exception.ThrottleAspectException;
import cz.cvut.kbss.termit.util.Configuration;
Expand All @@ -18,6 +19,8 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.context.annotation.Scope;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.EvaluationException;
Expand All @@ -39,6 +42,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand Down Expand Up @@ -182,6 +186,51 @@ private static StandardEvaluationContext makeDefaultContext() {
return standardEvaluationContext;
}

/**
* Prevents accepting new tasks by synchronization
* and cancels all scheduled tasks.
*/
@Order(Ordered.HIGHEST_PRECEDENCE)
@EventListener(ClearLongRunningTaskQueueEvent.class)
public void onClearLongRunningTaskQueueEvent() {
synchronized (throttledFutures) { // synchronize in the filed declaration order
synchronized (lastRun) {
synchronized (scheduledFutures) {
LOG.info("Clearing throttled tasks...");

long count = 0;
Iterator<Map.Entry<Identifier, ThrottledFuture<Object>>> throttledIt =
throttledFutures.entrySet().iterator();

while(throttledIt.hasNext()) {
final Map.Entry<Identifier, ThrottledFuture<Object>> entry = throttledIt.next();
final ThrottledFuture<Object> future = entry.getValue();
final Identifier identifier = entry.getKey();
if(future.isRunning() || future.isDone()) continue;

// cancel the throttled future
future.cancel(false);
// cancel the scheduled future
Optional.ofNullable(scheduledFutures.get(identifier))
.ifPresent(scheduled -> {
scheduled.cancel(false);
if (scheduled.isCancelled()) {
scheduledFutures.remove(identifier);
}
});
if (future.isCancelled()) {
throttledIt.remove();
}
count++;
notifyTaskChanged(future);
}
clearOldFutures();
LOG.info("Cancelled {} pending throttled tasks", count);
}
}
}
}

/**
* @return future or null
* @throws TermItException when the target method throws
Expand Down

0 comments on commit 103d558

Please sign in to comment.