Skip to content

Commit

Permalink
Merge pull request #548 from folio-org/cancel-query-experimentation
Browse files Browse the repository at this point in the history
[MODFQMMGR-510] Cancel queries in db
  • Loading branch information
bvsharp authored Dec 11, 2024
2 parents 9050c41 + a3d98b1 commit fb81183
Show file tree
Hide file tree
Showing 14 changed files with 479 additions and 349 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.folio.fqm.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

@Configuration
public class ScheduledExecutorServiceConfig {
@Bean(destroyMethod = "shutdown")
public ScheduledExecutorService scheduledExecutorService() {
return Executors.newScheduledThreadPool(1);
}
}
34 changes: 17 additions & 17 deletions src/main/java/org/folio/fqm/repository/EntityTypeRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,23 @@ public Stream<EntityType> getEntityTypeDefinitions(Collection<UUID> entityTypeId
Map<UUID, EntityType> entityTypes = entityTypeCache.get(tenantId != null ? tenantId : executionContext.getTenantId(), tenantIdKey -> {
String tableName = "".equals(tenantIdKey) ? TABLE_NAME : tenantIdKey + "_mod_fqm_manager." + TABLE_NAME;
Field<String> definitionField = field(DEFINITION_FIELD_NAME, String.class);
Map<String, EntityType> rawEntityTypes = readerJooqContext
.select(definitionField)
.from(table(tableName))
.fetch(definitionField)
.stream()
.map(this::unmarshallEntityType)
.collect(Collectors.toMap(EntityType::getId, Function.identity()));

return rawEntityTypes.values().stream()
.map(entityType -> {
String customFieldsEntityTypeId = entityType.getCustomFieldEntityTypeId();
if (customFieldsEntityTypeId != null) {
entityType.getColumns().addAll(fetchColumnNamesForCustomFields(customFieldsEntityTypeId, entityType, rawEntityTypes));
}
return entityType;
})
.collect(Collectors.toMap(entityType -> UUID.fromString(entityType.getId()), Function.identity()));
Map<String, EntityType> rawEntityTypes = readerJooqContext
.select(definitionField)
.from(table(tableName))
.fetch(definitionField)
.stream()
.map(this::unmarshallEntityType)
.collect(Collectors.toMap(EntityType::getId, Function.identity()));

return rawEntityTypes.values().stream()
.map(entityType -> {
String customFieldsEntityTypeId = entityType.getCustomFieldEntityTypeId();
if (customFieldsEntityTypeId != null) {
entityType.getColumns().addAll(fetchColumnNamesForCustomFields(customFieldsEntityTypeId, entityType, rawEntityTypes));
}
return entityType;
})
.collect(Collectors.toMap(entityType -> UUID.fromString(entityType.getId()), Function.identity()));
}
);

Expand Down
78 changes: 63 additions & 15 deletions src/main/java/org/folio/fqm/repository/IdStreamer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

Expand All @@ -44,23 +46,26 @@
@Log4j2
public class IdStreamer {

private static final long QUERY_CANCELLATION_CHECK_SECONDS = 30;

@Qualifier("readerJooqContext")
private final DSLContext jooqContext;
private final EntityTypeFlatteningService entityTypeFlatteningService;
private final CrossTenantQueryService crossTenantQueryService;
private final FolioExecutionContext executionContext;
private final QueryRepository queryRepository;
private final QueryResultsRepository queryResultsRepository;
private final ScheduledExecutorService executorService;

/**
* Executes the given Fql Query and stream the result Ids back.
*/
public void streamIdsInBatch(EntityType entityType,
boolean sortResults,
Fql fql,
int batchSize,
int maxQuerySize,
UUID queryId) {
boolean sortResults,
Fql fql,
int batchSize,
int maxQuerySize,
UUID queryId) {
boolean ecsEnabled = crossTenantQueryService.ecsEnabled();
List<String> tenantsToQuery = crossTenantQueryService.getTenantsToQuery(entityType);
this.streamIdsInBatch(entityType, sortResults, fql, batchSize, maxQuerySize, queryId, tenantsToQuery, ecsEnabled);
Expand All @@ -87,9 +92,9 @@ public List<List<String>> getSortedIds(String derivedTableName,
}

private void streamIdsInBatch(EntityType entityType,
boolean sortResults,
Fql fql, int batchSize,
int maxQuerySize, UUID queryId, List<String> tenantsToQuery, boolean ecsEnabled) {
boolean sortResults,
Fql fql, int batchSize,
int maxQuerySize, UUID queryId, List<String> tenantsToQuery, boolean ecsEnabled) {
UUID entityTypeId = UUID.fromString(entityType.getId());
log.debug("List of tenants to query: {}", tenantsToQuery);
Field<String[]> idValueGetter = EntityTypeUtils.getResultIdValueGetter(entityType);
Expand All @@ -112,7 +117,8 @@ private void streamIdsInBatch(EntityType entityType,
innerJoinClause,
whereClause,
sortResults,
batchSize
batchSize,
queryId
);
if (fullQuery == null) {
fullQuery = (Select<Record1<String[]>>) innerQuery;
Expand All @@ -126,6 +132,8 @@ private void streamIdsInBatch(EntityType entityType,
fullQuery.fetchSize(batchSize);
}

monitorQueryCancellation(queryId);

try (
Cursor<Record1<String[]>> idsCursor = fullQuery.fetchLazy();
Stream<String[]> idStream = idsCursor
Expand All @@ -137,9 +145,7 @@ private void streamIdsInBatch(EntityType entityType,
) {
var total = new AtomicInteger();
idsStream.map(ids -> new IdsWithCancelCallback(ids, idsStream::close))
.forEach(idsWithCancelCallback -> {
handleBatch(queryId, idsWithCancelCallback, maxQuerySize, total);
});
.forEach(idsWithCancelCallback -> handleBatch(queryId, idsWithCancelCallback, maxQuerySize, total));
}
}

Expand All @@ -162,7 +168,47 @@ void handleBatch(UUID queryId, IdsWithCancelCallback idsWithCancelCallback, Inte
}
}

private ResultQuery<Record1<String[]>> buildQuery(EntityType entityType, Field<String[]> idValueGetter, String finalJoinClause, Condition sqlWhereClause, boolean sortResults, int batchSize) {
void monitorQueryCancellation(UUID queryId) {
Runnable cancellationMonitor = new Runnable() {
@Override
public void run() {
try {
log.debug("Checking query cancellation for query {}", queryId);
QueryStatus queryStatus = queryRepository
.getQuery(queryId, false)
.orElseThrow(() -> new QueryNotFoundException(queryId))
.status();
if (queryStatus == QueryStatus.CANCELLED) {
cancelQuery(queryId);
} else if (queryStatus == QueryStatus.IN_PROGRESS) {
// Reschedule the cancellation monitor if query is still in progress
executorService.schedule(this, QUERY_CANCELLATION_CHECK_SECONDS, TimeUnit.SECONDS);
}
} catch (Exception e) {
log.error("Unexpected error occurred while cancelling query: {}", e.getMessage(), e);
}
}
};
executorService.schedule(cancellationMonitor, 0, TimeUnit.SECONDS);
}

void cancelQuery(UUID queryId) {
log.info("Query {} has been marked as cancelled. Cancelling query in database.", queryId);
String querySearchText = "%Query ID: " + queryId + "%";
List<Integer> pids = jooqContext
.select(field("pid", Integer.class))
.from(table("pg_stat_activity"))
.where(field("state").eq("active"))
.and(field("query").like(querySearchText))
.fetchInto(Integer.class);
for (int pid : pids) {
log.debug("PID for the executing query: {}", pid);
jooqContext.execute("SELECT pg_cancel_backend(?)", pid);
}
}

private ResultQuery<Record1<String[]>> buildQuery(EntityType entityType, Field<String[]> idValueGetter, String finalJoinClause, Condition sqlWhereClause, boolean sortResults, int batchSize, UUID queryId) {
String hint = "/* Query ID: " + queryId + " */";
if (!isEmpty(entityType.getGroupByFields())) {
Field<?>[] groupByFields = entityType
.getColumns()
Expand All @@ -171,16 +217,18 @@ private ResultQuery<Record1<String[]>> buildQuery(EntityType entityType, Field<S
.map(col -> col.getFilterValueGetter() == null ? col.getValueGetter() : col.getFilterValueGetter())
.map(DSL::field)
.toArray(Field[]::new);
return jooqContext.dsl()
return jooqContext
.select(field(idValueGetter))
.hint(hint)
.from(finalJoinClause)
.where(sqlWhereClause)
.groupBy(groupByFields)
.orderBy(EntityTypeUtils.getSortFields(entityType, sortResults))
.fetchSize(batchSize);
} else {
return jooqContext.dsl()
return jooqContext
.select(field(idValueGetter))
.hint(hint)
.from(finalJoinClause)
.where(sqlWhereClause)
.orderBy(EntityTypeUtils.getSortFields(entityType, sortResults))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void updateQuery(UUID queryId, QueryStatus queryStatus, OffsetDateTime en
.execute();
}

@Cacheable(value="queryCache", condition="#useCache==true")
@Cacheable(value = "queryCache", condition = "#useCache==true")
public Optional<Query> getQuery(UUID queryId, boolean useCache) {
return Optional.ofNullable(jooqContext.select()
.from(table(QUERY_DETAILS_TABLE))
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/org/folio/fqm/service/QueryExecutionService.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.folio.fqm.service;

import java.time.OffsetDateTime;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.fqm.domain.Query;
import org.folio.fqm.domain.QueryStatus;
import org.folio.fqm.exception.QueryNotFoundException;
import org.folio.fqm.model.FqlQueryWithContext;
import org.folio.fqm.repository.QueryRepository;
import org.folio.querytool.domain.dto.EntityType;
Expand Down Expand Up @@ -42,7 +44,18 @@ public void executeQueryAsync(Query query, EntityType entityType, int maxQuerySi
query
);
} catch (Exception exception) {
handleFailure(query, exception);
try {
// Check query status. Only mark query as failed if query is not already cancelled
QueryStatus queryStatus = queryRepository
.getQuery(query.queryId(), false)
.orElseThrow(() -> new QueryNotFoundException(query.queryId()))
.status();
if (queryStatus != QueryStatus.CANCELLED) {
handleFailure(query, exception);
}
} catch (QueryNotFoundException e) {
handleFailure(query, e);
}
}
}

Expand Down
Loading

0 comments on commit fb81183

Please sign in to comment.