Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DC-1160: Use a random prefix for relationship tables to avoid name collision #1735

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import bio.terra.service.snapshot.SnapshotTable;
import bio.terra.service.snapshot.exception.InvalidSnapshotException;
import bio.terra.service.tabulardata.WalkRelationship;
import bio.terra.stairway.ShortUUID;
import com.azure.core.credential.AzureSasCredential;
import com.azure.storage.blob.BlobUrlParts;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -71,7 +72,6 @@
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
Expand Down Expand Up @@ -393,19 +393,15 @@ FROM OPENROWSET(BULK '<parquetFileLocation>',
"DROP DATABASE SCOPED CREDENTIAL [<resourceName>];";

private final ApplicationConfiguration applicationConfiguration;
private final DrsIdService drsIdService;
private final ObjectMapper objectMapper;

@Autowired
public AzureSynapsePdao(
AzureResourceConfiguration azureResourceConfiguration,
ApplicationConfiguration applicationConfiguration,
DrsIdService drsIdService,
ObjectMapper objectMapper,
@Qualifier("synapseJdbcTemplate") NamedParameterJdbcTemplate synapseJdbcTemplate) {
this.azureResourceConfiguration = azureResourceConfiguration;
this.applicationConfiguration = applicationConfiguration;
this.drsIdService = drsIdService;
this.objectMapper = objectMapper;
this.synapseJdbcTemplate = synapseJdbcTemplate;
}
Expand Down Expand Up @@ -788,7 +784,6 @@ public Map<String, Long> createSnapshotParquetFilesByQuery(
* table snapshot
* @return tableRowCounts - hash map of snapshot table names and number of rows included in
* snapshot
* @throws SQLException
*/
public Map<String, Long> createSnapshotParquetFilesByAsset(
AssetSpecification assetSpec,
Expand All @@ -798,7 +793,7 @@ public Map<String, Long> createSnapshotParquetFilesByAsset(
SnapshotRequestAssetModel requestModel,
boolean isGlobalFieldIds,
String compactIdPrefix)
throws SQLException, PdaoException {
throws PdaoException {
Map<String, Long> tableRowCounts = new HashMap<>();

// First handle root table
Expand Down Expand Up @@ -916,7 +911,6 @@ public void walkRelationships(
* @param tableRowCounts Map tracking the number of rows included in each table of the snapshot
* @param isGlobalFieldIds If true, configure query to use the global drs id format
* @param compactIdPrefix If specified, configure the query to use a compact drs id format
* @throws SQLException
*/
public void createSnapshotParquetFilesByRelationship(
UUID snapshotId,
Expand Down Expand Up @@ -946,11 +940,11 @@ public void createSnapshotParquetFilesByRelationship(
// If there are no rows in the "from" table's snapshot, then there is nothing to be added
// to the "to" snapshot table
Long fromTableCount = tableRowCounts.get(fromTableName);
if (fromTableCount == null || fromTableCount <= Long.valueOf(0)) {
if (fromTableCount == null || fromTableCount <= 0L) {
logger.info(
"Snapshot by Asset - No rows included in from table {}. Parquet file for snapshot table will not be created.",
fromTableName);
tableRowCounts.put(toTableName, (long) 0);
tableRowCounts.put(toTableName, 0L);
return;
}

Expand All @@ -961,7 +955,9 @@ public void createSnapshotParquetFilesByRelationship(
toTableName,
snapshotId,
IngestUtils.getSnapshotSliceParquetFilePath(
toTableName, String.format("%s_%s_relationship", fromTableName, toTableName)),
toTableName,
String.format(
"%s_%s_%s_relationship", ShortUUID.get(), fromTableName, toTableName)),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the interesting change here

datasetDataSourceName,
snapshotDataSourceName,
toAssetTable.getSynapseColumns(),
Expand Down Expand Up @@ -1016,7 +1012,7 @@ public void createSnapshotParquetFilesByRelationship(
@VisibleForTesting
ST buildSnapshotByAssetQueryTemplate(Map<String, Long> tableRowCounts, String toTableName) {
Long toTableRowCount = tableRowCounts.get(toTableName);
boolean toTableAlreadyHasRows = toTableRowCount != null && toTableRowCount > Long.valueOf(0);
boolean toTableAlreadyHasRows = toTableRowCount != null && toTableRowCount > 0L;

return new ST(
toTableAlreadyHasRows
Expand Down Expand Up @@ -1052,7 +1048,7 @@ public Map<String, Long> createSnapshotParquetFilesByRowId(
table.getColumns().stream()
.filter(c -> columnsToInclude.contains(c.getName()))
.map(Column::toSynapseColumn)
.collect(Collectors.toList());
.toList();
sqlCreateSnapshotTableTemplate = new ST(CREATE_SNAPSHOT_TABLE_BY_ROW_ID_TEMPLATE);
query =
generateSnapshotParquetCreateQuery(
Expand All @@ -1078,10 +1074,12 @@ public Map<String, Long> createSnapshotParquetFilesByRowId(
rows = synapseJdbcTemplate.update(query, params);
} catch (DataAccessException ex) {
logger.warn(
"No rows were added to the Snapshot for table "
+ table.getName()
+ ". This may be because the source dataset was empty or because the rows were filtered out by the query/asset specification defined in the snapshot create request. Exception: "
+ ex.getMessage());
"No rows were added to the Snapshot for table {}. "
+ "This may be because the source dataset was empty or because the rows were filtered "
+ "out by the query/asset specification defined in the snapshot create request. "
+ "Exception: {}",
table.getName(),
ex.getMessage());
}
tableRowCounts.put(table.getName(), (long) rows);
}
Expand Down Expand Up @@ -1317,14 +1315,9 @@ public List<SynapseDataResultModel> getTableData(
CollectionType collectionType) {

// Ensure that the sort column is a valid column
if (!sort.equals(PDAO_ROW_ID_COLUMN)) {
table
.getColumnByName(sort)
.orElseThrow(
() ->
new InvalidColumnException(
"Column %s was not found in the snapshot table %s"
.formatted(sort, tableName)));
if (!sort.equals(PDAO_ROW_ID_COLUMN) && table.getColumnByName(sort).isEmpty()) {
throw new InvalidColumnException(
"Column %s was not found in the snapshot table %s".formatted(sort, tableName));
}

List<SynapseColumn> columns =
Expand All @@ -1333,7 +1326,7 @@ public List<SynapseDataResultModel> getTableData(
Column.toSynapseColumn(
new Column().name(PDAO_ROW_ID_COLUMN).type(TableDataType.STRING))),
table.getSynapseColumns());
boolean includeTotalRowCount = collectionType.equals(CollectionType.DATASET);
boolean includeTotalRowCount = collectionType == CollectionType.DATASET;
final String sql =
new ST(QUERY_FROM_DATASOURCE_TEMPLATE)
.add("columns", columns)
Expand Down Expand Up @@ -1431,17 +1424,15 @@ private SQLServerDataSource getDatasource(String databaseName) {
}

private void cleanup(List<String> resourceNames, String sql) {
resourceNames.stream()
.forEach(
resource -> {
try {
ST sqlTemplate = new ST(sql);
sqlTemplate.add("resourceName", resource);
executeSynapseQuery(sqlTemplate.render());
} catch (Exception ex) {
logger.warn("Unable to clean up synapse resource {}.", resource, ex);
}
});
for (String resource : resourceNames) {
try {
ST sqlTemplate = new ST(sql);
sqlTemplate.add("resourceName", resource);
executeSynapseQuery(sqlTemplate.render());
} catch (Exception ex) {
logger.warn("Unable to clean up synapse resource {}.", resource, ex);
}
}
}

public static String getCredentialName(UUID collectionId, String email) {
Expand All @@ -1464,11 +1455,10 @@ private Object extractValue(ResultSet resultSet, Column column) {
case BYTES -> resultSet.getBytes(column.getName());
case DIRREF, FILEREF, STRING, TEXT, DATE, DATETIME, TIMESTAMP -> resultSet.getString(
column.getName());
case FLOAT -> resultSet.getFloat(column.getName());
case FLOAT, NUMERIC -> resultSet.getFloat(column.getName());
case FLOAT64 -> resultSet.getDouble(column.getName());
case INTEGER -> resultSet.getInt(column.getName());
case INT64 -> resultSet.getLong(column.getName());
case NUMERIC -> resultSet.getFloat(column.getName());
case TIME -> resultSet.getTime(column.getName());
default -> throw new IllegalArgumentException(
"Unknown datatype '" + column.getType() + "'");
Expand All @@ -1477,7 +1467,7 @@ private Object extractValue(ResultSet resultSet, Column column) {
throw new PdaoException("Error reading data", e);
}
} else {
String rawValue = null;
String rawValue;
try {
rawValue = resultSet.getString(column.getName());
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import bio.terra.stairway.FlightMap;
import bio.terra.stairway.StepResult;
import bio.terra.stairway.StepStatus;
import java.sql.SQLException;
import java.util.Map;
import java.util.UUID;

Expand Down Expand Up @@ -54,7 +53,7 @@ public StepResult doStep(FlightContext context) throws InterruptedException {
snapshotReq.isGlobalFileIds(),
snapshotReq.getCompactIdPrefix());
workingMap.put(SnapshotWorkingMapKeys.TABLE_ROW_COUNT_MAP, tableRowCounts);
} catch (SQLException | PdaoException ex) {
} catch (PdaoException ex) {
return new StepResult(StepStatus.STEP_RESULT_FAILURE_FATAL, ex);
}
return StepResult.getStepResultSuccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ record IngestSource(String tableName, String ingestFile, long expectedRowCount)

static final List<IngestSource> TABLES =
List.of(
new IngestSource("concept", "omop/concept-table-data.jsonl", 7),
new IngestSource("concept", "omop/concept-table-data.jsonl", 8),
new IngestSource("person", "omop/person-table-data.jsonl", 23),
new IngestSource("relationship", "omop/relationship.jsonl", 2),
new IngestSource("concept_ancestor", "omop/concept-ancestor-table-data.jsonl", 10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import bio.terra.service.dataset.AssetSpecification;
import bio.terra.service.dataset.exception.TableNotFoundException;
import bio.terra.service.dataset.flight.ingest.IngestUtils;
import bio.terra.service.filedata.DrsIdService;
import bio.terra.service.resourcemanagement.azure.AzureResourceConfiguration;
import bio.terra.service.snapshot.SnapshotTable;
import bio.terra.service.snapshot.exception.InvalidSnapshotException;
Expand Down Expand Up @@ -73,7 +72,6 @@ void setup() {
new AzureSynapsePdao(
azureResourceConfiguration,
mock(ApplicationConfiguration.class),
mock(DrsIdService.class),
new ObjectMapper(),
synapseJdbcTemplate);
assetSpec = AssetUtils.buildTestAssetSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -92,8 +93,6 @@
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
Expand All @@ -111,8 +110,6 @@
@Category(Connected.class)
@EmbeddedDatabaseTest
public class BigQueryPdaoTest {
private static final Logger logger = LoggerFactory.getLogger(BigQueryPdaoTest.class);

@Autowired private JsonLoader jsonLoader;
@Autowired private ConnectedTestConfiguration testConfig;
@Autowired private BigQuerySnapshotPdao bigQuerySnapshotPdao;
Expand All @@ -134,9 +131,6 @@ public class BigQueryPdaoTest {

private final Storage storage = StorageOptions.getDefaultInstance().getService();

private final List<UUID> datasetIdsToDelete = new ArrayList<>();
private final List<Dataset> bqDatasetsToDelete = new ArrayList<>();

private final List<BlobInfo> blobsToDelete = new ArrayList<>();

private DatasetSummaryModel datasetSummaryModel;
Expand Down Expand Up @@ -306,7 +300,7 @@ record IngestSource(String tableName, String ingestFile, int expectedRowCount) {

static final List<IngestSource> TABLES =
List.of(
new IngestSource("concept", "omop/concept-table-data.jsonl", 7),
new IngestSource("concept", "omop/concept-table-data.jsonl", 8),
new IngestSource("person", "omop/person-table-data.jsonl", 23),
new IngestSource("relationship", "omop/relationship.jsonl", 2),
new IngestSource("concept_ancestor", "omop/concept-ancestor-table-data.jsonl", 10),
Expand Down Expand Up @@ -408,14 +402,14 @@ public void createSnapshotByRequestId() throws Exception {
String rowId = "datarepo_row_id";
List<String> personIds =
queryForIds(snapshot.getName(), "person", rowId, bigQuerySnapshotProject, rowId);
assertThat(personIds.size(), is(23));
assertThat(personIds, hasSize(23));
List<String> conditionOccurrenceIds =
queryForIds(
snapshot.getName(), "condition_occurrence", rowId, bigQuerySnapshotProject, rowId);
assertThat(conditionOccurrenceIds.size(), is(49));
assertThat(conditionOccurrenceIds, hasSize(49));
List<String> conceptIds =
queryForIds(snapshot.getName(), "concept", rowId, bigQuerySnapshotProject, rowId);
assertThat(conceptIds.size(), is(5));
assertThat(conceptIds, hasSize(6));
}

@Test
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/omop/concept-table-data.jsonl
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
{"concept_id": 3, "domain_id": "Condition", "concept_name": "concept3", "concept_code": 13, "standard_concept": "S"}
{"concept_id": 4, "domain_id": "Condition", "concept_name": "concept4", "concept_code": 14, "standard_concept": "S"}
{"concept_id": 5, "domain_id": "Condition", "concept_name": "concept5", "concept_code": 15}
{"concept_id": 100, "domain_id": "Domain", "concept_name": "concept type", "concept_code": 100}
{"concept_id": 44818723, "domain_id": "Metadata", "concept_name": "Subsumes"}
{"concept_id": 44818821, "domain_id": "Metadata", "concept_name": "Is a"}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"condition_concept_id": 1, "person_id": 100}
{"condition_concept_id": 1, "person_id": 100, "condition_type_concept_id": 100}
{"condition_concept_id": 1, "person_id": 101}
{"condition_concept_id": 1, "person_id": 102}
{"condition_concept_id": 1, "person_id": 103}
Expand Down
34 changes: 33 additions & 1 deletion src/test/resources/omop/snapshot-access-request.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
{"sourceSnapshotId": "00000000-0000-0000-0000-000000000000",
"name": "simple_snapshot_access_request",
"researchPurposeStatement": "purpose",
"snapshotBuilderRequest": {"cohorts": [{"name": "test", "criteriaGroups": [{"name": "Group 1", "meetAll": false, "criteria": [], "mustMeet": true}]}], "valueSets": [{"name": "Condition", "values": []}], "conceptSets": [{"name": "Condition", "concept": {"id": 19, "code": null, "name": "Condition", "hasChildren": false}, "featureValueGroupName": "Condition"}]}
"snapshotBuilderRequest": {
"cohorts": [
{
"name": "test",
"criteriaGroups": [
{
"name": "Group 1",
"meetAll": false,
"criteria": [],
"mustMeet": true
}
]
}
],
"valueSets": [
{
"name": "Condition",
"values": []
}
],
"conceptSets": [
{
"name": "Condition",
"concept": {
"id": 19,
"code": null,
"name": "Condition",
"hasChildren": false
},
"featureValueGroupName": "Condition"
}
]
}
}
Loading