Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODINVSTOR-1283 Modify endpoint for bulk instances upsert with publish events flag #1111

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ramls/bulkUpsertRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
"recordsFileName": {
"description": "File name of entities records",
"type": "string"
},
"publishEvents": {
"description": "A flag that indicates whether domain events should be published.",
"type": "boolean",
"default": true
}
},
"required": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void postInstanceStorageBatchSynchronous(boolean upsert, InstancesPost en
var instances = ObjectConverterUtils.convertObject(entity, Instances.class);

new InstanceService(vertxContext, okapiHeaders)
.createInstances(instances.getInstances(), upsert, true)
.createInstances(instances.getInstances(), upsert, true, true)
.otherwise(cause -> respond500WithTextPlain(cause.getMessage()))
.onComplete(asyncResultHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void postInstanceStorageBatchSynchronousUnsafe(InstancesPost entity, Map<
var instances = ObjectConverterUtils.convertObject(entity, Instances.class);

new InstanceService(vertxContext, okapiHeaders)
.createInstances(instances.getInstances(), true, false)
.createInstances(instances.getInstances(), true, false, true)
.otherwise(cause -> respond500WithTextPlain(cause.getMessage()))
.onComplete(asyncResultHandler);
}
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/folio/services/BulkProcessingContext.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.services;

import org.apache.commons.lang3.StringUtils;
import org.folio.rest.jaxrs.model.BulkUpsertRequest;

/**
* Encapsulates the context for bulk processing of entities from external file.
Expand All @@ -17,13 +18,15 @@ public class BulkProcessingContext {
private final String errorsFilePath;
private final String errorEntitiesFileLocalPath;
private final String errorsFileLocalPath;
private final boolean publishEvents;

public BulkProcessingContext(String entitiesFilePath) {
this.initialFilePath = StringUtils.removeStart(entitiesFilePath, '/');
public BulkProcessingContext(BulkUpsertRequest request) {
this.initialFilePath = StringUtils.removeStart(request.getRecordsFileName(), '/');
this.errorEntitiesFilePath = initialFilePath + FAILED_ENTITIES_FILE_SUFFIX;
this.errorsFilePath = initialFilePath + ERRORS_FILE_SUFFIX;
this.errorEntitiesFileLocalPath = ROOT_FOLDER + errorEntitiesFilePath;
this.errorsFileLocalPath = ROOT_FOLDER + errorsFilePath;
this.publishEvents = request.getPublishEvents();
}

public String getErrorEntitiesFilePath() {
Expand All @@ -42,4 +45,7 @@ public String getErrorsFileLocalPath() {
return errorsFileLocalPath;
}

public boolean isPublishEvents() {
return publishEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ public final class BatchOperationContext<T> {
*/
private final Collection<T> existingRecords;

public BatchOperationContext(Collection<T> recordsToBeCreated, Collection<T> existingRecords) {
private final boolean publishEvents;

public BatchOperationContext(Collection<T> recordsToBeCreated, Collection<T> existingRecords, boolean publishEvents) {
this.recordsToBeCreated = unmodifiableCollection(recordsToBeCreated);
this.existingRecords = unmodifiableCollection(existingRecords);
this.publishEvents = publishEvents;
}

public Collection<T> getRecordsToBeCreated() {
Expand All @@ -23,4 +26,8 @@ public Collection<T> getRecordsToBeCreated() {
public Collection<T> getExistingRecords() {
return existingRecords;
}

public boolean isPublishEvents() {
return publishEvents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
public final class BatchOperationContextFactory {
private BatchOperationContextFactory() { }

public static <T> Future<BatchOperationContext<T>> buildBatchOperationContext(
boolean upsert, List<T> all, AbstractRepository<T> repository, Function<T, String> idGetter) {
public static <T> Future<BatchOperationContext<T>> buildBatchOperationContext(boolean upsert, List<T> all,
AbstractRepository<T> repository,
Function<T, String> idGetter,
boolean publishEvents) {

if (!upsert) {
return succeededFuture(new BatchOperationContext<>(all, emptyList()));
return succeededFuture(new BatchOperationContext<>(all, emptyList(), publishEvents));
}

return repository.getById(all, idGetter).map(found -> {
final var toBeCreated = all.stream()
.filter(entity -> !found.containsKey(idGetter.apply(entity)))
.collect(toList());

return new BatchOperationContext<>(toBeCreated, found.values());
return new BatchOperationContext<>(toBeCreated, found.values(), publishEvents);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ private Future<List<T>> loadEntities(BulkUpsertRequest bulkRequest) {
}

private Future<BulkUpsertResponse> upsertEntities(List<T> entities, BulkUpsertRequest bulkRequest) {
BulkProcessingContext bulkProcessingContext = new BulkProcessingContext(bulkRequest.getRecordsFileName());
BulkProcessingContext bulkProcessingContext = new BulkProcessingContext(bulkRequest);

return upsert(entities)
return upsert(entities, bulkProcessingContext.isPublishEvents())
.map(v -> new BulkUpsertResponse().withErrorsNumber(0))
.recover(e -> processSequentially(entities, bulkProcessingContext));
}
Expand All @@ -89,7 +89,7 @@ private Future<BulkUpsertResponse> processSequentially(List<T> entities, BulkPro
BulkProcessingErrorFileWriter errorsWriter = new BulkProcessingErrorFileWriter(vertx, bulkContext);

return errorsWriter.initialize()
.compose(v -> processInBatches(entities, entity -> upsert(List.of(entity))
.compose(v -> processInBatches(entities, entity -> upsert(List.of(entity), bulkContext.isPublishEvents())
.recover(e -> handleUpsertFailure(errorsCounter, errorsWriter, entity, e))))
.eventually(errorsWriter::close)
.eventually(() -> uploadErrorsFiles(bulkContext))
Expand Down Expand Up @@ -166,10 +166,11 @@ private Future<Void> uploadErrorsFiles(BulkProcessingContext bulkContext) {
* Performs an upsert operation on specified list of {@code entities}.
* The implementation of the upsert operation depends on the specifics of the {@code <T>} type of entity.
*
* @param entities - a list of entities to be updated or created
* @param entities - a list of entities to be updated or created
* @param publishEvents - a flag that indicates whether domain events should be published
* @return Future of Void, succeeded if the upsert operation is successful, otherwise failed
*/
protected abstract Future<Void> upsert(List<T> entities);
protected abstract Future<Void> upsert(List<T> entities, boolean publishEvents);

/**
* Provides a representation of the given {@code entity} to be written to error file containing entities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ protected Future<Void> ensureEntitiesWithNonMarcControlledFieldsData(List<Instan
}

@Override
protected Future<Void> upsert(List<InstanceWrapper> instanceWrappers) {
protected Future<Void> upsert(List<InstanceWrapper> instanceWrappers, boolean publishEvents) {
List<Instance> instances = instanceWrappers.stream().map(InstanceWrapper::instance).toList();

return instanceService.createInstances(instances, APPLY_UPSERT, APPLY_OPTIMISTIC_LOCKING)
return instanceService.createInstances(instances, APPLY_UPSERT, APPLY_OPTIMISTIC_LOCKING, publishEvents)
.compose(response -> {
if (!isCreateSuccessResponse(response)) {
String msg = String.format("Failed to update instances, status: '%s', message: '%s'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ public Handler<Response> publishCreatedOrUpdated(BatchOperationContext<D> batchO
log.info("Records created {}, records updated {}", batchOperation.getRecordsToBeCreated().size(),
batchOperation.getExistingRecords().size());

publishRecordsCreated(batchOperation.getRecordsToBeCreated()).compose(
notUsed -> publishUpdated(batchOperation.getExistingRecords()));
if (batchOperation.isPublishEvents()) {
publishRecordsCreated(batchOperation.getRecordsToBeCreated()).compose(
notUsed -> publishUpdated(batchOperation.getExistingRecords()));
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public Future<Response> createHoldings(List<HoldingsRecord> holdings, boolean up
.compose(ar -> hridManager.populateHridForHoldings(holdings)
.compose(NotesValidators::refuseHoldingLongNotes)
.compose(result -> buildBatchOperationContext(upsert, holdings,
holdingsRepository, HoldingsRecord::getId))
holdingsRepository, HoldingsRecord::getId, true))
.compose(batchOperation -> postSync(HOLDINGS_RECORD_TABLE, holdings, MAX_ENTITIES,
upsert, optimisticLocking, okapiHeaders, vertxContext, PostHoldingsStorageBatchSynchronousResponse.class)
.onSuccess(domainEventPublisher.publishCreatedOrUpdated(batchOperation))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,15 @@ public Future<Response> createInstance(Instance entity) {
.map(ResponseHandlerUtil::handleHridError);
}

public Future<Response> createInstances(List<Instance> instances, boolean upsert, boolean optimisticLocking) {
public Future<Response> createInstances(List<Instance> instances, boolean upsert, boolean optimisticLocking,
boolean publishEvents) {
final String statusUpdatedDate = generateStatusUpdatedDate();
instances.forEach(instance -> instance.setStatusUpdatedDate(statusUpdatedDate));

return hridManager.populateHridForInstances(instances)
.compose(NotesValidators::refuseInstanceLongNotes)
.compose(notUsed -> buildBatchOperationContext(upsert, instances,
instanceRepository, Instance::getId))
instanceRepository, Instance::getId, publishEvents))
.compose(batchOperation ->
// Can use instances list here directly because the class is stateful
postSync(INSTANCE_TABLE, instances, MAX_ENTITIES, upsert, optimisticLocking, okapiHeaders,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/folio/services/item/ItemService.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Future<Response> createItems(List<Item> items, boolean upsert, boolean op
.compose(NotesValidators::refuseItemLongNotes)
.compose(result -> effectiveValuesService.populateEffectiveValues(items))
.compose(this::populateCirculationNoteId)
.compose(result -> buildBatchOperationContext(upsert, items, itemRepository, Item::getId))
.compose(result -> buildBatchOperationContext(upsert, items, itemRepository, Item::getId, true))
.compose(batchOperation -> postSync(ITEM_TABLE, items, MAX_ENTITIES, upsert, optimisticLocking,
okapiHeaders, vertxContext, PostItemStorageBatchSynchronousResponse.class)
.onSuccess(domainEventService.publishCreatedOrUpdated(batchOperation)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.Assert.assertEquals;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.S3;

import io.vertx.core.json.Json;
Expand Down Expand Up @@ -77,6 +77,69 @@ public class InstanceStorageInstancesBulkApiTest extends TestBaseWithInventoryUt

private final InstanceEventMessageChecks instanceMessageChecks = new InstanceEventMessageChecks(KAFKA_CONSUMER);

@Before
public void setUp() {
StorageTestSuite.deleteAll(TENANT_ID, PRECEDING_SUCCEEDING_TITLE_TABLE);
clearData();
removeAllEvents();
}

@Test
public void shouldUpdateInstancesWithoutErrors()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
shouldUpdateInstances(true);
}

@Test
public void shouldUpdateInstancesWithoutErrorsAndDoNotPublishDomainEvents()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
shouldUpdateInstances(false);
}

@Test
public void shouldUpdateInstancesWithErrors()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
// given
String expectedErrorRecordsFileName = BULK_FILE_TO_UPLOAD + "_failedEntities";
String expectedErrorsFileName = BULK_FILE_TO_UPLOAD + "_errors";

List<String> instancesIds = extractInstancesIdsFromFile(BULK_INSTANCES_WITH_INVALID_TYPE_PATH);
FileInputStream inputStream = FileUtils.openInputStream(new File(BULK_INSTANCES_WITH_INVALID_TYPE_PATH));
String bulkFilePath = s3Client.write(BULK_FILE_TO_UPLOAD, inputStream);

final IndividualResource existingInstance1 = createInstance(buildInstance(instancesIds.get(0), INSTANCE_TITLE_1));
final IndividualResource existingInstance2 = createInstance(buildInstance(instancesIds.get(1), INSTANCE_TITLE_2));

// when
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest()
.withRecordsFileName(bulkFilePath)
);

// then
assertThat(bulkResponse.getErrorsNumber(), is(1));
assertThat(bulkResponse.getErrorRecordsFileName(), is(expectedErrorRecordsFileName));
assertThat(bulkResponse.getErrorsFileName(), is(expectedErrorsFileName));

List<String> filesList = s3Client.list(BULK_FILE_TO_UPLOAD);
assertThat(filesList.size(), is(3));
assertThat(filesList, containsInAnyOrder(bulkFilePath, expectedErrorRecordsFileName, expectedErrorsFileName));
List<String> errors = readLinesFromInputStream(s3Client.read(expectedErrorsFileName));
assertThat(errors.size(), is(1));

JsonObject updatedInstance1 = getInstanceById(existingInstance1.getId().toString());

instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.noUpdatedMessagePublished(existingInstance2.getId().toString());
}

@Test
public void shouldReturnUnprocessableEntityIfRecordsFileNameIsNotSpecified()
throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Response> future = getClient().post(instancesBulk(), new BulkUpsertRequest(), TENANT_ID);
Response response = future.get(10, SECONDS);
assertThat(response.getStatusCode(), is(HTTP_UNPROCESSABLE_ENTITY.toInt()));
}

@BeforeClass
public static void setUpClass() {
localStackContainer = new LocalStackContainer(DockerImageName.parse("localstack/localstack:s3-latest"))
Expand Down Expand Up @@ -104,21 +167,13 @@ public static void setUpClass() {
s3Client.createBucketIfNotExists();
}

@Before
public void setUp() {
StorageTestSuite.deleteAll(TENANT_ID, PRECEDING_SUCCEEDING_TITLE_TABLE);
clearData();
removeAllEvents();
}

@AfterClass
public static void tearDownClass() {
localStackContainer.close();
}

@Test
public void shouldUpdateInstancesWithoutErrors()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
private void shouldUpdateInstances(boolean publishEvents)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
// given
List<String> instancesIds = extractInstancesIdsFromFile(BULK_INSTANCES_PATH);
FileInputStream inputStream = FileUtils.openInputStream(new File(BULK_INSTANCES_PATH));
Expand All @@ -135,7 +190,10 @@ public void shouldUpdateInstancesWithoutErrors()
precedingSucceedingTitleClient.create(precedingSucceedingTitle2.getJson());

// when
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest().withRecordsFileName(bulkFilePath));
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest()
.withRecordsFileName(bulkFilePath)
.withPublishEvents(publishEvents)
);

// then
assertThat(bulkResponse.getErrorsNumber(), is(0));
Expand All @@ -154,49 +212,13 @@ public void shouldUpdateInstancesWithoutErrors()
assertThat(titleJson.getString("title"), notNullValue());
});

instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.updatedMessagePublished(existingInstance2.getJson(), updatedInstance2);
}

@Test
public void shouldUpdateInstancesWithErrors()
throws ExecutionException, InterruptedException, TimeoutException, IOException {
// given
String expectedErrorRecordsFileName = BULK_FILE_TO_UPLOAD + "_failedEntities";
String expectedErrorsFileName = BULK_FILE_TO_UPLOAD + "_errors";

List<String> instancesIds = extractInstancesIdsFromFile(BULK_INSTANCES_WITH_INVALID_TYPE_PATH);
FileInputStream inputStream = FileUtils.openInputStream(new File(BULK_INSTANCES_WITH_INVALID_TYPE_PATH));
String bulkFilePath = s3Client.write(BULK_FILE_TO_UPLOAD, inputStream);

final IndividualResource existingInstance1 = createInstance(buildInstance(instancesIds.get(0), INSTANCE_TITLE_1));
final IndividualResource existingInstance2 = createInstance(buildInstance(instancesIds.get(1), INSTANCE_TITLE_2));

// when
BulkUpsertResponse bulkResponse = postInstancesBulk(new BulkUpsertRequest().withRecordsFileName(bulkFilePath));

// then
assertThat(bulkResponse.getErrorsNumber(), is(1));
assertThat(bulkResponse.getErrorRecordsFileName(), is(expectedErrorRecordsFileName));
assertThat(bulkResponse.getErrorsFileName(), is(expectedErrorsFileName));

List<String> filesList = s3Client.list(BULK_FILE_TO_UPLOAD);
assertThat(filesList.size(), is(3));
assertThat(filesList, containsInAnyOrder(bulkFilePath, expectedErrorRecordsFileName, expectedErrorsFileName));
List<String> errors = readLinesFromInputStream(s3Client.read(expectedErrorsFileName));
assertThat(errors.size(), is(1));

JsonObject updatedInstance1 = getInstanceById(existingInstance1.getId().toString());
instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.noUpdatedMessagePublished(existingInstance2.getId().toString());
}

@Test
public void shouldReturnUnprocessableEntityIfRecordsFileNameIsNotSpecified()
throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Response> future = getClient().post(instancesBulk(), new BulkUpsertRequest(), TENANT_ID);
Response response = future.get(10, SECONDS);
assertThat(response.getStatusCode(), is(HTTP_UNPROCESSABLE_ENTITY.toInt()));
if (publishEvents) {
instanceMessageChecks.updatedMessagePublished(existingInstance1.getJson(), updatedInstance1);
instanceMessageChecks.updatedMessagePublished(existingInstance2.getJson(), updatedInstance2);
} else {
instanceMessageChecks.noUpdatedMessagePublished(existingInstance1.getId().toString());
instanceMessageChecks.noUpdatedMessagePublished(existingInstance2.getId().toString());
}
}

private List<String> extractInstancesIdsFromFile(String bulkInstancesFilePath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.folio.rest.jaxrs.model.BulkUpsertRequest;
import org.folio.rest.jaxrs.model.Instance;
import org.folio.services.BulkProcessingContext;
import org.junit.After;
Expand All @@ -34,7 +35,8 @@ public class BulkProcessingErrorFileWriterTest {

@Before
public void setUp() {
bulkContext = new BulkProcessingContext(BULK_INSTANCES_FILE_PATH);
var request = new BulkUpsertRequest().withRecordsFileName(BULK_INSTANCES_FILE_PATH);
bulkContext = new BulkProcessingContext(request);
writer = new BulkProcessingErrorFileWriter(vertx, bulkContext);
}

Expand Down
Loading
Loading