Skip to content

Commit

Permalink
[MODDATAIMP-953] [Orchid] Pass user information directly into Kafka (#…
Browse files Browse the repository at this point in the history
…312)

* [MODDATAIMP-953] Use original user instead of system user to process jobs

* Fix submodule

* Update permissions

* fix nit
  • Loading branch information
ncovercash authored Oct 30, 2023
1 parent 95d9b31 commit b6443c9
Show file tree
Hide file tree
Showing 22 changed files with 124 additions and 2,414 deletions.
24 changes: 0 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ Version 2.0. See the file "[LICENSE](LICENSE)" for more information.
* [File splitting configuration](#file-splitting-configuration)
* [Interaction with AWS S3/Minio](#interaction-with-aws-s3minio)
* [Queue prioritization algorithm](#queue-prioritization-algorithm)
* [System user](#system-user)
* [Interaction with Kafka](#interaction-with-kafka)
* [Other system properties](#other-system-properties)
* [Issue tracker](#issue-tracker)
Expand Down Expand Up @@ -195,29 +194,6 @@ For information on what these mean, how to configure them, how scores are calcul
> [!NOTE]
> We recommend the suggested values above, however, there is a lot of room for customization and extension as needed. Please see the doc for more information.
## System user

> [!WARNING]
> This module creates a system user upon installation with the following (if splitting is enabled):
>
> - Name `SystemDataImport`
> - Username `data-import-system-user` (customizable via env variable `SYSTEM_PROCESSING_USERNAME`)
> - Password customizable via env variable `SYSTEM_PROCESSING_PASSWORD` (**must be set, or the module will fail to start**)
> - [permissions to perform any import-related activities](/src/main/resources/permissions.txt).
To enable asynchronous job launching (as part of the file splitting process), the module creates
a system user upon installation. The system user is named `SystemDataImport`,
and its credentials may be customized with the following environment variables:

| Name | Type | Required | Default | Description |
| ---------------------------- | ------ | ---------------------------- | ------------------------- | -------------------- |
| `SYSTEM_PROCESSING_USERNAME` | string | no | `data-import-system-user` | System user username |
| `SYSTEM_PROCESSING_PASSWORD` | string | yes, if splitting is enabled | _none_ | System user password |

This user is granted [many of the same permissions](/src/main/resources/permissions.txt) as the module for the
`/data-import/uploadDefinitions/{uploadDefinitionId}/processFiles` endpoint. This enables this
user to complete any import-related tasks across compatible modules.

## Interaction with Kafka

All modules involved in data import (mod-data-import, mod-source-record-manager, mod-source-record-storage, mod-inventory, mod-invoice) are communicating via Kafka directly. Therefore, to enable data import Kafka should be set up properly and all the necessary parameters should be set for the modules.
Expand Down
29 changes: 3 additions & 26 deletions descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,6 @@
"id": "source-manager-job-executions",
"version": "3.3"
},
{
"id": "login",
"version": "7.0"
},
{
"id": "authtoken",
"version": "2.0"
},
{
"id": "permissions",
"version": "5.3"
},
{
"id": "users",
"version": "15.0 16.0"
Expand Down Expand Up @@ -255,7 +243,8 @@
"acquisitions-units-storage.memberships.collection.get",
"isbn-utils.convert-to-13.get",
"instance-authority-links.instances.collection.get",
"instance-authority-links.instances.collection.put"
"instance-authority-links.instances.collection.put",
"organizations.organizations.collection.get"
],
"permissionsDesired": [
"invoices.acquisitions-units-assignments.assign",
Expand Down Expand Up @@ -340,18 +329,7 @@
"methods": [
"POST"
],
"pathPattern": "/_/tenant",
"modulePermissions": [
"users.collection.get",
"users.item.post",
"users.item.put",
"login.item.post",
"login.item.delete",
"perms.users.get",
"perms.users.assign.immutable",
"perms.users.item.post",
"perms.users.item.put"
]
"pathPattern": "/_/tenant"
},
{
"methods": [
Expand Down Expand Up @@ -520,7 +498,6 @@
{ "name": "SCORE_PART_NUMBER_FIRST", "value": "1" },
{ "name": "SCORE_PART_NUMBER_LAST", "value": "0" },
{ "name": "SCORE_PART_NUMBER_LAST_REFERENCE", "value": "100" },
{ "name": "SYSTEM_PROCESSING_USERNAME", "value": "data-import-system-user" },
{ "name": "ASYNC_PROCESSOR_POLL_INTERVAL_MS", "value": "5000" },
{ "name": "ASYNC_PROCESSOR_MAX_WORKERS_COUNT", "value": "1" }
]
Expand Down
2 changes: 1 addition & 1 deletion ramls/raml-storage
23 changes: 14 additions & 9 deletions src/main/java/org/folio/dao/DataImportQueueItemDaoImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public class DataImportQueueItemDaoImpl implements DataImportQueueItemDao {
private static final String GET_BY_ID_SQL =
"SELECT * FROM %s.%s WHERE id = $1";
private static final String INSERT_SQL =
"INSERT INTO %s.%s (id, job_execution_id, upload_definition_id, tenant, original_size, file_path, timestamp, part_number, processing, okapi_url, data_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)";
"INSERT INTO %s.%s (id, job_execution_id, upload_definition_id, tenant, original_size, file_path, timestamp, part_number, processing, okapi_url, data_type, okapi_token, okapi_permissions) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)";
private static final String UPDATE_BY_ID_SQL =
"UPDATE %s.%s SET job_execution_id = $2, upload_definition_id = $3, tenant = $4, original_size = $5, file_path = $6, timestamp = $7, part_number = $8, processing = $9, okapi_url = $10, data_type = $11 WHERE id = $1";
"UPDATE %s.%s SET job_execution_id = $2, upload_definition_id = $3, tenant = $4, original_size = $5, file_path = $6, timestamp = $7, part_number = $8, processing = $9, okapi_url = $10, data_type = $11, okapi_token = $12, okapi_permissions = $13 WHERE id = $1";
private static final String DELETE_BY_ID_SQL =
"DELETE FROM %s.%s WHERE id = $1";
private static final String DELETE_BY_JOB_ID_SQL =
"DELETE FROM %s.%s WHERE job_execution_id = $1";
"DELETE FROM %s.%s WHERE job_execution_id = $1";
private static final String LOCK_ACCESS_EXCLUSIVE_SQL =
"LOCK TABLE %s.%s IN ACCESS EXCLUSIVE MODE";

Expand Down Expand Up @@ -229,7 +229,9 @@ public Future<String> addQueueItem(DataImportQueueItem dataImportQueueItem) {
dataImportQueueItem.getPartNumber(),
dataImportQueueItem.getProcessing(),
dataImportQueueItem.getOkapiUrl(),
dataImportQueueItem.getDataType()
dataImportQueueItem.getDataType(),
dataImportQueueItem.getOkapiToken(),
dataImportQueueItem.getOkapiPermissions()
),
promise
);
Expand Down Expand Up @@ -265,7 +267,9 @@ public Future<DataImportQueueItem> updateDataImportQueueItem(
dataImportQueueItem.getPartNumber(),
dataImportQueueItem.getProcessing(),
dataImportQueueItem.getOkapiUrl(),
dataImportQueueItem.getDataType()
dataImportQueueItem.getDataType(),
dataImportQueueItem.getOkapiToken(),
dataImportQueueItem.getOkapiPermissions()
),
promise
);
Expand All @@ -288,6 +292,7 @@ public Future<DataImportQueueItem> updateDataImportQueueItem(
)
);
}

@Override
public Future<Void> deleteDataImportQueueItemByJobExecutionId(String id) {
String query = format(
Expand All @@ -311,6 +316,7 @@ public Future<Void> deleteDataImportQueueItemByJobExecutionId(String id) {
return Future.failedFuture(notFoundException);
});
}

@Override
public Future<Void> deleteDataImportQueueItem(String id) {
String query = format(
Expand All @@ -325,10 +331,7 @@ public Future<Void> deleteDataImportQueueItem(String id) {
if (result.rowCount() == 1) {
return Future.succeededFuture();
}
String message = format(
"Error deleting queue item with id '%s'",
id
);
String message = format("Error deleting queue item with id '%s'", id);
NotFoundException notFoundException = new NotFoundException(message);
LOGGER.error(message, notFoundException);
return Future.failedFuture(notFoundException);
Expand All @@ -354,6 +357,8 @@ private DataImportQueueItem mapRowJsonToQueueItem(Row rowAsJson) {
queueItem.setProcessing(rowAsJson.getBoolean("processing"));
queueItem.setOkapiUrl(rowAsJson.getString("okapi_url"));
queueItem.setDataType(rowAsJson.getString("data_type"));
queueItem.setOkapiToken(rowAsJson.getString("okapi_token"));
queueItem.setOkapiPermissions(rowAsJson.getString("okapi_permissions"));
return queueItem;
}

Expand Down
11 changes: 0 additions & 11 deletions src/main/java/org/folio/rest/impl/ModTenantAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
import org.folio.rest.jaxrs.model.FileExtensionCollection;
import org.folio.rest.jaxrs.model.TenantAttributes;
import org.folio.rest.tools.utils.TenantTool;
import org.folio.service.auth.SystemUserAuthService;
import org.folio.service.cleanup.StorageCleanupService;
import org.folio.service.fileextension.FileExtensionService;
import org.folio.spring.SpringContextUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

public class ModTenantAPI extends TenantAPI {

Expand All @@ -34,21 +32,12 @@ public class ModTenantAPI extends TenantAPI {
@Autowired
private StorageCleanupService storageCleanupService;

@Autowired
private SystemUserAuthService systemUserAuthService;

@Value("${SPLIT_FILES_ENABLED:false}")
private boolean fileSplittingEnabled;

public ModTenantAPI() {
SpringContextUtil.autowireDependencies(this, Vertx.currentContext());
}

@Override
Future<Integer> loadData(TenantAttributes attributes, String tenantId, Map<String, String> headers, Context context) {
if (fileSplittingEnabled) {
systemUserAuthService.initializeSystemUser(headers);
}
return super.loadData(attributes, tenantId, headers, context)
.compose(num -> {
initStorageCleanupService(headers, context);
Expand Down
163 changes: 0 additions & 163 deletions src/main/java/org/folio/service/auth/ApiClient.java

This file was deleted.

Loading

0 comments on commit b6443c9

Please sign in to comment.