diff --git a/NEWS.md b/NEWS.md index 57335a8a0..7bbf7810e 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,9 @@ +## v3.2.5 2024-05-10 +### Bug fixes +* Keep right context in resource-id thread ([MSEARCH-754](https://folio-org.atlassian.net/browse/MSEARCH-754)) + +--- + ## v3.2.4 2024-05-03 ### Bug fixes * Fix title count when on central tenant ([MSEARCH-745](https://issues.folio.org/browse/MSEARCH-745)) @@ -12,7 +18,7 @@ ## v3.2.2 2024-04-10 ### Bug fixes -* Do not delete kafka topics if tenant collection topic feature is enabled ([MSEARCH-725](https://issues.folio.org/browse/MSEARCH-725)) +* Do not delete kafka topics if collection topic is enabled ([MSEARCH-725](https://folio-org.atlassian.net/browse/MSEARCH-725)) --- diff --git a/src/main/java/org/folio/search/configuration/AsyncConfig.java b/src/main/java/org/folio/search/configuration/AsyncConfig.java index 1b2c2ad5a..42f21748f 100644 --- a/src/main/java/org/folio/search/configuration/AsyncConfig.java +++ b/src/main/java/org/folio/search/configuration/AsyncConfig.java @@ -3,6 +3,7 @@ import java.util.concurrent.Executor; import lombok.RequiredArgsConstructor; import org.folio.search.configuration.properties.StreamIdsProperties; +import org.folio.spring.scope.FolioExecutionScopeExecutionContextManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; @@ -15,13 +16,14 @@ public class AsyncConfig { private final StreamIdsProperties streamIdsProperties; - @Bean - public Executor taskExecutor() { + @Bean("streamIdsExecutor") + public Executor streamIdsExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(streamIdsProperties.getCorePoolSize()); executor.setMaxPoolSize(streamIdsProperties.getMaxPoolSize()); executor.setQueueCapacity(streamIdsProperties.getQueueCapacity()); executor.setThreadNamePrefix("StreamResourceIds-"); + executor.setTaskDecorator(FolioExecutionScopeExecutionContextManager::getRunnableWithCurrentFolioContext); executor.initialize(); return executor; } diff --git a/src/main/java/org/folio/search/service/ResourceIdService.java b/src/main/java/org/folio/search/service/ResourceIdService.java index 52789dc43..426197cc0 100644 --- a/src/main/java/org/folio/search/service/ResourceIdService.java +++ b/src/main/java/org/folio/search/service/ResourceIdService.java @@ -26,7 +26,7 @@ import org.folio.search.repository.ResourceIdsJobRepository; import org.folio.search.repository.ResourceIdsTemporaryRepository; import org.folio.search.repository.SearchRepository; -import org.folio.spring.service.SystemUserScopedExecutionService; +import org.folio.spring.FolioExecutionContext; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -41,7 +41,7 @@ public class ResourceIdService { private final CqlSearchQueryConverter queryConverter; private final ResourceIdsJobRepository jobRepository; private final ResourceIdsTemporaryRepository idsTemporaryRepository; - private final SystemUserScopedExecutionService executionService; + private final FolioExecutionContext folioExecutionContext; /** * Returns resource ids for passed cql query in text type. @@ -140,11 +140,10 @@ protected OutputStreamWriter createOutputStreamWriter(OutputStream outputStream) private void streamResourceIds(CqlResourceIdsRequest request, Consumer> idsConsumer) { log.info("streamResourceIds:: by [query: {}, resource: {}]", request.getQuery(), request.getResource()); - var searchSource = executionService.executeSystemUserScoped(request.getTenantId(), - () -> queryConverter.convertForConsortia(request.getQuery(), request.getResource()) - .size(streamIdsProperties.getScrollQuerySize()) - .fetchSource(new String[] {request.getSourceFieldPath()}, null) - .sort(fieldSort("_doc"))); + var searchSource = queryConverter.convertForConsortia(request.getQuery(), request.getResource()) + .size(streamIdsProperties.getScrollQuerySize()) + .fetchSource(new String[] {request.getSourceFieldPath()}, null) + .sort(fieldSort("_doc")); searchRepository.streamResourceIds(request, searchSource, idsConsumer); } diff --git a/src/main/java/org/folio/search/service/ResourceIdsJobService.java b/src/main/java/org/folio/search/service/ResourceIdsJobService.java index 956863a4c..4b848d00f 100644 --- a/src/main/java/org/folio/search/service/ResourceIdsJobService.java +++ b/src/main/java/org/folio/search/service/ResourceIdsJobService.java @@ -2,11 +2,13 @@ import java.security.SecureRandom; import java.util.Date; +import java.util.concurrent.Executor; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.RandomStringUtils; import org.folio.search.converter.ResourceIdsJobMapper; import org.folio.search.domain.dto.ResourceIdsJob; +import org.folio.search.model.streamids.ResourceIdsJobEntity; import org.folio.search.model.types.StreamJobStatus; import org.folio.search.repository.ResourceIdsJobRepository; import org.folio.search.service.consortium.ConsortiumTenantExecutor; @@ -21,6 +23,7 @@ public class ResourceIdsJobService { private final ResourceIdsJobRepository jobRepository; private final ResourceIdsJobMapper resourceIdsJobMapper; private final ResourceIdService resourceIdService; + private final Executor streamIdsExecutor; public ResourceIdsJob getJobById(String id) { var jobEntity = consortiumTenantExecutor.execute(() -> jobRepository.getReferenceById(id)); @@ -35,12 +38,17 @@ public ResourceIdsJob createStreamJob(ResourceIdsJob job, String tenantId) { entity.setTemporaryTableName(generateTemporaryTableName()); log.info("Attempts to create streamJob by [resourceIdsJob: {}]", entity); - var savedJob = consortiumTenantExecutor.execute(() -> jobRepository.save(entity)); + var savedJob = consortiumTenantExecutor.execute(() -> saveAndRun(entity, tenantId)); - consortiumTenantExecutor.runAsync(() -> resourceIdService.streamResourceIdsForJob(savedJob, tenantId)); return resourceIdsJobMapper.convert(savedJob); } + private ResourceIdsJobEntity saveAndRun(ResourceIdsJobEntity entity, String tenantId) { + var job = jobRepository.save(entity); + streamIdsExecutor.execute(() -> resourceIdService.streamResourceIdsForJob(job, tenantId)); + return job; + } + private String generateTemporaryTableName() { return RandomStringUtils .random(32, 0, 0, true, false, null, new SecureRandom()) diff --git a/src/main/java/org/folio/search/service/consortium/ConsortiumTenantExecutor.java b/src/main/java/org/folio/search/service/consortium/ConsortiumTenantExecutor.java index 7987af748..60493e4a3 100644 --- a/src/main/java/org/folio/search/service/consortium/ConsortiumTenantExecutor.java +++ b/src/main/java/org/folio/search/service/consortium/ConsortiumTenantExecutor.java @@ -5,7 +5,6 @@ import lombok.extern.log4j.Log4j2; import org.folio.spring.FolioExecutionContext; import org.folio.spring.service.SystemUserScopedExecutionService; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @Log4j2 @@ -39,9 +38,4 @@ public void run(Runnable operation) { }); } - @Async - public void runAsync(Runnable operation) { - run(operation); - } - } diff --git a/src/test/java/org/folio/search/service/ResourceIdServiceTest.java b/src/test/java/org/folio/search/service/ResourceIdServiceTest.java index 4a3c9fb52..64ea07a71 100644 --- a/src/test/java/org/folio/search/service/ResourceIdServiceTest.java +++ b/src/test/java/org/folio/search/service/ResourceIdServiceTest.java @@ -12,7 +12,6 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.opensearch.index.query.QueryBuilders.termQuery; @@ -21,7 +20,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; -import java.util.concurrent.Callable; import java.util.function.Consumer; import org.folio.search.configuration.properties.StreamIdsProperties; import org.folio.search.cql.CqlSearchQueryConverter; @@ -33,9 +31,7 @@ import org.folio.search.model.types.StreamJobStatus; import org.folio.search.repository.ResourceIdsJobRepository; import org.folio.search.repository.SearchRepository; -import org.folio.spring.service.SystemUserScopedExecutionService; import org.folio.spring.testing.type.UnitTest; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -65,17 +61,9 @@ class ResourceIdServiceTest { private StreamIdsProperties properties; @Mock private ResourceIdsJobRepository jobRepository; - @Mock - private SystemUserScopedExecutionService executionService; @Spy private final ObjectMapper objectMapper = OBJECT_MAPPER; - @BeforeEach - void setUp() { - lenient().when(executionService.executeSystemUserScoped(any(), any())) - .thenAnswer(invocation -> ((Callable) invocation.getArgument(1)).call()); - } - @Test void streamResourceIds() throws IOException { when(queryConverter.convertForConsortia(TEST_QUERY, RESOURCE_NAME)).thenReturn(searchSource());