Skip to content

Commit

Permalink
fix(search-ids): keep right context in resource-id thread (#580)
Browse files Browse the repository at this point in the history
Closes: MSEARCH-754

(cherry picked from commit ae73fa6)
  • Loading branch information
psmagin committed May 10, 2024
1 parent c6ae986 commit ae76bcf
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 30 deletions.
8 changes: 7 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v3.2.5 2024-05-10
### Bug fixes
* Keep right context in resource-id thread ([MSEARCH-754](https://folio-org.atlassian.net/browse/MSEARCH-754))

---

## v3.2.4 2024-05-03
### Bug fixes
* Fix title count when on central tenant ([MSEARCH-745](https://issues.folio.org/browse/MSEARCH-745))
Expand All @@ -12,7 +18,7 @@

## v3.2.2 2024-04-10
### Bug fixes
* Do not delete kafka topics if tenant collection topic feature is enabled ([MSEARCH-725](https://issues.folio.org/browse/MSEARCH-725))
* Do not delete kafka topics if collection topic is enabled ([MSEARCH-725](https://folio-org.atlassian.net/browse/MSEARCH-725))

---

Expand Down
6 changes: 4 additions & 2 deletions src/main/java/org/folio/search/configuration/AsyncConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.concurrent.Executor;
import lombok.RequiredArgsConstructor;
import org.folio.search.configuration.properties.StreamIdsProperties;
import org.folio.spring.scope.FolioExecutionScopeExecutionContextManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
Expand All @@ -15,13 +16,14 @@ public class AsyncConfig {

private final StreamIdsProperties streamIdsProperties;

@Bean
public Executor taskExecutor() {
@Bean("streamIdsExecutor")
public Executor streamIdsExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(streamIdsProperties.getCorePoolSize());
executor.setMaxPoolSize(streamIdsProperties.getMaxPoolSize());
executor.setQueueCapacity(streamIdsProperties.getQueueCapacity());
executor.setThreadNamePrefix("StreamResourceIds-");
executor.setTaskDecorator(FolioExecutionScopeExecutionContextManager::getRunnableWithCurrentFolioContext);
executor.initialize();
return executor;
}
Expand Down
13 changes: 6 additions & 7 deletions src/main/java/org/folio/search/service/ResourceIdService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.folio.search.repository.ResourceIdsJobRepository;
import org.folio.search.repository.ResourceIdsTemporaryRepository;
import org.folio.search.repository.SearchRepository;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.folio.spring.FolioExecutionContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -41,7 +41,7 @@ public class ResourceIdService {
private final CqlSearchQueryConverter queryConverter;
private final ResourceIdsJobRepository jobRepository;
private final ResourceIdsTemporaryRepository idsTemporaryRepository;
private final SystemUserScopedExecutionService executionService;
private final FolioExecutionContext folioExecutionContext;

/**
* Returns resource ids for passed cql query in text type.
Expand Down Expand Up @@ -140,11 +140,10 @@ protected OutputStreamWriter createOutputStreamWriter(OutputStream outputStream)
private void streamResourceIds(CqlResourceIdsRequest request, Consumer<List<String>> idsConsumer) {
log.info("streamResourceIds:: by [query: {}, resource: {}]", request.getQuery(), request.getResource());

var searchSource = executionService.executeSystemUserScoped(request.getTenantId(),
() -> queryConverter.convertForConsortia(request.getQuery(), request.getResource())
.size(streamIdsProperties.getScrollQuerySize())
.fetchSource(new String[] {request.getSourceFieldPath()}, null)
.sort(fieldSort("_doc")));
var searchSource = queryConverter.convertForConsortia(request.getQuery(), request.getResource())
.size(streamIdsProperties.getScrollQuerySize())
.fetchSource(new String[] {request.getSourceFieldPath()}, null)
.sort(fieldSort("_doc"));

searchRepository.streamResourceIds(request, searchSource, idsConsumer);
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/org/folio/search/service/ResourceIdsJobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import java.security.SecureRandom;
import java.util.Date;
import java.util.concurrent.Executor;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.RandomStringUtils;
import org.folio.search.converter.ResourceIdsJobMapper;
import org.folio.search.domain.dto.ResourceIdsJob;
import org.folio.search.model.streamids.ResourceIdsJobEntity;
import org.folio.search.model.types.StreamJobStatus;
import org.folio.search.repository.ResourceIdsJobRepository;
import org.folio.search.service.consortium.ConsortiumTenantExecutor;
Expand All @@ -21,6 +23,7 @@ public class ResourceIdsJobService {
private final ResourceIdsJobRepository jobRepository;
private final ResourceIdsJobMapper resourceIdsJobMapper;
private final ResourceIdService resourceIdService;
private final Executor streamIdsExecutor;

public ResourceIdsJob getJobById(String id) {
var jobEntity = consortiumTenantExecutor.execute(() -> jobRepository.getReferenceById(id));
Expand All @@ -35,12 +38,17 @@ public ResourceIdsJob createStreamJob(ResourceIdsJob job, String tenantId) {
entity.setTemporaryTableName(generateTemporaryTableName());

log.info("Attempts to create streamJob by [resourceIdsJob: {}]", entity);
var savedJob = consortiumTenantExecutor.execute(() -> jobRepository.save(entity));
var savedJob = consortiumTenantExecutor.execute(() -> saveAndRun(entity, tenantId));

consortiumTenantExecutor.runAsync(() -> resourceIdService.streamResourceIdsForJob(savedJob, tenantId));
return resourceIdsJobMapper.convert(savedJob);
}

private ResourceIdsJobEntity saveAndRun(ResourceIdsJobEntity entity, String tenantId) {
var job = jobRepository.save(entity);
streamIdsExecutor.execute(() -> resourceIdService.streamResourceIdsForJob(job, tenantId));
return job;
}

private String generateTemporaryTableName() {
return RandomStringUtils
.random(32, 0, 0, true, false, null, new SecureRandom())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import lombok.extern.log4j.Log4j2;
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Log4j2
Expand Down Expand Up @@ -39,9 +38,4 @@ public void run(Runnable operation) {
});
}

@Async
public void runAsync(Runnable operation) {
run(operation);
}

}
12 changes: 0 additions & 12 deletions src/test/java/org/folio/search/service/ResourceIdServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.opensearch.index.query.QueryBuilders.termQuery;
Expand All @@ -21,7 +20,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import org.folio.search.configuration.properties.StreamIdsProperties;
import org.folio.search.cql.CqlSearchQueryConverter;
Expand All @@ -33,9 +31,7 @@
import org.folio.search.model.types.StreamJobStatus;
import org.folio.search.repository.ResourceIdsJobRepository;
import org.folio.search.repository.SearchRepository;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.folio.spring.testing.type.UnitTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -65,17 +61,9 @@ class ResourceIdServiceTest {
private StreamIdsProperties properties;
@Mock
private ResourceIdsJobRepository jobRepository;
@Mock
private SystemUserScopedExecutionService executionService;
@Spy
private final ObjectMapper objectMapper = OBJECT_MAPPER;

@BeforeEach
void setUp() {
lenient().when(executionService.executeSystemUserScoped(any(), any()))
.thenAnswer(invocation -> ((Callable<?>) invocation.getArgument(1)).call());
}

@Test
void streamResourceIds() throws IOException {
when(queryConverter.convertForConsortia(TEST_QUERY, RESOURCE_NAME)).thenReturn(searchSource());
Expand Down

0 comments on commit ae76bcf

Please sign in to comment.