From f182fec60639e032ed614b15a4403df6c90762de Mon Sep 17 00:00:00 2001 From: Noah Overcash Date: Mon, 9 Oct 2023 11:21:00 -0400 Subject: [PATCH] Use token-based login system for Poppy --- pom.xml | 6 + .../org/folio/service/auth/AuthClient.java | 39 ++++- .../service/auth/SystemUserAuthService.java | 5 +- .../service/file/S3JobRunningVerticle.java | 158 +++++++++--------- .../folio/service/auth/AuthClientTest.java | 57 +++++-- .../file/S3JobRunningVerticleUnitTest.java | 40 +++-- 6 files changed, 190 insertions(+), 115 deletions(-) diff --git a/pom.xml b/pom.xml index f9814aae..55a74622 100644 --- a/pom.xml +++ b/pom.xml @@ -225,6 +225,12 @@ 3.6.0 jar + + org.folio.okapi + okapi-common + 5.1.0 + jar + org.folio folio-kafka-wrapper diff --git a/src/main/java/org/folio/service/auth/AuthClient.java b/src/main/java/org/folio/service/auth/AuthClient.java index 60acc6dc..812db52c 100644 --- a/src/main/java/org/folio/service/auth/AuthClient.java +++ b/src/main/java/org/folio/service/auth/AuthClient.java @@ -1,5 +1,7 @@ package org.folio.service.auth; +import io.vertx.core.Future; +import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import java.util.Optional; import lombok.AllArgsConstructor; @@ -8,18 +10,45 @@ import lombok.NoArgsConstructor; import org.apache.http.client.methods.HttpPost; import org.folio.dataimport.util.OkapiConnectionParams; +import org.folio.okapi.common.WebClientFactory; +import org.folio.okapi.common.refreshtoken.client.ClientOptions; +import org.folio.okapi.common.refreshtoken.client.impl.LoginClient; +import org.folio.okapi.common.refreshtoken.tokencache.TenantUserCache; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class AuthClient extends ApiClient { - private static final String LOGIN_ENDPOINT = "authn/login"; private static final String CREDENTIALS_ENDPOINT = "authn/credentials"; - public String login(OkapiConnectionParams params, LoginCredentials payload) { - return post(params, LOGIN_ENDPOINT, payload) - .orElseThrow() - .getString("okapiToken"); + private static final int CACHE_SIZE = 5; + + private TenantUserCache cache; + private Vertx vertx; + + @Autowired + public AuthClient(Vertx vertx) { + this.vertx = vertx; + + this.cache = new TenantUserCache(CACHE_SIZE); + } + + public Future login( + OkapiConnectionParams params, + LoginCredentials payload + ) { + // use standardized RTR token utility + return new LoginClient( + new ClientOptions() + .okapiUrl(params.getOkapiUrl()) + .webClient(WebClientFactory.getWebClient(vertx)), + cache, + payload.getTenant(), + payload.getUsername(), + () -> Future.succeededFuture(payload.getPassword()) + ) + .getToken(); } public void saveCredentials( diff --git a/src/main/java/org/folio/service/auth/SystemUserAuthService.java b/src/main/java/org/folio/service/auth/SystemUserAuthService.java index bb345ca8..239edffc 100644 --- a/src/main/java/org/folio/service/auth/SystemUserAuthService.java +++ b/src/main/java/org/folio/service/auth/SystemUserAuthService.java @@ -1,5 +1,6 @@ package org.folio.service.auth; +import io.vertx.core.Future; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -94,7 +95,9 @@ public void initializeSystemUser(Map headers) { LOGGER.info("System user created successfully!"); } - public String getAuthToken(OkapiConnectionParams okapiConnectionParams) { + public Future getAuthToken( + OkapiConnectionParams okapiConnectionParams + ) { LOGGER.info("Attempting {}", getLoginCredentials(okapiConnectionParams)); return authClient.login( diff --git a/src/main/java/org/folio/service/file/S3JobRunningVerticle.java b/src/main/java/org/folio/service/file/S3JobRunningVerticle.java index dd942e67..d1bbd8e6 100644 --- a/src/main/java/org/folio/service/file/S3JobRunningVerticle.java +++ b/src/main/java/org/folio/service/file/S3JobRunningVerticle.java @@ -143,76 +143,78 @@ protected Future processQueueItem(DataImportQueueItem queueItem) { queueItem.getJobExecutionId() ); - OkapiConnectionParams params = getConnectionParams(queueItem); - // we need to store out here to ensure it is properly deleted // on failure and success AtomicReference localFile = new AtomicReference<>(); - return Future - .succeededFuture(new QueueJob().withQueueItem(queueItem)) - .compose((QueueJob job) -> - createLocalFile(queueItem) - .map((File file) -> { - localFile.set(file); - return job.withFile(file); + return getConnectionParams(queueItem) + .compose(params -> + Future + .succeededFuture(new QueueJob().withQueueItem(queueItem)) + .compose((QueueJob job) -> + createLocalFile(queueItem) + .map((File file) -> { + localFile.set(file); + return job.withFile(file); + }) + ) + .compose(job -> + uploadDefinitionService + .getJobExecutionById(queueItem.getJobExecutionId(), params) + .map(job::withJobExecution) + ) + .compose(job -> + updateJobExecutionStatusSafely( + job.getJobExecution().getId(), + new StatusDto() + .withStatus(StatusDto.Status.PROCESSING_IN_PROGRESS), + params + ) + .map(job) + ) + .compose(this::downloadFromS3) + .compose(job -> + fileProcessor + .processFile( + job.getFile(), + job.getJobExecution().getId(), + // this is the only part used on our end + new JobProfileInfo() + .withDataType( + JobProfileInfo.DataType.fromValue( + job.getQueueItem().getDataType() + ) + ), + params + ) + .map(job) + ) + .onFailure((Throwable err) -> { + LOGGER.error("Unable to start chunk {}", queueItem, err); + + updateJobExecutionStatusSafely( + queueItem.getJobExecutionId(), + new StatusDto() + .withErrorStatus(ErrorStatus.FILE_PROCESSING_ERROR) + .withStatus(StatusDto.Status.ERROR), + params + ); }) - ) - .compose(job -> - uploadDefinitionService - .getJobExecutionById(queueItem.getJobExecutionId(), params) - .map(job::withJobExecution) - ) - .compose(job -> - updateJobExecutionStatusSafely( - job.getJobExecution().getId(), - new StatusDto().withStatus(StatusDto.Status.PROCESSING_IN_PROGRESS), - params - ) - .map(job) - ) - .compose(this::downloadFromS3) - .compose(job -> - fileProcessor - .processFile( - job.getFile(), - job.getJobExecution().getId(), - // this is the only part used on our end - new JobProfileInfo() - .withDataType( - JobProfileInfo.DataType.fromValue( - job.getQueueItem().getDataType() - ) - ), - params + .onSuccess((QueueJob result) -> + LOGGER.info( + "Completed processing job execution {}!", + queueItem.getJobExecutionId() + ) ) - .map(job) - ) - .onFailure((Throwable err) -> { - LOGGER.error("Unable to start chunk {}", queueItem, err); - - updateJobExecutionStatusSafely( - queueItem.getJobExecutionId(), - new StatusDto() - .withErrorStatus(ErrorStatus.FILE_PROCESSING_ERROR) - .withStatus(StatusDto.Status.ERROR), - params - ); - }) - .onSuccess((QueueJob result) -> - LOGGER.info( - "Completed processing job execution {}!", - queueItem.getJobExecutionId() - ) - ) - .onComplete((AsyncResult v) -> { - queueItemDao.deleteQueueItemById(queueItem.getId()); + .onComplete((AsyncResult v) -> { + queueItemDao.deleteQueueItemById(queueItem.getId()); - File file = localFile.get(); - if (file != null) { - vertx.fileSystem().delete(file.toString()); - } - }); + File file = localFile.get(); + if (file != null) { + vertx.fileSystem().delete(file.toString()); + } + }) + ); } protected Future updateJobExecutionStatusSafely( @@ -272,7 +274,7 @@ protected Future createLocalFile(DataImportQueueItem queueItem) { /** * Authenticate and get connection parameters (Okapi URL/token) */ - protected OkapiConnectionParams getConnectionParams( + protected Future getConnectionParams( DataImportQueueItem queueItem ) { OkapiConnectionParams provisionalParams = new OkapiConnectionParams( @@ -288,19 +290,21 @@ protected OkapiConnectionParams getConnectionParams( vertx ); - String token = systemUserService.getAuthToken(provisionalParams); - - return new OkapiConnectionParams( - Map.of( - XOkapiHeaders.URL.toLowerCase(), - queueItem.getOkapiUrl(), - XOkapiHeaders.TENANT.toLowerCase(), - queueItem.getTenant(), - XOkapiHeaders.TOKEN.toLowerCase(), - token - ), - vertx - ); + return systemUserService + .getAuthToken(provisionalParams) + .map(token -> + new OkapiConnectionParams( + Map.of( + XOkapiHeaders.URL.toLowerCase(), + queueItem.getOkapiUrl(), + XOkapiHeaders.TENANT.toLowerCase(), + queueItem.getTenant(), + XOkapiHeaders.TOKEN.toLowerCase(), + token + ), + vertx + ) + ); } @Override diff --git a/src/test/java/org/folio/service/auth/AuthClientTest.java b/src/test/java/org/folio/service/auth/AuthClientTest.java index 7e729f13..4660d5be 100644 --- a/src/test/java/org/folio/service/auth/AuthClientTest.java +++ b/src/test/java/org/folio/service/auth/AuthClientTest.java @@ -6,7 +6,6 @@ import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson; import static com.github.tomakehurst.wiremock.client.WireMock.exactly; import static com.github.tomakehurst.wiremock.client.WireMock.ok; -import static com.github.tomakehurst.wiremock.client.WireMock.okJson; import static com.github.tomakehurst.wiremock.client.WireMock.post; import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; import static org.hamcrest.MatcherAssert.assertThat; @@ -14,23 +13,28 @@ import static org.junit.Assert.assertThrows; import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.common.Slf4jNotifier; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; import java.util.Map; -import java.util.NoSuchElementException; import org.folio.dataimport.util.OkapiConnectionParams; import org.folio.service.auth.AuthClient.LoginCredentials; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(VertxUnitRunner.class) public class AuthClientTest { - private static final String LOGIN_ENDPOINT = "/authn/login"; + private static final String LOGIN_ENDPOINT = "/authn/login-with-expiry"; private static final String CREDENTIALS_ENDPOINT = "/authn/credentials"; - AuthClient client = new AuthClient(); + AuthClient client = new AuthClient(Vertx.vertx()); LoginCredentials testLoginCredentials = LoginCredentials .builder() @@ -74,30 +78,49 @@ public void teardown() { } @Test - public void testLogin() { + public void testLogin(TestContext context) { mockServer.stubFor( post(LOGIN_ENDPOINT) .withRequestBody( - equalToJson(JsonObject.mapFrom(testLoginCredentials).toString()) + equalToJson( + JsonObject + .of("username", "username", "password", "password") + .toString() + ) + ) + .willReturn( + WireMock.created().withHeader("Set-Cookie", "folioAccessToken=result") ) - .willReturn(okJson(JsonObject.of("okapiToken", "result").toString())) ); - assertThat(client.login(params, testLoginCredentials), is("result")); - - mockServer.verify(exactly(1), anyRequestedFor(urlMatching(LOGIN_ENDPOINT))); + client + .login(params, testLoginCredentials) + .onComplete( + context.asyncAssertSuccess(token -> { + assertThat(token, is("result")); + + mockServer.verify( + exactly(1), + anyRequestedFor(urlMatching(LOGIN_ENDPOINT)) + ); + }) + ); } @Test - public void testLoginError() { + public void testLoginError(TestContext context) { mockServer.stubFor(post(LOGIN_ENDPOINT).willReturn(badRequest())); - assertThrows( - NoSuchElementException.class, - () -> client.login(params, testLoginCredentials) - ); - - mockServer.verify(exactly(1), anyRequestedFor(urlMatching(LOGIN_ENDPOINT))); + client + .login(params, testLoginCredentials) + .onComplete( + context.asyncAssertFailure(v -> { + mockServer.verify( + exactly(1), + anyRequestedFor(urlMatching(LOGIN_ENDPOINT)) + ); + }) + ); } @Test diff --git a/src/test/java/org/folio/service/file/S3JobRunningVerticleUnitTest.java b/src/test/java/org/folio/service/file/S3JobRunningVerticleUnitTest.java index bce9bdff..2aecbfe3 100644 --- a/src/test/java/org/folio/service/file/S3JobRunningVerticleUnitTest.java +++ b/src/test/java/org/folio/service/file/S3JobRunningVerticleUnitTest.java @@ -34,7 +34,6 @@ import java.util.Optional; import org.apache.commons.io.FileUtils; import org.folio.dao.DataImportQueueItemDao; -import org.folio.dataimport.util.OkapiConnectionParams; import org.folio.rest.jaxrs.model.DataImportQueueItem; import org.folio.rest.jaxrs.model.JobExecution; import org.folio.rest.jaxrs.model.StatusDto; @@ -119,20 +118,25 @@ public void setUp() throws IOException { } @Test - public void testConnectionParams() { - when(systemUserService.getAuthToken(any())).thenReturn("token"); + public void testConnectionParams(TestContext context) { + when(systemUserService.getAuthToken(any())) + .thenReturn(Future.succeededFuture("token")); - OkapiConnectionParams params = verticle.getConnectionParams( - new DataImportQueueItem().withTenant("tenant").withOkapiUrl("okapi-url") - ); - - assertThat(params.getTenantId(), is("tenant")); - assertThat(params.getOkapiUrl(), is("okapi-url")); - assertThat(params.getToken(), is("token")); + verticle + .getConnectionParams( + new DataImportQueueItem().withTenant("tenant").withOkapiUrl("okapi-url") + ) + .onComplete( + context.asyncAssertSuccess(params -> { + assertThat(params.getTenantId(), is("tenant")); + assertThat(params.getOkapiUrl(), is("okapi-url")); + assertThat(params.getToken(), is("token")); - verify(systemUserService, times(1)).getAuthToken(any()); + verify(systemUserService, times(1)).getAuthToken(any()); - verifyNoMoreInteractions(systemUserService); + verifyNoMoreInteractions(systemUserService); + }) + ); } @Test @@ -386,7 +390,9 @@ public void testProcessQueueItemSuccess(TestContext context) .withJobExecutionId("job-exec-id") .withDataType("MARC"); - doReturn(null).when(verticle).getConnectionParams(any()); + doReturn(Future.succeededFuture(null)) + .when(verticle) + .getConnectionParams(any()); doReturn(Future.succeededFuture(tempFile)) .when(verticle) @@ -455,7 +461,9 @@ public void testProcessQueueItemFailure(TestContext context) .withJobExecutionId("job-exec-id") .withDataType("MARC"); - doReturn(null).when(verticle).getConnectionParams(any()); + doReturn(Future.succeededFuture()) + .when(verticle) + .getConnectionParams(any()); doReturn(Future.succeededFuture(tempFile)) .when(verticle) @@ -530,7 +538,9 @@ public void testProcessQueueItemEarlyFailure(TestContext context) .withJobExecutionId("job-exec-id") .withDataType("MARC"); - doReturn(null).when(verticle).getConnectionParams(any()); + doReturn(Future.succeededFuture()) + .when(verticle) + .getConnectionParams(any()); doThrow(new UncheckedIOException(new IOException())) .when(verticle)