Skip to content

Commit

Permalink
Merge branch 'master' into ecs-tlr-feature
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-barannyk authored Nov 8, 2024
2 parents 8deee0d + 0dbe3be commit 676d236
Show file tree
Hide file tree
Showing 22 changed files with 248 additions and 108 deletions.
27 changes: 26 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,29 @@
## v28.0.0 In progress
## v28.1.0 YYYY-mm-DD
### Breaking changes
* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE))

### New APIs versions
* Provides `API_NAME vX.Y`
* Requires `API_NAME vX.Y`

### Features
* Modify endpoint for bulk instances upsert with publish events flag ([MODINVSTOR-1283](https://folio-org.atlassian.net/browse/MODINVSTOR-1283))
* Change Kafka event publishing keys for holdings and items ([MODINVSTOR-1281](https://folio-org.atlassian.net/browse/MODINVSTOR-1281))

### Bug fixes
* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE))

### Tech Dept
* Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE))

### Dependencies
* Bump `LIB_NAME` from `OLD_VERSION` to `NEW_VERSION`
* Add `LIB_NAME VERSION`
* Remove `LIB_NAME`

---

## v28.0.0 2024-11-01
### Breaking changes
* Migrate "publicationPeriod" data to the Dates object and remove it from the Instance schema ([MODINVSTOR-1232](https://folio-org.atlassian.net/browse/MODINVSTOR-1232))
* Delete deprecated `shelf-locations` API ([MODINVSTOR-1183](https://folio-org.atlassian.net/browse/MODINVSTOR-1183))
Expand Down
5 changes: 5 additions & 0 deletions ramls/bulkUpsertRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
"recordsFileName": {
"description": "File name of entities records",
"type": "string"
},
"publishEvents": {
"description": "A flag that indicates whether domain events should be published.",
"type": "boolean",
"default": true
}
},
"required": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void postInstanceStorageBatchSynchronous(boolean upsert, InstancesPost en
var instances = ObjectConverterUtils.convertObject(entity, Instances.class);

new InstanceService(vertxContext, okapiHeaders)
.createInstances(instances.getInstances(), upsert, true)
.createInstances(instances.getInstances(), upsert, true, true)
.otherwise(cause -> respond500WithTextPlain(cause.getMessage()))
.onComplete(asyncResultHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void postInstanceStorageBatchSynchronousUnsafe(InstancesPost entity, Map<
var instances = ObjectConverterUtils.convertObject(entity, Instances.class);

new InstanceService(vertxContext, okapiHeaders)
.createInstances(instances.getInstances(), true, false)
.createInstances(instances.getInstances(), true, false, true)
.otherwise(cause -> respond500WithTextPlain(cause.getMessage()))
.onComplete(asyncResultHandler);
}
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/folio/services/BulkProcessingContext.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.services;

import org.apache.commons.lang3.StringUtils;
import org.folio.rest.jaxrs.model.BulkUpsertRequest;

/**
* Encapsulates the context for bulk processing of entities from external file.
Expand All @@ -17,13 +18,15 @@ public class BulkProcessingContext {
private final String errorsFilePath;
private final String errorEntitiesFileLocalPath;
private final String errorsFileLocalPath;
private final boolean publishEvents;

public BulkProcessingContext(String entitiesFilePath) {
this.initialFilePath = StringUtils.removeStart(entitiesFilePath, '/');
public BulkProcessingContext(BulkUpsertRequest request) {
this.initialFilePath = StringUtils.removeStart(request.getRecordsFileName(), '/');
this.errorEntitiesFilePath = initialFilePath + FAILED_ENTITIES_FILE_SUFFIX;
this.errorsFilePath = initialFilePath + ERRORS_FILE_SUFFIX;
this.errorEntitiesFileLocalPath = ROOT_FOLDER + errorEntitiesFilePath;
this.errorsFileLocalPath = ROOT_FOLDER + errorsFilePath;
this.publishEvents = request.getPublishEvents();
}

public String getErrorEntitiesFilePath() {
Expand All @@ -42,4 +45,7 @@ public String getErrorsFileLocalPath() {
return errorsFileLocalPath;
}

public boolean isPublishEvents() {
return publishEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ public final class BatchOperationContext<T> {
*/
private final Collection<T> existingRecords;

public BatchOperationContext(Collection<T> recordsToBeCreated, Collection<T> existingRecords) {
private final boolean publishEvents;

public BatchOperationContext(Collection<T> recordsToBeCreated, Collection<T> existingRecords, boolean publishEvents) {
this.recordsToBeCreated = unmodifiableCollection(recordsToBeCreated);
this.existingRecords = unmodifiableCollection(existingRecords);
this.publishEvents = publishEvents;
}

public Collection<T> getRecordsToBeCreated() {
Expand All @@ -23,4 +26,8 @@ public Collection<T> getRecordsToBeCreated() {
public Collection<T> getExistingRecords() {
return existingRecords;
}

public boolean isPublishEvents() {
return publishEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
public final class BatchOperationContextFactory {
private BatchOperationContextFactory() { }

public static <T> Future<BatchOperationContext<T>> buildBatchOperationContext(
boolean upsert, List<T> all, AbstractRepository<T> repository, Function<T, String> idGetter) {
public static <T> Future<BatchOperationContext<T>> buildBatchOperationContext(boolean upsert, List<T> all,
AbstractRepository<T> repository,
Function<T, String> idGetter,
boolean publishEvents) {

if (!upsert) {
return succeededFuture(new BatchOperationContext<>(all, emptyList()));
return succeededFuture(new BatchOperationContext<>(all, emptyList(), publishEvents));
}

return repository.getById(all, idGetter).map(found -> {
final var toBeCreated = all.stream()
.filter(entity -> !found.containsKey(idGetter.apply(entity)))
.collect(toList());

return new BatchOperationContext<>(toBeCreated, found.values());
return new BatchOperationContext<>(toBeCreated, found.values(), publishEvents);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ private Future<List<T>> loadEntities(BulkUpsertRequest bulkRequest) {
}

private Future<BulkUpsertResponse> upsertEntities(List<T> entities, BulkUpsertRequest bulkRequest) {
BulkProcessingContext bulkProcessingContext = new BulkProcessingContext(bulkRequest.getRecordsFileName());
BulkProcessingContext bulkProcessingContext = new BulkProcessingContext(bulkRequest);

return upsert(entities)
return upsert(entities, bulkProcessingContext.isPublishEvents())
.map(v -> new BulkUpsertResponse().withErrorsNumber(0))
.recover(e -> processSequentially(entities, bulkProcessingContext));
}
Expand All @@ -89,7 +89,7 @@ private Future<BulkUpsertResponse> processSequentially(List<T> entities, BulkPro
BulkProcessingErrorFileWriter errorsWriter = new BulkProcessingErrorFileWriter(vertx, bulkContext);

return errorsWriter.initialize()
.compose(v -> processInBatches(entities, entity -> upsert(List.of(entity))
.compose(v -> processInBatches(entities, entity -> upsert(List.of(entity), bulkContext.isPublishEvents())
.recover(e -> handleUpsertFailure(errorsCounter, errorsWriter, entity, e))))
.eventually(errorsWriter::close)
.eventually(() -> uploadErrorsFiles(bulkContext))
Expand Down Expand Up @@ -166,10 +166,11 @@ private Future<Void> uploadErrorsFiles(BulkProcessingContext bulkContext) {
* Performs an upsert operation on specified list of {@code entities}.
* The implementation of the upsert operation depends on the specifics of the {@code <T>} type of entity.
*
* @param entities - a list of entities to be updated or created
* @param entities - a list of entities to be updated or created
* @param publishEvents - a flag that indicates whether domain events should be published
* @return Future of Void, succeeded if the upsert operation is successful, otherwise failed
*/
protected abstract Future<Void> upsert(List<T> entities);
protected abstract Future<Void> upsert(List<T> entities, boolean publishEvents);

/**
* Provides a representation of the given {@code entity} to be written to error file containing entities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ protected Future<Void> ensureEntitiesWithNonMarcControlledFieldsData(List<Instan
}

@Override
protected Future<Void> upsert(List<InstanceWrapper> instanceWrappers) {
protected Future<Void> upsert(List<InstanceWrapper> instanceWrappers, boolean publishEvents) {
List<Instance> instances = instanceWrappers.stream().map(InstanceWrapper::instance).toList();

return instanceService.createInstances(instances, APPLY_UPSERT, APPLY_OPTIMISTIC_LOCKING)
return instanceService.createInstances(instances, APPLY_UPSERT, APPLY_OPTIMISTIC_LOCKING, publishEvents)
.compose(response -> {
if (!isCreateSuccessResponse(response)) {
String msg = String.format("Failed to update instances, status: '%s', message: '%s'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.vertx.core.Future.succeededFuture;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.rest.support.ResponseUtil.isCreateSuccessResponse;
Expand Down Expand Up @@ -67,8 +66,10 @@ public Handler<Response> publishCreatedOrUpdated(BatchOperationContext<D> batchO
log.info("Records created {}, records updated {}", batchOperation.getRecordsToBeCreated().size(),
batchOperation.getExistingRecords().size());

publishRecordsCreated(batchOperation.getRecordsToBeCreated()).compose(
notUsed -> publishUpdated(batchOperation.getExistingRecords()));
if (batchOperation.isPublishEvents()) {
publishRecordsCreated(batchOperation.getRecordsToBeCreated()).compose(
notUsed -> publishUpdated(batchOperation.getExistingRecords()));
}
};
}

Expand Down Expand Up @@ -131,20 +132,20 @@ protected List<Triple<String, E, E>> mapOldRecordsToNew(List<Pair<String, D>> ol
var oldRecordPair = idToOldRecordPairMap.get(getId(newRecordPair.getValue()));
return triple(newRecordPair.getKey(), convertDomainToEvent(oldRecordPair.getKey(), oldRecordPair.getValue()),
convertDomainToEvent(newRecordPair.getKey(), newRecordPair.getValue()));
}).collect(toList());
}).toList();
}

private Future<Void> publishRecordsCreated(Collection<D> records) {
return convertDomainsToEvents(records).compose(domainEventService::publishRecordsCreated);
}

private Future<List<Pair<String, E>>> convertDomainsToEvents(Collection<D> domains) {
protected Future<List<Pair<String, E>>> convertDomainsToEvents(Collection<D> domains) {
return getRecordIds(domains).map(pairs -> pairs.stream()
.map(pair -> pair(pair.getKey(), convertDomainToEvent(pair.getKey(), pair.getValue())))
.collect(toList()));
.toList());
}

private Future<List<Triple<String, E, E>>> convertDomainsToEvents(Collection<D> newRecords,
protected Future<List<Triple<String, E, E>>> convertDomainsToEvents(Collection<D> newRecords,
Collection<D> oldRecords) {

return getRecordIds(oldRecords).compose(oldRecordsInstanceIds -> getRecordIds(newRecords).map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected Future<List<Pair<String, HoldingsRecord>>> getRecordIds(
Collection<HoldingsRecord> holdingsRecords) {

return succeededFuture(holdingsRecords.stream()
.map(hr -> pair(hr.getInstanceId(), hr))
.map(hr -> pair(hr.getId(), hr))
.toList());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package org.folio.services.domainevent;

import static io.vertx.core.Future.succeededFuture;
import static java.util.stream.Collectors.toMap;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.InventoryKafkaTopic.ITEM;
import static org.folio.InventoryKafkaTopic.REINDEX_RECORDS;
import static org.folio.rest.support.ResponseUtil.isDeleteSuccessResponse;
import static org.folio.rest.tools.utils.TenantTool.tenantId;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
Expand All @@ -20,6 +24,7 @@
import org.folio.rest.jaxrs.model.HoldingsRecord;
import org.folio.rest.jaxrs.model.Item;
import org.folio.rest.jaxrs.model.PublishReindexRecords;
import org.folio.rest.support.CollectionUtil;

public class ItemDomainEventPublisher extends AbstractDomainEventPublisher<Item, ItemWithInstanceId> {
private static final Logger log = getLogger(ItemDomainEventPublisher.class);
Expand All @@ -42,7 +47,7 @@ public Future<Void> publishUpdated(Item newItem, Item oldItem, HoldingsRecord ne
ItemWithInstanceId oldItemWithId = new ItemWithInstanceId(oldItem, oldHoldings.getInstanceId());
ItemWithInstanceId newItemWithId = new ItemWithInstanceId(newItem, newHoldings.getInstanceId());

return domainEventService.publishRecordUpdated(newHoldings.getInstanceId(), oldItemWithId, newItemWithId);
return domainEventService.publishRecordUpdated(newItem.getId(), oldItemWithId, newItemWithId);
}

public Future<Void> publishUpdated(HoldingsRecord oldHoldings, HoldingsRecord newHoldings, List<Item> oldItems) {
Expand All @@ -67,9 +72,18 @@ public Future<Void> publishReindexItems(String key, List<Map<String, Object>> it
}

@Override
public void publishRemoved(String instanceId, String itemRaw) {
String instanceIdAndItemRaw = "{\"instanceId\":\"" + instanceId + "\"," + itemRaw.substring(1);
domainEventService.publishRecordRemoved(instanceId, instanceIdAndItemRaw);
public Handler<Response> publishRemoved(Item removedRecord) {
return response -> {
if (!isDeleteSuccessResponse(response)) {
log.warn("Item record removal failed, no event will be sent");
return;
}
getRecordIds(List.of(removedRecord))
.map(CollectionUtil::getFirst)
.map(Pair::getKey)
.compose(instanceId -> domainEventService.publishRecordRemoved(
removedRecord.getId(), convertDomainToEvent(instanceId, removedRecord)));
};
}

@Override
Expand All @@ -85,11 +99,33 @@ protected ItemWithInstanceId convertDomainToEvent(String instanceId, Item item)
return new ItemWithInstanceId(item, instanceId);
}

@Override
protected List<Triple<String, ItemWithInstanceId, ItemWithInstanceId>> mapOldRecordsToNew(
List<Pair<String, Item>> oldRecords, List<Pair<String, Item>> newRecords) {

var idToOldRecordPairMap = oldRecords.stream().collect(toMap(pair -> getId(pair.getValue()), pair -> pair));

return newRecords.stream().map(newRecordPair -> {
var oldRecordPair = idToOldRecordPairMap.get(getId(newRecordPair.getValue()));
return triple(newRecordPair.getValue().getId(), convertDomainToEvent(
oldRecordPair.getKey(), oldRecordPair.getValue()),
convertDomainToEvent(newRecordPair.getKey(), newRecordPair.getValue()));
}).toList();
}

@Override
protected String getId(Item item) {
return item.getId();
}

@Override
protected Future<List<Pair<String, ItemWithInstanceId>>> convertDomainsToEvents(Collection<Item> domains) {
return getRecordIds(domains)
.map(pairs -> pairs.stream()
.map(pair -> pair(pair.getValue().getId(), convertDomainToEvent(pair.getKey(), pair.getValue())))
.toList());
}

private List<Triple<String, ItemWithInstanceId, ItemWithInstanceId>> mapOldItemsToNew(
HoldingsRecord oldHoldings, HoldingsRecord newHoldings, Collection<Item> oldItems, Collection<Item> newItems) {

Expand Down
18 changes: 15 additions & 3 deletions src/main/java/org/folio/services/holding/HoldingsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import static org.folio.services.batch.BatchOperationContextFactory.buildBatchOperationContext;
import static org.folio.validator.HridValidators.refuseWhenHridChanged;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
Expand All @@ -29,6 +31,7 @@
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.folio.dbschema.ObjectMapperTool;
import org.folio.persist.HoldingsRepository;
import org.folio.persist.InstanceRepository;
import org.folio.rest.jaxrs.model.HoldingsRecord;
Expand All @@ -51,6 +54,8 @@

public class HoldingsService {
private static final Logger log = getLogger(HoldingsService.class);
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper();

private final Context vertxContext;
private final Map<String, String> okapiHeaders;
private final PostgresClient postgresClient;
Expand Down Expand Up @@ -146,8 +151,15 @@ public Future<Response> deleteHoldings(String cql) {
// https://sonarcloud.io/organizations/folio-org/rules?open=java%3AS1602&rule_key=java%3AS1602
return holdingsRepository.delete(cql)
.onSuccess(rowSet -> vertxContext.runOnContext(runLater ->
rowSet.iterator().forEachRemaining(row ->
domainEventPublisher.publishRemoved(row.getString(0), row.getString(1))
rowSet.iterator().forEachRemaining(row -> {
try {
var holdingId = OBJECT_MAPPER.readTree(row.getString(1)).get("id").textValue();
domainEventPublisher.publishRemoved(holdingId, row.getString(1));
} catch (JsonProcessingException ex) {
log.error(String.format("deleteHoldings:: Failed to parse json : %s", ex.getMessage()), ex);
throw new IllegalArgumentException(ex.getCause());
}
}
)
))
.map(Response.noContent().build());
Expand All @@ -168,7 +180,7 @@ public Future<Response> createHoldings(List<HoldingsRecord> holdings, boolean up
.compose(ar -> hridManager.populateHridForHoldings(holdings)
.compose(NotesValidators::refuseHoldingLongNotes)
.compose(result -> buildBatchOperationContext(upsert, holdings,
holdingsRepository, HoldingsRecord::getId))
holdingsRepository, HoldingsRecord::getId, true))
.compose(batchOperation -> postSync(HOLDINGS_RECORD_TABLE, holdings, MAX_ENTITIES,
upsert, optimisticLocking, okapiHeaders, vertxContext, PostHoldingsStorageBatchSynchronousResponse.class)
.onSuccess(domainEventPublisher.publishCreatedOrUpdated(batchOperation))))
Expand Down
Loading

0 comments on commit 676d236

Please sign in to comment.