diff --git a/NEWS.md b/NEWS.md index 605624b01..e126c59c8 100644 --- a/NEWS.md +++ b/NEWS.md @@ -9,6 +9,10 @@ ### Features * Modify endpoint for bulk instances upsert with publish events flag ([MODINVSTOR-1283](https://folio-org.atlassian.net/browse/MODINVSTOR-1283)) * Change Kafka event publishing keys for holdings and items ([MODINVSTOR-1281](https://folio-org.atlassian.net/browse/MODINVSTOR-1281)) +* Merge custom ECS TLR feature branch into master ([MODINVSTOR-1262](https://folio-org.atlassian.net/browse/MODINVSTOR-1262)) +* Service points synchronization: implement processors ([MODINVSTOR-1246](https://folio-org.atlassian.net/browse/MODINVSTOR-1246)) +* Service points synchronization: create a verticle ([MODINVSTOR-1245](https://folio-org.atlassian.net/browse/MODINVSTOR-1245)) +* Do not return routing service points by default ([MODINVSTOR-1219](https://folio-org.atlassian.net/browse/MODINVSTOR-1219)) ### Bug fixes * Description ([ISSUE](https://folio-org.atlassian.net/browse/ISSUE)) @@ -52,6 +56,7 @@ ### Features * Add floating collection flag in location schema ([MODINVSTOR-1250](https://issues.folio.org/browse/MODINVSTOR-1250)) * Implement domain event production for location create/update/delete ([MODINVSTOR-1181](https://issues.folio.org/browse/MODINVSTOR-1181)) +* Add a new boolean field ecsRequestRouting to the service point schema ([MODINVSTOR-1179](https://issues.folio.org/browse/MODINVSTOR-1179)) * Implement domain event production for library create/update/delete ([MODINVSTOR-1216](https://issues.folio.org/browse/MODINVSTOR-1216)) * Implement domain event production for campus create/update/delete ([MODINVSTOR-1217](https://issues.folio.org/browse/MODINVSTOR-1217)) * Implement domain event production for institution create/update/delete ([MODINVSTOR-1218](https://issues.folio.org/browse/MODINVSTOR-1218)) diff --git a/README.MD b/README.MD index 2aeee5343..a4b8f3a63 100644 --- a/README.MD +++ b/README.MD @@ -186,6 +186,7 @@ These environment variables configure the module interaction with S3-compatible * `S3_ACCESS_KEY_ID` * `S3_SECRET_ACCESS_KEY` * `S3_IS_AWS` (default value - `false`) +* `ECS_TLR_FEATURE_ENABLED` (default value - `false`) # Local Deployment using Docker diff --git a/descriptors/ModuleDescriptor-template.json b/descriptors/ModuleDescriptor-template.json index 14dd4ccad..07d017494 100755 --- a/descriptors/ModuleDescriptor-template.json +++ b/descriptors/ModuleDescriptor-template.json @@ -1120,28 +1120,33 @@ }, { "id": "service-points", - "version": "3.3", + "version": "3.4", "handlers": [ { "methods": ["GET"], "pathPattern": "/service-points", - "permissionsRequired": ["inventory-storage.service-points.collection.get"] + "permissionsRequired": ["inventory-storage.service-points.collection.get"], + "modulePermissions": ["user-tenants.collection.get"] }, { "methods": ["GET"], "pathPattern": "/service-points/{id}", - "permissionsRequired": ["inventory-storage.service-points.item.get"] + "permissionsRequired": ["inventory-storage.service-points.item.get"], + "modulePermissions": ["user-tenants.collection.get"] }, { "methods": ["POST"], "pathPattern": "/service-points", - "permissionsRequired": ["inventory-storage.service-points.item.post"] + "permissionsRequired": ["inventory-storage.service-points.item.post"], + "modulePermissions": ["user-tenants.collection.get"] }, { "methods": ["PUT"], "pathPattern": "/service-points/{id}", - "permissionsRequired": ["inventory-storage.service-points.item.put"] + "permissionsRequired": ["inventory-storage.service-points.item.put"], + "modulePermissions": ["user-tenants.collection.get"] }, { "methods": ["DELETE"], "pathPattern": "/service-points/{id}", - "permissionsRequired": ["inventory-storage.service-points.item.delete"] + "permissionsRequired": ["inventory-storage.service-points.item.delete"], + "modulePermissions": ["user-tenants.collection.get"] } ] }, @@ -2900,7 +2905,8 @@ { "name": "S3_BUCKET", "value": "marc-migrations" }, { "name": "S3_ACCESS_KEY_ID", "value": "" }, { "name": "S3_SECRET_ACCESS_KEY", "value": "" }, - { "name": "S3_IS_AWS", "value": "true" } + { "name": "S3_IS_AWS", "value": "true" }, + { "name": "ECS_TLR_FEATURE_ENABLED", "value": "false"} ] } } diff --git a/pom.xml b/pom.xml index 8ddbb6658..51bd97fde 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ 5.5.0 4.2.2 3.26.3 + 2.1.7 3.13.0 3.6.0 @@ -273,6 +274,12 @@ log4j-slf4j2-impl test + + uk.org.webcompere + system-stubs-junit4 + ${system-stubs-junit4.version} + test + diff --git a/ramls/service-point.raml b/ramls/service-point.raml index 7f04a4ddd..4037caf71 100644 --- a/ramls/service-point.raml +++ b/ramls/service-point.raml @@ -1,6 +1,6 @@ #%RAML 1.0 title: Service Points API -version: v3.3 +version: v3.4 protocols: [ HTTP, HTTPS ] baseUri: http://localhost @@ -34,6 +34,12 @@ resourceTypes: searchable: { description: "with valid searchable fields", example: "name=aaa"}, pageable ] + queryParameters: + includeRoutingServicePoints: + description: "Should ECS request routing service points be included in the response" + default: false + required: false + type: boolean description: Return a list of service points post: description: Create a new service point diff --git a/ramls/servicepoint.json b/ramls/servicepoint.json index 1590139dc..292538d8c 100644 --- a/ramls/servicepoint.json +++ b/ramls/servicepoint.json @@ -72,6 +72,11 @@ ] } }, + "ecsRequestRouting": { + "type": "boolean", + "description": "Indicates a service point used for the ECS functionality", + "default" : false + }, "metadata": { "type": "object", "$ref": "raml-util/schemas/metadata.schema", diff --git a/src/main/java/org/folio/rest/impl/InitApiImpl.java b/src/main/java/org/folio/rest/impl/InitApiImpl.java index 134fec4e9..bfa05628d 100644 --- a/src/main/java/org/folio/rest/impl/InitApiImpl.java +++ b/src/main/java/org/folio/rest/impl/InitApiImpl.java @@ -6,11 +6,13 @@ import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; +import io.vertx.core.ThreadingModel; import io.vertx.core.Vertx; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.rest.resource.interfaces.InitAPI; import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.ServicePointSynchronizationVerticle; import org.folio.services.consortium.ShadowInstanceSynchronizationVerticle; import org.folio.services.consortium.SynchronizationVerticle; import org.folio.services.migration.async.AsyncMigrationConsumerVerticle; @@ -25,6 +27,7 @@ public void init(Vertx vertx, Context context, Handler> han initAsyncMigrationVerticle(vertx) .compose(v -> initShadowInstanceSynchronizationVerticle(vertx, getConsortiumDataCache(context))) .compose(v -> initSynchronizationVerticle(vertx, getConsortiumDataCache(context))) + .compose(v -> initServicePointSynchronizationVerticle(vertx, getConsortiumDataCache(context))) .map(true) .onComplete(handler); } @@ -76,6 +79,22 @@ private Future initSynchronizationVerticle(Vertx vertx, ConsortiumDataCa .mapEmpty(); } + private Future initServicePointSynchronizationVerticle(Vertx vertx, + ConsortiumDataCache consortiumDataCache) { + + DeploymentOptions options = new DeploymentOptions() + .setThreadingModel(ThreadingModel.WORKER) + .setInstances(1); + + return vertx.deployVerticle(() -> new ServicePointSynchronizationVerticle(consortiumDataCache), + options) + .onSuccess(v -> log.info("initServicePointSynchronizationVerticle:: " + + "ServicePointSynchronizationVerticle verticle was successfully started")) + .onFailure(e -> log.error("initServicePointSynchronizationVerticle:: " + + "ServicePointSynchronizationVerticle verticle was not successfully started", e)) + .mapEmpty(); + } + private void initConsortiumDataCache(Vertx vertx, Context context) { ConsortiumDataCache consortiumDataCache = new ConsortiumDataCache(vertx, vertx.createHttpClient()); context.put(ConsortiumDataCache.class.getName(), consortiumDataCache); diff --git a/src/main/java/org/folio/rest/impl/ServicePointApi.java b/src/main/java/org/folio/rest/impl/ServicePointApi.java index 661b7d704..fd4ca47d1 100644 --- a/src/main/java/org/folio/rest/impl/ServicePointApi.java +++ b/src/main/java/org/folio/rest/impl/ServicePointApi.java @@ -2,6 +2,7 @@ import static io.vertx.core.Future.succeededFuture; import static java.lang.Boolean.TRUE; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.folio.rest.support.EndpointFailureHandler.handleFailure; import io.vertx.core.AsyncResult; @@ -31,15 +32,18 @@ public class ServicePointApi implements org.folio.rest.jaxrs.resource.ServicePoi "Hold shelf expiry period must be specified when service point can be used for pickup."; public static final String SERVICE_POINT_CREATE_ERR_MSG_WITHOUT_BEING_PICKUP_LOC = "Hold shelf expiry period cannot be specified when service point cannot be used for pickup"; + private static final String ECS_ROUTING_QUERY_FILTER = "cql.allRecords=1 NOT ecsRequestRouting=true"; private static final Logger logger = LogManager.getLogger(); @Validate @Override - public void getServicePoints(String query, String totalRecords, int offset, int limit, + public void getServicePoints(boolean includeRoutingServicePoints, String query, + String totalRecords, int offset, int limit, Map okapiHeaders, Handler> asyncResultHandler, Context vertxContext) { + query = updateGetServicePointsQuery(query, includeRoutingServicePoints); PgUtil.get(SERVICE_POINT_TABLE, Servicepoint.class, Servicepoints.class, query, offset, limit, okapiHeaders, vertxContext, GetServicePointsResponse.class, asyncResultHandler); } @@ -67,11 +71,11 @@ public void postServicePoints(Servicepoint entity, id = UUID.randomUUID().toString(); entity.setId(id); } - String tenantId = getTenant(okapiHeaders); - PostgresClient pgClient = getPgClient(vertxContext, tenantId); - pgClient.save(SERVICE_POINT_TABLE, id, entity, saveReply -> { - if (saveReply.failed()) { - String message = logAndSaveError(saveReply.cause()); + new ServicePointService(vertxContext, okapiHeaders) + .createServicePoint(id, entity) + .onSuccess(response -> asyncResultHandler.handle(succeededFuture(response))) + .onFailure(throwable -> { + String message = logAndSaveError(throwable); if (isDuplicate(message)) { asyncResultHandler.handle(Future.succeededFuture( PostServicePointsResponse.respond422WithApplicationJson( @@ -82,15 +86,7 @@ public void postServicePoints(Servicepoint entity, PostServicePointsResponse.respond500WithTextPlain( getErrorResponse(message)))); } - } else { - String ret = saveReply.result(); - entity.setId(ret); - asyncResultHandler.handle(Future.succeededFuture( - PostServicePointsResponse - .respond201WithApplicationJson(entity, - PostServicePointsResponse.headersFor201().withLocation(LOCATION_PREFIX + ret)))); - } - }); + }); } catch (Exception e) { String message = logAndSaveError(e); asyncResultHandler.handle(Future.succeededFuture( @@ -244,7 +240,7 @@ private boolean isDuplicate(String errorMessage) { "duplicate key value violates unique constraint"); } - private String validateServicePoint(Servicepoint svcpt) { + public static String validateServicePoint(Servicepoint svcpt) { HoldShelfExpiryPeriod holdShelfExpiryPeriod = svcpt.getHoldShelfExpiryPeriod(); boolean pickupLocation = svcpt.getPickupLocation() == null ? Boolean.FALSE : svcpt.getPickupLocation(); @@ -261,4 +257,19 @@ private Future checkServicePointInUse() { return Future.succeededFuture(false); } + private static String updateGetServicePointsQuery(String query, boolean includeRoutingServicePoints) { + if (includeRoutingServicePoints) { + return query; + } + + logger.debug("updateGetServicePointsQuery:: original query: {}", query); + String newQuery = ECS_ROUTING_QUERY_FILTER; + if (isNotBlank(query)) { + newQuery += " and " + query; + } + logger.debug("updateGetServicePointsQuery:: updated query: {}", newQuery); + + return newQuery; + } + } diff --git a/src/main/java/org/folio/services/consortium/ServicePointSynchronizationVerticle.java b/src/main/java/org/folio/services/consortium/ServicePointSynchronizationVerticle.java new file mode 100644 index 000000000..7753ce861 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/ServicePointSynchronizationVerticle.java @@ -0,0 +1,111 @@ +package org.folio.services.consortium; + +import static org.folio.rest.tools.utils.ModuleName.getModuleName; +import static org.folio.rest.tools.utils.ModuleName.getModuleVersion; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_CREATED; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_DELETED; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_UPDATED; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.http.HttpClient; +import java.util.ArrayList; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.kafka.AsyncRecordHandler; +import org.folio.kafka.GlobalLoadSensor; +import org.folio.kafka.KafkaConfig; +import org.folio.kafka.KafkaConsumerWrapper; +import org.folio.kafka.SubscriptionDefinition; +import org.folio.kafka.services.KafkaEnvironmentProperties; +import org.folio.kafka.services.KafkaTopic; +import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.handler.ServicePointSynchronizationCreateHandler; +import org.folio.services.consortium.handler.ServicePointSynchronizationDeleteHandler; +import org.folio.services.consortium.handler.ServicePointSynchronizationUpdateHandler; +import org.folio.services.domainevent.ServicePointEventType; + +public class ServicePointSynchronizationVerticle extends AbstractVerticle { + + private static final Logger log = LogManager.getLogger(ServicePointSynchronizationVerticle.class); + private static final String TENANT_PATTERN = "\\w{1,}"; + private static final String MODULE_ID = getModuleId(); + private static final int DEFAULT_LOAD_LIMIT = 5; + private final ConsortiumDataCache consortiumDataCache; + + private final List> consumers = new ArrayList<>(); + + public ServicePointSynchronizationVerticle(final ConsortiumDataCache consortiumDataCache) { + this.consortiumDataCache = consortiumDataCache; + } + + @Override + public void start(Promise startPromise) throws Exception { + var httpClient = vertx.createHttpClient(); + + createConsumers(httpClient) + .onSuccess(v -> log.info("start:: verticle started")) + .onFailure(t -> log.error("start:: verticle start failed", t)) + .onComplete(startPromise); + } + + private Future createConsumers(HttpClient httpClient) { + final var config = getKafkaConfig(); + + return createEventConsumer(SERVICE_POINT_CREATED, config, + new ServicePointSynchronizationCreateHandler(consortiumDataCache, httpClient, vertx)) + .compose(r -> createEventConsumer(SERVICE_POINT_UPDATED, config, + new ServicePointSynchronizationUpdateHandler(consortiumDataCache, httpClient, vertx))) + .compose(r -> createEventConsumer(SERVICE_POINT_DELETED, config, + new ServicePointSynchronizationDeleteHandler(consortiumDataCache, httpClient, vertx))) + .mapEmpty(); + } + + private Future> createEventConsumer( + ServicePointEventType eventType, KafkaConfig kafkaConfig, + AsyncRecordHandler handler) { + + var subscriptionDefinition = SubscriptionDefinition.builder() + .eventType(eventType.name()) + .subscriptionPattern(buildSubscriptionPattern(eventType.getKafkaTopic(), kafkaConfig)) + .build(); + + return createConsumer(kafkaConfig, subscriptionDefinition, handler); + } + + private Future> createConsumer(KafkaConfig kafkaConfig, + SubscriptionDefinition subscriptionDefinition, + AsyncRecordHandler recordHandler) { + + var consumer = KafkaConsumerWrapper.builder() + .context(context) + .vertx(vertx) + .kafkaConfig(kafkaConfig) + .loadLimit(DEFAULT_LOAD_LIMIT) + .globalLoadSensor(new GlobalLoadSensor()) + .subscriptionDefinition(subscriptionDefinition) + .build(); + + return consumer.start(recordHandler, MODULE_ID) + .onSuccess(v -> consumers.add(consumer)) + .map(consumer); + } + + private static String buildSubscriptionPattern(KafkaTopic kafkaTopic, KafkaConfig kafkaConfig) { + return kafkaTopic.fullTopicName(kafkaConfig, TENANT_PATTERN); + } + + private static String getModuleId() { + return getModuleName().replace("_", "-") + "-" + getModuleVersion(); + } + + private KafkaConfig getKafkaConfig() { + return KafkaConfig.builder() + .envId(KafkaEnvironmentProperties.environment()) + .kafkaHost(KafkaEnvironmentProperties.host()) + .kafkaPort(KafkaEnvironmentProperties.port()) + .build(); + } +} diff --git a/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationCreateHandler.java b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationCreateHandler.java new file mode 100644 index 000000000..f38b118fd --- /dev/null +++ b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationCreateHandler.java @@ -0,0 +1,26 @@ +package org.folio.services.consortium.handler; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.processor.ServicePointSynchronizationCreateEventProcessor; +import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor; +import org.folio.services.domainevent.DomainEvent; + +public class ServicePointSynchronizationCreateHandler extends ServicePointSynchronizationHandler { + + public ServicePointSynchronizationCreateHandler(ConsortiumDataCache consortiumDataCache, + HttpClient httpClient, Vertx vertx) { + + super(consortiumDataCache, httpClient, vertx); + } + + @Override + protected ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor( + DomainEvent domainEvent) { + + return new ServicePointSynchronizationCreateEventProcessor(domainEvent); + } + +} diff --git a/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationDeleteHandler.java b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationDeleteHandler.java new file mode 100644 index 000000000..6493526a6 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationDeleteHandler.java @@ -0,0 +1,26 @@ +package org.folio.services.consortium.handler; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.processor.ServicePointSynchronizationDeleteEventProcessor; +import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor; +import org.folio.services.domainevent.DomainEvent; + +public class ServicePointSynchronizationDeleteHandler extends ServicePointSynchronizationHandler { + + public ServicePointSynchronizationDeleteHandler(ConsortiumDataCache consortiumDataCache, + HttpClient httpClient, Vertx vertx) { + + super(consortiumDataCache, httpClient, vertx); + } + + @Override + protected ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor( + DomainEvent domainEvent) { + + return new ServicePointSynchronizationDeleteEventProcessor(domainEvent); + } + +} diff --git a/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationHandler.java b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationHandler.java new file mode 100644 index 000000000..00b5dc08e --- /dev/null +++ b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationHandler.java @@ -0,0 +1,73 @@ +package org.folio.services.consortium.handler; + +import static io.vertx.core.Future.succeededFuture; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import io.vertx.core.json.Json; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import java.util.Optional; +import org.apache.commons.collections4.map.CaseInsensitiveMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.kafka.AsyncRecordHandler; +import org.folio.kafka.KafkaHeaderUtils; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.services.caches.ConsortiumData; +import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.SynchronizationContext; +import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor; +import org.folio.services.domainevent.DomainEvent; + +public abstract class ServicePointSynchronizationHandler + implements AsyncRecordHandler { + + private static final Logger log = LogManager.getLogger( + ServicePointSynchronizationHandler.class); + + private final ConsortiumDataCache consortiumDataCache; + private final HttpClient httpClient; + private final Vertx vertx; + + protected ServicePointSynchronizationHandler(ConsortiumDataCache consortiumDataCache, + HttpClient httpClient, Vertx vertx) { + + this.consortiumDataCache = consortiumDataCache; + this.httpClient = httpClient; + this.vertx = vertx; + } + + @Override + public Future handle(KafkaConsumerRecord kafkaConsumerRecord) { + log.info("handle:: Processing event {}", kafkaConsumerRecord::topic); + var headers = new CaseInsensitiveMap<>(KafkaHeaderUtils.kafkaHeadersToMap( + kafkaConsumerRecord.headers())); + return consortiumDataCache.getConsortiumData(headers) + .compose(consortiumData -> processConsortiumData(kafkaConsumerRecord, consortiumData, + headers)); + } + + private Future processConsortiumData( + KafkaConsumerRecord kafkaConsumerRecord, + Optional consortiumData, CaseInsensitiveMap headers) { + + log.info("processConsortiumData:: {}", consortiumData); + return consortiumData.map(data -> processConsortiumDataByEvent(data, kafkaConsumerRecord, + headers)).orElseGet(() -> succeededFuture(kafkaConsumerRecord.key())); + } + + private Future processConsortiumDataByEvent(ConsortiumData data, + KafkaConsumerRecord kafkaConsumerRecord, + CaseInsensitiveMap headers) { + + var event = Json.decodeValue(kafkaConsumerRecord.value(), DomainEvent.class); + var servicePointSynchronizationProcessor = getServicePointSynchronizationProcessor(event); + return servicePointSynchronizationProcessor.process(kafkaConsumerRecord.key(), + new SynchronizationContext(data, headers, vertx, httpClient)); + } + + protected abstract ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor( + DomainEvent domainEvent); + +} diff --git a/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationUpdateHandler.java b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationUpdateHandler.java new file mode 100644 index 000000000..ce24dd4c4 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/handler/ServicePointSynchronizationUpdateHandler.java @@ -0,0 +1,26 @@ +package org.folio.services.consortium.handler; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.services.caches.ConsortiumDataCache; +import org.folio.services.consortium.processor.ServicePointSynchronizationEventProcessor; +import org.folio.services.consortium.processor.ServicePointSynchronizationUpdateEventProcessor; +import org.folio.services.domainevent.DomainEvent; + +public class ServicePointSynchronizationUpdateHandler extends ServicePointSynchronizationHandler { + + public ServicePointSynchronizationUpdateHandler(ConsortiumDataCache consortiumDataCache, + HttpClient httpClient, Vertx vertx) { + + super(consortiumDataCache, httpClient, vertx); + } + + @Override + protected ServicePointSynchronizationEventProcessor getServicePointSynchronizationProcessor( + DomainEvent domainEvent) { + + return new ServicePointSynchronizationUpdateEventProcessor(domainEvent); + } + +} diff --git a/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationCreateEventProcessor.java b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationCreateEventProcessor.java new file mode 100644 index 000000000..679756890 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationCreateEventProcessor.java @@ -0,0 +1,59 @@ +package org.folio.services.consortium.processor; + +import static io.vertx.core.Future.failedFuture; + +import io.vertx.core.Future; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.rest.impl.ServicePointApi; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.rest.persist.PostgresClient; +import org.folio.services.domainevent.DomainEvent; +import org.folio.services.domainevent.ServicePointEventType; +import org.folio.services.servicepoint.ServicePointService; + +public class ServicePointSynchronizationCreateEventProcessor + extends ServicePointSynchronizationEventProcessor { + + private static final Logger log = LogManager.getLogger( + ServicePointSynchronizationCreateEventProcessor.class); + + public ServicePointSynchronizationCreateEventProcessor(DomainEvent domainEvent) { + super(ServicePointEventType.SERVICE_POINT_CREATED, domainEvent); + } + + @Override + protected Future processEvent(ServicePointService servicePointService, String servicePointId) { + try { + Servicepoint servicePoint = PostgresClient.pojo2JsonObject(domainEvent.getNewEntity()) + .mapTo(Servicepoint.class); + + return servicePointService.createServicePoint(servicePointId, servicePoint) + .map(servicePointId); + } catch (Exception e) { + log.error("processEvent:: failed due to {}", e.getMessage(), e); + return failedFuture(e); + } + } + + @Override + protected boolean validateEventEntity() { + try { + Servicepoint servicePoint = PostgresClient.pojo2JsonObject(domainEvent.getNewEntity()) + .mapTo(Servicepoint.class); + if (servicePoint == null) { + log.warn("validateEventEntity:: failed to find new service point entity"); + return false; + } + String validationMessage = ServicePointApi.validateServicePoint(servicePoint); + if (validationMessage != null) { + log.warn("validateEventEntity:: {}", validationMessage); + return false; + } + return true; + } catch (Exception e) { + log.error("validateEventEntity:: failed due to {}", e.getMessage(), e); + } + return false; + } +} diff --git a/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationDeleteEventProcessor.java b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationDeleteEventProcessor.java new file mode 100644 index 000000000..dbebbe36f --- /dev/null +++ b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationDeleteEventProcessor.java @@ -0,0 +1,42 @@ +package org.folio.services.consortium.processor; + +import io.vertx.core.Future; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.rest.persist.PostgresClient; +import org.folio.services.domainevent.DomainEvent; +import org.folio.services.domainevent.ServicePointEventType; +import org.folio.services.servicepoint.ServicePointService; + +public class ServicePointSynchronizationDeleteEventProcessor + extends ServicePointSynchronizationEventProcessor { + + private static final Logger log = LogManager.getLogger( + ServicePointSynchronizationDeleteEventProcessor.class); + + public ServicePointSynchronizationDeleteEventProcessor(DomainEvent domainEvent) { + super(ServicePointEventType.SERVICE_POINT_DELETED, domainEvent); + } + + @Override + protected Future processEvent(ServicePointService servicePointService, String servicePointId) { + return servicePointService.deleteServicePoint(servicePointId).map(servicePointId); + } + + @Override + protected boolean validateEventEntity() { + try { + var servicePoint = PostgresClient.pojo2JsonObject(domainEvent.getOldEntity()) + .mapTo(Servicepoint.class); + if (servicePoint == null) { + log.warn("validateEventEntity:: service point is null"); + return false; + } + return true; + } catch (Exception e) { + log.error("validateEventEntity:: failed to {}", e.getMessage(), e); + } + return false; + } +} diff --git a/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationEventProcessor.java b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationEventProcessor.java new file mode 100644 index 000000000..2dff9e92f --- /dev/null +++ b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationEventProcessor.java @@ -0,0 +1,80 @@ +package org.folio.services.consortium.processor; + +import static io.vertx.core.Future.succeededFuture; +import static java.lang.Boolean.FALSE; +import static org.folio.okapi.common.XOkapiHeaders.TENANT; +import static org.folio.utils.ConsortiumUtils.isCentralTenant; + +import io.vertx.core.Future; +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.services.consortium.SynchronizationContext; +import org.folio.services.domainevent.DomainEvent; +import org.folio.services.domainevent.ServicePointEventType; +import org.folio.services.servicepoint.ServicePointService; +import org.folio.utils.Environment; + +public abstract class ServicePointSynchronizationEventProcessor { + + private static final Logger log = LogManager.getLogger( + ServicePointSynchronizationEventProcessor.class); + private static final String ECS_TLR_FEATURE_ENABLED = "ECS_TLR_FEATURE_ENABLED"; + protected final DomainEvent domainEvent; + private final ServicePointEventType servicePointEventType; + + protected ServicePointSynchronizationEventProcessor(ServicePointEventType eventType, + DomainEvent domainEvent) { + this.servicePointEventType = eventType; + this.domainEvent = domainEvent; + } + + public Future process(String eventKey, SynchronizationContext context) { + var future = succeededFuture(eventKey); + if (!isCentralTenant(domainEvent.getTenant(), context.consortiaData()) + || !isEcsTlrFeatureEnabled() + || servicePointEventType.getPayloadType() != domainEvent.getType()) { + + log.info("process:: tenant: {}, event type: {}, ECS_TLR_FEATURE_ENABLED: {}", + domainEvent.getTenant(), domainEvent.getType(), isEcsTlrFeatureEnabled()); + return future; + } + if (!validateEventEntity()) { + log.warn("process:: validation event entity failed"); + return future; + } + var vertxContext = context.vertx().getOrCreateContext(); + var headers = context.headers(); + for (String memberTenant : context.consortiaData().memberTenants()) { + log.info("process:: tenant {} servicePointId {}", memberTenant, eventKey); + future = future.eventually(() -> prepareHeaders(headers, memberTenant) + .compose(lendingTenantHeader -> { + var servicePointService = new ServicePointService(vertxContext, lendingTenantHeader); + return processEvent(servicePointService, eventKey); + }) + .onFailure(e -> + log.warn("process:: tenant {} servicePointId {} failed", memberTenant, eventKey, e))); + } + return future; + } + + protected abstract Future processEvent(ServicePointService servicePointService, + String servicePointId); + + protected abstract boolean validateEventEntity(); + + private boolean isEcsTlrFeatureEnabled() { + return Boolean.parseBoolean(Environment.getEnvVar(ECS_TLR_FEATURE_ENABLED, FALSE.toString())); + } + + private Future> prepareHeaders(Map headers, + String memberTenant) { + + var map = new HashMap<>(headers); + map.put(TENANT, memberTenant); + return succeededFuture(map); + } + +} diff --git a/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationUpdateEventProcessor.java b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationUpdateEventProcessor.java new file mode 100644 index 000000000..920b96301 --- /dev/null +++ b/src/main/java/org/folio/services/consortium/processor/ServicePointSynchronizationUpdateEventProcessor.java @@ -0,0 +1,66 @@ +package org.folio.services.consortium.processor; + +import static io.vertx.core.Future.failedFuture; + +import io.vertx.core.Future; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.rest.impl.ServicePointApi; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.rest.persist.PostgresClient; +import org.folio.services.domainevent.DomainEvent; +import org.folio.services.domainevent.ServicePointEventType; +import org.folio.services.servicepoint.ServicePointService; + +public class ServicePointSynchronizationUpdateEventProcessor + extends ServicePointSynchronizationEventProcessor { + + private static final Logger log = LogManager.getLogger( + ServicePointSynchronizationUpdateEventProcessor.class); + + public ServicePointSynchronizationUpdateEventProcessor(DomainEvent domainEvent) { + super(ServicePointEventType.SERVICE_POINT_UPDATED, domainEvent); + } + + @Override + protected Future processEvent(ServicePointService servicePointService, String servicePointId) { + try { + Servicepoint servicepoint = PostgresClient.pojo2JsonObject(domainEvent.getNewEntity()) + .mapTo(Servicepoint.class); + return servicePointService.updateServicePoint(servicePointId, servicepoint) + .map(servicePointId); + } catch (Exception e) { + log.warn("processEvent:: failed due to {}", e.getMessage(), e); + return failedFuture(e); + } + } + + @Override + protected boolean validateEventEntity() { + try { + var oldServicePoint = PostgresClient.pojo2JsonObject(domainEvent.getOldEntity()) + .mapTo(Servicepoint.class); + Servicepoint newServicePoint = PostgresClient.pojo2JsonObject(domainEvent.getNewEntity()) + .mapTo(Servicepoint.class); + + if (oldServicePoint == null || newServicePoint == null) { + log.warn("validateEventEntity:: failed due to oldServicePoint {} newServicePoint {}", + oldServicePoint, newServicePoint); + return false; + } + if (newServicePoint.equals(oldServicePoint)) { + log.warn("validateEventEntity:: old/new service points are identical"); + return false; + } + String validationMessage = ServicePointApi.validateServicePoint(newServicePoint); + if (validationMessage != null) { + log.warn("validateEventEntity:: {}", validationMessage); + return false; + } + return true; + } catch (Exception e) { + log.error("validateEventEntity:: failed to {}", e.getMessage(), e); + } + return false; + } +} diff --git a/src/main/java/org/folio/services/domainevent/ServicePointDomainEventPublisher.java b/src/main/java/org/folio/services/domainevent/ServicePointDomainEventPublisher.java index 88201e6bc..3b0a2f46f 100644 --- a/src/main/java/org/folio/services/domainevent/ServicePointDomainEventPublisher.java +++ b/src/main/java/org/folio/services/domainevent/ServicePointDomainEventPublisher.java @@ -21,4 +21,8 @@ public Future publishUpdated(Servicepoint servicePoint, Servicepoint updat public Future publishDeleted(Servicepoint servicePoint) { return publishRecordRemoved(servicePoint.getId(), servicePoint); } + + public Future publishCreated(Servicepoint servicePoint) { + return publishRecordCreated(servicePoint.getId(), servicePoint); + } } diff --git a/src/main/java/org/folio/services/domainevent/ServicePointEventType.java b/src/main/java/org/folio/services/domainevent/ServicePointEventType.java new file mode 100644 index 000000000..9a97122e9 --- /dev/null +++ b/src/main/java/org/folio/services/domainevent/ServicePointEventType.java @@ -0,0 +1,31 @@ +package org.folio.services.domainevent; + +import static org.folio.InventoryKafkaTopic.SERVICE_POINT; +import static org.folio.services.domainevent.DomainEventType.CREATE; +import static org.folio.services.domainevent.DomainEventType.DELETE; +import static org.folio.services.domainevent.DomainEventType.UPDATE; + +import org.folio.kafka.services.KafkaTopic; + +public enum ServicePointEventType { + + SERVICE_POINT_CREATED(SERVICE_POINT, CREATE), + SERVICE_POINT_UPDATED(SERVICE_POINT, UPDATE), + SERVICE_POINT_DELETED(SERVICE_POINT, DELETE); + + private final KafkaTopic kafkaTopic; + private final DomainEventType payloadType; + + ServicePointEventType(KafkaTopic kafkaTopic, DomainEventType payloadType) { + this.kafkaTopic = kafkaTopic; + this.payloadType = payloadType; + } + + public KafkaTopic getKafkaTopic() { + return kafkaTopic; + } + + public DomainEventType getPayloadType() { + return payloadType; + } +} diff --git a/src/main/java/org/folio/services/servicepoint/ServicePointService.java b/src/main/java/org/folio/services/servicepoint/ServicePointService.java index d6b5a1c80..4c7cc6808 100644 --- a/src/main/java/org/folio/services/servicepoint/ServicePointService.java +++ b/src/main/java/org/folio/services/servicepoint/ServicePointService.java @@ -1,6 +1,9 @@ package org.folio.services.servicepoint; import static io.vertx.core.Future.succeededFuture; +import static org.folio.rest.impl.ServicePointApi.LOCATION_PREFIX; +import static org.folio.rest.jaxrs.resource.ServicePoints.PostServicePointsResponse.headersFor201; +import static org.folio.rest.jaxrs.resource.ServicePoints.PostServicePointsResponse.respond201WithApplicationJson; import io.vertx.core.Context; import io.vertx.core.Future; @@ -45,6 +48,15 @@ public Future updateServicePoint(String servicePointId, Servicepoint e .map(x -> ItemStorage.PutItemStorageItemsByItemIdResponse.respond204()); } + public Future createServicePoint(String servicePointId, Servicepoint servicePoint) { + servicePoint.setId(servicePointId); + return servicePointRepository.save(servicePointId, servicePoint) + .compose(notUsed -> + servicePointDomainEventPublisher.publishCreated(servicePoint) + .map(resp -> respond201WithApplicationJson(servicePoint, headersFor201() + .withLocation(LOCATION_PREFIX + servicePointId)))); + } + public Future deleteServicePoint(String servicePointId) { log.debug("deleteServicePoint:: parameters servicePointId: {}", servicePointId); diff --git a/src/main/java/org/folio/utils/Environment.java b/src/main/java/org/folio/utils/Environment.java new file mode 100644 index 000000000..ecd34e7a9 --- /dev/null +++ b/src/main/java/org/folio/utils/Environment.java @@ -0,0 +1,8 @@ +package org.folio.utils; + +public record Environment() { + + public static String getEnvVar(String key, String defaultVal) { + return System.getenv().getOrDefault(key, defaultVal); + } +} diff --git a/src/test/java/org/folio/rest/api/ServicePointSynchronizationVerticleTest.java b/src/test/java/org/folio/rest/api/ServicePointSynchronizationVerticleTest.java new file mode 100644 index 000000000..1d037460d --- /dev/null +++ b/src/test/java/org/folio/rest/api/ServicePointSynchronizationVerticleTest.java @@ -0,0 +1,341 @@ +package org.folio.rest.api; + +import static com.github.tomakehurst.wiremock.client.WireMock.equalToIgnoreCase; +import static java.lang.String.format; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.awaitility.Awaitility.waitAtMost; +import static org.folio.kafka.services.KafkaEnvironmentProperties.environment; +import static org.folio.kafka.services.KafkaEnvironmentProperties.host; +import static org.folio.kafka.services.KafkaEnvironmentProperties.port; +import static org.folio.rest.api.ServicePointTest.createHoldShelfExpiryPeriod; +import static org.folio.rest.support.http.InterfaceUrls.servicePointsUrl; +import static org.folio.rest.tools.utils.ModuleName.getModuleName; +import static org.folio.rest.tools.utils.ModuleName.getModuleVersion; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_CREATED; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_DELETED; +import static org.folio.services.domainevent.ServicePointEventType.SERVICE_POINT_UPDATED; +import static org.folio.utility.LocationUtility.createServicePoint; +import static org.folio.utility.ModuleUtility.getClient; +import static org.folio.utility.ModuleUtility.getVertx; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.github.tomakehurst.wiremock.client.WireMock; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpResponseHead; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.kafka.admin.KafkaAdminClient; +import io.vertx.kafka.client.common.TopicPartition; +import io.vertx.kafka.client.consumer.OffsetAndMetadata; +import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.SneakyThrows; +import org.folio.okapi.common.XOkapiHeaders; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.folio.utility.ModuleUtility; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import uk.org.webcompere.systemstubs.rules.EnvironmentVariablesRule; + +@RunWith(VertxUnitRunner.class) +public class ServicePointSynchronizationVerticleTest extends TestBaseWithInventoryUtil { + + private static final String CENTRAL_TENANT_ID = "consortium"; + private static final String COLLEGE_TENANT_ID = "college"; + private static final String SERVICE_POINT_TOPIC = format( + "%s.%s.inventory.service-point", environment(), CENTRAL_TENANT_ID); + private static final String KAFKA_SERVER_URL = format("%s:%s", host(), port()); + private static final String SERVICE_POINT_ID = UUID.randomUUID().toString(); + private static final String CONSORTIUM_ID = UUID.randomUUID().toString(); + private static final String CONSORTIUM_TENANTS_PATH = "/consortia/%s/tenants".formatted( + CONSORTIUM_ID); + private static final String ECS_TLR_FEATURE_ENABLED = "ECS_TLR_FEATURE_ENABLED"; + private static KafkaProducer producer; + private static KafkaAdminClient adminClient; + @Rule + public EnvironmentVariablesRule environmentVariablesRule = + new EnvironmentVariablesRule(ECS_TLR_FEATURE_ENABLED, "true"); + + @BeforeClass + public static void setUpClass() throws Exception { + ModuleUtility.prepareTenant(CENTRAL_TENANT_ID, false); + ModuleUtility.prepareTenant(COLLEGE_TENANT_ID, false); + + producer = createProducer(); + adminClient = createAdminClient(); + } + + @Before + public void setUp() { + clearData(CENTRAL_TENANT_ID); + clearData(COLLEGE_TENANT_ID); + mockUserTenantsForConsortiumMember(); + mockConsortiumTenants(); + mockUserTenantsForNonConsortiumMember(); + assertTrue(Boolean.parseBoolean(System.getenv().getOrDefault(ECS_TLR_FEATURE_ENABLED, + "false"))); + } + + @AfterClass + public static void tearDownClass() throws ExecutionException, InterruptedException, + TimeoutException { + + ModuleUtility.removeTenant(CENTRAL_TENANT_ID); + ModuleUtility.removeTenant(COLLEGE_TENANT_ID); + waitFor(producer.close().compose(v -> adminClient.close()) + ); + } + + @Test + public void shouldPropagateCreationOfServicePointOnLendingTenant(TestContext context) { + var servicePointFromCentralTenant = createServicePointAgainstTenant(CENTRAL_TENANT_ID, false); + + int initialOffset = getOffsetForServicePointCreateEvents(); + publishServicePointCreateEvent(servicePointFromCentralTenant); + waitUntilValueIsIncreased(initialOffset, + ServicePointSynchronizationVerticleTest::getOffsetForServicePointCreateEvents); + getServicePointById(COLLEGE_TENANT_ID) + .onComplete(context.asyncAssertSuccess(collegeServicePoint -> + context.assertEquals(servicePointFromCentralTenant.getId(), collegeServicePoint.getId()))); + } + + @Test + public void shouldPropagateUpdateOfServicePointOnLendingTenant(TestContext context) { + var servicePointFromCentralTenant = createServicePointAgainstTenant(CENTRAL_TENANT_ID, + true); + var servicePointFromDataTenant = createServicePointAgainstTenant(COLLEGE_TENANT_ID, + false); + + int initialOffset = getOffsetForServicePointUpdateEvents(); + publishServicePointUpdateEvent(servicePointFromDataTenant, servicePointFromCentralTenant); + waitUntilValueIsIncreased(initialOffset, + ServicePointSynchronizationVerticleTest::getOffsetForServicePointUpdateEvents); + getServicePointById(COLLEGE_TENANT_ID) + .onComplete(context.asyncAssertSuccess(collegeServicePoint -> + context.assertEquals(servicePointFromCentralTenant.getDiscoveryDisplayName(), + collegeServicePoint.getDiscoveryDisplayName()))); + } + + @Test + public void shouldPropagateDeleteOfServicePointOnLendingTenant(TestContext context) { + var servicePointFromCentralTenant = createServicePointAgainstTenant(CENTRAL_TENANT_ID, false); + var servicePointFromDataTenant = createServicePointAgainstTenant(COLLEGE_TENANT_ID, false); + + getServicePointById(COLLEGE_TENANT_ID) + .onComplete(context.asyncAssertSuccess(servicePoint -> + context.assertEquals(servicePointFromCentralTenant.getId(), + servicePointFromDataTenant.getId()))); + + int initialOffset = getOffsetForServicePointDeleteEvents(); + publishServicePointDeleteEvent(servicePointFromDataTenant); + waitUntilValueIsIncreased(initialOffset, + ServicePointSynchronizationVerticleTest::getOffsetForServicePointDeleteEvents); + getStatusCodeOfServicePointById(COLLEGE_TENANT_ID) + .onComplete(context.asyncAssertSuccess(statusCode -> + context.assertEquals(HTTP_NOT_FOUND, statusCode))); + } + + @Test + public void shouldHandleUpdateEventForNonExistingServicePoint(TestContext context) { + Servicepoint nonExistingServicePoint = new Servicepoint().withId(UUID.randomUUID().toString()); + publishServicePointUpdateEvent(nonExistingServicePoint, nonExistingServicePoint); + + getStatusCodeOfServicePointById(COLLEGE_TENANT_ID) + .onComplete(context.asyncAssertSuccess(statusCode -> + context.assertEquals(HTTP_NOT_FOUND, statusCode))); + } + + @Test + public void shouldHandleDeleteEventForNonExistingServicePoint(TestContext context) { + Servicepoint nonExistingServicePoint = new Servicepoint().withId(UUID.randomUUID().toString()); + publishServicePointDeleteEvent(nonExistingServicePoint); + + getStatusCodeOfServicePointById(COLLEGE_TENANT_ID) + .onComplete(context.asyncAssertSuccess(statusCode -> + context.assertEquals(HTTP_NOT_FOUND, statusCode))); + } + + @SneakyThrows + public static T waitFor(Future future, int timeoutSeconds) { + return future.toCompletionStage() + .toCompletableFuture() + .get(timeoutSeconds, TimeUnit.SECONDS); + } + + public static T waitFor(Future future) { + return waitFor(future, 10); + } + + private Future getServicePointById(String tenantId) { + Promise> promise = Promise.promise(); + getClient().get(servicePointsUrl("/" + SERVICE_POINT_ID), tenantId, promise::complete); + return promise.future().map(resp -> { + MatcherAssert.assertThat(resp.statusCode(), CoreMatchers.is(HTTP_OK)); + return resp.bodyAsJson(Servicepoint.class); + }); + } + + private Future getStatusCodeOfServicePointById(String tenantId) { + Promise> promise = Promise.promise(); + getClient().get(servicePointsUrl("/" + SERVICE_POINT_ID), tenantId, promise::complete); + return promise.future().map(HttpResponseHead::statusCode); + } + + @SneakyThrows(Exception.class) + private static Servicepoint createServicePointAgainstTenant(String tenantId, boolean updated) { + String discoveryDisplayName = "Circulation Desk -- Basement" + (updated ? "(updated)" : ""); + return createServicePoint(UUID.fromString(SERVICE_POINT_ID), "Circ Desk 2522", "cd2522", + discoveryDisplayName, null, 20, + true, createHoldShelfExpiryPeriod(), tenantId) + .getJson().mapTo(Servicepoint.class); + } + + private static int waitUntilValueIsIncreased(int previousValue, Callable valueSupplier) { + return waitAtMost(60, SECONDS) + .until(valueSupplier, newValue -> newValue > previousValue); + } + + private static JsonObject buildCreateEvent(Servicepoint newVersion) { + return new JsonObject() + .put("tenant", CENTRAL_TENANT_ID) + .put("type", "CREATE") + .put("new", newVersion); + } + + private static JsonObject buildUpdateEvent(Servicepoint oldVersion, Servicepoint newVersion) { + return new JsonObject() + .put("tenant", CENTRAL_TENANT_ID) + .put("type", "UPDATE") + .put("old", oldVersion) + .put("new", newVersion); + } + + private static JsonObject buildDeleteEvent(Servicepoint object) { + return new JsonObject() + .put("tenant", CENTRAL_TENANT_ID) + .put("type", "DELETE") + .put("old", object); + } + + private void publishServicePointCreateEvent( + Servicepoint newServicePoint) { + + publishEvent(buildCreateEvent(newServicePoint)); + } + + private void publishServicePointUpdateEvent(Servicepoint oldServicePoint, + Servicepoint newServicePoint) { + + publishEvent(buildUpdateEvent(oldServicePoint, newServicePoint)); + } + + private void publishServicePointDeleteEvent(Servicepoint servicePoint) { + publishEvent(buildDeleteEvent(servicePoint)); + } + + private static Integer getOffsetForServicePointCreateEvents() { + return getOffset(buildConsumerGroupId(SERVICE_POINT_CREATED.name())); + } + + private static Integer getOffsetForServicePointUpdateEvents() { + return getOffset(buildConsumerGroupId(SERVICE_POINT_UPDATED.name())); + } + + private static Integer getOffsetForServicePointDeleteEvents() { + return getOffset(buildConsumerGroupId(SERVICE_POINT_DELETED.name())); + } + + private void publishEvent(JsonObject eventPayload) { + var kafkaRecord = KafkaProducerRecord.create(SERVICE_POINT_TOPIC, SERVICE_POINT_ID, + eventPayload); + kafkaRecord.addHeader("X-Okapi-Tenant".toLowerCase(Locale.ROOT), CENTRAL_TENANT_ID); + kafkaRecord.addHeader("X-Okapi-Token".toLowerCase(Locale.ROOT), + "test-token".toLowerCase(Locale.ROOT)); + kafkaRecord.addHeader("X-Okapi-Url", mockServer.baseUrl().toLowerCase(Locale.ROOT)); + waitFor(producer.write(kafkaRecord)); + } + + private static KafkaProducer createProducer() { + Properties config = new Properties(); + config.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL); + config.put(ACKS_CONFIG, "1"); + + return KafkaProducer.create(getVertx(), config, String.class, JsonObject.class); + } + + private static KafkaAdminClient createAdminClient() { + Map config = Map.of(BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL); + return KafkaAdminClient.create(getVertx(), config); + } + + private static String buildConsumerGroupId(String eventType) { + return format("%s.%s-%s", eventType, getModuleName().replace("_", "-"), getModuleVersion()); + } + + private static int getOffset(String consumerGroupId) { + return waitFor( + adminClient.listConsumerGroupOffsets(consumerGroupId) + .map(partitions -> Optional.ofNullable(partitions.get(new TopicPartition(SERVICE_POINT_TOPIC, 0))) + .map(OffsetAndMetadata::getOffset) + .map(Long::intValue) + .orElse(0)) // if topic does not exist yet + ); + } + + private void mockUserTenantsForConsortiumMember() { + JsonObject userTenantsCollection = new JsonObject() + .put("userTenants", new JsonArray() + .add(new JsonObject() + .put("centralTenantId", CENTRAL_TENANT_ID) + .put("consortiumId", CONSORTIUM_ID))); + WireMock.stubFor(WireMock.get(USER_TENANTS_PATH) + .withHeader("X-Okapi-Tenant", equalToIgnoreCase(CENTRAL_TENANT_ID)) + .willReturn(WireMock.ok().withBody(userTenantsCollection.encodePrettily()))); + } + + public static void mockConsortiumTenants() { + JsonObject tenantsCollection = new JsonObject() + .put("tenants", new JsonArray() + .add(new JsonObject() + .put("id", CENTRAL_TENANT_ID) + .put("isCentral", true)) + .add(new JsonObject() + .put("id", COLLEGE_TENANT_ID) + .put("isCentral", false))); + WireMock.stubFor(WireMock.get(CONSORTIUM_TENANTS_PATH) + .willReturn(WireMock.ok().withBody(tenantsCollection.encodePrettily()))); + } + + public static void mockUserTenantsForNonConsortiumMember() { + JsonObject emptyUserTenantsCollection = new JsonObject() + .put("userTenants", JsonArray.of()); + WireMock.stubFor(WireMock.get(USER_TENANTS_PATH) + .withHeader(XOkapiHeaders.TENANT, equalToIgnoreCase(COLLEGE_TENANT_ID)) + .willReturn(WireMock.ok().withBody(emptyUserTenantsCollection.encodePrettily()))); + } + +} diff --git a/src/test/java/org/folio/rest/api/ServicePointTest.java b/src/test/java/org/folio/rest/api/ServicePointTest.java index 65cfe7409..d295cffc0 100644 --- a/src/test/java/org/folio/rest/api/ServicePointTest.java +++ b/src/test/java/org/folio/rest/api/ServicePointTest.java @@ -1,14 +1,19 @@ package org.folio.rest.api; +import static java.util.Collections.emptyList; import static org.folio.rest.impl.ServicePointApi.SERVICE_POINT_CREATE_ERR_MSG_WITHOUT_BEING_PICKUP_LOC; import static org.folio.rest.impl.ServicePointApi.SERVICE_POINT_CREATE_ERR_MSG_WITHOUT_HOLD_EXPIRY; import static org.folio.rest.support.http.InterfaceUrls.servicePointsUrl; import static org.folio.rest.support.http.InterfaceUrls.servicePointsUsersUrl; import static org.folio.utility.LocationUtility.createServicePoint; +import static org.folio.utility.RestUtility.TENANT_ID; import static org.folio.utility.RestUtility.send; +import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; import io.vertx.core.http.HttpMethod; import io.vertx.core.json.Json; @@ -23,6 +28,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; import lombok.SneakyThrows; import org.folio.rest.jaxrs.model.HoldShelfExpiryPeriod; import org.folio.rest.jaxrs.model.Servicepoint; @@ -33,7 +40,9 @@ import org.folio.rest.support.messages.ServicePointEventMessageChecks; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(JUnitParamsRunner.class) public class ServicePointTest extends TestBase { private static final String SUPPORTED_CONTENT_TYPE_JSON_DEF = "application/json"; private final ServicePointEventMessageChecks servicePointEventMessageChecks = @@ -74,6 +83,7 @@ public void canCreateServicePoint() assertThat(response.getJson().getString("id"), notNullValue()); assertThat(response.getJson().getString("code"), is("cd1")); assertThat(response.getJson().getString("name"), is("Circ Desk 1")); + assertThat(response.getJson().getBoolean("ecsRequestRouting"), is(false)); } @Test @@ -710,6 +720,44 @@ public void cannotCreateServicePointWithStaffSlipsMissingFields() assertThat(response.getStatusCode(), is(422)); } + @Test + @Parameters({ + "false, ", // no query parameters + "false, ?query=cql.allRecords=1%20sortby%20name&limit=1000", + "false, ?includeRoutingServicePoints=false", + "true, ?includeRoutingServicePoints=true", + "false, ?includeRoutingServicePoints=false&query=cql.allRecords=1%20sortby%20name&limit=1000", + "true, ?includeRoutingServicePoints=true&query=cql.allRecords=1%20sortby%20name&limit=1000" + }) + public void ecsRequestRoutingServicePointsAreReturnedOnlyWhenExplicitlyRequested( + boolean shouldReturnRoutingServicePoints, String queryParameters) throws Exception { + + UUID regularServicePointId1 = UUID.randomUUID(); + UUID regularServicePointId2 = UUID.randomUUID(); + UUID routingServicePointId = UUID.randomUUID(); + + createServicePoint(regularServicePointId1, "Circ Desk 1", "cd1", "Circulation Desk 1", + null, 20, true, createHoldShelfExpiryPeriod(), emptyList(), null, TENANT_ID); + createServicePoint(regularServicePointId2, "Circ Desk 2", "cd2", "Circulation Desk 2", + null, 20, true, createHoldShelfExpiryPeriod(), emptyList(), false, TENANT_ID); + createServicePoint(routingServicePointId, "Circ Desk 3", "cd3", "Circulation Desk 3", + null, 20, true, createHoldShelfExpiryPeriod(), emptyList(), true, TENANT_ID); + + List servicePointIds = get(queryParameters) + .stream() + .map(json -> json.getString("id")) + .toList(); + + assertThat(servicePointIds, + hasItems(regularServicePointId1.toString(), regularServicePointId2.toString())); + if (shouldReturnRoutingServicePoints) { + assertThat(servicePointIds, hasItem(routingServicePointId.toString())); + assertThat(servicePointIds, hasSize(3)); + } else { + assertThat(servicePointIds, hasSize(2)); + } + } + @Test public void canUpdateServicePointWithStaffSlips() throws InterruptedException, ExecutionException, TimeoutException, MalformedURLException { @@ -765,9 +813,15 @@ public void canFilterByPickupLocation() throws Exception { private List getMany(String cql, Object... args) throws InterruptedException, ExecutionException, TimeoutException { + return get("?query=" + String.format(cql, args)); + } + + private List get(String queryParams) throws InterruptedException, + ExecutionException, TimeoutException { + final CompletableFuture getCompleted = new CompletableFuture<>(); - send(servicePointsUrl("?query=" + String.format(cql, args)), + send(servicePointsUrl(queryParams), HttpMethod.GET, null, SUPPORTED_CONTENT_TYPE_JSON_DEF, ResponseHandler.json(getCompleted)); @@ -780,8 +834,7 @@ private List getMany(String cql, Object... args) throws InterruptedE private Response getById(UUID id) throws InterruptedException, ExecutionException, - TimeoutException, - MalformedURLException { + TimeoutException { CompletableFuture getCompleted = new CompletableFuture<>(); diff --git a/src/test/java/org/folio/services/consortium/processor/ServicePointSynchronizationEventProcessorTest.java b/src/test/java/org/folio/services/consortium/processor/ServicePointSynchronizationEventProcessorTest.java new file mode 100644 index 000000000..009c2fad4 --- /dev/null +++ b/src/test/java/org/folio/services/consortium/processor/ServicePointSynchronizationEventProcessorTest.java @@ -0,0 +1,124 @@ +package org.folio.services.consortium.processor; + +import static org.folio.services.domainevent.DomainEvent.createEvent; +import static org.folio.services.domainevent.DomainEvent.deleteEvent; +import static org.folio.services.domainevent.DomainEvent.updateEvent; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import java.util.UUID; +import java.util.stream.Stream; +import org.folio.rest.jaxrs.model.HoldShelfExpiryPeriod; +import org.folio.rest.jaxrs.model.Servicepoint; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +@ExtendWith(VertxExtension.class) +class ServicePointSynchronizationEventProcessorTest { + private static final String TENANT = "tenant"; + + @Test + void shouldFailToUpdateEventDueToProcessEventException(VertxTestContext testContext) { + var updateEventProcessor = new ServicePointSynchronizationUpdateEventProcessor(updateEvent( + new Servicepoint(), new Servicepoint(), TENANT)); + processEventToThrowException(updateEventProcessor, testContext); + } + + @Test + void shouldFailToCreateEventDueToProcessEventException(VertxTestContext testContext) { + var createEventProcessor = new ServicePointSynchronizationCreateEventProcessor(createEvent( + new Servicepoint(), TENANT)); + processEventToThrowException(createEventProcessor, testContext); + } + + @ParameterizedTest + @MethodSource("servicePointProvider") + void shouldReturnFalseIfServicePointsAreNull(Servicepoint oldServicepoint, Servicepoint newServicepoint) { + String tenant = "tenant"; + var updateEventProcessor = new ServicePointSynchronizationUpdateEventProcessor( + updateEvent(oldServicepoint, newServicepoint, tenant)); + + assertFalse(updateEventProcessor.validateEventEntity()); + } + + static Stream servicePointProvider() { + return Stream.of( + Arguments.of(null, null), + Arguments.of(null, new Servicepoint()), + Arguments.of(new Servicepoint(), null)); + } + + @Test + void shouldReturnFalseIfServicePointIsNull() { + var createEventProcessor = new ServicePointSynchronizationCreateEventProcessor( + createEvent(null, TENANT)); + var deleteEventProcessor = new ServicePointSynchronizationDeleteEventProcessor( + deleteEvent(null, TENANT)); + + assertFalse(createEventProcessor.validateEventEntity()); + assertFalse(deleteEventProcessor.validateEventEntity()); + } + + @Test + void shouldReturnFalseIfServicePointsAreIdentical() { + var servicepoint = new Servicepoint(); + var updateEventProcessor = new ServicePointSynchronizationUpdateEventProcessor( + updateEvent(servicepoint, servicepoint, TENANT)); + + assertFalse(updateEventProcessor.validateEventEntity()); + } + + @Test + void shouldReturnTrueForUpdateIfNewServicePointIsValid() { + var oldServicepoint = new Servicepoint().withId(UUID.randomUUID().toString()); + var newServicepoint = new Servicepoint().withId(UUID.randomUUID().toString()); + + var updateEventProcessor = new ServicePointSynchronizationUpdateEventProcessor( + updateEvent(oldServicepoint, newServicepoint, TENANT)); + + assertTrue(updateEventProcessor.validateEventEntity()); + } + + @Test + void shouldReturnTrueForCreateAndDeleteIfServicePointIsValid() { + var servicepoint = new Servicepoint().withId(UUID.randomUUID().toString()); + var createEventProcessor = new ServicePointSynchronizationCreateEventProcessor( + createEvent(servicepoint, TENANT)); + var deleteEventProcessor = new ServicePointSynchronizationDeleteEventProcessor( + deleteEvent(servicepoint, TENANT)); + + assertTrue(createEventProcessor.validateEventEntity()); + assertTrue(deleteEventProcessor.validateEventEntity()); + } + + @Test + void shouldReturnFalseIfValidationMessageIsNotNull() { + Servicepoint oldServicepoint = new Servicepoint(); + Servicepoint newServicepoint = new Servicepoint() + .withHoldShelfExpiryPeriod(new HoldShelfExpiryPeriod()); + + var updateEventProcessor = new ServicePointSynchronizationUpdateEventProcessor( + updateEvent(oldServicepoint, newServicepoint, TENANT)); + var createEventProcessor = new ServicePointSynchronizationCreateEventProcessor( + createEvent(newServicepoint, TENANT)); + + assertFalse(updateEventProcessor.validateEventEntity()); + assertFalse(createEventProcessor.validateEventEntity()); + } + + private void processEventToThrowException(ServicePointSynchronizationEventProcessor processor, + VertxTestContext testContext) { + + processor.processEvent(null, "") + .onComplete(ar -> + testContext.verify(() -> { + assertTrue(ar.cause() instanceof RuntimeException); + testContext.completeNow(); + })); + } +} diff --git a/src/test/java/org/folio/utility/LocationUtility.java b/src/test/java/org/folio/utility/LocationUtility.java index d2cb9b939..81312a359 100644 --- a/src/test/java/org/folio/utility/LocationUtility.java +++ b/src/test/java/org/folio/utility/LocationUtility.java @@ -176,7 +176,7 @@ public static Response createServicePoint(UUID id, String name, String code, throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException { return createServicePoint(id, name, code, discoveryDisplayName, description, - shelvingLagTime, pickupLocation, shelfExpiryPeriod, Collections.emptyList(), TENANT_ID); + shelvingLagTime, pickupLocation, shelfExpiryPeriod, Collections.emptyList(), null, TENANT_ID); } public static Response createServicePoint(UUID id, String name, String code, @@ -186,7 +186,7 @@ public static Response createServicePoint(UUID id, String name, String code, throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException { return createServicePoint(id, name, code, discoveryDisplayName, description, - shelvingLagTime, pickupLocation, shelfExpiryPeriod, Collections.emptyList(), tenantId); + shelvingLagTime, pickupLocation, shelfExpiryPeriod, Collections.emptyList(), null, tenantId); } public static Response createServicePoint(UUID id, String name, String code, @@ -196,13 +196,13 @@ public static Response createServicePoint(UUID id, String name, String code, throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException { return createServicePoint(id, name, code, discoveryDisplayName, description, shelvingLagTime, pickupLocation, - shelfExpiryPeriod, slips, TENANT_ID); + shelfExpiryPeriod, slips, null, TENANT_ID); } public static Response createServicePoint(UUID id, String name, String code, String discoveryDisplayName, String description, Integer shelvingLagTime, Boolean pickupLocation, HoldShelfExpiryPeriod shelfExpiryPeriod, - List slips, String tenantId) + List slips, Boolean ecsRequestRouting, String tenantId) throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException { final CompletableFuture createServicePoint = new CompletableFuture<>(); @@ -211,6 +211,9 @@ public static Response createServicePoint(UUID id, String name, String code, .put("name", name) .put("code", code) .put("discoveryDisplayName", discoveryDisplayName); + if (ecsRequestRouting != null) { + request.put("ecsRequestRouting", ecsRequestRouting); + } if (id != null) { request.put("id", id.toString()); }