Skip to content

Commit

Permalink
[MSEARCH-771]. Implement Indexing of Institutions from Kafka (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
BKadirkhodjaev authored Jun 19, 2024
1 parent acb43ac commit d094df4
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 14 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Allow Unified List of Inventory Locations in a Consortium to be fetched by member tenants ([MSEARCH-660](https://folio-org.atlassian.net/browse/MSEARCH-660))
* Implement Indexing of Campuses from Kafka ([MSEARCH-770](https://issues.folio.org/browse/MSEARCH-770))
* Extend response with additional Location fields for Inventory Locations in a Consortium endpoint ([MSEARCH-775](https://folio-org.atlassian.net/browse/MSEARCH-775))
* Implement Indexing of Institutions from Kafka ([MSEARCH-771](https://issues.folio.org/browse/MSEARCH-771))

### Bug fixes
* Do not delete kafka topics if collection topic is enabled ([MSEARCH-725](https://folio-org.atlassian.net/browse/MSEARCH-725))
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
| KAFKA_CONTRIBUTORS_TOPIC_PARTITIONS | 50 | Amount of partitions for `search.instance-contributor` topic. |
| KAFKA_CONTRIBUTORS_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.instance-contributor` topic. |
| KAFKA_CONSORTIUM_INSTANCE_CONCURRENCY | 2 | Custom number of kafka concurrent threads for consortium.instance message consuming. |
| KAFKA_LOCATION_CONCURRENCY | 1 | Custom number of kafka concurrent threads for inventory.location & inventory.campus message consuming. |
| KAFKA_LOCATION_CONCURRENCY | 1 | Custom number of kafka concurrent threads for inventory.location, inventory.campus, inventory.institution message consuming. |
| KAFKA_BIBFRAME_CONCURRENCY | 1 | Custom number of kafka concurrent threads for bibframe message consuming. |
| KAFKA_CONSORTIUM_INSTANCE_TOPIC_PARTITIONS | 50 | Amount of partitions for `search.consortium.instance` topic. |
| KAFKA_CONSORTIUM_INSTANCE_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.consortium.instance` topic. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
@Component
public class ResourceEventBatchInterceptor implements BatchInterceptor<String, ResourceEvent> {

private static final Map<String, String> TOPIC_TO_RESOURCE_MAP = Map.of(
"inventory.instance", SearchUtils.INSTANCE_RESOURCE,
"inventory.holdings-record", SearchUtils.INSTANCE_RESOURCE,
"inventory.item", SearchUtils.INSTANCE_RESOURCE,
"inventory.bound-with", SearchUtils.INSTANCE_RESOURCE,
"authorities.authority", SearchUtils.AUTHORITY_RESOURCE,
"search.instance-contributor", SearchUtils.CONTRIBUTOR_RESOURCE,
"search.instance-subject", SearchUtils.INSTANCE_SUBJECT_RESOURCE,
"inventory.classification-type", SearchUtils.CLASSIFICATION_TYPE_RESOURCE,
"inventory.location", SearchUtils.LOCATION_RESOURCE,
"inventory.campus", SearchUtils.CAMPUS_RESOURCE
private static final Map<String, String> TOPIC_TO_RESOURCE_MAP = Map.ofEntries(
Map.entry("inventory.instance", SearchUtils.INSTANCE_RESOURCE),
Map.entry("inventory.holdings-record", SearchUtils.INSTANCE_RESOURCE),
Map.entry("inventory.item", SearchUtils.INSTANCE_RESOURCE),
Map.entry("inventory.bound-with", SearchUtils.INSTANCE_RESOURCE),
Map.entry("authorities.authority", SearchUtils.AUTHORITY_RESOURCE),
Map.entry("search.instance-contributor", SearchUtils.CONTRIBUTOR_RESOURCE),
Map.entry("search.instance-subject", SearchUtils.INSTANCE_SUBJECT_RESOURCE),
Map.entry("inventory.classification-type", SearchUtils.CLASSIFICATION_TYPE_RESOURCE),
Map.entry("inventory.location", SearchUtils.LOCATION_RESOURCE),
Map.entry("inventory.campus", SearchUtils.CAMPUS_RESOURCE),
Map.entry("inventory.institution", SearchUtils.INSTITUTION_RESOURCE)
);

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.folio.search.model.dto.locationunit;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;
import lombok.With;
import lombok.extern.jackson.Jacksonized;

/**
* Describes Institution object that comes from external channels.
*/
@Data
@With
@Builder
@Jacksonized
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class InstitutionDto {

@JsonProperty("id")
private String id;
@JsonProperty("name")
private String name;
@JsonProperty("code")
private String code;

}
1 change: 1 addition & 0 deletions src/main/java/org/folio/search/utils/SearchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class SearchUtils {
public static final String CLASSIFICATION_TYPE_RESOURCE = "classification-type";
public static final String BIBFRAME_RESOURCE = "bibframe";
public static final String CAMPUS_RESOURCE = "campus";
public static final String INSTITUTION_RESOURCE = "institution";

public static final String ID_FIELD = "id";
public static final String SOURCE_FIELD = "source";
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ folio:
group-id: ${folio.environment}-mod-search-classification-type-group
location:
concurrency: ${KAFKA_LOCATION_CONCURRENCY:1}
topic-pattern: (${folio.environment}\.)(.*\.)inventory\.(location|campus)
topic-pattern: (${folio.environment}\.)(.*\.)inventory\.(location|campus|institution)
group-id: ${folio.environment}-mod-search-location-type-group
bibframe:
concurrency: ${KAFKA_BIBFRAME_CONCURRENCY:1}
Expand Down
21 changes: 21 additions & 0 deletions src/main/resources/elasticsearch/index/institution.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"index": {
"number_of_shards": 4,
"number_of_replicas": 2,
"refresh_interval": "1s",
"codec": "best_compression",
"mapping.total_fields.limit": 1000
},
"analysis": {
"normalizer": {
"keyword_lowercase": {
"filter": [
"lowercase",
"icu_folding"
],
"type": "custom"
}
},
"tokenizers": {}
}
}
24 changes: 24 additions & 0 deletions src/main/resources/model/institution.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "institution",
"eventBodyJavaClass": "org.folio.search.model.dto.locationunit.InstitutionDto",
"reindexSupported": false,
"fields": {
"id": {
"index": "keyword_lowercase",
"showInResponse": [ "search" ]
},
"tenantId": {
"index": "keyword_lowercase",
"showInResponse": [ "search" ],
"isTenant": true
},
"name": {
"index": "keyword_lowercase",
"showInResponse": [ "search" ]
},
"code": {
"index": "keyword_lowercase",
"showInResponse": [ "search" ]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.folio.search.controller;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.ONE_MINUTE;
import static org.awaitility.Durations.ONE_SECOND;
import static org.folio.search.domain.dto.ResourceEventType.CREATE;
import static org.folio.search.domain.dto.ResourceEventType.DELETE;
import static org.folio.search.domain.dto.ResourceEventType.DELETE_ALL;
import static org.folio.search.domain.dto.ResourceEventType.UPDATE;
import static org.folio.search.utils.SearchUtils.INSTITUTION_RESOURCE;
import static org.folio.search.utils.SearchUtils.getIndexName;
import static org.folio.search.utils.TestConstants.CENTRAL_TENANT_ID;
import static org.folio.search.utils.TestConstants.MEMBER_TENANT_ID;
import static org.folio.search.utils.TestConstants.inventoryInstitutionTopic;
import static org.folio.search.utils.TestUtils.kafkaResourceEvent;
import static org.folio.search.utils.TestUtils.randomId;
import static org.folio.search.utils.TestUtils.toMap;
import static org.opensearch.index.query.QueryBuilders.boolQuery;
import static org.opensearch.search.builder.SearchSourceBuilder.searchSource;

import java.io.IOException;
import java.util.Objects;
import lombok.extern.log4j.Log4j2;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.model.dto.locationunit.InstitutionDto;
import org.folio.search.support.base.BaseConsortiumIntegrationTest;
import org.folio.spring.testing.type.IntegrationTest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.index.query.QueryBuilders;

@Log4j2
@IntegrationTest
class InstitutionsIndexingConsortiumIT extends BaseConsortiumIntegrationTest {

@BeforeAll
static void prepare() {
setUpTenant(CENTRAL_TENANT_ID);
setUpTenant(MEMBER_TENANT_ID);
}

@AfterAll
static void cleanUp() {
removeTenant(CENTRAL_TENANT_ID);
removeTenant(MEMBER_TENANT_ID);
}

@AfterEach
void tearDown() throws IOException {
cleanUpIndex(INSTITUTION_RESOURCE, CENTRAL_TENANT_ID);
}

@Test
void shouldIndexAndRemoveInstitution() {
var institution = institution();
var createEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(institution), null);
kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createEvent);

awaitAssertInstitutionCount(1);

var deleteEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, DELETE, null, toMap(institution));
kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), deleteEvent);

awaitAssertInstitutionCount(0);
}

@Test
void shouldIndexAndUpdateInstitution() {
var institution = institution();
var createEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(institution), null);
kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createEvent);

awaitAssertInstitutionCount(1);

var institutionUpdated = institution.withName("nameUpdated");
var updateEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, UPDATE, toMap(institutionUpdated), toMap(institution));
kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), updateEvent);

awaitAssertInstitutionCountAfterUpdate(1, institutionUpdated);
}

@Test
void shouldIndexSameInstitutionFromDifferentTenantsAsSeparateDocs() {
var institution = institution();
var createCentralEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(institution), null);
var createMemberEvent = kafkaResourceEvent(MEMBER_TENANT_ID, CREATE, toMap(institution), null);
kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createCentralEvent);
kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createMemberEvent);

awaitAssertInstitutionCount(2);
}

@Test
void shouldRemoveAllDocumentsByTenantIdOnDeleteAllEvent() {
var institution = institution();
var createCentralEvent = kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, toMap(institution), null);
var createMemberEvent = kafkaResourceEvent(MEMBER_TENANT_ID, CREATE, toMap(institution), null);
kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createCentralEvent);
kafkaTemplate.send(inventoryInstitutionTopic(CENTRAL_TENANT_ID), createMemberEvent);

awaitAssertInstitutionCount(2);

var deleteAllMemberEvent = new ResourceEvent().type(DELETE_ALL).tenant(MEMBER_TENANT_ID);
kafkaTemplate.send(inventoryInstitutionTopic(MEMBER_TENANT_ID), deleteAllMemberEvent);

awaitAssertInstitutionCount(1);
}

private static InstitutionDto institution() {
return InstitutionDto.builder().id(randomId())
.name("name")
.code("code")
.build();
}

public static void awaitAssertInstitutionCount(int expected) {
await().atMost(ONE_MINUTE).pollInterval(ONE_SECOND).untilAsserted(() -> {
var totalHits = countIndexDocument(INSTITUTION_RESOURCE, CENTRAL_TENANT_ID);

assertThat(totalHits).isEqualTo(expected);
});
}

public static void awaitAssertInstitutionCountAfterUpdate(int expected, InstitutionDto institutionUpdated) {
await().atMost(ONE_MINUTE).pollInterval(ONE_SECOND).untilAsserted(() -> {
var idQuery = QueryBuilders.matchQuery("id", institutionUpdated.getId());
var nameQuery = QueryBuilders.matchQuery("name", institutionUpdated.getName());

var searchRequest = new SearchRequest()
.source(searchSource().query(boolQuery().must(idQuery).must(nameQuery))
.trackTotalHits(true).from(0).size(1))
.indices(getIndexName(INSTITUTION_RESOURCE, CENTRAL_TENANT_ID));
var searchResponse = elasticClient.search(searchRequest, RequestOptions.DEFAULT);
var hitCount = Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value;

assertThat(hitCount).isEqualTo(expected);
});
}
}
5 changes: 5 additions & 0 deletions src/test/java/org/folio/search/utils/TestConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class TestConstants {
public static final String CONSORTIUM_INSTANCE_TOPIC = "search.consortium.instance";
public static final String BIBFRAME_TOPIC = "search.bibframe";
public static final String CAMPUS_TOPIC = "inventory.campus";
public static final String INSTITUTION_TOPIC = "inventory.institution";

public static final String LOCAL_CN_TYPE = "6fd29f52-5c9c-44d0-b529-e9c5eb3a0aba";
public static final String FOLIO_CN_TYPE = "6e4d7565-b277-4dfa-8b7d-fbf306d9d0cd";
Expand Down Expand Up @@ -129,6 +130,10 @@ public static String inventoryCampusTopic(String tenantId) {
return getTopicName(tenantId, CAMPUS_TOPIC);
}

public static String inventoryInstitutionTopic(String tenantId) {
return getTopicName(tenantId, INSTITUTION_TOPIC);
}

public static String indexName(String tenantId) {
return String.join("_", ENV, INSTANCE_RESOURCE, tenantId);
}
Expand Down
5 changes: 4 additions & 1 deletion src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ folio:
- name: inventory.campus
numPartitions: 1
replicationFactor: 1
- name: inventory.institution
numPartitions: 1
replicationFactor: 1
listener:
events:
concurrency: 2
Expand Down Expand Up @@ -143,7 +146,7 @@ folio:
group-id: ${folio.environment}-mod-search-classification-type-group
location:
concurrency: 1
topic-pattern: (${folio.environment}\.)(.*\.)inventory\.(location|campus)
topic-pattern: (${folio.environment}\.)(.*\.)inventory\.(location|campus|institution)
group-id: ${folio.environment}-mod-search-location-type-group
bibframe:
concurrency: ${KAFKA_BIBFRAME_CONCURRENCY:1}
Expand Down

0 comments on commit d094df4

Please sign in to comment.