Skip to content

Commit

Permalink
Initial RMM implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonysena committed Jun 12, 2024
1 parent 7b85cfc commit 7b34d03
Show file tree
Hide file tree
Showing 23 changed files with 568 additions and 16 deletions.
4 changes: 3 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Imports:
rlang,
RJSONIO,
jsonlite,
ResultModelManager,
SqlRender (>= 1.11.1),
stringi (>= 1.7.6)
Suggests:
Expand All @@ -38,7 +39,8 @@ Suggests:
testthat,
withr
Remotes:
ohdsi/ROhdsiWebApi
ohdsi/ResultModelManager,
ohdsi/ROhdsiWebApi,
License: Apache License
VignetteBuilder: knitr
URL: https://ohdsi.github.io/CohortGenerator/, https://github.com/OHDSI/CohortGenerator
Expand Down
5 changes: 5 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export(createDemographicSubset)
export(createEmptyCohortDefinitionSet)
export(createEmptyNegativeControlOutcomeCohortSet)
export(createLimitSubset)
export(createResultsDataModel)
export(createSubsetCohortWindow)
export(dropCohortStatsTables)
export(exportCohortStatsTables)
Expand All @@ -26,20 +27,24 @@ export(getCohortDefinitionSet)
export(getCohortInclusionRules)
export(getCohortStats)
export(getCohortTableNames)
export(getDataMigrator)
export(getRequiredTasks)
export(getResultsDataModelSpecifications)
export(getSubsetDefinitions)
export(insertInclusionRuleNames)
export(isCamelCase)
export(isCohortDefinitionSet)
export(isFormattedForDatabaseUpload)
export(isSnakeCase)
export(isTaskRequired)
export(migrateDataModel)
export(readCsv)
export(recordTasksDone)
export(sampleCohortDefinitionSet)
export(saveCohortDefinitionSet)
export(saveCohortSubsetDefinition)
export(saveIncremental)
export(uploadResults)
export(writeCsv)
import(DatabaseConnector)
import(R6)
Expand Down
2 changes: 1 addition & 1 deletion R/CohortCount.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ getCohortCounts <- function(connectionDetails = NULL,
on.exit(DatabaseConnector::disconnect(connection))
}

sql <- SqlRender::readSql(system.file("sql/sql_server/CohortCounts.sql", package = "CohortGenerator", mustWork = TRUE))
sql <- SqlRender::readSql(system.file("sql/sql_server/CohortCounts.sql", package = utils::packageName(), mustWork = TRUE))
sql <- SqlRender::render(
sql = sql,
cohort_database_schema = cohortDatabaseSchema,
Expand Down
4 changes: 2 additions & 2 deletions R/CohortDefinitionSet.R
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ checkAndFixCohortDefinitionSetDataTypes <- function(x, fixDataTypes = TRUE, emit
#' @noRd
#' @keywords internal
.getCohortDefinitionSetSpecification <- function() {
return(readCsv(system.file("cohortDefinitionSetSpecificationDescription.csv",
package = "CohortGenerator",
return(readCsv(system.file("csv", "cohortDefinitionSetSpecificationDescription.csv",
package = utils::packageName(),
mustWork = TRUE
)))
}
Expand Down
2 changes: 1 addition & 1 deletion R/CohortSample.R
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
tableName = randSampleTableName
)

execSql <- SqlRender::readSql(system.file("sql", "sql_server", "sampling", "RandomSample.sql", package = "CohortGenerator"))
execSql <- SqlRender::readSql(system.file("sql", "sql_server", "sampling", "RandomSample.sql", utils::packageName()))
DatabaseConnector::renderTranslateExecuteSql(connection,
execSql,
tempEmulationSchema = tempEmulationSchema,
Expand Down
2 changes: 1 addition & 1 deletion R/CohortTables.R
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ createCohortTables <- function(connectionDetails = NULL,
yes = FALSE,
no = (createTableFlagList$cohortSampleTable && cohortTableNames$cohortSampleTable != cohortTableNames$cohortTable)
)
sql <- SqlRender::readSql(system.file("sql/sql_server/CreateCohortTables.sql", package = "CohortGenerator", mustWork = TRUE))
sql <- SqlRender::readSql(system.file("sql/sql_server/CreateCohortTables.sql", utils::packageName(), mustWork = TRUE))
sql <- SqlRender::render(
sql = sql,
cohort_database_schema = cohortDatabaseSchema,
Expand Down
6 changes: 3 additions & 3 deletions R/NegativeControlCohorts.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ createEmptyNegativeControlOutcomeCohortSet <- function(verbose = FALSE) {
#' @noRd
#' @keywords internal
.getNegativeControlOutcomeCohortSetSpecification <- function() {
return(readCsv(system.file("negativeControlOutcomeCohortSetSpecificationDescription.csv",
package = "CohortGenerator",
return(readCsv(system.file("csv", "negativeControlOutcomeCohortSetSpecificationDescription.csv",
package = utils::packageName(),
mustWork = TRUE
)))
}
Expand Down Expand Up @@ -214,7 +214,7 @@ createNegativeControlOutcomesQuery <- function(connection,
cohortTable,
occurrenceType,
detectOnDescendants) {
sql <- sql <- SqlRender::readSql(system.file("sql/sql_server/NegativeControlOutcomes.sql", package = "CohortGenerator", mustWork = TRUE))
sql <- sql <- SqlRender::readSql(system.file("sql/sql_server/NegativeControlOutcomes.sql", utils::packageName(), mustWork = TRUE))
sql <- SqlRender::render(
sql = sql,
cdm_database_schema = cdmDatabaseSchema,
Expand Down
156 changes: 156 additions & 0 deletions R/ResultsDataModel.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Copyright 2024 Observational Health Data Sciences and Informatics
#
# This file is part of SelfControlledCaseSeries
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#' Get specifications for CohortGenerator results data model
#'
#' @return
#' A tibble data frame object with specifications
#'
#' @export
getResultsDataModelSpecifications <- function() {
resultsDataModelSpecifications <- readCsv(
file = system.file("csv", "resultsDataModelSpecification.csv", package = utils::packageName())
)
return(resultsDataModelSpecifications)
}

#' Create the results data model tables on a database server.
#'
#' @details
#' Only PostgreSQL and SQLite servers are supported.
#'
#' @param connectionDetails DatabaseConnector connectionDetails instance @seealso[DatabaseConnector::createConnectionDetails]
#' @param databaseSchema The schema on the server where the tables will be created.
#' @param tablePrefix (Optional) string to insert before table names for database table names
#' @export
createResultsDataModel <- function(connectionDetails = NULL,
databaseSchema,
tablePrefix = "") {
if (connectionDetails$dbms == "sqlite" & databaseSchema != "main") {
stop("Invalid schema for sqlite, use databaseSchema = 'main'")
}

connection <- DatabaseConnector::connect(connectionDetails)
on.exit(DatabaseConnector::disconnect(connection))

# Create first version of results model:
sql <- SqlRender::loadRenderTranslateSql(
sqlFilename = "CreateResultsDataModel.sql",
packageName = utils::packageName(),
dbms = connection@dbms,
database_schema = databaseSchema,
table_prefix = tablePrefix
)
DatabaseConnector::executeSql(connection, sql)
# Migrate to current version:
migrateDataModel(
connectionDetails = connectionDetails,
databaseSchema = databaseSchema,
tablePrefix = tablePrefix
)
}

#' Upload results to the database server.
#'
#' @description
#' Requires the results data model tables have been created using the \code{\link{createResultsDataModel}} function.
#'
#' @param connectionDetails An object of type \code{connectionDetails} as created using the
#' \code{\link[DatabaseConnector]{createConnectionDetails}} function in the
#' DatabaseConnector package.
#' @param schema The schema on the server where the tables have been created.
#' @param zipFileName The name of the zip file.
#' @param forceOverWriteOfSpecifications If TRUE, specifications of the phenotypes, cohort definitions, and analysis
#' will be overwritten if they already exist on the database. Only use this if these specifications
#' have changed since the last upload.
#' @param purgeSiteDataBeforeUploading If TRUE, before inserting data for a specific databaseId all the data for
#' that site will be dropped. This assumes the input zip file contains the full data for that
#' data site.
#' @param tempFolder A folder on the local file system where the zip files are extracted to. Will be cleaned
#' up when the function is finished. Can be used to specify a temp folder on a drive that
#' has sufficient space if the default system temp space is too limited.
#' @param tablePrefix (Optional) string to insert before table names for database table names
#' @param ... See ResultModelManager::uploadResults
#' @export
uploadResults <- function(connectionDetails,
schema,
zipFileName,
forceOverWriteOfSpecifications = FALSE,
purgeSiteDataBeforeUploading = TRUE,
tempFolder = tempdir(),
tablePrefix = "",
...) {
unzipFolder <- tempfile("unzipTempFolder", tmpdir = tempFolder)
dir.create(path = unzipFolder, recursive = TRUE)
on.exit(unlink(unzipFolder, recursive = TRUE), add = TRUE)

ParallelLogger::logInfo("Unzipping ", zipFileName)
zip::unzip(zipFileName, exdir = unzipFolder)

ResultModelManager::uploadResults(
connectionDetails = connectionDetails,
schema = schema,
resultsFolder = unzipFolder,
tablePrefix = tablePrefix,
forceOverWriteOfSpecifications = forceOverWriteOfSpecifications,
purgeSiteDataBeforeUploading = purgeSiteDataBeforeUploading,
runCheckAndFixCommands = FALSE,
specifications = getResultsDataModelSpecifications(),
warnOnMissingTable = FALSE,
...
)
}

#' Migrate Data model
#' @description
#' Migrate data from current state to next state
#'
#' It is strongly advised that you have a backup of all data (either sqlite files, a backup database (in the case you
#' are using a postgres backend) or have kept the csv/zip files from your data generation.
#'
#' @inheritParams getDataMigrator
#' @export
migrateDataModel <- function(connectionDetails, databaseSchema, tablePrefix = "") {
ParallelLogger::logInfo("Migrating data set")
migrator <- getDataMigrator(connectionDetails = connectionDetails,
databaseSchema = databaseSchema,
tablePrefix = tablePrefix)
migrator$executeMigrations()
migrator$finalize()
}

#' Get database migrations instance
#' @description
#'
#' Returns ResultModelManager DataMigrationsManager instance.
# '@seealso [ResultModelManager::DataMigrationManager] which this function is a utility for.
#'
#' @param connectionDetails DatabaseConnector connection details object
#' @param databaseSchema String schema where database schema lives
#' @param tablePrefix (Optional) Use if a table prefix is used before table names (e.g. "cd_")
#' @returns Instance of ResultModelManager::DataMigrationManager that has interface for converting existing data models
#' @export
getDataMigrator <- function(connectionDetails, databaseSchema, tablePrefix = "") {
ResultModelManager::DataMigrationManager$new(
connectionDetails = connectionDetails,
databaseSchema = databaseSchema,
tablePrefix = tablePrefix,
packageTablePrefix = "cg_",
migrationPath = "migrations",
packageName = utils::packageName()
)
}
2 changes: 1 addition & 1 deletion R/SubsetDefinitions.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ CohortSubsetDefinition <- R6::R6Class(
dropTables <- c(dropTables, targetTable)
}

sql <- c(sql, SqlRender::readSql(system.file("sql", "sql_server", "subsets", "CohortSubsetDefinition.sql", package = "CohortGenerator")))
sql <- c(sql, SqlRender::readSql(system.file("sql", "sql_server", "subsets", "CohortSubsetDefinition.sql", utils::packageName())))
# Cleanup after exectuion
for (table in dropTables) {
sql <- c(sql, SqlRender::render("DROP TABLE IF EXISTS @table;", table = table))
Expand Down
6 changes: 3 additions & 3 deletions R/SubsetQueryBuilders.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ CohortSubsetQb <- R6::R6Class(
inherit = QueryBuilder,
private = list(
innerQuery = function(targetTable) {
sql <- SqlRender::readSql(system.file("sql", "sql_server", "subsets", "CohortSubsetOperator.sql", package = "CohortGenerator"))
sql <- SqlRender::readSql(system.file("sql", "sql_server", "subsets", "CohortSubsetOperator.sql", utils::packageName()))
sql <- SqlRender::render(sql,
target_table = targetTable,
output_table = self$getTableObjectId(),
Expand Down Expand Up @@ -83,7 +83,7 @@ LimitSubsetQb <- R6::R6Class(
inherit = QueryBuilder,
private = list(
innerQuery = function(targetTable) {
sql <- SqlRender::readSql(system.file("sql", "sql_server", "subsets", "LimitSubsetOperator.sql", package = "CohortGenerator"))
sql <- SqlRender::readSql(system.file("sql", "sql_server", "subsets", "LimitSubsetOperator.sql", utils::packageName()))
sql <- SqlRender::render(sql,
calendar_end_date = ifelse(is.null(private$operator$calendarEndDate), yes = "0", no = "1"),
calendar_end_date_day = ifelse(is.null(private$operator$calendarEndDate), yes = "", no = lubridate::day(private$operator$calendarEndDate)),
Expand Down Expand Up @@ -111,7 +111,7 @@ DemographicSubsetQb <- R6::R6Class(
inherit = QueryBuilder,
private = list(
innerQuery = function(targetTable) {
sql <- SqlRender::readSql(system.file("sql", "sql_server", "subsets", "DemographicSubsetOperator.sql", package = "CohortGenerator"))
sql <- SqlRender::readSql(system.file("sql", "sql_server", "subsets", "DemographicSubsetOperator.sql", utils::packageName()))
sql <- SqlRender::render(sql,
target_table = targetTable,
output_table = self$getTableObjectId(),
Expand Down
46 changes: 46 additions & 0 deletions inst/csv/resultsDataModelSpecification.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
table_name,column_name,data_type,is_required,primary_key,min_cell_count,description
cohort_definition,cohort_definition_id,bigint,Yes,Yes,No,The unique identifier for the cohort definition
cohort_definition,cohort_name,varchar,Yes,No,No,The name of the cohort definition
cohort_definition,description,varchar,No,No,No,A description of the cohort definition
cohort_definition,json,text,No,No,No,The circe-be compiliant JSON expression
cohort_definition,sql_command,text,No,No,No,The OHDSI-SQL command used to construct the cohort
cohort_definition,subset_parent,bigint,No,No,No,The parent cohort id if this cohort is a subset
cohort_definition,is_subset,int,No,No,No,This value is 1 when the cohort is a subset
cohort_definition,subset_definition_id,bigint,No,No,No,The cohort subset definition
cohort_generation,cohort_id,bigint,Yes,Yes,No,The uniqe identifier for the cohort definition
cohort_generation,cohort_name,varchar,Yes,No,No,The name of the cohort generated
cohort_generation,generation_status,varchar,No,No,No,The cohort generation status
cohort_generation,start_time,Timestamp,No,No,No,The start time of the generation process
cohort_generation,end_time,Timestamp,No,No,No,The end time of the generation process
cohort_generation,database_id,varchar,Yes,Yes,No,The database idenifier for this information
cohort_inclusion,cohort_definition_id,bigint,bigint,Yes,No,The unique identifier for the cohort definition
cohort_inclusion,rule_sequence,int,Yes,Yes,No,The rule sequence for the inclusion rule
cohort_inclusion,name,varchar,Yes,Yes,No,The name of the inclusion rule
cohort_inclusion,description,varchar,No,No,No,The description of the inclusion rule
cohort_inc_result,database_id,varchar,Yes,Yes,No,The database idenifier for this information
cohort_inc_result,cohort_definition_id,bigint,Yes,Yes,No,The unique identifier for the cohort definition
cohort_inc_result,inclusion_rule_mask,int,Yes,Yes,No,A bit-mask for the inclusion rule
cohort_inc_result,person_count,bigint,Yes,Yes,Yes,The number of persons satisifying the inclusion rule
cohort_inc_result,mode_id,int,Yes,Yes,No,The mode of the inclusion rule.
cohort_inc_stats,database_id,varchar,Yes,Yes,No,The database idenifier for this information
cohort_inc_stats,cohort_definition_id,bigint,Yes,Yes,No,The unique identifier for the cohort definition
cohort_inc_stats,rule_sequence,int,Yes,Yes,No,The rule sequence
cohort_inc_stats,person_count,bigint,Yes,Yes,Yes,The person count
cohort_inc_stats,gain_count,bigint,Yes,Yes,No,The gain count
cohort_inc_stats,person_total,bigint,Yes,Yes,Yes,The person total
cohort_inc_stats,mode_id,int,Yes,Yes,No,The mode id
cohort_summary_stats,database_id,varchar,Yes,Yes,No,The database idenifier for this information
cohort_summary_stats,cohort_definition_id,bigint,Yes,Yes,No,The unique identifier for the cohort definition
cohort_summary_stats,base_count,bigint,Yes,Yes,Yes,The base count
cohort_summary_stats,final_count,bigint,Yes,Yes,Yes,The final count
cohort_summary_stats,mode_id,int,Yes,Yes,No,The mode id
cohort_censor_stats,cohort_definition_id,bigint,Yes,Yes,No,The unique identifier for the cohort definition
cohort_censor_stats,lost_count,bigint,Yes,Yes,Yes,The number lost due to censoring
cohort_count,database_id,varchar,Yes,Yes,No,The database idenifier for this information
cohort_count,cohort_id,bigint,Yes,Yes,No,The unique identifier for the cohort definition
cohort_count,cohort_entries,bigint,Yes,Yes,Yes,The number of cohort entries
cohort_count,cohort_subjects,bigint,Yes,Yes,Yes,The number of unique subjects
cohort_count_neg_ctrl,database_id,varchar,Yes,Yes,No,The database idenifier for this information
cohort_count_neg_ctrl,cohort_id,bigint,Yes,Yes,No,The unique identifier for the cohort definition
cohort_count_neg_ctrl,cohort_entries,bigint,Yes,Yes,Yes,The number of cohort entries
cohort_count_neg_ctrl,cohort_subjects,bigint,Yes,Yes,Yes,The number of unique subjects
Loading

0 comments on commit 7b34d03

Please sign in to comment.