From ee50998d86a003dc1214bfcdb091cf90bf0dfaf6 Mon Sep 17 00:00:00 2001 From: BKadirkhodjaev Date: Wed, 19 Jun 2024 17:31:08 +0500 Subject: [PATCH] [MSEARCH-771]. Add models, event processing, indexing, tests, README.md, NEWS.md --- NEWS.md | 1 + README.md | 2 +- .../ResourceEventBatchInterceptor.java | 23 +-- .../dto/locationunit/InstitutionDto.java | 27 ++++ .../org/folio/search/utils/SearchUtils.java | 1 + src/main/resources/application.yml | 2 +- .../elasticsearch/index/institution.json | 21 +++ src/main/resources/model/institution.json | 24 +++ .../InstitutionsIndexingConsortiumIT.java | 144 ++++++++++++++++++ .../org/folio/search/utils/TestConstants.java | 5 + src/test/resources/application.yml | 5 +- 11 files changed, 241 insertions(+), 14 deletions(-) create mode 100644 src/main/java/org/folio/search/model/dto/locationunit/InstitutionDto.java create mode 100644 src/main/resources/elasticsearch/index/institution.json create mode 100644 src/main/resources/model/institution.json create mode 100644 src/test/java/org/folio/search/controller/InstitutionsIndexingConsortiumIT.java diff --git a/NEWS.md b/NEWS.md index 19bb87f46..bd220d361 100644 --- a/NEWS.md +++ b/NEWS.md @@ -22,6 +22,7 @@ * Allow Unified List of Inventory Locations in a Consortium to be fetched by member tenants ([MSEARCH-660](https://folio-org.atlassian.net/browse/MSEARCH-660)) * Implement Indexing of Campuses from Kafka ([MSEARCH-770](https://issues.folio.org/browse/MSEARCH-770)) * Extend response with additional Location fields for Inventory Locations in a Consortium endpoint ([MSEARCH-775](https://folio-org.atlassian.net/browse/MSEARCH-775)) +* Implement Indexing of Institutions from Kafka ([MSEARCH-771](https://issues.folio.org/browse/MSEARCH-771)) ### Bug fixes * Do not delete kafka topics if collection topic is enabled ([MSEARCH-725](https://folio-org.atlassian.net/browse/MSEARCH-725)) diff --git a/README.md b/README.md index 8cc084a80..346a80dcd 100644 --- a/README.md +++ b/README.md @@ -243,7 +243,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l | KAFKA_CONTRIBUTORS_TOPIC_PARTITIONS | 50 | Amount of partitions for `search.instance-contributor` topic. | | KAFKA_CONTRIBUTORS_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.instance-contributor` topic. | | KAFKA_CONSORTIUM_INSTANCE_CONCURRENCY | 2 | Custom number of kafka concurrent threads for consortium.instance message consuming. | -| KAFKA_LOCATION_CONCURRENCY | 1 | Custom number of kafka concurrent threads for inventory.location & inventory.campus message consuming. | +| KAFKA_LOCATION_CONCURRENCY | 1 | Custom number of kafka concurrent threads for inventory.location, inventory.campus, inventory.institution message consuming. | | KAFKA_BIBFRAME_CONCURRENCY | 1 | Custom number of kafka concurrent threads for bibframe message consuming. | | KAFKA_CONSORTIUM_INSTANCE_TOPIC_PARTITIONS | 50 | Amount of partitions for `search.consortium.instance` topic. | | KAFKA_CONSORTIUM_INSTANCE_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.consortium.instance` topic. | diff --git a/src/main/java/org/folio/search/integration/interceptor/ResourceEventBatchInterceptor.java b/src/main/java/org/folio/search/integration/interceptor/ResourceEventBatchInterceptor.java index db4a93215..770fd696f 100644 --- a/src/main/java/org/folio/search/integration/interceptor/ResourceEventBatchInterceptor.java +++ b/src/main/java/org/folio/search/integration/interceptor/ResourceEventBatchInterceptor.java @@ -13,17 +13,18 @@ @Component public class ResourceEventBatchInterceptor implements BatchInterceptor { - private static final Map TOPIC_TO_RESOURCE_MAP = Map.of( - "inventory.instance", SearchUtils.INSTANCE_RESOURCE, - "inventory.holdings-record", SearchUtils.INSTANCE_RESOURCE, - "inventory.item", SearchUtils.INSTANCE_RESOURCE, - "inventory.bound-with", SearchUtils.INSTANCE_RESOURCE, - "authorities.authority", SearchUtils.AUTHORITY_RESOURCE, - "search.instance-contributor", SearchUtils.CONTRIBUTOR_RESOURCE, - "search.instance-subject", SearchUtils.INSTANCE_SUBJECT_RESOURCE, - "inventory.classification-type", SearchUtils.CLASSIFICATION_TYPE_RESOURCE, - "inventory.location", SearchUtils.LOCATION_RESOURCE, - "inventory.campus", SearchUtils.CAMPUS_RESOURCE + private static final Map TOPIC_TO_RESOURCE_MAP = Map.ofEntries( + Map.entry("inventory.instance", SearchUtils.INSTANCE_RESOURCE), + Map.entry("inventory.holdings-record", SearchUtils.INSTANCE_RESOURCE), + Map.entry("inventory.item", SearchUtils.INSTANCE_RESOURCE), + Map.entry("inventory.bound-with", SearchUtils.INSTANCE_RESOURCE), + Map.entry("authorities.authority", SearchUtils.AUTHORITY_RESOURCE), + Map.entry("search.instance-contributor", SearchUtils.CONTRIBUTOR_RESOURCE), + Map.entry("search.instance-subject", SearchUtils.INSTANCE_SUBJECT_RESOURCE), + Map.entry("inventory.classification-type", SearchUtils.CLASSIFICATION_TYPE_RESOURCE), + Map.entry("inventory.location", SearchUtils.LOCATION_RESOURCE), + Map.entry("inventory.campus", SearchUtils.CAMPUS_RESOURCE), + Map.entry("inventory.institution", SearchUtils.INSTITUTION_RESOURCE) ); @Override diff --git a/src/main/java/org/folio/search/model/dto/locationunit/InstitutionDto.java b/src/main/java/org/folio/search/model/dto/locationunit/InstitutionDto.java new file mode 100644 index 000000000..272310d3e --- /dev/null +++ b/src/main/java/org/folio/search/model/dto/locationunit/InstitutionDto.java @@ -0,0 +1,27 @@ +package org.folio.search.model.dto.locationunit; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Builder; +import lombok.Data; +import lombok.With; +import lombok.extern.jackson.Jacksonized; + +/** + * Describes Institution object that comes from external channels. + */ +@Data +@With +@Builder +@Jacksonized +@JsonInclude(JsonInclude.Include.NON_EMPTY) +public class InstitutionDto { + + @JsonProperty("id") + private String id; + @JsonProperty("name") + private String name; + @JsonProperty("code") + private String code; + +} diff --git a/src/main/java/org/folio/search/utils/SearchUtils.java b/src/main/java/org/folio/search/utils/SearchUtils.java index 2f44b7822..377f34a4a 100644 --- a/src/main/java/org/folio/search/utils/SearchUtils.java +++ b/src/main/java/org/folio/search/utils/SearchUtils.java @@ -40,6 +40,7 @@ public class SearchUtils { public static final String CLASSIFICATION_TYPE_RESOURCE = "classification-type"; public static final String BIBFRAME_RESOURCE = "bibframe"; public static final String CAMPUS_RESOURCE = "campus"; + public static final String INSTITUTION_RESOURCE = "institution"; public static final String ID_FIELD = "id"; public static final String SOURCE_FIELD = "source"; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e79d2c6df..20a078194 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -133,7 +133,7 @@ folio: group-id: ${folio.environment}-mod-search-classification-type-group location: concurrency: ${KAFKA_LOCATION_CONCURRENCY:1} - topic-pattern: (${folio.environment}\.)(.*\.)inventory\.(location|campus) + topic-pattern: (${folio.environment}\.)(.*\.)inventory\.(location|campus|institution) group-id: ${folio.environment}-mod-search-location-type-group bibframe: concurrency: ${KAFKA_BIBFRAME_CONCURRENCY:1} diff --git a/src/main/resources/elasticsearch/index/institution.json b/src/main/resources/elasticsearch/index/institution.json new file mode 100644 index 000000000..4c1e18baf --- /dev/null +++ b/src/main/resources/elasticsearch/index/institution.json @@ -0,0 +1,21 @@ +{ + "index": { + "number_of_shards": 4, + "number_of_replicas": 2, + "refresh_interval": "1s", + "codec": "best_compression", + "mapping.total_fields.limit": 1000 + }, + "analysis": { + "normalizer": { + "keyword_lowercase": { + "filter": [ + "lowercase", + "icu_folding" + ], + "type": "custom" + } + }, + "tokenizers": {} + } +} diff --git a/src/main/resources/model/institution.json b/src/main/resources/model/institution.json new file mode 100644 index 000000000..148b6d756 --- /dev/null +++ b/src/main/resources/model/institution.json @@ -0,0 +1,24 @@ +{ + "name": "institution", + "eventBodyJavaClass": "org.folio.search.model.dto.locationunit.InstitutionDto", + "reindexSupported": false, + "fields": { + "id": { + "index": "keyword_lowercase", + "showInResponse": [ "search" ] + }, + "tenantId": { + "index": "keyword_lowercase", + "showInResponse": [ "search" ], + "isTenant": true + }, + "name": { + "index": "keyword_lowercase", + "showInResponse": [ "search" ] + }, + "code": { + "index": "keyword_lowercase", + "showInResponse": [ "search" ] + } + } +} diff --git a/src/test/java/org/folio/search/controller/InstitutionsIndexingConsortiumIT.java b/src/test/java/org/folio/search/controller/InstitutionsIndexingConsortiumIT.java new file mode 100644 index 000000000..0924a9946 --- /dev/null +++ b/src/test/java/org/folio/search/controller/InstitutionsIndexingConsortiumIT.java @@ -0,0 +1,144 @@ +package org.folio.search.controller; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.awaitility.Durations.ONE_MINUTE; +import static org.awaitility.Durations.ONE_SECOND; +import static org.folio.search.domain.dto.ResourceEventType.CREATE; +import static org.folio.search.domain.dto.ResourceEventType.DELETE; +import static org.folio.search.domain.dto.ResourceEventType.DELETE_ALL; +import static org.folio.search.domain.dto.ResourceEventType.UPDATE; +import static org.folio.search.utils.SearchUtils.INSTITUTION_RESOURCE; +import static org.folio.search.utils.SearchUtils.getIndexName; +import static org.folio.search.utils.TestConstants.CENTRAL_TENANT_ID; +import static org.folio.search.utils.TestConstants.MEMBER_TENANT_ID; +import static org.folio.search.utils.TestConstants.inventoryInstitutionTopic; +import static org.folio.search.utils.TestUtils.kafkaResourceEvent; +import static org.folio.search.utils.TestUtils.randomId; +import static org.folio.search.utils.TestUtils.toMap; +import static org.opensearch.index.query.QueryBuilders.boolQuery; +import static org.opensearch.search.builder.SearchSourceBuilder.searchSource; + +import java.io.IOException; +import java.util.Objects; +import lombok.extern.log4j.Log4j2; +import org.folio.search.domain.dto.ResourceEvent; +import org.folio.search.model.dto.locationunit.InstitutionDto; +import org.folio.search.support.base.BaseConsortiumIntegrationTest; +import org.folio.spring.testing.type.IntegrationTest; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.client.RequestOptions; +import org.opensearch.index.query.QueryBuilders; + +@Log4j2 +@IntegrationTest +class InstitutionsIndexingConsortiumIT extends BaseConsortiumIntegrationTest { + + @BeforeAll + static void prepare() { + setUpTenant(CENTRAL_TENANT_ID); + setUpTenant(MEMBER_TENANT_ID); + } + + @AfterAll + static void cleanUp() { + removeTenant(CENTRAL_TENANT_ID); + removeTenant(MEMBER_TENANT_ID); + } + + @AfterEach + void tearDown() throws IOException { + cleanUpIndex(INSTITUTION_RESOURCE, CENTRAL_TENANT_ID); + } + + @Test + void shouldIndexAndRemoveInstitution() { + var institution = institution(); + var createEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(institution), null); + kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createEvent); + + awaitAssertInstitutionCount(1); + + var deleteEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, DELETE, null, toMap(institution)); + kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), deleteEvent); + + awaitAssertInstitutionCount(0); + } + + @Test + void shouldIndexAndUpdateInstitution() { + var institution = institution(); + var createEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(institution), null); + kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createEvent); + + awaitAssertInstitutionCount(1); + + var institutionUpdated = institution.withName("nameUpdated"); + var updateEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, UPDATE, toMap(institutionUpdated), toMap(institution)); + kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), updateEvent); + + awaitAssertInstitutionCountAfterUpdate(1, institutionUpdated); + } + + @Test + void shouldIndexSameInstitutionFromDifferentTenantsAsSeparateDocs() { + var institution = institution(); + var createCentralEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(institution), null); + var createMemberEvent = kafkaResourceEvent(MEMBER_TENANT_ID, CREATE, toMap(institution), null); + kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createCentralEvent); + kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createMemberEvent); + + awaitAssertInstitutionCount(2); + } + + @Test + void shouldRemoveAllDocumentsByTenantIdOnDeleteAllEvent() { + var institution = institution(); + var createCentralEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(institution), null); + var createMemberEvent = kafkaResourceEvent(MEMBER_TENANT_ID, CREATE, toMap(institution), null); + kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createCentralEvent); + kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createMemberEvent); + + awaitAssertInstitutionCount(2); + + var deleteAllMemberEvent = new ResourceEvent().type(DELETE_ALL).tenant(MEMBER_TENANT_ID); + kafkaTemplate.send(inventoryInstitutionTopic(MEMBER_TENANT_ID), deleteAllMemberEvent); + + awaitAssertInstitutionCount(1); + } + + private static InstitutionDto institution() { + return InstitutionDto.builder().id(randomId()) + .name("name") + .code("code") + .build(); + } + + public static void awaitAssertInstitutionCount(int expected) { + await().atMost(ONE_MINUTE).pollInterval(ONE_SECOND).untilAsserted(() -> { + var totalHits = countIndexDocument(INSTITUTION_RESOURCE, CENTRAL_TENANT_ID); + + assertThat(totalHits).isEqualTo(expected); + }); + } + + public static void awaitAssertInstitutionCountAfterUpdate(int expected, InstitutionDto institutionUpdated) { + await().atMost(ONE_MINUTE).pollInterval(ONE_SECOND).untilAsserted(() -> { + var idQuery = QueryBuilders.matchQuery("id", institutionUpdated.getId()); + var nameQuery = QueryBuilders.matchQuery("name", institutionUpdated.getName()); + + var searchRequest = new SearchRequest() + .source(searchSource().query(boolQuery().must(idQuery).must(nameQuery)) + .trackTotalHits(true).from(0).size(1)) + .indices(getIndexName(INSTITUTION_RESOURCE, CENTRAL_TENANT_ID)); + var searchResponse = elasticClient.search(searchRequest, RequestOptions.DEFAULT); + var hitCount = Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value; + + assertThat(hitCount).isEqualTo(expected); + }); + } +} diff --git a/src/test/java/org/folio/search/utils/TestConstants.java b/src/test/java/org/folio/search/utils/TestConstants.java index dc608ee28..af37169f0 100644 --- a/src/test/java/org/folio/search/utils/TestConstants.java +++ b/src/test/java/org/folio/search/utils/TestConstants.java @@ -39,6 +39,7 @@ public class TestConstants { public static final String CONSORTIUM_INSTANCE_TOPIC = "search.consortium.instance"; public static final String BIBFRAME_TOPIC = "search.bibframe"; public static final String CAMPUS_TOPIC = "inventory.campus"; + public static final String INSTITUTION_TOPIC = "inventory.institution"; public static final String LOCAL_CN_TYPE = "6fd29f52-5c9c-44d0-b529-e9c5eb3a0aba"; public static final String FOLIO_CN_TYPE = "6e4d7565-b277-4dfa-8b7d-fbf306d9d0cd"; @@ -129,6 +130,10 @@ public static String inventoryCampusTopic(String tenantId) { return getTopicName(tenantId, CAMPUS_TOPIC); } + public static String inventoryInstitutionTopic(String tenantId) { + return getTopicName(tenantId, INSTITUTION_TOPIC); + } + public static String indexName(String tenantId) { return String.join("_", ENV, INSTANCE_RESOURCE, tenantId); } diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index e04acca15..cd2c3ca25 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -116,6 +116,9 @@ folio: - name: inventory.campus numPartitions: 1 replicationFactor: 1 + - name: inventory.institution + numPartitions: 1 + replicationFactor: 1 listener: events: concurrency: 2 @@ -143,7 +146,7 @@ folio: group-id: ${folio.environment}-mod-search-classification-type-group location: concurrency: 1 - topic-pattern: (${folio.environment}\.)(.*\.)inventory\.(location|campus) + topic-pattern: (${folio.environment}\.)(.*\.)inventory\.(location|campus|institution) group-id: ${folio.environment}-mod-search-location-type-group bibframe: concurrency: ${KAFKA_BIBFRAME_CONCURRENCY:1}