Skip to content

Commit

Permalink
MODINVSTOR-1283 Modify endpoint for bulk instances upsert with publis…
Browse files Browse the repository at this point in the history
…h events flag
  • Loading branch information
psmagin committed Nov 5, 2024
1 parent 358f43f commit 37b5420
Show file tree
Hide file tree
Showing 15 changed files with 129 additions and 79 deletions.
5 changes: 5 additions & 0 deletions ramls/bulkUpsertRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
"recordsFileName": {
"description": "File name of entities records",
"type": "string"
},
"publishEvents": {
"description": "A flag that indicates whether domain events should be published.",
"type": "boolean",
"default": true
}
},
"required": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void postInstanceStorageBatchSynchronous(boolean upsert, InstancesPost en
var instances = ObjectConverterUtils.convertObject(entity, Instances.class);

new InstanceService(vertxContext, okapiHeaders)
.createInstances(instances.getInstances(), upsert, true)
.createInstances(instances.getInstances(), upsert, true, true)
.otherwise(cause -> respond500WithTextPlain(cause.getMessage()))
.onComplete(asyncResultHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void postInstanceStorageBatchSynchronousUnsafe(InstancesPost entity, Map<
var instances = ObjectConverterUtils.convertObject(entity, Instances.class);

new InstanceService(vertxContext, okapiHeaders)
.createInstances(instances.getInstances(), true, false)
.createInstances(instances.getInstances(), true, false, true)
.otherwise(cause -> respond500WithTextPlain(cause.getMessage()))
.onComplete(asyncResultHandler);
}
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/folio/services/BulkProcessingContext.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.services;

import org.apache.commons.lang3.StringUtils;
import org.folio.rest.jaxrs.model.BulkUpsertRequest;

/**
* Encapsulates the context for bulk processing of entities from external file.
Expand All @@ -17,13 +18,15 @@ public class BulkProcessingContext {
private final String errorsFilePath;
private final String errorEntitiesFileLocalPath;
private final String errorsFileLocalPath;
private final boolean publishEvents;

public BulkProcessingContext(String entitiesFilePath) {
this.initialFilePath = StringUtils.removeStart(entitiesFilePath, '/');
public BulkProcessingContext(BulkUpsertRequest request) {
this.initialFilePath = StringUtils.removeStart(request.getRecordsFileName(), '/');
this.errorEntitiesFilePath = initialFilePath + FAILED_ENTITIES_FILE_SUFFIX;
this.errorsFilePath = initialFilePath + ERRORS_FILE_SUFFIX;
this.errorEntitiesFileLocalPath = ROOT_FOLDER + errorEntitiesFilePath;
this.errorsFileLocalPath = ROOT_FOLDER + errorsFilePath;
this.publishEvents = request.getPublishEvents();
}

public String getErrorEntitiesFilePath() {
Expand All @@ -42,4 +45,7 @@ public String getErrorsFileLocalPath() {
return errorsFileLocalPath;
}

public boolean isPublishEvents() {
return publishEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ public final class BatchOperationContext<T> {
*/
private final Collection<T> existingRecords;

public BatchOperationContext(Collection<T> recordsToBeCreated, Collection<T> existingRecords) {
private final boolean publishEvents;

public BatchOperationContext(Collection<T> recordsToBeCreated, Collection<T> existingRecords, boolean publishEvents) {
this.recordsToBeCreated = unmodifiableCollection(recordsToBeCreated);
this.existingRecords = unmodifiableCollection(existingRecords);
this.publishEvents = publishEvents;
}

public Collection<T> getRecordsToBeCreated() {
Expand All @@ -23,4 +26,8 @@ public Collection<T> getRecordsToBeCreated() {
public Collection<T> getExistingRecords() {
return existingRecords;
}

public boolean isPublishEvents() {
return publishEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
public final class BatchOperationContextFactory {
private BatchOperationContextFactory() { }

public static <T> Future<BatchOperationContext<T>> buildBatchOperationContext(
boolean upsert, List<T> all, AbstractRepository<T> repository, Function<T, String> idGetter) {
public static <T> Future<BatchOperationContext<T>> buildBatchOperationContext(boolean upsert, List<T> all,
AbstractRepository<T> repository,
Function<T, String> idGetter,
boolean publishEvents) {

if (!upsert) {
return succeededFuture(new BatchOperationContext<>(all, emptyList()));
return succeededFuture(new BatchOperationContext<>(all, emptyList(), publishEvents));
}

return repository.getById(all, idGetter).map(found -> {
final var toBeCreated = all.stream()
.filter(entity -> !found.containsKey(idGetter.apply(entity)))
.collect(toList());

return new BatchOperationContext<>(toBeCreated, found.values());
return new BatchOperationContext<>(toBeCreated, found.values(), publishEvents);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ private Future<List<T>> loadEntities(BulkUpsertRequest bulkRequest) {
}

private Future<BulkUpsertResponse> upsertEntities(List<T> entities, BulkUpsertRequest bulkRequest) {
BulkProcessingContext bulkProcessingContext = new BulkProcessingContext(bulkRequest.getRecordsFileName());
BulkProcessingContext bulkProcessingContext = new BulkProcessingContext(bulkRequest);

return upsert(entities)
return upsert(entities, bulkProcessingContext.isPublishEvents())
.map(v -> new BulkUpsertResponse().withErrorsNumber(0))
.recover(e -> processSequentially(entities, bulkProcessingContext));
}
Expand All @@ -89,7 +89,7 @@ private Future<BulkUpsertResponse> processSequentially(List<T> entities, BulkPro
BulkProcessingErrorFileWriter errorsWriter = new BulkProcessingErrorFileWriter(vertx, bulkContext);

return errorsWriter.initialize()
.compose(v -> processInBatches(entities, entity -> upsert(List.of(entity))
.compose(v -> processInBatches(entities, entity -> upsert(List.of(entity), bulkContext.isPublishEvents())
.recover(e -> handleUpsertFailure(errorsCounter, errorsWriter, entity, e))))
.eventually(errorsWriter::close)
.eventually(() -> uploadErrorsFiles(bulkContext))
Expand Down Expand Up @@ -166,10 +166,11 @@ private Future<Void> uploadErrorsFiles(BulkProcessingContext bulkContext) {
* Performs an upsert operation on specified list of {@code entities}.
* The implementation of the upsert operation depends on the specifics of the {@code <T>} type of entity.
*
* @param entities - a list of entities to be updated or created
* @param entities - a list of entities to be updated or created
* @param publishEvents - a flag that indicates whether domain events should be published
* @return Future of Void, succeeded if the upsert operation is successful, otherwise failed
*/
protected abstract Future<Void> upsert(List<T> entities);
protected abstract Future<Void> upsert(List<T> entities, boolean publishEvents);

/**
* Provides a representation of the given {@code entity} to be written to error file containing entities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ protected Future<Void> ensureEntitiesWithNonMarcControlledFieldsData(List<Instan
}

@Override
protected Future<Void> upsert(List<InstanceWrapper> instanceWrappers) {
protected Future<Void> upsert(List<InstanceWrapper> instanceWrappers, boolean publishEvents) {
List<Instance> instances = instanceWrappers.stream().map(InstanceWrapper::instance).toList();

return instanceService.createInstances(instances, APPLY_UPSERT, APPLY_OPTIMISTIC_LOCKING)
return instanceService.createInstances(instances, APPLY_UPSERT, APPLY_OPTIMISTIC_LOCKING, publishEvents)
.compose(response -> {
if (!isCreateSuccessResponse(response)) {
String msg = String.format("Failed to update instances, status: '%s', message: '%s'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ public Handler<Response> publishCreatedOrUpdated(BatchOperationContext<D> batchO
log.info("Records created {}, records updated {}", batchOperation.getRecordsToBeCreated().size(),
batchOperation.getExistingRecords().size());

publishRecordsCreated(batchOperation.getRecordsToBeCreated()).compose(
notUsed -> publishUpdated(batchOperation.getExistingRecords()));
if (batchOperation.isPublishEvents()) {
publishRecordsCreated(batchOperation.getRecordsToBeCreated()).compose(
notUsed -> publishUpdated(batchOperation.getExistingRecords()));
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public Future<Response> createHoldings(List<HoldingsRecord> holdings, boolean up
.compose(ar -> hridManager.populateHridForHoldings(holdings)
.compose(NotesValidators::refuseHoldingLongNotes)
.compose(result -> buildBatchOperationContext(upsert, holdings,
holdingsRepository, HoldingsRecord::getId))
holdingsRepository, HoldingsRecord::getId, true))
.compose(batchOperation -> postSync(HOLDINGS_RECORD_TABLE, holdings, MAX_ENTITIES,
upsert, optimisticLocking, okapiHeaders, vertxContext, PostHoldingsStorageBatchSynchronousResponse.class)
.onSuccess(domainEventPublisher.publishCreatedOrUpdated(batchOperation))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,15 @@ public Future<Response> createInstance(Instance entity) {
.map(ResponseHandlerUtil::handleHridError);
}

public Future<Response> createInstances(List<Instance> instances, boolean upsert, boolean optimisticLocking) {
public Future<Response> createInstances(List<Instance> instances, boolean upsert, boolean optimisticLocking,
boolean publishEvents) {
final String statusUpdatedDate = generateStatusUpdatedDate();
instances.forEach(instance -> instance.setStatusUpdatedDate(statusUpdatedDate));

return hridManager.populateHridForInstances(instances)
.compose(NotesValidators::refuseInstanceLongNotes)
.compose(notUsed -> buildBatchOperationContext(upsert, instances,
instanceRepository, Instance::getId))
instanceRepository, Instance::getId, publishEvents))
.compose(batchOperation ->
// Can use instances list here directly because the class is stateful
postSync(INSTANCE_TABLE, instances, MAX_ENTITIES, upsert, optimisticLocking, okapiHeaders,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/folio/services/item/ItemService.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Future<Response> createItems(List<Item> items, boolean upsert, boolean op
.compose(NotesValidators::refuseItemLongNotes)
.compose(result -> effectiveValuesService.populateEffectiveValues(items))
.compose(this::populateCirculationNoteId)
.compose(result -> buildBatchOperationContext(upsert, items, itemRepository, Item::getId))
.compose(result -> buildBatchOperationContext(upsert, items, itemRepository, Item::getId, true))
.compose(batchOperation -> postSync(ITEM_TABLE, items, MAX_ENTITIES, upsert, optimisticLocking,
okapiHeaders, vertxContext, PostItemStorageBatchSynchronousResponse.class)
.onSuccess(domainEventService.publishCreatedOrUpdated(batchOperation)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.Assert.assertEquals;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;

import io.vertx.core.json.Json;
Expand Down Expand Up @@ -77,6 +77,69 @@ public class InstanceStorageInstancesBulkApiTest extends TestBaseWithInventoryUt

private final InstanceEventMessageChecks instanceMessageChecks = new InstanceEventMessageChecks(KAFKA_CONSUMER);

@Before
public void setUp() {
StorageTestSuite.deleteAll(TENANT_ID, PRECEDING_SUCCEEDING_TITLE_TABLE);
clearData();
removeAllEvents();
}

@Test
public void shouldUpdateInstancesWithoutErrors()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
shouldUpdateInstances(true);
}

@Test
public void shouldUpdateInstancesWithoutErrorsAndDoNotPublishDomainEvents()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
shouldUpdateInstances(false);
}

@Test
public void shouldUpdateInstancesWithErrors()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
// given
String expectedErrorRecordsFileName = BULK_FILE_TO_UPLOAD + "_failedEntities";
String expectedErrorsFileName = BULK_FILE_TO_UPLOAD + "_errors";

List<String> instancesIds = extractInstancesIdsFromFile(BULK_INSTANCES_WITH_INVALID_TYPE_PATH);
FileInputStream inputStream = FileUtils.openInputStream(new File(BULK_INSTANCES_WITH_INVALID_TYPE_PATH));
String bulkFilePath = s3Client.write(BULK_FILE_TO_UPLOAD, inputStream);

final IndividualResource existingInstance1 = createInstance(buildInstance(instancesIds.get(0), INSTANCE_TITLE_1));
final IndividualResource existingInstance2 = createInstance(buildInstance(instancesIds.get(1), INSTANCE_TITLE_2));

// when
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest()
.withRecordsFileName(bulkFilePath)
);

// then
assertThat(bulkResponse.getErrorsNumber(), is(1));
assertThat(bulkResponse.getErrorRecordsFileName(), is(expectedErrorRecordsFileName));
assertThat(bulkResponse.getErrorsFileName(), is(expectedErrorsFileName));

List<String> filesList = s3Client.list(BULK_FILE_TO_UPLOAD);
assertThat(filesList.size(), is(3));
assertThat(filesList, containsInAnyOrder(bulkFilePath, expectedErrorRecordsFileName, expectedErrorsFileName));
List<String> errors = readLinesFromInputStream(s3Client.read(expectedErrorsFileName));
assertThat(errors.size(), is(1));

JsonObject updatedInstance1 = getInstanceById(existingInstance1.getId().toString());

instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.noUpdatedMessagePublished(existingInstance2.getId().toString());
}

@Test
public void shouldReturnUnprocessableEntityIfRecordsFileNameIsNotSpecified()
throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Response> future = getClient().post(instancesBulk(), new BulkUpsertRequest(), TENANT_ID);
Response response = future.get(10, SECONDS);
assertThat(response.getStatusCode(), is(HTTP_UNPROCESSABLE_ENTITY.toInt()));
}

@BeforeClass
public static void setUpClass() {
localStackContainer = new LocalStackContainer(DockerImageName.parse("localstack/localstack:s3-latest"))
Expand Down Expand Up @@ -104,21 +167,13 @@ public static void setUpClass() {
s3Client.createBucketIfNotExists();
}

@Before
public void setUp() {
StorageTestSuite.deleteAll(TENANT_ID, PRECEDING_SUCCEEDING_TITLE_TABLE);
clearData();
removeAllEvents();
}

@AfterClass
public static void tearDownClass() {
localStackContainer.close();
}

@Test
public void shouldUpdateInstancesWithoutErrors()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
private void shouldUpdateInstances(boolean publishEvents)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
// given
List<String> instancesIds = extractInstancesIdsFromFile(BULK_INSTANCES_PATH);
FileInputStream inputStream = FileUtils.openInputStream(new File(BULK_INSTANCES_PATH));
Expand All @@ -135,7 +190,10 @@ public void shouldUpdateInstancesWithoutErrors()
precedingSucceedingTitleClient.create(precedingSucceedingTitle2.getJson());

// when
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest().withRecordsFileName(bulkFilePath));
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest()
.withRecordsFileName(bulkFilePath)
.withPublishEvents(publishEvents)
);

// then
assertThat(bulkResponse.getErrorsNumber(), is(0));
Expand All @@ -154,49 +212,13 @@ public void shouldUpdateInstancesWithoutErrors()
assertThat(titleJson.getString("title"), notNullValue());
});

instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.updatedMessagePublished(existingInstance2.getJson(), updatedInstance2);
}

@Test
public void shouldUpdateInstancesWithErrors()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
// given
String expectedErrorRecordsFileName = BULK_FILE_TO_UPLOAD + "_failedEntities";
String expectedErrorsFileName = BULK_FILE_TO_UPLOAD + "_errors";

List<String> instancesIds = extractInstancesIdsFromFile(BULK_INSTANCES_WITH_INVALID_TYPE_PATH);
FileInputStream inputStream = FileUtils.openInputStream(new File(BULK_INSTANCES_WITH_INVALID_TYPE_PATH));
String bulkFilePath = s3Client.write(BULK_FILE_TO_UPLOAD, inputStream);

final IndividualResource existingInstance1 = createInstance(buildInstance(instancesIds.get(0), INSTANCE_TITLE_1));
final IndividualResource existingInstance2 = createInstance(buildInstance(instancesIds.get(1), INSTANCE_TITLE_2));

// when
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest().withRecordsFileName(bulkFilePath));

// then
assertThat(bulkResponse.getErrorsNumber(), is(1));
assertThat(bulkResponse.getErrorRecordsFileName(), is(expectedErrorRecordsFileName));
assertThat(bulkResponse.getErrorsFileName(), is(expectedErrorsFileName));

List<String> filesList = s3Client.list(BULK_FILE_TO_UPLOAD);
assertThat(filesList.size(), is(3));
assertThat(filesList, containsInAnyOrder(bulkFilePath, expectedErrorRecordsFileName, expectedErrorsFileName));
List<String> errors = readLinesFromInputStream(s3Client.read(expectedErrorsFileName));
assertThat(errors.size(), is(1));

JsonObject updatedInstance1 = getInstanceById(existingInstance1.getId().toString());
instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.noUpdatedMessagePublished(existingInstance2.getId().toString());
}

@Test
public void shouldReturnUnprocessableEntityIfRecordsFileNameIsNotSpecified()
throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Response> future = getClient().post(instancesBulk(), new BulkUpsertRequest(), TENANT_ID);
Response response = future.get(10, SECONDS);
assertThat(response.getStatusCode(), is(HTTP_UNPROCESSABLE_ENTITY.toInt()));
if (publishEvents) {
instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.updatedMessagePublished(existingInstance2.getJson(), updatedInstance2);
} else {
instanceMessageChecks.noUpdatedMessagePublished(existingInstance1.getId().toString());
instanceMessageChecks.noUpdatedMessagePublished(existingInstance2.getId().toString());
}
}

private List<String> extractInstancesIdsFromFile(String bulkInstancesFilePath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.folio.rest.jaxrs.model.BulkUpsertRequest;
import org.folio.rest.jaxrs.model.Instance;
import org.folio.services.BulkProcessingContext;
import org.junit.After;
Expand All @@ -34,7 +35,8 @@ public class BulkProcessingErrorFileWriterTest {

@Before
public void setUp() {
bulkContext = new BulkProcessingContext(BULK_INSTANCES_FILE_PATH);
var request = new BulkUpsertRequest().withRecordsFileName(BULK_INSTANCES_FILE_PATH);
bulkContext = new BulkProcessingContext(request);
writer = new BulkProcessingErrorFileWriter(vertx, bulkContext);
}

Expand Down
Loading

0 comments on commit 37b5420

Please sign in to comment.