Skip to content

Commit

Permalink
feat(browse-classification): prepare and populate database for classi…
Browse files Browse the repository at this point in the history
…fication browse

Closes: MSEARCH-667
Signed-off-by: psmagin <[email protected]>
  • Loading branch information
psmagin committed Feb 9, 2024
1 parent af3ac0a commit f3a56cb
Show file tree
Hide file tree
Showing 180 changed files with 1,163 additions and 212 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Update LccnProcessor to populate lccn field with only "LCCN" ([MSEARCH-630](https://issues.folio.org/browse/MSEARCH-630))
* Make maximum offset for additional elasticsearch request on browse configurable ([MSEARCH-641](https://issues.folio.org/browse/MSEARCH-641))
* Make system user usage optional ([MSEARCH-631](https://issues.folio.org/browse/MSEARCH-631))
* Prepare and populate database for classification browse ([MSEARCH-667](https://issues.folio.org/browse/MSEARCH-667))


### Bug fixes
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
| SEARCH_BY_ALL_FIELDS_ENABLED | false | Specifies if globally search by all field values must be enabled or not (tenant can override this setting) |
| BROWSE_CN_INTERMEDIATE_VALUES_ENABLED | true | Specifies if globally intermediate values (nested instance items) must be populated or not (tenant can override this setting) |
| BROWSE_CN_INTERMEDIATE_REMOVE_DUPLICATES | true | Specifies if globally intermediate duplicate values (fullCallNumber) should be removed or not (Active only with BROWSE_CN_INTERMEDIATE_VALUES_ENABLED) |
| BROWSE_CLASSIFICATIONS_ENABLED | false | Specifies if globally instance classification indexing will be performed |
| SCROLL_QUERY_SIZE | 1000 | The number of records to be loaded by each scroll query. 10_000 is a max value |
| STREAM_ID_RETRY_INTERVAL_MS | 1000 | Specifies time to wait before reattempting query. |
| STREAM_ID_RETRY_ATTEMPTS | 3 | Specifies how many queries attempt to perform after the first one failed. |
Expand Down
34 changes: 2 additions & 32 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
<streamex.version>0.8.2</streamex.version>

<!-- Test dependencies versions -->
<testcontainers.version>1.19.4</testcontainers.version>
<wiremock.version>2.27.2</wiremock.version>
<awaitility.version>4.2.0</awaitility.version>

Expand Down Expand Up @@ -256,34 +255,6 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand All @@ -300,9 +271,8 @@

<dependency>
<groupId>org.folio</groupId>
<artifactId>folio-service-tools-spring-test</artifactId>
<version>${folio-service-tools.version}</version>
<scope>test</scope>
<artifactId>folio-spring-testing</artifactId>
<version>${folio-spring-support.version}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.folio.search.repository.classification;

import java.util.Objects;
import lombok.Builder;

public record InstanceClassificationEntity(
Id id,
boolean shared
) {

public InstanceClassificationEntity {
Objects.requireNonNull(id);
}

public String type() {
return id().type();
}

public String number() {
return id().number();
}

public String instanceId() {
return id().instanceId();
}

public String tenantId() {
return id().tenantId();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InstanceClassificationEntity that = (InstanceClassificationEntity) o;
return Objects.equals(id, that.id);
}

@Override
public int hashCode() {
return Objects.hash(id);
}

@Builder
public record Id(String type,
String number,
String instanceId,
String tenantId) {
public Id {
Objects.requireNonNull(number);
Objects.requireNonNull(instanceId);
Objects.requireNonNull(tenantId);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.folio.search.repository.classification;

import java.util.List;
import org.folio.search.model.index.InstanceSubResource;

public record InstanceClassificationEntityAgg(
String type,
String number,
List<InstanceSubResource> instances
) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package org.folio.search.repository.classification;

import static org.folio.search.utils.JdbcUtils.getQuestionMarkPlaceholder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.PreparedStatement;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.search.model.index.InstanceSubResource;
import org.folio.search.utils.JdbcUtils;
import org.folio.spring.FolioExecutionContext;
import org.jetbrains.annotations.NotNull;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Repository;

@Log4j2
@Repository
@RequiredArgsConstructor
public class InstanceClassificationJdbcRepository implements InstanceClassificationRepository {

private static final String INSTANCE_CLASSIFICATION_TABLE_NAME = "instance_classification";
private static final String CLASSIFICATION_TYPE_COLUMN = "classification_type";
private static final String CLASSIFICATION_NUMBER_COLUMN = "classification_number";
private static final String TENANT_ID_COLUMN = "tenant_id";
private static final String INSTANCE_ID_COLUMN = "instance_id";
private static final String SHARED_COLUMN = "shared";
private static final String CLASSIFICATION_TYPE_DEFAULT = "<null>";

private static final String SELECT_ALL_SQL = "SELECT * FROM %s;";
private static final String SELECT_ALL_BY_INSTANCE_ID_AGG = """
SELECT
t1.classification_number,
t1.classification_type,
json_agg(json_build_object(
'instanceId', t1.instance_id,
'shared', t1.shared,
'tenantId', t1.tenant_id
)) AS instances
FROM %1$s t1
INNER JOIN %1$s t2 ON t1.classification_number = t2.classification_number
AND t1.classification_type = t2.classification_type
AND t2.instance_id IN (%2$s)
GROUP BY t1.classification_number, t1.classification_type;
""";
private static final String INSERT_SQL = """
INSERT INTO %s (classification_type, classification_number, tenant_id, instance_id, shared)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (classification_type, classification_number, tenant_id, instance_id)
DO UPDATE SET shared = EXCLUDED.shared;
""";
private static final String DELETE_SQL = """
DELETE FROM %s
WHERE classification_type = ? AND classification_number = ? AND tenant_id = ? AND instance_id = ?;
""";
private static final int BATCH_SIZE = 100;
private static final TypeReference<List<InstanceSubResource>> VALUE_TYPE_REF = new TypeReference<>() { };

private final FolioExecutionContext context;
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;

public void saveAll(List<InstanceClassificationEntity> classifications) {
log.debug("saveAll::instance classifications [entities: {}]", classifications);

if (classifications == null || classifications.isEmpty()) {
return;
}

var uniqueEntities = classifications.stream().distinct().toList();

jdbcTemplate.batchUpdate(
INSERT_SQL.formatted(getTableName()),
uniqueEntities,
BATCH_SIZE,
(PreparedStatement ps, InstanceClassificationEntity item) -> {
var id = item.id();
ps.setString(1, itemTypeToDatabaseValue(id));
ps.setString(2, id.number());
ps.setString(3, id.tenantId());
ps.setString(4, id.instanceId());
ps.setBoolean(5, item.shared());
});
}

@Override
public void deleteAll(List<InstanceClassificationEntity> classifications) {
log.debug("deleteAll::instance classifications [entities: {}]", classifications);

if (classifications == null || classifications.isEmpty()) {
return;
}

jdbcTemplate.batchUpdate(
DELETE_SQL.formatted(getTableName()),
classifications,
BATCH_SIZE,
(PreparedStatement ps, InstanceClassificationEntity item) -> {
var id = item.id();
ps.setString(1, itemTypeToDatabaseValue(id));
ps.setString(2, id.number());
ps.setString(3, id.tenantId());
ps.setString(4, id.instanceId());
});

}

@Override
public List<InstanceClassificationEntity> findAll() {
log.debug("findAll::instance classifications");
return jdbcTemplate.query(SELECT_ALL_SQL.formatted(getTableName()), instanceClassificationRowMapper());
}

@Override
public List<InstanceClassificationEntityAgg> findAllByInstanceIds(List<String> instanceIds) {
log.debug("findAllByInstanceIds::instance classifications [instanceIds: {}]", instanceIds);
return jdbcTemplate.query(
SELECT_ALL_BY_INSTANCE_ID_AGG.formatted(getTableName(), getQuestionMarkPlaceholder(instanceIds.size())),
instanceClassificationAggRowMapper(),
instanceIds.toArray());
}

@NotNull
private RowMapper<InstanceClassificationEntity> instanceClassificationRowMapper() {
return (rs, rowNum) -> {
var builder = InstanceClassificationEntity.Id.builder();
var typeVal = rs.getString(CLASSIFICATION_TYPE_COLUMN);
builder.type(CLASSIFICATION_TYPE_DEFAULT.equals(typeVal) ? null : typeVal);
builder.number(rs.getString(CLASSIFICATION_NUMBER_COLUMN));
builder.instanceId(rs.getString(INSTANCE_ID_COLUMN));
builder.tenantId(rs.getString(TENANT_ID_COLUMN));
var shared = rs.getBoolean(SHARED_COLUMN);
return new InstanceClassificationEntity(builder.build(), shared);
};
}

@NotNull
private RowMapper<InstanceClassificationEntityAgg> instanceClassificationAggRowMapper() {
return (rs, rowNum) -> {
var typeVal = rs.getString(CLASSIFICATION_TYPE_COLUMN);
var type = CLASSIFICATION_TYPE_DEFAULT.equals(typeVal) ? null : typeVal;
var number = rs.getString(CLASSIFICATION_NUMBER_COLUMN);
var instancesJson = rs.getString("instances");
List<InstanceSubResource> instanceSubResources;
try {
instanceSubResources = objectMapper.readValue(instancesJson, VALUE_TYPE_REF);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
}
return new InstanceClassificationEntityAgg(type, number, instanceSubResources);
};
}

private String getTableName() {
return JdbcUtils.getFullTableName(context, INSTANCE_CLASSIFICATION_TABLE_NAME);
}

private String itemTypeToDatabaseValue(InstanceClassificationEntity.Id id) {
return id.type() == null ? CLASSIFICATION_TYPE_DEFAULT : id.type();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.folio.search.repository.classification;

import java.util.List;

public interface InstanceClassificationRepository {

void saveAll(List<InstanceClassificationEntity> classifications);

void deleteAll(List<InstanceClassificationEntity> classifications);

List<InstanceClassificationEntity> findAll();

List<InstanceClassificationEntityAgg> findAllByInstanceIds(List<String> instanceId);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.search.service.consortium;

import static java.util.Collections.nCopies;
import static org.folio.search.utils.JdbcUtils.getFullTableName;
import static org.folio.search.utils.JdbcUtils.getQuestionMarkPlaceholder;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -21,7 +22,7 @@
@RequiredArgsConstructor
public class ConsortiumInstanceRepository {

private static final String CONSORTIUM_INSTANCE_TABLE_NAME = "consortium_instance";
static final String CONSORTIUM_INSTANCE_TABLE_NAME = "consortium_instance";
private static final String SELECT_BY_ID_SQL = "SELECT * FROM %s WHERE instance_id IN (%s)";
private static final String DELETE_BY_TENANT_AND_ID_SQL = "DELETE FROM %s WHERE tenant_id = ? AND instance_id = ?;";
private static final String UPSERT_SQL = """
Expand All @@ -40,15 +41,15 @@ ON CONFLICT (tenant_id, instance_id)
public List<ConsortiumInstance> fetch(List<String> instanceIds) {
log.debug("fetch::consortium instances by [ids: {}]", instanceIds);
return jdbcTemplate.query(
SELECT_BY_ID_SQL.formatted(getFullTableName(), String.join(",", nCopies(instanceIds.size(), "?"))),
SELECT_BY_ID_SQL.formatted(getTableName(), getQuestionMarkPlaceholder(instanceIds.size())),
(rs, rowNum) -> toConsortiumInstance(rs),
instanceIds.toArray());
}

public void save(List<ConsortiumInstance> instances) {
log.debug("save::consortium instances [number: {}]", instances.size());
jdbcTemplate.batchUpdate(
UPSERT_SQL.formatted(getFullTableName()),
UPSERT_SQL.formatted(getTableName()),
instances,
100,
(PreparedStatement ps, ConsortiumInstance item) -> {
Expand All @@ -63,7 +64,7 @@ public void save(List<ConsortiumInstance> instances) {
public void delete(Set<ConsortiumInstanceId> instanceIds) {
log.debug("delete::consortium instances [tenant-instanceIds: {}]", instanceIds);
jdbcTemplate.batchUpdate(
DELETE_BY_TENANT_AND_ID_SQL.formatted(getFullTableName()),
DELETE_BY_TENANT_AND_ID_SQL.formatted(getTableName()),
instanceIds,
100,
(PreparedStatement ps, ConsortiumInstanceId id) -> {
Expand All @@ -78,8 +79,7 @@ private ConsortiumInstance toConsortiumInstance(ResultSet rs) throws SQLExceptio
return new ConsortiumInstance(id, rs.getString(JSON_COLUMN));
}

private String getFullTableName() {
var dbSchemaName = context.getFolioModuleMetadata().getDBSchemaName(context.getTenantId());
return dbSchemaName + "." + CONSORTIUM_INSTANCE_TABLE_NAME;
private String getTableName() {
return getFullTableName(context, CONSORTIUM_INSTANCE_TABLE_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private Stream<ResourceEvent> populateResourceEvents(ResourceEvent event) {
.map(ResourceDescription::getIndexingConfiguration)
.map(ResourceIndexingConfiguration::getEventPreProcessor)
.map(eventPreProcessorBeans::get)
.map(eventPreProcessor -> eventPreProcessor.process(event))
.map(eventPreProcessor -> eventPreProcessor.preProcess(event))
.map(Collection::stream)
.orElseGet(() -> Stream.of(event));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void init() {
* @return list with divided authority event objects
*/
@Override
public List<ResourceEvent> process(ResourceEvent event) {
public List<ResourceEvent> preProcess(ResourceEvent event) {
log.debug("process:: by [id: {}, tenant: {}, resourceType: {}]",
event.getId(), event.getTenant(), event.getType());

Expand Down
Loading

0 comments on commit f3a56cb

Please sign in to comment.