Skip to content

Commit

Permalink
Delete pending endpoint (teamdigitale#84)
Browse files Browse the repository at this point in the history
* endpoint per la cancellazione dei run "appesi"

* endpoint per la cancellazione dei run appesi
  • Loading branch information
gnespolino authored Dec 22, 2023
1 parent 2c6fb68 commit efcd226
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,29 @@ public class SimpleHarvestRepositoryProcessor {
private final NdcEventPublisher ndcEventPublisher;
private final List<String> locks = new ArrayList<>();

private void setThreadName(String repoId, String revision, String status) {
Thread.currentThread().setName(THREAD_PREFIX + repoId + "-" + revision + "-" + status);
}
private boolean harvestingInProgress = false;

public List<String> getAllRunningHarvests() {
public static List<String> getAllRunningHarvestThreadNames() {
return ThreadUtils.getAllThreads().stream()
.map(Thread::getName)
.filter(name -> startsWith(name, THREAD_PREFIX))
.filter(name -> endsWith(name, "RUNNING"))
.collect(Collectors.toList());
}

public boolean isHarvestingInProgress() {
return harvestingInProgress;
}

private void setThreadName(String runId, String repoId, String revision, String status) {
Thread.currentThread().setName(THREAD_PREFIX + "|" + runId + "|" + repoId + "|" + revision + "|" + status);
}

@Async
public void execute(String runId, Repository repository, String correlationId, String revision, boolean force, String currentUserLogin) {
this.harvestingInProgress = true;
try {
setThreadName(repository.getId(), revision, "RUNNING");
setThreadName(runId, repository.getId(), revision, "RUNNING");
publishHarvesterStartedEvent(repository, correlationId, revision, runId, currentUserLogin);

synchronized (locks) {
Expand All @@ -64,7 +71,7 @@ public void execute(String runId, Repository repository, String correlationId, S
String.format("Harvesting for repo %s is already running",
repository.getUrl())),
currentUserLogin);
setThreadName(repository.getId(), revision, "IDLE");
setThreadName(runId, repository.getId(), revision, "IDLE");
return;
}
locks.add(repository.getId() + revision);
Expand All @@ -79,7 +86,7 @@ public void execute(String runId, Repository repository, String correlationId, S
harvesterService.harvest(repository, revision);

publishHarvesterSuccessfulEvent(repository, correlationId, revision, runId, currentUserLogin);
setThreadName(repository.getId(), revision, "IDLE");
setThreadName(runId, repository.getId(), revision, "IDLE");
} catch (HarvesterAlreadyExecuted e) {
publishHarvesterFailedEvent(repository, correlationId, revision, runId, HarvesterRun.Status.UNCHANGED, e, currentUserLogin);
} catch (HarvesterAlreadyInProgress e) {
Expand All @@ -89,7 +96,8 @@ public void execute(String runId, Repository repository, String correlationId, S
log.error("Unable to process {}", repository.getUrl(), e);
}
removeLock(repository, revision);
setThreadName(repository.getId(), revision, "IDLE");
setThreadName(runId, repository.getId(), revision, "IDLE");
this.harvestingInProgress = false;
}

private void removeLock(Repository repository, String revision) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
Expand Down Expand Up @@ -38,6 +39,16 @@ public List<HarvesterRun> getAllRuns() {
return harvesterRunService.getAllRuns();
}

@GetMapping("jobs/harvest/running")
public List<RunningInstance> getAllRunningInstance() {
return harvesterRunService.getAllRunningInstances();
}

@DeleteMapping("jobs/harvest/run")
public void deletePendingRuns() {
harvesterRunService.deletePendingRuns();
}

@PostMapping(value = "jobs/harvest", params = "repositoryId")
public JobExecutionResponse harvestRepositories(
@RequestParam("repositoryId") String repositoryId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package it.gov.innovazione.ndc.controller;

import it.gov.innovazione.ndc.model.harvester.HarvesterRun;
import lombok.Builder;
import lombok.Data;

@Builder
@Data
public class RunningInstance {
private final String threadName;
private final HarvesterRun harvesterRun;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package it.gov.innovazione.ndc.harvester.service;

import it.gov.innovazione.ndc.controller.RunningInstance;
import it.gov.innovazione.ndc.model.harvester.HarvesterRun;
import it.gov.innovazione.ndc.model.harvester.Repository;
import lombok.RequiredArgsConstructor;
Expand All @@ -12,9 +13,14 @@
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static it.gov.innovazione.ndc.config.SimpleHarvestRepositoryProcessor.getAllRunningHarvestThreadNames;
import static org.apache.commons.lang3.StringUtils.contains;
import static org.apache.commons.lang3.StringUtils.endsWithIgnoreCase;
import static org.apache.commons.lang3.StringUtils.equalsIgnoreCase;
import static org.apache.commons.lang3.StringUtils.startsWithIgnoreCase;

Expand All @@ -24,6 +30,7 @@
public class HarvesterRunService {

private final JdbcTemplate jdbcTemplate;

private static final Long HARVESTING_RECENT_DAYS = 30L;

public int saveHarvesterRun(HarvesterRun harvesterRun) {
Expand Down Expand Up @@ -148,4 +155,63 @@ private Instant getInstant(ResultSet rs, String column) {
}
}

public void deletePendingRuns() {
getAllRuns()
.stream()
.filter(harvesterRun -> harvesterRun.getStatus() == HarvesterRun.Status.RUNNING)
.forEach(this::deleteIfNecessary);
}

private void deleteIfNecessary(HarvesterRun harvesterRun) {
if (getAllRunningHarvestThreadNames()
.stream()
.noneMatch(threadName -> matchRunning(harvesterRun, threadName))) {
delete(harvesterRun);
}
}

private boolean matchRunning(HarvesterRun harvesterRun, String threadName) {
return contains(threadName, harvesterRun.getRepositoryId())
&& contains(threadName, harvesterRun.getRevision())
&& contains(threadName, harvesterRun.getId())
&& endsWithIgnoreCase(threadName, "RUNNING");
}

private void delete(HarvesterRun harvesterRun) {
String query = "DELETE FROM HARVESTER_RUN WHERE ID = ?";
jdbcTemplate.update(query, harvesterRun.getId());
}

public List<RunningInstance> getAllRunningInstances() {
List<HarvesterRun> allRuns = getAllRuns();
return getAllRunningHarvestThreadNames()
.stream()
.map(threadName -> asRunningInstance(allRuns, threadName))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

private RunningInstance asRunningInstance(List<HarvesterRun> allRuns, String threadName) {
HarvesterRun harvesterRun = findHarvesterRun(allRuns, threadName);
if (harvesterRun != null) {
return RunningInstance.builder()
.threadName(threadName)
.harvesterRun(findHarvesterRun(allRuns, threadName))
.build();
}
return null;
}

private HarvesterRun findHarvesterRun(List<HarvesterRun> allRuns, String threadName) {
try {
String[] split = threadName.split("\\|");
String runId = split[1];
return allRuns.stream()
.filter(harvesterRun -> harvesterRun.getId().equals(runId))
.findFirst()
.orElse(null);
} catch (Exception e) {
return null;
}
}
}

0 comments on commit efcd226

Please sign in to comment.