Skip to content

Commit

Permalink
MSEARCH-794: set merge failed status on failure
Browse files Browse the repository at this point in the history
  • Loading branch information
mukhiddin-yusuf committed Aug 15, 2024
1 parent c15d439 commit cc5b5d5
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public List<UUID> fetchInventoryRecordIds(InventoryRecordType recordType, CqlQue
case HOLDING -> fetchHoldings(cqlQuery, offset, limit);
};
} catch (Exception e) {
log.warn("Failed to fetch Inventory records for {}", recordType);
log.warn("Failed to fetch Inventory records for {} : {}", recordType, e.getMessage());
throw new FolioIntegrationException("Failed to fetch inventory records for %s".formatted(recordType.name()), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections4.CollectionUtils;
import org.folio.search.configuration.properties.ReindexConfigurationProperties;
import org.folio.search.exception.FolioIntegrationException;
import org.folio.search.integration.InventoryService;
import org.folio.search.model.client.CqlQuery;
import org.folio.search.model.client.CqlQueryParam;
Expand All @@ -30,14 +31,17 @@ public class ReindexMergeRangeIndexService {

private final Map<ReindexEntityType, MergeRangeRepository> repositories;
private final InventoryService inventoryService;
private final ReindexStatusService statusService;
private final ReindexConfigurationProperties reindexConfig;

public ReindexMergeRangeIndexService(List<MergeRangeRepository> repositories,
InventoryService inventoryService,
ReindexStatusService statusService,
ReindexConfigurationProperties reindexConfig) {
this.repositories = repositories.stream()
.collect(Collectors.toMap(MergeRangeRepository::entityType, Function.identity()));
this.inventoryService = inventoryService;
this.statusService = statusService;
this.reindexConfig = reindexConfig;
}

Expand All @@ -49,13 +53,18 @@ public void deleteAllRangeRecords() {

public void createMergeRanges(String tenantId) {
var repository = repositories.get(ReindexEntityType.INSTANCE);
for (var recordType : InventoryRecordType.values()) {
var recordsCount = inventoryService.fetchInventoryRecordCount(recordType);
var rangeSize = reindexConfig.getMergeRangeSize();
var ranges = constructRecordMergeRanges(recordsCount, rangeSize, recordType, tenantId);
try {
for (var recordType : InventoryRecordType.values()) {
var recordsCount = inventoryService.fetchInventoryRecordCount(recordType);
var rangeSize = reindexConfig.getMergeRangeSize();
var ranges = constructRecordMergeRanges(recordsCount, rangeSize, recordType, tenantId);

log.info("Creating [{} {}] ranges for [tenant: {}]", ranges.size(), recordType, tenantId);
repository.saveMergeRanges(ranges);
log.info("Creating [{} {}] ranges for [tenant: {}]", ranges.size(), recordType, tenantId);
repository.saveMergeRanges(ranges);
}
} catch (FolioIntegrationException e) {
log.warn("Skip creating merge ranges for [tenant: {}]. Exception: {}", tenantId, e);
statusService.updateMergeRangesFailed();
}
}

Expand Down Expand Up @@ -97,15 +106,16 @@ private List<MergeRangeEntity> constructRecordMergeRanges(int recordsCount,

private MergeRangeEntity mergeEntity(UUID id, InventoryRecordType recordType, String tenantId, UUID lowerId,
UUID upperId, Timestamp createdAt) {
ReindexEntityType entityType;
return new MergeRangeEntity(id, asEntityType(recordType), tenantId, lowerId, upperId, createdAt);
}

private ReindexEntityType asEntityType(InventoryRecordType recordType) {
if (recordType == InventoryRecordType.INSTANCE) {
entityType = ReindexEntityType.INSTANCE;
return ReindexEntityType.INSTANCE;
} else if (recordType == InventoryRecordType.HOLDING) {
entityType = ReindexEntityType.HOLDING;
return ReindexEntityType.HOLDING;
} else {
entityType = ReindexEntityType.ITEM;
return ReindexEntityType.ITEM;
}

return new MergeRangeEntity(id, entityType, tenantId, lowerId, upperId, createdAt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private void publishRecordsRange() {
inventoryService.publishReindexRecordsRange(rangeEntity);
} catch (FolioIntegrationException e) {
log.error("Failed to publish records range entity [rangeEntity: {}]. Exception: {}", rangeEntity, e);
statusService.updateMergeRangesFailed(entityType);
statusService.updateMergeRangesFailed();
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public void updateMergeRangesStarted(ReindexEntityType entityType, int totalMerg
statusRepository.setMergeReindexStarted(entityType, totalMergeRanges);
}

public void updateMergeRangesFailed(ReindexEntityType entityType) {
statusRepository.setReindexMergeFailed(entityType);
public void updateMergeRangesFailed() {
statusRepository.setReindexMergeFailed(ReindexConstants.MERGE_RANGE_ENTITY_TYPES);
}

private List<ReindexStatusEntity> constructNewStatusRecords(ReindexStatus status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@ public class ReindexStatusRepository {
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
""";

private static final String UPDATE_STATUS_SQL = """
private static final String UPDATE_SQL = """
UPDATE %s
SET %s
WHERE entity_type = ?;
""";

private static final String UPDATE_FOR_ENTITIES_SQL = """
UPDATE %s
SET %s
WHERE entity_type in ?;
""";

private final FolioExecutionContext context;
private final JdbcTemplate jdbcTemplate;

Expand All @@ -44,24 +50,24 @@ public void truncate() {

public void setMergeReindexStarted(ReindexEntityType entityType, int totalMergeRanges) {
var fullTableName = getFullTableName(context, REINDEX_STATUS_TABLE);
var sql = UPDATE_STATUS_SQL.formatted(
var sql = UPDATE_SQL.formatted(
fullTableName, "%s = ?, %s = ?".formatted(TOTAL_MERGE_RANGES_COLUMN, START_TIME_MERGE_COLUMN));

jdbcTemplate.update(sql, totalMergeRanges, Timestamp.from(Instant.now()), entityType.name());
}

public void setReindexUploadFailed(ReindexEntityType entityType) {
var fullTableName = getFullTableName(context, REINDEX_STATUS_TABLE);
var sql = UPDATE_STATUS_SQL.formatted(fullTableName, "%s = ?".formatted(END_TIME_UPLOAD_COLUMN));
var sql = UPDATE_SQL.formatted(fullTableName, "%s = ?".formatted(END_TIME_UPLOAD_COLUMN));

jdbcTemplate.update(sql, ReindexStatus.UPLOAD_FAILED.name(), Timestamp.from(Instant.now()), entityType.name());
}

public void setReindexMergeFailed(ReindexEntityType entityType) {
public void setReindexMergeFailed(List<ReindexEntityType> entityTypes) {
var fullTableName = getFullTableName(context, REINDEX_STATUS_TABLE);
var sql = UPDATE_STATUS_SQL.formatted(fullTableName, "%s = ?".formatted(END_TIME_MERGE_COLUMN));
var sql = UPDATE_FOR_ENTITIES_SQL.formatted(fullTableName, "%s = ?".formatted(END_TIME_MERGE_COLUMN));

jdbcTemplate.update(sql, ReindexStatus.MERGE_FAILED.name(), Timestamp.from(Instant.now()), entityType.name());
jdbcTemplate.update(sql, ReindexStatus.MERGE_FAILED.name(), Timestamp.from(Instant.now()), entityTypes);
}

public void saveReindexStatusRecords(List<ReindexStatusEntity> statusRecords) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,6 @@ void initFullReindex_negative_abortMergeAndSetFailedStatusWhenPublishingRangesFa
verify(statusService)
.updateMergeRangesStarted(any(ReindexEntityType.class), eq(1));
verify(mergeRangeService).fetchMergeRanges(any(ReindexEntityType.class));
verify(statusService).updateMergeRangesFailed(any(ReindexEntityType.class));
verify(statusService).updateMergeRangesFailed();
}
}

0 comments on commit cc5b5d5

Please sign in to comment.