diff --git a/.Rbuildignore b/.Rbuildignore index e91063e..a0ad27e 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -4,3 +4,11 @@ ^\.Rproj\.user$ ^\.idea$ ^\.github$ +_pkgdown\.yml +compare_versions +deploy.sh +docs +extras +man-roxygen +^cran-comments\.md$ +^CRAN-SUBMISSION$ diff --git a/.github/workflows/R_CMD_check_Hades.yaml b/.github/workflows/R_CMD_check_Hades.yaml index 564181c..df47d5f 100644 --- a/.github/workflows/R_CMD_check_Hades.yaml +++ b/.github/workflows/R_CMD_check_Hades.yaml @@ -20,9 +20,6 @@ jobs: fail-fast: false matrix: config: - - {os: windows-latest, r: '4.2.3', rtools: '42', rspm: "https://cloud.r-project.org"} - - {os: macOS-latest, r: '4.2.3', rtools: '42', rspm: "https://cloud.r-project.org"} - - {os: ubuntu-20.04, r: '4.2.3', rtools: '42', rspm: "https://packagemanager.rstudio.com/cran/__linux__/focal/latest"} - {os: windows-latest, r: 'release', rtools: '', rspm: "https://cloud.r-project.org"} - {os: macOS-latest, r: 'release', rtools: '', rspm: "https://cloud.r-project.org"} - {os: ubuntu-20.04, r: 'release', rtools: '', rspm: "https://packagemanager.rstudio.com/cran/__linux__/focal/latest"} @@ -50,15 +47,29 @@ jobs: CDM5_REDSHIFT_PASSWORD: ${{ secrets.CDM5_REDSHIFT_PASSWORD }} CDM5_REDSHIFT_SERVER: ${{ secrets.CDM5_REDSHIFT_SERVER }} CDM5_REDSHIFT_USER: ${{ secrets.CDM5_REDSHIFT_USER }} + CDM_SNOWFLAKE_CDM53_SCHEMA: ${{ secrets.CDM_SNOWFLAKE_CDM53_SCHEMA }} + CDM_SNOWFLAKE_OHDSI_SCHEMA: ${{ secrets.CDM_SNOWFLAKE_OHDSI_SCHEMA }} + CDM_SNOWFLAKE_PASSWORD: ${{ secrets.CDM_SNOWFLAKE_PASSWORD }} + CDM_SNOWFLAKE_CONNECTION_STRING: ${{ secrets.CDM_SNOWFLAKE_CONNECTION_STRING }} + CDM_SNOWFLAKE_USER: ${{ secrets.CDM_SNOWFLAKE_USER }} + CDM5_SPARK_USER: ${{ secrets.CDM5_SPARK_USER }} + CDM5_SPARK_PASSWORD: ${{ secrets.CDM5_SPARK_PASSWORD }} + CDM5_SPARK_CONNECTION_STRING: ${{ secrets.CDM5_SPARK_CONNECTION_STRING }} + CDM5_SPARK_CDM_SCHEMA: ${{ secrets.CDM5_SPARK_CDM_SCHEMA }} + CDM5_SPARK_OHDSI_SCHEMA: ${{ secrets.CDM5_SPARK_OHDSI_SCHEMA }} + CDM_BIG_QUERY_CONNECTION_STRING: ${{ secrets.CDM_BIG_QUERY_CONNECTION_STRING }} + CDM_BIG_QUERY_KEY_FILE: ${{ secrets.CDM_BIG_QUERY_KEY_FILE }} + CDM_BIG_QUERY_CDM_SCHEMA: ${{ secrets.CDM_BIG_QUERY_CDM_SCHEMA }} + CDM_BIG_QUERY_OHDSI_SCHEMA: ${{ secrets.CDM_BIG_QUERY_OHDSI_SCHEMA }} steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: r-lib/actions/setup-r@v2 with: r-version: ${{ matrix.config.r }} rtools-version: ${{ matrix.config.rtools }} - + - uses: r-lib/actions/setup-tinytex@v2 - uses: r-lib/actions/setup-pandoc@v2 @@ -91,6 +102,13 @@ jobs: eval sudo $cmd done < <(Rscript -e 'writeLines(remotes::system_requirements("ubuntu", "20.04"))') + - name: Setup Java + if: runner.os == 'macOS' + uses: actions/setup-java@v4 + with: + distribution: 'corretto' + java-version: '8' + - name: Install libssh if: runner.os == 'Linux' run: | @@ -98,6 +116,7 @@ jobs: - name: Install dependencies run: | + install.packages("cachem") remotes::install_deps(dependencies = TRUE, INSTALL_opts=c("--no-multiarch")) remotes::install_cran("rcmdcheck") shell: Rscript {0} @@ -121,14 +140,14 @@ jobs: - name: Upload check results if: failure() - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: ${{ runner.os }}-r${{ matrix.config.r }}-results path: check - name: Upload source package if: success() && runner.os == 'macOS' && github.event_name != 'pull_request' && github.ref == 'refs/heads/main' - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: package_tarball path: check/*.tar.gz @@ -192,7 +211,7 @@ jobs: - name: Download package tarball if: ${{ env.new_version != '' }} - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: package_tarball diff --git a/.github/workflows/R_CMD_check_main_weekly.yaml b/.github/workflows/R_CMD_check_main_weekly.yaml index ecf43ff..30c8dea 100644 --- a/.github/workflows/R_CMD_check_main_weekly.yaml +++ b/.github/workflows/R_CMD_check_main_weekly.yaml @@ -20,30 +20,41 @@ jobs: GITHUB_PAT: ${{ secrets.GH_TOKEN }} R_REMOTES_NO_ERRORS_FROM_WARNINGS: true RSPM: ${{ matrix.config.rspm }} - CDM5_ORACLE_CDM_SCHEMA: ${{ secrets.CDM5_ORACLE_CDM_SCHEMA }} + CDM5_ORACLE_CDM_SCHEMA: ${{ secrets.CDM5_ORACLE_CDM54_SCHEMA }} CDM5_ORACLE_OHDSI_SCHEMA: ${{ secrets.CDM5_ORACLE_OHDSI_SCHEMA }} CDM5_ORACLE_PASSWORD: ${{ secrets.CDM5_ORACLE_PASSWORD }} CDM5_ORACLE_SERVER: ${{ secrets.CDM5_ORACLE_SERVER }} CDM5_ORACLE_USER: ${{ secrets.CDM5_ORACLE_USER }} - CDM5_POSTGRESQL_CDM_SCHEMA: ${{ secrets.CDM5_POSTGRESQL_CDM_SCHEMA }} + CDM5_POSTGRESQL_CDM_SCHEMA: ${{ secrets.CDM5_POSTGRESQL_CDM54_SCHEMA }} CDM5_POSTGRESQL_OHDSI_SCHEMA: ${{ secrets.CDM5_POSTGRESQL_OHDSI_SCHEMA }} CDM5_POSTGRESQL_PASSWORD: ${{ secrets.CDM5_POSTGRESQL_PASSWORD }} CDM5_POSTGRESQL_SERVER: ${{ secrets.CDM5_POSTGRESQL_SERVER }} CDM5_POSTGRESQL_USER: ${{ secrets.CDM5_POSTGRESQL_USER }} - CDM5_SQL_SERVER_CDM_SCHEMA: ${{ secrets.CDM5_SQL_SERVER_CDM_SCHEMA }} + CDM5_SQL_SERVER_CDM_SCHEMA: ${{ secrets.CDM5_SQL_SERVER_CDM54_SCHEMA }} CDM5_SQL_SERVER_OHDSI_SCHEMA: ${{ secrets.CDM5_SQL_SERVER_OHDSI_SCHEMA }} CDM5_SQL_SERVER_PASSWORD: ${{ secrets.CDM5_SQL_SERVER_PASSWORD }} CDM5_SQL_SERVER_SERVER: ${{ secrets.CDM5_SQL_SERVER_SERVER }} CDM5_SQL_SERVER_USER: ${{ secrets.CDM5_SQL_SERVER_USER }} - CDM5_REDSHIFT_CDM_SCHEMA: ${{ secrets.CDM5_REDSHIFT_CDM_SCHEMA }} + CDM5_REDSHIFT_CDM_SCHEMA: ${{ secrets.CDM5_REDSHIFT_CDM54_SCHEMA }} CDM5_REDSHIFT_OHDSI_SCHEMA: ${{ secrets.CDM5_REDSHIFT_OHDSI_SCHEMA }} CDM5_REDSHIFT_PASSWORD: ${{ secrets.CDM5_REDSHIFT_PASSWORD }} CDM5_REDSHIFT_SERVER: ${{ secrets.CDM5_REDSHIFT_SERVER }} CDM5_REDSHIFT_USER: ${{ secrets.CDM5_REDSHIFT_USER }} + CDM_SNOWFLAKE_CDM53_SCHEMA: ${{ secrets.CDM_SNOWFLAKE_CDM53_SCHEMA }} + CDM_SNOWFLAKE_OHDSI_SCHEMA: ${{ secrets.CDM_SNOWFLAKE_OHDSI_SCHEMA }} + CDM_SNOWFLAKE_PASSWORD: ${{ secrets.CDM_SNOWFLAKE_PASSWORD }} + CDM_SNOWFLAKE_CONNECTION_STRING: ${{ secrets.CDM_SNOWFLAKE_CONNECTION_STRING }} + CDM_SNOWFLAKE_USER: ${{ secrets.CDM_SNOWFLAKE_USER }} CDM5_SPARK_USER: ${{ secrets.CDM5_SPARK_USER }} CDM5_SPARK_PASSWORD: ${{ secrets.CDM5_SPARK_PASSWORD }} CDM5_SPARK_CONNECTION_STRING: ${{ secrets.CDM5_SPARK_CONNECTION_STRING }} - + CDM5_SPARK_CDM_SCHEMA: ${{ secrets.CDM5_SPARK_CDM_SCHEMA }} + CDM5_SPARK_OHDSI_SCHEMA: ${{ secrets.CDM5_SPARK_OHDSI_SCHEMA }} + CDM_BIG_QUERY_CONNECTION_STRING: ${{ secrets.CDM_BIG_QUERY_CONNECTION_STRING }} + CDM_BIG_QUERY_KEY_FILE: ${{ secrets.CDM_BIG_QUERY_KEY_FILE }} + CDM_BIG_QUERY_CDM_SCHEMA: ${{ secrets.CDM_BIG_QUERY_CDM_SCHEMA }} + CDM_BIG_QUERY_OHDSI_SCHEMA: ${{ secrets.CDM_BIG_QUERY_OHDSI_SCHEMA }} + steps: - uses: actions/checkout@v2 diff --git a/CRAN-SUBMISSION b/CRAN-SUBMISSION new file mode 100644 index 0000000..15cff24 --- /dev/null +++ b/CRAN-SUBMISSION @@ -0,0 +1,3 @@ +Version: 0.11.0 +Date: 2024-09-09 13:18:20 UTC +SHA: 39c45bae218f8ffd983b9bc9a6a5914ad4f7f9df diff --git a/DESCRIPTION b/DESCRIPTION index 94d1c28..fbd83a7 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,8 +1,8 @@ Package: CohortGenerator Type: Package -Title: An R Package for Cohort Generation Against the OMOP CDM -Version: 0.8.1 -Date: 2023-10-10 +Title: Cohort Generation for the OMOP Common Data Model +Version: 0.11.2 +Date: 2024-09-30 Authors@R: c( person("Anthony", "Sena", email = "sena@ohdsi.org", role = c("aut", "cre")), person("Jamie", "Gilbert", role = c("aut")), @@ -11,40 +11,41 @@ Authors@R: c( person("Observational Health Data Science and Informatics", role = c("cph")) ) Maintainer: Anthony Sena -Description: An R package for that encapsulates the functions for generating cohorts against the OMOP CDM. +Description: Generate cohorts and subsets using an Observational + Medical Outcomes Partnership (OMOP) Common Data Model (CDM) Database. + Cohorts are defined using 'CIRCE' () or + SQL compatible with 'SqlRender' (). Depends: DatabaseConnector (>= 5.0.0), R (>= 3.6.0), R6 Imports: - bit64, checkmate, digest, dplyr, lubridate, + methods, ParallelLogger (>= 3.0.0), readr (>= 2.1.0), rlang, RJSONIO, jsonlite, + ResultModelManager, SqlRender (>= 1.11.1), - stringi (>= 1.7.6) + stringi (>= 1.7.6), + tibble Suggests: CirceR (>= 1.1.1), Eunomia, knitr, rmarkdown, - ROhdsiWebApi, testthat, - withr -Remotes: - ohdsi/CirceR, - ohdsi/Eunomia, - ohdsi/ROhdsiWebApi + withr, + zip License: Apache License VignetteBuilder: knitr URL: https://ohdsi.github.io/CohortGenerator/, https://github.com/OHDSI/CohortGenerator BugReports: https://github.com/OHDSI/CohortGenerator/issues -RoxygenNote: 7.2.3 +RoxygenNote: 7.3.1 Encoding: UTF-8 Language: en-US diff --git a/NAMESPACE b/NAMESPACE index b10a82b..62124bb 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -16,6 +16,7 @@ export(createDemographicSubset) export(createEmptyCohortDefinitionSet) export(createEmptyNegativeControlOutcomeCohortSet) export(createLimitSubset) +export(createResultsDataModel) export(createSubsetCohortWindow) export(dropCohortStatsTables) export(exportCohortStatsTables) @@ -23,9 +24,12 @@ export(generateCohortSet) export(generateNegativeControlOutcomeCohorts) export(getCohortCounts) export(getCohortDefinitionSet) +export(getCohortInclusionRules) export(getCohortStats) export(getCohortTableNames) +export(getDataMigrator) export(getRequiredTasks) +export(getResultsDataModelSpecifications) export(getSubsetDefinitions) export(insertInclusionRuleNames) export(isCamelCase) @@ -33,16 +37,21 @@ export(isCohortDefinitionSet) export(isFormattedForDatabaseUpload) export(isSnakeCase) export(isTaskRequired) +export(migrateDataModel) export(readCsv) export(recordTasksDone) +export(runCohortGeneration) +export(sampleCohortDefinitionSet) export(saveCohortDefinitionSet) export(saveCohortSubsetDefinition) export(saveIncremental) +export(uploadResults) export(writeCsv) import(DatabaseConnector) import(R6) -importFrom(dplyr,"%>%") +import(dplyr) importFrom(grDevices,rgb) +importFrom(methods,is) importFrom(rlang,':=') importFrom(rlang,.data) importFrom(stats,aggregate) diff --git a/NEWS.md b/NEWS.md index f1cb257..9271fa2 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,9 +1,65 @@ +CohortGenerator 0.11.2 +======================= + +- Ensure temp tables are dropped before creating them (#188) + +CohortGenerator 0.11.1 +======================= + +- CohortGenerator added to CRAN (#77) + +CohortGenerator 0.11.0 +======================= + +New Features + +- Add support for minimum cell count (#176) + +Bug Fixes + +- Multiple calls to export stats causing duplicates in cohort inclusion file (#179) +- Updates to subset documentation (#180, #181) +- Negative control outcome generation bug (#177) + +CohortGenerator 0.10.0 +======================= + +New Features + +- Add `runCohortGeneration` function (Issue #165) +- Adopt ResultModelManager for handling results data models & uploading. Extend results data model to include information on cohort subsets(#154, #162) +- Remove REMOTES entries for CirceR and Eunomia which are now in CRAN (#145) +- Unit tests now running on all OHDSI DB Platforms (#151) + +Bug Fixes + +- Negation of cohort subset operator must join on `subject_id` AND `start_date` (#167) +- Allow integer as cohort ID (#146) +- Use native messaging functions for output vs. ParallelLogger (#97) +- Prevent upload of inclusion rule information (#78) +- Expose `colTypes` when working with .csv files (#59) +- Remove `bit64` from package (mostly) (#152) +- Updated documentation for cohort subset negate feature (#111) + +CohortGenerator 0.9.0 +======================= +- Random sample functionality (for development only) (Issue #129) +- Incremental mode for negative control cohort generation (Issue #137) +- Fixes getCohortCounts() if cohortIds is not specified, but cohortDefinitionSet is. (Issue #136) +- Add cohort ID to generation output messages (Issue #132) +- Add databaseId to output of getStatsTable() (Issue #116) +- Prevent duplicate cohort IDs in cohortDefinitionSet (Issue #130) +- Fix cohort stats query for Oracle (Issue #143) +- Ensure databaseId applied to all returned cohort counts (Issue #144) +- Preserve backwards compatibility if cohort sample table is not in the list of cohort table names (Issue #147) + + CohortGenerator 0.8.1 ======================= - Include cohorts with 0 people in cohort counts (Issue #91). - Use numeric for cohort ID (Issue #98) - Allow big ints for target pairs (#103) -- Pass `tempEmulationSchema` when creating negative controlc ohorts (#104) +- Pass `tempEmulationSchema` when creating negative control cohorts (#104) - Target CDM v5.4 for unit tests (#119) - Fix for subset references (#115) - Allow for subset cohort name templating (#118) diff --git a/R/CohortConstruction.R b/R/CohortConstruction.R index 7a79ee4..ee4d9b7 100644 --- a/R/CohortConstruction.R +++ b/R/CohortConstruction.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -76,6 +76,13 @@ generateCohortSet <- function(connectionDetails = NULL, "sql" ) ) + assertLargeInteger(cohortDefinitionSet$cohortId) + # Verify that cohort IDs are not repeated in the cohort definition + # set before generating + if (length(unique(cohortDefinitionSet$cohortId)) != length(cohortDefinitionSet$cohortId)) { + duplicatedCohortIds <- cohortDefinitionSet$cohortId[duplicated(cohortDefinitionSet$cohortId)] + stop("Cannot generate! Duplicate cohort IDs found in your cohortDefinitionSet: ", paste(duplicatedCohortIds, sep = ","), ". Please fix your cohortDefinitionSet and try again.") + } if (is.null(connection) && is.null(connectionDetails)) { stop("You must provide either a database connection or the connection details.") } @@ -94,28 +101,7 @@ generateCohortSet <- function(connectionDetails = NULL, on.exit(DatabaseConnector::disconnect(connection)) } - # Verify the cohort tables exist and if they do not - # stop the generation process - tableExistsFlagList <- lapply(cohortTableNames, FUN = function(x) { - x <- FALSE - }) - tables <- DatabaseConnector::getTableNames(connection, cohortDatabaseSchema) - for (i in 1:length(cohortTableNames)) { - if (toupper(cohortTableNames[i]) %in% toupper(tables)) { - tableExistsFlagList[i] <- TRUE - } - } - - if (!all(unlist(tableExistsFlagList, use.names = FALSE))) { - errorMsg <- "The following tables have not been created: \n" - for (i in 1:length(cohortTableNames)) { - if (!tableExistsFlagList[[i]]) { - errorMsg <- paste0(errorMsg, " - ", cohortTableNames[i], "\n") - } - } - errorMsg <- paste(errorMsg, "Please use the createCohortTables function to ensure all tables exist before generating cohorts.", sep = "\n") - stop(errorMsg) - } + .checkCohortTables(connection, cohortDatabaseSchema, cohortTableNames) generatedTemplateCohorts <- c() if ("isTemplatedCohort" %in% colnames(cohortDefinitionSet)) { @@ -283,7 +269,7 @@ generateCohort <- function(cohortId = NULL, connection <- DatabaseConnector::connect(connectionDetails) on.exit(DatabaseConnector::disconnect(connection)) } - ParallelLogger::logInfo(i, "/", nrow(cohortDefinitionSet), "- Generating cohort: ", cohortName) + rlang::inform(paste0(i, "/", nrow(cohortDefinitionSet), "- Generating cohort: ", cohortName, " (id = ", cohortId, ")")) sql <- cohortDefinitionSet$sql[i] if (!isSubset) { diff --git a/R/CohortCount.R b/R/CohortCount.R index 3e44424..7adad6c 100644 --- a/R/CohortCount.R +++ b/R/CohortCount.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -27,7 +27,10 @@ #' @template CohortTable #' #' @param cohortIds The cohort Id(s) used to reference the cohort in the cohort -#' table. If left empty, all cohorts in the table will be included. +#' table. If left empty and no `cohortDefinitionSet` argument is +#' specified, all cohorts in the table will be included. If +#' you specify the `cohortIds` AND `cohortDefinitionSet`, the counts will +#' reflect the `cohortIds` from the `cohortDefinitionSet`. #' #' @template CohortDefinitionSet #' @@ -57,27 +60,37 @@ getCohortCounts <- function(connectionDetails = NULL, sql = sql, cohort_database_schema = cohortDatabaseSchema, cohort_table = cohortTable, - cohort_ids = cohortIds, - database_id = ifelse(test = is.null(databaseId), yes = "", no = databaseId) + cohort_ids = cohortIds ) sql <- SqlRender::translate(sql = sql, targetDialect = connection@dbms) tablesInServer <- tolower(DatabaseConnector::getTableNames(conn = connection, databaseSchema = cohortDatabaseSchema)) if (tolower(cohortTable) %in% tablesInServer) { counts <- DatabaseConnector::querySql(connection, sql, snakeCaseToCamelCase = TRUE) delta <- Sys.time() - start - ParallelLogger::logInfo(paste("Counting cohorts took", signif(delta, 3), attr(delta, "units"))) + rlang::inform(paste("Counting cohorts took", signif(delta, 3), attr(delta, "units"))) if (!is.null(cohortDefinitionSet)) { + # If the user has NOT specified a list of cohortIds + # to use to filter the cohortDefinitionSet, then + # extract the unique cohortIds + if (length(cohortIds) == 0) { + cohortIds <- cohortDefinitionSet$cohortId + } counts <- merge( x = counts, - y = cohortDefinitionSet[cohortDefinitionSet$cohortId %in% cohortIds, ], + y = cohortDefinitionSet[cohortDefinitionSet$cohortId %in% cohortIds, , drop = FALSE], by = "cohortId", all.y = TRUE ) - counts <- transform( - counts, - cohortEntries = ifelse(is.na(cohortEntries), 0L, cohortEntries), - cohortSubjects = ifelse(is.na(cohortSubjects), 0L, cohortSubjects) - ) + counts <- with(counts, { + transform( + counts, + cohortEntries = ifelse(is.na(cohortEntries), 0L, cohortEntries), + cohortSubjects = ifelse(is.na(cohortSubjects), 0L, cohortSubjects) + ) + }) + } + if (!is.null(databaseId)) { + counts$databaseId <- databaseId } return(counts) } else { diff --git a/R/CohortDefinitionSet.R b/R/CohortDefinitionSet.R index ac78915..6482cfd 100644 --- a/R/CohortDefinitionSet.R +++ b/R/CohortDefinitionSet.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -29,15 +29,41 @@ #' @export createEmptyCohortDefinitionSet <- function(verbose = FALSE) { checkmate::assert_logical(verbose) - cohortDefinitionSetSpec <- .getCohortDefinitionSetSpecification() + df <- data.frame( + cohortId = numeric(), + cohortName = character(), + sql = character(), + json = character() + ) if (verbose) { - print(cohortDefinitionSetSpec) + print(df) } - # Build the data.frame dynamically from the cohort definition set spec - df <- .createEmptyDataFrameFromSpecification(cohortDefinitionSetSpec) invisible(df) } +.cohortDefinitionSetHasRequiredColumns <- function(x, emitWarning = FALSE) { + checkmate::assert_data_frame(x) + df <- createEmptyCohortDefinitionSet(verbose = FALSE) + + # Compare the column names from the input x to an empty cohort + # definition set to ensure the required columns are present + cohortDefinitionSetColumns <- colnames(df) + matchingColumns <- intersect(x = colnames(x), y = cohortDefinitionSetColumns) + columnNamesMatch <- setequal(matchingColumns, cohortDefinitionSetColumns) + + if (!columnNamesMatch && emitWarning) { + columnsMissing <- setdiff(x = cohortDefinitionSetColumns, y = colnames(x)) + warningMessage <- paste0( + "The following columns were missing in your cohortDefinitionSet: ", + paste(columnsMissing, collapse = ","), + ". A cohortDefinitionSet requires the following columns: ", + paste(cohortDefinitionSetColumns, collapse = ",") + ) + warning(warningMessage) + } + invisible(columnNamesMatch) +} + #' Is the data.frame a cohort definition set? #' #' @description @@ -99,7 +125,6 @@ checkAndFixCohortDefinitionSetDataTypes <- function(x, fixDataTypes = TRUE, emit checkmate::assert_data_frame(x) df <- createEmptyCohortDefinitionSet(verbose = FALSE) cohortDefinitionSetColumns <- colnames(df) - cohortDefinitionSetSpec <- .getCohortDefinitionSetSpecification() columnNamesMatch <- .cohortDefinitionSetHasRequiredColumns(x = x, emitWarning = emitWarning) if (!columnNamesMatch) { @@ -107,7 +132,8 @@ checkAndFixCohortDefinitionSetDataTypes <- function(x, fixDataTypes = TRUE, emit } # Compare the data types from the input x to an empty cohort - # definition set to ensure the same data types are present + # definition set to ensure the same data types (or close enough) + # are present dataTypesMatch <- FALSE # Subset x to the required columns xSubset <- x[, cohortDefinitionSetColumns] @@ -116,7 +142,14 @@ checkAndFixCohortDefinitionSetDataTypes <- function(x, fixDataTypes = TRUE, emit # Get the reference data types cohortDefinitionSetDataTypes <- sapply(df, typeof) # Check if the data types match - dataTypesMatch <- identical(x = xDataTypes, y = cohortDefinitionSetDataTypes) + # NOTE: createEmptyCohortDefinitionSet() is the reference for the data + # types. cohortId is declared as a numeric but an integer is also fine + dataTypesMatch <- (xDataTypes[1] %in% c("integer", "double") && all(xDataTypes[2:4] == "character")) + # Create the cohortDefinitionSetSpec from the names/data types for reference + cohortDefinitionSetSpec <- data.frame( + columnName = names(xDataTypes), + dataType = xDataTypes + ) if (!dataTypesMatch && emitWarning) { dataTypesMismatch <- setdiff(x = cohortDefinitionSetDataTypes, y = xDataTypes) # Create a column for the warning message @@ -145,50 +178,6 @@ checkAndFixCohortDefinitionSetDataTypes <- function(x, fixDataTypes = TRUE, emit )) } -.cohortDefinitionSetHasRequiredColumns <- function(x, emitWarning = FALSE) { - checkmate::assert_data_frame(x) - df <- createEmptyCohortDefinitionSet(verbose = FALSE) - cohortDefinitionSetSpec <- .getCohortDefinitionSetSpecification() - - # Compare the column names from the input x to an empty cohort - # definition set to ensure the required columns are present - cohortDefinitionSetColumns <- colnames(df) - matchingColumns <- intersect(x = colnames(x), y = cohortDefinitionSetColumns) - columnNamesMatch <- setequal(matchingColumns, cohortDefinitionSetColumns) - - if (!columnNamesMatch && emitWarning) { - columnsMissing <- setdiff(x = cohortDefinitionSetColumns, y = colnames(x)) - warningMessage <- paste0( - "The following columns were missing in your cohortDefinitionSet: ", - paste(columnsMissing, collapse = ","), - ". A cohortDefinitionSet requires the following columns: ", - paste(cohortDefinitionSetColumns, collapse = ",") - ) - warning(warningMessage) - } - invisible(columnNamesMatch) -} - -#' Helper function to return the specification description of a -#' cohortDefinitionSet -#' -#' @description -#' This function reads from the cohortDefinitionSetSpecificationDescription.csv -#' to return a data.frame that describes the required columns in a -#' cohortDefinitionSet -#' -#' @return -#' Returns a data.frame that defines a cohortDefinitionSet -#' -#' @noRd -#' @keywords internal -.getCohortDefinitionSetSpecification <- function() { - return(readCsv(system.file("cohortDefinitionSetSpecificationDescription.csv", - package = "CohortGenerator", - mustWork = TRUE - ))) -} - #' Get a cohort definition set #' #' @description @@ -244,7 +233,7 @@ getCohortDefinitionSet <- function(settingsFileName = "Cohorts.csv", path <- system.file(fileName, package = packageName) } if (verbose) { - ParallelLogger::logInfo(paste0(" -- Loading ", basename(fileName), " from ", path)) + rlang::inform(paste0(" -- Loading ", basename(fileName), " from ", path)) } if (!file.exists(path)) { if (grepl(".json$", tolower(basename(fileName))) && warnOnMissingJson) { @@ -259,10 +248,10 @@ getCohortDefinitionSet <- function(settingsFileName = "Cohorts.csv", } # Read the settings file which holds the cohortDefinitionSet - ParallelLogger::logInfo("Loading cohortDefinitionSet") + rlang::inform("Loading cohortDefinitionSet") settings <- readCsv(file = getPath(fileName = settingsFileName), warnOnCaseMismatch = FALSE) - assert_settings_columns(names(settings), getPath(fileName = settingsFileName)) + assertSettingsColumns(names(settings), getPath(fileName = settingsFileName)) checkmate::assert_true(all(cohortFileNameValue %in% names(settings))) checkmate::assert_true((!all(.getFileDataColumns() %in% names(settings)))) @@ -313,12 +302,12 @@ getCohortDefinitionSet <- function(settingsFileName = "Cohorts.csv", # Loading cohort subset definitions with their associated targets if (loadSubsets & nrow(subsetsToLoad) > 0) { if (dir.exists(subsetJsonFolder)) { - ParallelLogger::logInfo("Loading Cohort Subset Definitions") + rlang::inform("Loading Cohort Subset Definitions") ## Loading subsets that apply to the saved definition sets for (i in unique(subsetsToLoad$subsetDefinitionId)) { subsetFile <- file.path(subsetJsonFolder, paste0(i, ".json")) - ParallelLogger::logInfo("Loading Cohort Subset Defintion ", subsetFile) + rlang::inform(paste0("Loading Cohort Subset Defintion ", subsetFile)) subsetDef <- CohortSubsetDefinition$new(ParallelLogger::loadSettingsFromJson(subsetFile)) # Find target cohorts for this subset definition subsetTargetIds <- unique(subsetsToLoad[subsetsToLoad$subsetDefinitionId == i, ]$subsetParent) @@ -382,7 +371,7 @@ saveCohortDefinitionSet <- function(cohortDefinitionSet, checkmate::assertDataFrame(cohortDefinitionSet, min.rows = 1, col.names = "named") checkmate::assert_vector(cohortFileNameValue) checkmate::assert_true(length(cohortFileNameValue) > 0) - assert_settings_columns(names(cohortDefinitionSet)) + assertSettingsColumns(names(cohortDefinitionSet)) checkmate::assert_true(all(cohortFileNameValue %in% names(cohortDefinitionSet))) if (length(.getTemplateDefinitions(cohortDefinitionSet)) > 0) { @@ -404,7 +393,7 @@ saveCohortDefinitionSet <- function(cohortDefinitionSet, # Export the cohortDefinitionSet to the settings folder if (verbose) { - ParallelLogger::logInfo("Exporting cohortDefinitionSet to ", settingsFileName) + rlang::inform(paste0("Exporting cohortDefinitionSet to ", settingsFileName)) } # Write the settings file and ensure that the "sql" and "json" columns are # not included @@ -432,7 +421,7 @@ saveCohortDefinitionSet <- function(cohortDefinitionSet, } if (verbose) { - ParallelLogger::logInfo("Exporting (", i, "/", nrow(cohortDefinitionSet), "): ", cohortName) + rlang::inform(paste0("Exporting (", i, "/", nrow(cohortDefinitionSet), "): ", cohortName)) } if (!is.na(json) && nchar(json) > 0) { @@ -448,7 +437,7 @@ saveCohortDefinitionSet <- function(cohortDefinitionSet, } } - ParallelLogger::logInfo("Cohort definition saved") + rlang::inform("Cohort definition saved") } .getSettingsFileRequiredColumns <- function() { @@ -514,17 +503,49 @@ checkSettingsColumns <- function(columnNames, settingsFileName = NULL) { } } -.createEmptyDataFrameFromSpecification <- function(specifications) { - # Build the data.frame dynamically from the cohort definition set spec - df <- data.frame() - for (i in 1:nrow(specifications)) { - colName <- specifications$columnName[i] - dataType <- specifications$dataType[i] - if (dataType == "integer64") { - df <- df %>% dplyr::mutate(!!colName := do.call(what = bit64::as.integer64, args = list())) - } else { - df <- df %>% dplyr::mutate(!!colName := do.call(what = dataType, args = list())) - } +#' Custom checkmate assertion for ensuring a vector contains only integer numbers, +#' including large ones +#' +#' @description +#' This function is used to provide a more informative message to inform +#' a user that their number must be an integer. Since the +#' cohort definition set allows for storing `numeric` data types, we need +#' to make sure that there are no digits in the mantissa of the cohort ID. +#' NOTE: This function is necessary since checkmate::assert_integerish +#' will still throw an error even in the case where you have a large +#' integer which was not desirable. +#' +#' @param x The vector containing integer/numeric values +#' +#' @param columnName The name of the column where this vector came from. This +#' is used when displaying the error message. +#' @return +#' Returns TRUE if all the values in x are integers +#' @noRd +#' @keywords internal +checkLargeInteger <- function(x, columnName = "cohortId") { + # NOTE: suppressWarnings used to mask + # warning from R which may happen for + # large values in X. + res <- all(suppressWarnings(x %% 1) == 0) + if (!isTRUE(res)) { + errorMessage <- paste0("The column ", columnName, " included non-integer values. Please update and re-try") + return(errorMessage) + } else { + return(TRUE) } - invisible(df) +} + +.copySubsetDefinitions <- function(copyToCds, copyFromCds) { + # deep clone any subset definitions + if (hasSubsetDefinitions(copyFromCds)) { + subsetDefintiions <- list() + Map(function(subsetDefinition) { + subsetDefintiions[[length(subsetDefintiions) + 1]] <- subsetDefinition$clone(deep = TRUE) + }, attr(copyFromCds, "cohortSubsetDefinitions")) + attr(copyToCds, "cohortSubsetDefinitions") <- subsetDefintiions + attr(copyToCds, "hasSubsetDefinitions") <- TRUE + } + + copyToCds } diff --git a/R/CohortGenerator.R b/R/CohortGenerator.R index 3a1cc51..d4588b4 100644 --- a/R/CohortGenerator.R +++ b/R/CohortGenerator.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -19,12 +19,14 @@ #' @import DatabaseConnector #' @import R6 +#' @import dplyr #' @importFrom grDevices rgb +#' @importFrom methods is #' @importFrom stats aggregate setNames #' @importFrom utils write.csv install.packages menu packageVersion sessionInfo -#' @importFrom dplyr "%>%" #' @importFrom rlang .data ':=' NULL # Add custom assertions -assert_settings_columns <- checkmate::makeAssertionFunction(checkSettingsColumns) +assertSettingsColumns <- checkmate::makeAssertionFunction(checkSettingsColumns) +assertLargeInteger <- checkmate::makeAssertionFunction(checkLargeInteger) diff --git a/R/CohortSample.R b/R/CohortSample.R new file mode 100644 index 0000000..e522244 --- /dev/null +++ b/R/CohortSample.R @@ -0,0 +1,302 @@ +# Copyright 2024 Observational Health Data Sciences and Informatics +# +# This file is part of CohortGenerator +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +#' Get sample set for a given cohort. +#' @description +#' Returns a set of integers of size n unless count is less than n, in which case it returns count integers +#' +#' @noRd +.getSampleSet <- function(connection, + n = NULL, + sampleFraction = NULL, + seed, + seedArgs, + cohortDatabaseSchema, + targetCohortId, + targetTable) { + countSql <- "SELECT COUNT(DISTINCT SUBJECT_ID) as cnt FROM @cohort_database_schema.@target_table + WHERE cohort_definition_id = @target_cohort_id" + count <- DatabaseConnector::renderTranslateQuerySql(connection, + countSql, + cohort_database_schema = cohortDatabaseSchema, + target_cohort_id = targetCohortId, + target_table = targetTable + ) %>% + dplyr::pull() + + if (!is.null(sampleFraction)) { + n <- round(count * sampleFraction) + } + + if (count > n) { + if (is.null(seedArgs)) { + seedArgs <- list() + } + seedArgs$seed <- seed + do.call(set.seed, seedArgs) + return(data.frame(rand_id = sort(sample(1:count, n, replace = FALSE)))) + } else if (count == 0) { + return(data.frame()) + } + return(data.frame(rand_id = 1:count)) +} + +#' Sample cohort +#' @description +#' Samples a cohort with a specified integer set +#' @noRd +.sampleCohort <- function(connection, + targetCohortId, + targetTable, + outputCohortId, + outputTable, + cohortDatabaseSchema, + outputDatabaseSchema, + sampleTable, + seed, + tempEmulationSchema) { + randSampleTableName <- paste0("#SAMPLE_TABLE_", seed) + DatabaseConnector::insertTable( + connection = connection, + data = sampleTable, + dropTableIfExists = TRUE, + tempTable = TRUE, + tempEmulationSchema = tempEmulationSchema, + tableName = randSampleTableName + ) + + execSql <- SqlRender::readSql(system.file("sql", "sql_server", "sampling", "RandomSample.sql", package = "CohortGenerator")) + DatabaseConnector::renderTranslateExecuteSql(connection, + execSql, + tempEmulationSchema = tempEmulationSchema, + random_sample_table = randSampleTableName, + target_cohort_id = targetCohortId, + output_cohort_id = outputCohortId, + cohort_database_schema = cohortDatabaseSchema, + output_database_schema = outputDatabaseSchema, + output_table = outputTable, + target_table = targetTable + ) +} + + +.computeIdentifierExpression <- function(identifierExpression, cohortId, seed) { + allowed_vars <- c("cohortId", "seed") + vars_in_expression <- intersect(all.vars(parse(text = identifierExpression)), allowed_vars) + + if (length(setdiff(all.vars(parse(text = identifierExpression)), vars_in_expression)) > 0) { + stop("Invalid variable in expression.") + } + + expr <- parse(text = identifierExpression) + result <- eval(expr, list(cohortId = cohortId, seed = seed)) + return(result) +} + + +.checkUniqueOutputIds <- function(cohortIds, seed, identifierExpression, cohortTableNames) { + idSet <- Map(function(.x, .s) { + .computeIdentifierExpression(identifierExpression, .x, .s) + }, cohortIds, seed) + + # If output is a different table to base table then target ids don't need to be distinct from output + if (cohortTableNames$cohortTable == cohortTableNames$cohortSampleTable) { + idSet <- c(idSet, cohortIds) + } + errorMessage <- "identifier expression does not produce unique output for cohort ids" + if (length(unique(idSet)) != length(idSet)) stop(errorMessage) + invisible(NULL) +} + +#' Sample Cohort Definition Set +#' +#' @description +#' Create 1 or more sample of size n of a cohort definition set +#' +#' Subsetted cohorts can be sampled, as with any other subset form. +#' However, subsetting a sampled cohort is not recommended and not currently supported at this time. +#' In the case where n > cohort count the entire cohort is copied unmodified +#' +#' As different databases have different forms of randomness, the random selection is computed in +#' R, based on the count for each cohort. This is, therefore, db platform independent +#' +#' Note, this function assumes cohorts have already been generated. +#' +#' Lifecycle Note: This functionality is considered experimental and not intended for use inside analytic packages +#' +#' @param n Sample size. Ignored if sample fraction is set +#' @param sampleFraction Fraction of cohort to sample +#' @param identifierExpression Optional string R expression used to compute output cohort id. Can only use variables +#' cohortId and seed. Default is "cohortId * 1000 + seed", which is substituted and evaluated +#' @param cohortIds Optional subset of cohortIds to generate. By default this function will sample all cohorts +#' @param seed Vector of seeds to give to the R pseudorandom number generator +#' @param seedArgs optional arguments to pass to set.seed +#' @param outputDatabaseSchema optional schema to output cohorts to (if different from cohortDatabaseSchema) +#' @export +#' @returns sampledCohortDefinitionSet - a data.frame like object that contains the resulting identifiers and modified names of cohorts +#' @inheritParams generateCohortSet +sampleCohortDefinitionSet <- function(cohortDefinitionSet, + cohortIds = cohortDefinitionSet$cohortId, + connectionDetails = NULL, + connection = NULL, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), + cohortDatabaseSchema, + outputDatabaseSchema = cohortDatabaseSchema, + cohortTableNames = getCohortTableNames(), + n = NULL, + sampleFraction = NULL, + seed = 64374, + seedArgs = NULL, + identifierExpression = "cohortId * 1000 + seed", + incremental = FALSE, + incrementalFolder = NULL) { + checkmate::assertIntegerish(n, len = 1, null.ok = TRUE) + checkmate::assertNumeric(sampleFraction, len = 1, null.ok = TRUE, lower = 0, upper = 1.0) + checkmate::assertIntegerish(seed, min.len = 1) + checkmate::assertDataFrame(cohortDefinitionSet, min.rows = 1, col.names = "named") + checkmate::assertNames(colnames(cohortDefinitionSet), + must.include = c( + "cohortId", + "cohortName", + "sql" + ) + ) + + if (is.null(n) && is.null(sampleFraction)) { + stop("Must specificy n or fraction size") + } + + if (is.null(connection) && is.null(connectionDetails)) { + stop("You must provide either a database connection or the connection details.") + } + + if (incremental) { + if (is.null(incrementalFolder)) { + stop("Must specify incrementalFolder when incremental = TRUE") + } + if (!file.exists(incrementalFolder)) { + dir.create(incrementalFolder, recursive = TRUE) + } + + recordKeepingFile <- file.path(incrementalFolder, "GeneratedCohortSamples.csv") + } + # check uniqueness of output ids + .checkUniqueOutputIds(cohortDefinitionSet$cohortIds, seed, identifierExpression, cohortTableNames) + + start <- Sys.time() + if (is.null(connection)) { + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + } + + .checkCohortTables(connection, cohortDatabaseSchema, cohortTableNames) + sampledCohorts <- + base::Map(function(seed, targetCohortId) { + sampledCohortDefinition <- cohortDefinitionSet %>% + dplyr::filter(.data$cohortId == targetCohortId) + + sampledCohortDefinition$isSample <- TRUE + sampledCohortDefinition$status <- "ungenerated" + outputCohortId <- .computeIdentifierExpression( + identifierExpression, + sampledCohortDefinition$cohortId, + seed + ) + sampledCohortDefinition$sampleTargetCohortId <- sampledCohortDefinition$cohortId + sampledCohortDefinition$cohortId <- outputCohortId + + if (!is.null(sampleFraction)) { + sampledCohortDefinition$cohortName <- sprintf( + "%s [%s%% SAMPLE seed=%s]", + sampledCohortDefinition$cohortName, seed, sampleFraction * 100 + ) + } else { + sampledCohortDefinition$cohortName <- sprintf( + "%s [SAMPLE seed=%s n=%s]", + sampledCohortDefinition$cohortName, seed, n + ) + } + + if (hasSubsetDefinitions(cohortDefinitionSet)) { + # must maintain mapping for subset parent ids + sampledCohortDefinition$subsetParent <- .computeIdentifierExpression( + identifierExpression, + sampledCohortDefinition$subsetParent, + seed + ) + } + + if (incremental && !isTaskRequired( + cohortId = outputCohortId, + seed = seed, + checksum = computeChecksum(paste0(sampledCohortDefinition$sql, n, seed, outputCohortId)), + recordKeepingFile = recordKeepingFile + )) { + sampledCohortDefinition$status <- "skipped" + return(sampledCohortDefinition) + } + # check incremental task for cohort sampling + sampleTable <- .getSampleSet( + connection = connection, + n = n, + sampleFraction = sampleFraction, + seed = seed + targetCohortId, # Seed is unique to each target cohort + seedArgs = seedArgs, + cohortDatabaseSchema = cohortDatabaseSchema, + targetCohortId = targetCohortId, + targetTable = cohortTableNames$cohortTable + ) + + if (nrow(sampleTable) == 0) { + rlang::inform(paste0("No entires found for ", targetCohortId, " was it generated?")) + return(sampledCohortDefinition) + } + # Called only for side effects + .sampleCohort( + connection = connection, + targetCohortId = targetCohortId, + targetTable = cohortTableNames$cohortTable, + outputCohortId = outputCohortId, + outputTable = cohortTableNames$cohortSampleTable, + cohortDatabaseSchema = cohortDatabaseSchema, + outputDatabaseSchema = outputDatabaseSchema, + sampleTable = sampleTable, + seed = seed + targetCohortId, # Seed is unique to each target cohort + tempEmulationSchema = tempEmulationSchema + ) + + sampledCohortDefinition$status <- "generated" + if (incremental) { + recordTasksDone( + cohortId = sampledCohortDefinition$cohortId, + seed = seed, + checksum = computeChecksum(paste0(sampledCohortDefinition$sql, n, seed, outputCohortId)), + recordKeepingFile = recordKeepingFile + ) + } + return(sampledCohortDefinition) + }, seed, cohortIds) %>% + dplyr::bind_rows() + + + + attr(sampledCohorts, "isSampledCohortDefinition") <- TRUE + sampledCohorts <- .copySubsetDefinitions(sampledCohorts, cohortDefinitionSet) + delta <- Sys.time() - start + writeLines(paste("Generating sample set took", round(delta, 2), attr(delta, "units"))) + return(sampledCohorts) +} diff --git a/R/CohortStats.R b/R/CohortStats.R index 7659fa9..3fe0ac2 100644 --- a/R/CohortStats.R +++ b/R/CohortStats.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -47,14 +47,6 @@ insertInclusionRuleNames <- function(connectionDetails = NULL, stop("You must provide either a database connection or the connection details.") } - checkmate::assertDataFrame(cohortDefinitionSet, min.rows = 1, col.names = "named") - checkmate::assertNames(colnames(cohortDefinitionSet), - must.include = c( - "cohortId", - "cohortName", - "json" - ) - ) if (is.null(connection)) { connection <- DatabaseConnector::connect(connectionDetails) on.exit(DatabaseConnector::disconnect(connection)) @@ -65,44 +57,7 @@ insertInclusionRuleNames <- function(connectionDetails = NULL, stop(paste0(cohortInclusionTable, " table not found in schema: ", cohortDatabaseSchema, ". Please make sure the table is created using the createCohortTables() function before calling this function.")) } - # Assemble the cohort inclusion rules - # NOTE: This data frame must match the @cohort_inclusion_table - # structure as defined in inst/sql/sql_server/CreateCohortTables.sql - inclusionRules <- data.frame( - cohortDefinitionId = bit64::integer64(), - ruleSequence = integer(), - name = character(), - description = character() - ) - # Remove any cohort definitions that do not include the JSON property - cohortDefinitionSet <- cohortDefinitionSet[!(is.null(cohortDefinitionSet$json) | is.na(cohortDefinitionSet$json)), ] - for (i in 1:nrow(cohortDefinitionSet)) { - cohortDefinition <- RJSONIO::fromJSON(content = cohortDefinitionSet$json[i], digits = 23) - if (!is.null(cohortDefinition$InclusionRules)) { - nrOfRules <- length(cohortDefinition$InclusionRules) - if (nrOfRules > 0) { - for (j in 1:nrOfRules) { - ruleName <- cohortDefinition$InclusionRules[[j]]$name - ruleDescription <- cohortDefinition$InclusionRules[[j]]$description - if (is.na(ruleName) || ruleName == "") { - ruleName <- paste0("Unamed rule (Sequence ", j - 1, ")") - } - if (is.null(ruleDescription)) { - ruleDescription <- "" - } - inclusionRules <- rbind( - inclusionRules, - data.frame( - cohortDefinitionId = bit64::as.integer64(cohortDefinitionSet$cohortId[i]), - ruleSequence = as.integer(j - 1), - name = ruleName, - description = ruleDescription - ) - ) - } - } - } - } + inclusionRules <- getCohortInclusionRules(cohortDefinitionSet) # Remove any existing data to prevent duplication DatabaseConnector::renderTranslateExecuteSql( @@ -116,7 +71,7 @@ insertInclusionRuleNames <- function(connectionDetails = NULL, # Insert the inclusion rules if (nrow(inclusionRules) > 0) { - ParallelLogger::logInfo("Inserting inclusion rule names") + rlang::inform("Inserting inclusion rule names") DatabaseConnector::insertTable( connection = connection, databaseSchema = cohortDatabaseSchema, @@ -152,8 +107,8 @@ getStatsTable <- function(connectionDetails, databaseId <- NULL } - ParallelLogger::logInfo("- Fetching data from ", table) - sql <- "SELECT {@database_id != ''}?{CAST('@database_id' as VARCHAR(255)) as database_id,} * FROM @cohort_database_schema.@table" + rlang::inform(paste0("- Fetching data from ", table)) + sql <- "SELECT {@database_id != ''}?{CAST('@database_id' as VARCHAR(255)) as database_id,} t.* FROM @cohort_database_schema.@table t" data <- DatabaseConnector::renderTranslateQuerySql( sql = sql, connection = connection, @@ -174,6 +129,7 @@ getStatsTable <- function(connectionDetails, } #' Get Cohort Inclusion Stats Table Data +#' #' @description #' This function returns a data frame of the data in the Cohort Inclusion Tables. #' Results are organized in to a list with 5 different data frames: @@ -238,8 +194,73 @@ getCohortStats <- function(connectionDetails, cohortDatabaseSchema = cohortDatabaseSchema, table = cohortTableNames[[table]], snakeCaseToCamelCase = snakeCaseToCamelCase, - includeDatabaseId = includeDatabaseId + includeDatabaseId = includeDatabaseId, + databaseId = databaseId ) } return(results) } + + +#' Get Cohort Inclusion Rules from a cohort definition set +#' +#' @description +#' This function returns a data frame of the inclusion rules defined +#' in a cohort definition set. +#' +#' @md +#' @template CohortDefinitionSet +#' +#' @export +getCohortInclusionRules <- function(cohortDefinitionSet) { + checkmate::assertDataFrame(cohortDefinitionSet, min.rows = 1, col.names = "named") + checkmate::assertNames(colnames(cohortDefinitionSet), + must.include = c( + "cohortId", + "cohortName", + "json" + ) + ) + + # Assemble the cohort inclusion rules + # NOTE: This data frame must match the @cohort_inclusion_table + # structure as defined in inst/sql/sql_server/CreateCohortTables.sql + inclusionRules <- data.frame( + cohortDefinitionId = numeric(), + ruleSequence = integer(), + name = character(), + description = character() + ) + + # Remove any cohort definitions that do not include the JSON property + cohortDefinitionSet <- cohortDefinitionSet[!(is.null(cohortDefinitionSet$json) | is.na(cohortDefinitionSet$json)), ] + for (i in 1:nrow(cohortDefinitionSet)) { + cohortDefinition <- RJSONIO::fromJSON(content = cohortDefinitionSet$json[i], digits = 23) + if (!is.null(cohortDefinition$InclusionRules)) { + nrOfRules <- length(cohortDefinition$InclusionRules) + if (nrOfRules > 0) { + for (j in 1:nrOfRules) { + ruleName <- cohortDefinition$InclusionRules[[j]]$name + ruleDescription <- cohortDefinition$InclusionRules[[j]]$description + if (is.na(ruleName) || ruleName == "") { + ruleName <- paste0("Unamed rule (Sequence ", j - 1, ")") + } + if (is.null(ruleDescription)) { + ruleDescription <- "" + } + inclusionRules <- rbind( + inclusionRules, + data.frame( + cohortDefinitionId = as.numeric(cohortDefinitionSet$cohortId[i]), + ruleSequence = as.integer(j - 1), + name = ruleName, + description = ruleDescription + ) + ) + } + } + } + } + + invisible(inclusionRules) +} diff --git a/R/CohortTables.R b/R/CohortTables.R index ee88ee2..19a57c7 100644 --- a/R/CohortTables.R +++ b/R/CohortTables.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -23,6 +23,7 @@ #' and cohort statistics tables. #' #' @param cohortTable Name of the cohort table. +#' @param cohortSampleTable Name of the cohort table for sampled cohorts (defaults to the same as the cohort table). #' #' @param cohortInclusionTable Name of the inclusion table, one of the tables for storing #' inclusion rule statistics. @@ -40,6 +41,7 @@ #' #' @export getCohortTableNames <- function(cohortTable = "cohort", + cohortSampleTable = cohortTable, cohortInclusionTable = paste0(cohortTable, "_inclusion"), cohortInclusionResultTable = paste0(cohortTable, "_inclusion_result"), cohortInclusionStatsTable = paste0(cohortTable, "_inclusion_stats"), @@ -47,6 +49,7 @@ getCohortTableNames <- function(cohortTable = "cohort", cohortCensorStatsTable = paste0(cohortTable, "_censor_stats")) { return(list( cohortTable = cohortTable, + cohortSampleTable = cohortSampleTable, cohortInclusionTable = cohortInclusionTable, cohortInclusionResultTable = cohortInclusionResultTable, cohortInclusionStatsTable = cohortInclusionStatsTable, @@ -95,24 +98,31 @@ createCohortTables <- function(connectionDetails = NULL, for (i in 1:length(cohortTableNames)) { if (toupper(cohortTableNames[i]) %in% toupper(tables)) { createTableFlagList[i] <- FALSE - ParallelLogger::logInfo("Table \"", cohortTableNames[i], "\" already exists and in incremental mode, so not recreating it.") + rlang::inform(paste0("Table \"", cohortTableNames[i], "\" already exists and in incremental mode, so not recreating it.")) } } } if (any(unlist(createTableFlagList, use.names = FALSE))) { - ParallelLogger::logInfo("Creating cohort tables") + rlang::inform("Creating cohort tables") + createSampleTable <- ifelse( + test = is.null(createTableFlagList$cohortSampleTable), + yes = FALSE, + no = (createTableFlagList$cohortSampleTable && cohortTableNames$cohortSampleTable != cohortTableNames$cohortTable) + ) sql <- SqlRender::readSql(system.file("sql/sql_server/CreateCohortTables.sql", package = "CohortGenerator", mustWork = TRUE)) sql <- SqlRender::render( sql = sql, cohort_database_schema = cohortDatabaseSchema, create_cohort_table = createTableFlagList$cohortTable, + create_cohort_sample_table = createSampleTable, create_cohort_inclusion_table = createTableFlagList$cohortInclusionTable, create_cohort_inclusion_result_table = createTableFlagList$cohortInclusionResultTable, create_cohort_inclusion_stats_table = createTableFlagList$cohortInclusionStatsTable, create_cohort_summary_stats_table = createTableFlagList$cohortSummaryStatsTable, create_cohort_censor_stats_table = createTableFlagList$cohortCensorStatsTable, cohort_table = cohortTableNames$cohortTable, + cohort_sample_table = cohortTableNames$cohortSampleTable, cohort_inclusion_table = cohortTableNames$cohortInclusionTable, cohort_inclusion_result_table = cohortTableNames$cohortInclusionResultTable, cohort_inclusion_stats_table = cohortTableNames$cohortInclusionStatsTable, @@ -127,7 +137,7 @@ createCohortTables <- function(connectionDetails = NULL, DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) logCreateTableMessage <- function(schema, tableName) { - ParallelLogger::logInfo("- Created table ", schema, ".", tableName) + rlang::inform(paste0("- Created table ", schema, ".", tableName)) } for (i in 1:length(createTableFlagList)) { if (createTableFlagList[[i]]) { @@ -136,7 +146,7 @@ createCohortTables <- function(connectionDetails = NULL, } delta <- Sys.time() - start - ParallelLogger::logInfo("Creating cohort tables took ", round(delta, 2), attr(delta, "units")) + rlang::inform(paste0("Creating cohort tables took ", round(delta, 2), attr(delta, "units"))) } } @@ -163,7 +173,7 @@ dropCohortStatsTables <- function(connectionDetails = NULL, # Export the stats dropTable <- function(table) { - ParallelLogger::logInfo("- Dropping ", table) + rlang::inform(paste0("- Dropping ", table)) sql <- "TRUNCATE TABLE @cohort_database_schema.@table; DROP TABLE @cohort_database_schema.@table;" DatabaseConnector::renderTranslateExecuteSql( @@ -185,3 +195,30 @@ dropCohortStatsTables <- function(connectionDetails = NULL, dropTable(cohortTableNames$cohortTable) } } + +.checkCohortTables <- function(connection, + cohortDatabaseSchema, + cohortTableNames) { + # Verify the cohort tables exist and if they do not + # stop the generation process + tableExistsFlagList <- lapply(cohortTableNames, FUN = function(x) { + x <- FALSE + }) + tables <- DatabaseConnector::getTableNames(connection, cohortDatabaseSchema) + for (i in 1:length(cohortTableNames)) { + if (toupper(cohortTableNames[i]) %in% toupper(tables)) { + tableExistsFlagList[i] <- TRUE + } + } + + if (!all(unlist(tableExistsFlagList, use.names = FALSE))) { + errorMsg <- "The following tables have not been created: \n" + for (i in 1:length(cohortTableNames)) { + if (!tableExistsFlagList[[i]]) { + errorMsg <- paste0(errorMsg, " - ", cohortTableNames[i], "\n") + } + } + errorMsg <- paste(errorMsg, "Please use the createCohortTables function to ensure all tables exist before generating cohorts.", sep = "\n") + stop(errorMsg) + } +} diff --git a/R/CsvHelper.R b/R/CsvHelper.R index a469507..a1a55b1 100644 --- a/R/CsvHelper.R +++ b/R/CsvHelper.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -28,12 +28,42 @@ #' @param warnOnCaseMismatch When TRUE, raise a warning if column headings #' in the .csv are not in snake_case format #' +#' @param colTypes Corresponds to the `col_types` in the `readr::read_csv` function. +#' One of `NULL`, a [readr::cols()] specification, or +#' a string. See `vignette("readr")` for more details. +#' +#' If `NULL`, all column types will be inferred from `guess_max` rows of the +#' input, interspersed throughout the file. This is convenient (and fast), +#' but not robust. If the guessed types are wrong, you'll need to increase +#' `guess_max` or supply the correct types yourself. +#' +#' Column specifications created by [list()] or [cols()] must contain +#' one column specification for each column. +#' +#' Alternatively, you can use a compact string representation where each +#' character represents one column: +#' - c = character +#' - i = integer +#' - n = number +#' - d = double +#' - l = logical +#' - f = factor +#' - D = date +#' - T = date time +#' - t = time +#' - ? = guess +#' - _ or - = skip +#' +#' By default, reading a file without a column specification will print a +#' message showing what `readr` guessed they were. To remove this message, +#' set `show_col_types = FALSE` or set `options(readr.show_col_types = FALSE)`. +#' #' @return #' A tibble with the .csv contents #' #' @export -readCsv <- function(file, warnOnCaseMismatch = TRUE) { - fileContents <- .readCsv(file = file) +readCsv <- function(file, warnOnCaseMismatch = TRUE, colTypes = readr::cols()) { + fileContents <- .readCsv(file = file, colTypes = colTypes) columnNames <- colnames(fileContents) columnNamesInSnakeCaseFormat <- isSnakeCase(columnNames) if (!all(columnNamesInSnakeCaseFormat) && warnOnCaseMismatch) { @@ -58,10 +88,10 @@ readCsv <- function(file, warnOnCaseMismatch = TRUE) { #' #' @noRd #' @keywords internal -.readCsv <- function(file) { +.readCsv <- function(file, colTypes = readr::cols()) { invisible(readr::read_csv( file = file, - col_types = readr::cols(), + col_types = colTypes, lazy = FALSE )) } diff --git a/R/Export.R b/R/Export.R index ea958b6..0b7c9eb 100644 --- a/R/Export.R +++ b/R/Export.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -19,7 +19,14 @@ #' @description #' This function retrieves the data from the cohort statistics tables and #' writes them to the inclusion statistics folder specified in the function -#' call. +#' call. NOTE: inclusion rule names are handled in one of two ways: +#' +#' 1. You can specify the cohortDefinitionSet parameter and the inclusion rule +#' names will be extracted from the data.frame. +#' 2. You can insert the inclusion rule names into the database using the +#' insertInclusionRuleNames function of this package. +#' +#' The first approach is preferred as to avoid the warning emitted. #' #' @template Connection #' @@ -38,6 +45,12 @@ #' #' @param databaseId Optional - when specified, the databaseId will be added #' to the exported results +#' @template minCellCount +#' +#' @template CohortDefinitionSet +#' +#' @param tablePrefix Optional - allows to append a prefix to the exported +#' file names. #' #' @export exportCohortStatsTables <- function(connectionDetails, @@ -48,7 +61,10 @@ exportCohortStatsTables <- function(connectionDetails, snakeCaseToCamelCase = TRUE, fileNamesInSnakeCase = FALSE, incremental = FALSE, - databaseId = NULL) { + databaseId = NULL, + minCellCount = 5, + cohortDefinitionSet = NULL, + tablePrefix = "") { if (is.null(connection)) { # Establish the connection and ensure the cleanup is performed connection <- DatabaseConnector::connect(connectionDetails) @@ -59,68 +75,199 @@ exportCohortStatsTables <- function(connectionDetails, dir.create(cohortStatisticsFolder, recursive = TRUE) } - # Export the stats - exportStats <- function(table, + # Internal function to export the stats + exportStats <- function(data, fileName, - includeDatabaseId) { - data <- getStatsTable( - connection = connection, - table = table, - snakeCaseToCamelCase = snakeCaseToCamelCase, - databaseId = databaseId, - cohortDatabaseSchema = cohortDatabaseSchema, - includeDatabaseId = includeDatabaseId - ) + resultsDataModelTableName, + tablePrefix) { + fullFileName <- file.path(cohortStatisticsFolder, paste0(tablePrefix, fileName)) + primaryKeyColumns <- getPrimaryKey(resultsDataModelTableName) + columnsToCensor <- getColumnsToCensor(resultsDataModelTableName) + rlang::inform(paste0("- Saving data to - ", fullFileName)) + + # Make sure the data is censored before saving + if (length(columnsToCensor) > 0) { + for (i in seq_along(columnsToCensor)) { + colName <- ifelse(isTRUE(snakeCaseToCamelCase), yes = columnsToCensor[i], no = SqlRender::camelCaseToSnakeCase(columnsToCensor[i])) + data <- data %>% + enforceMinCellValue(colName, minCellCount) + } + } - fullFileName <- file.path(cohortStatisticsFolder, fileName) - ParallelLogger::logInfo("- Saving data to - ", fullFileName) if (incremental) { - if (snakeCaseToCamelCase) { - cohortDefinitionIds <- unique(data$cohortDefinitionId) - saveIncremental(data, fullFileName, cohortDefinitionId = cohortDefinitionIds) - } else { - cohortDefinitionIds <- unique(data$cohort_definition_id) - saveIncremental(data, fullFileName, cohort_definition_id = cohortDefinitionIds) + # Dynamically build the arguments to the saveIncremental + # to specify the primary key(s) for the file + args <- list( + data = data, + file = fullFileName + ) + for (i in seq_along(primaryKeyColumns)) { + colName <- ifelse(isTRUE(snakeCaseToCamelCase), yes = primaryKeyColumns[i], no = SqlRender::camelCaseToSnakeCase(primaryKeyColumns[i])) + args[[colName]] <- data[[colName]] } + do.call( + what = CohortGenerator::saveIncremental, + args = args + ) } else { .writeCsv(x = data, file = fullFileName) } } tablesToExport <- data.frame( - tableName = cohortTableNames$cohortInclusionTable, - fileName = "cohort_inclusion.csv", - includeDatabaseId = FALSE + tableName = c("cohortInclusionResultTable", "cohortInclusionStatsTable", "cohortSummaryStatsTable", "cohortCensorStatsTable"), + fileName = c("cohort_inc_result.csv", "cohort_inc_stats.csv", "cohort_summary_stats.csv", "cohort_censor_stats.csv"), + resultsDataModelTableName = c("cg_cohort_inc_result", "cg_cohort_inc_stats", "cg_cohort_summary_stats", "cg_cohort_censor_stats") + ) + + if (is.null(cohortDefinitionSet)) { + warning("No cohortDefinitionSet specified; please make sure you've inserted the inclusion rule names using the insertInclusionRuleNames function.") + tablesToExport <- rbind(tablesToExport, data.frame( + tableName = "cohortInclusionTable", + fileName = "cohort_inclusion.csv", + resultsDataModelTableName = "cg_cohort_inclusion" + )) + } else { + inclusionRules <- getCohortInclusionRules(cohortDefinitionSet) + names(inclusionRules) <- SqlRender::camelCaseToSnakeCase(names(inclusionRules)) + exportStats( + data = inclusionRules, + fileName = "cohort_inclusion.csv", + resultsDataModelTableName = "cg_cohort_inclusion", + tablePrefix = tablePrefix + ) + } + + # Get the cohort statistics + cohortStats <- getCohortStats( + connectionDetails = connectionDetails, + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + databaseId = databaseId, + snakeCaseToCamelCase = snakeCaseToCamelCase, + cohortTableNames = cohortTableNames ) - tablesToExport <- rbind(tablesToExport, data.frame( - tableName = cohortTableNames$cohortInclusionResultTable, - fileName = "cohort_inc_result.csv", - includeDatabaseId = TRUE - )) - tablesToExport <- rbind(tablesToExport, data.frame( - tableName = cohortTableNames$cohortInclusionStatsTable, - fileName = "cohort_inc_stats.csv", - includeDatabaseId = TRUE - )) - tablesToExport <- rbind(tablesToExport, data.frame( - tableName = cohortTableNames$cohortSummaryStatsTable, - fileName = "cohort_summary_stats.csv", - includeDatabaseId = TRUE - )) - tablesToExport <- rbind(tablesToExport, data.frame( - tableName = cohortTableNames$cohortCensorStatsTable, - fileName = "cohort_censor_stats.csv", - includeDatabaseId = TRUE - )) + for (i in 1:nrow(tablesToExport)) { fileName <- ifelse(test = fileNamesInSnakeCase, yes = tablesToExport$fileName[i], no = SqlRender::snakeCaseToCamelCase(tablesToExport$fileName[i]) ) exportStats( - table = tablesToExport$tableName[i], + data = cohortStats[[tablesToExport$tableName[i]]], fileName = fileName, - includeDatabaseId = tablesToExport$includeDatabaseId[i] + resultsDataModelTableName = tablesToExport$resultsDataModelTableName[[i]], + tablePrefix = tablePrefix ) } } + +exportCohortDefinitionSet <- function(outputFolder, cohortDefinitionSet = NULL) { + cohortDefinitions <- createEmptyResult("cg_cohort_definition") + cohortSubsets <- createEmptyResult("cg_cohort_subset_definition") + if (!is.null(cohortDefinitionSet)) { + cdsCohortSubsets <- getSubsetDefinitions(cohortDefinitionSet) + if (length(cdsCohortSubsets) > 0) { + for (i in seq_along(cdsCohortSubsets)) { + cohortSubsets <- rbind( + cohortSubsets, + data.frame( + subsetDefinitionId = cdsCohortSubsets[[i]]$definitionId, + json = as.character(cdsCohortSubsets[[i]]$toJSON()) + ) + ) + } + } else { + # NOTE: In this case the cohortDefinitionSet has no subsets defined + # and so we need to add the additional columns that are defined + # in the function: addCohortSubsetDefinition. To do this, + # we'll construct a copy of the cohortDefinitionSet with a single + # subset to get the proper structure and filter it to the + # cohorts of interest. + cdsCopy <- cohortDefinitionSet %>% + addCohortSubsetDefinition( + cohortSubsetDefintion = createCohortSubsetDefinition( + definitionId = 1, + name = "empty", + subsetOperators = list( + createDemographicSubset() + ) + ) + ) %>% + dplyr::filter(.data$cohortId == cohortDefinitionSet$cohortId) + cohortDefinitionSet <- cdsCopy + } + # Massage and save the cohort definition set + colsToRename <- c("cohortId", "cohortName", "sql", "json") + colInd <- which(names(cohortDefinitionSet) %in% colsToRename) + names(cohortDefinitionSet)[colInd] <- c("cohortDefinitionId", "cohortName", "sqlCommand", "json") + if (!"description" %in% names(cohortDefinitionSet)) { + cohortDefinitionSet$description <- "" + } + cohortDefinitions <- cohortDefinitionSet[, intersect(names(cohortDefinitions), names(cohortDefinitionSet))] + } + writeCsv( + x = cohortDefinitions, + file = file.path(outputFolder, "cg_cohort_definition.csv") + ) + writeCsv( + x = cohortSubsets, + file = file.path(outputFolder, "cg_cohort_subset_definition.csv") + ) +} + +createEmptyResult <- function(tableName) { + columns <- readCsv( + file = system.file("csv", "resultsDataModelSpecification.csv", package = "CohortGenerator") + ) %>% + dplyr::filter(.data$tableName == !!tableName) %>% + dplyr::pull(.data$columnName) %>% + SqlRender::snakeCaseToCamelCase() + result <- vector(length = length(columns)) + names(result) <- columns + result <- tibble::as_tibble(t(result), name_repair = "check_unique") + result <- result[FALSE, ] + return(result) +} + +getPrimaryKey <- function(tableName) { + columns <- readCsv( + file = system.file("csv", "resultsDataModelSpecification.csv", package = "CohortGenerator") + ) %>% + dplyr::filter(.data$tableName == !!tableName & tolower(.data$primaryKey) == "yes") %>% + dplyr::pull(.data$columnName) %>% + SqlRender::snakeCaseToCamelCase() + return(columns) +} + +getColumnsToCensor <- function(tableName) { + columns <- readCsv( + file = system.file("csv", "resultsDataModelSpecification.csv", package = "CohortGenerator") + ) %>% + dplyr::filter(.data$tableName == !!tableName & tolower(.data$minCellCount) == "yes") %>% + dplyr::pull(.data$columnName) %>% + SqlRender::snakeCaseToCamelCase() + return(columns) +} + +enforceMinCellValue <- function(data, fieldName, minValues, silent = FALSE) { + toCensor <- !is.na(pull(data, fieldName)) & pull(data, fieldName) < minValues & pull(data, fieldName) != 0 + if (!silent) { + percent <- round(100 * sum(toCensor) / nrow(data), 1) + message( + " censoring ", + sum(toCensor), + " values (", + percent, + "%) from ", + fieldName, + " because value below minimum" + ) + } + if (length(minValues) == 1) { + data[toCensor, fieldName] <- -minValues + } else { + data[toCensor, fieldName] <- -minValues[toCensor] + } + return(data) +} diff --git a/R/Incremental.R b/R/Incremental.R index ff9ce9c..cb578b0 100644 --- a/R/Incremental.R +++ b/R/Incremental.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -70,7 +70,7 @@ isTaskRequired <- function(..., checksum, recordKeepingFile, verbose = TRUE) { if (verbose) { key <- list(...) key <- paste(sprintf("%s = '%s'", names(key), key), collapse = ", ") - ParallelLogger::logInfo("Skipping ", key, " because it is unchanged from earlier run") + rlang::inform(paste0("Skipping ", key, " because it is unchanged from earlier run")) } return(FALSE) } else { @@ -115,7 +115,7 @@ getRequiredTasks <- function(..., checksum, recordKeepingFile) { tasks$checksum <- NULL if (length(idx) > 0) { text <- paste(sprintf("%s = %s", names(tasks), tasks[idx, ]), collapse = ", ") - ParallelLogger::logInfo("Skipping ", text, " because it is unchanged from earlier run") + rlang::inform(paste0("Skipping ", text, " because it is unchanged from earlier run")) tasks <- tasks[-idx, ] } } diff --git a/R/NegativeControlCohorts.R b/R/NegativeControlCohorts.R index 016c5b4..afd816a 100644 --- a/R/NegativeControlCohorts.R +++ b/R/NegativeControlCohorts.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -29,36 +29,17 @@ #' @export createEmptyNegativeControlOutcomeCohortSet <- function(verbose = FALSE) { checkmate::assert_logical(verbose) - negativeControlOutcomeCohortSetSpecification <- .getNegativeControlOutcomeCohortSetSpecification() + df <- data.frame( + cohortId = numeric(), + cohortName = character(), + outcomeConceptId = numeric() + ) if (verbose) { - print(negativeControlOutcomeCohortSetSpecification) + print(df) } - # Build the data.frame dynamically - df <- .createEmptyDataFrameFromSpecification(negativeControlOutcomeCohortSetSpecification) invisible(df) } -#' Helper function to return the specification description of a -#' negativeControlOutcomeCohortSet -#' -#' @description -#' This function reads from the negativeControlOutcomeCohortSetSpecificationDescription.csv -#' to return a data.frame that describes the required columns in a -#' negativeControlOutcomeCohortSet -#' -#' @return -#' Returns a data.frame that defines a negativeControlOutcomeCohortSet -#' -#' @noRd -#' @keywords internal -.getNegativeControlOutcomeCohortSetSpecification <- function() { - return(readCsv(system.file("negativeControlOutcomeCohortSetSpecificationDescription.csv", - package = "CohortGenerator", - mustWork = TRUE - ))) -} - - #' Generate a set of negative control outcome cohorts #' #' @description @@ -84,6 +65,10 @@ createEmptyNegativeControlOutcomeCohortSet <- function(verbose = FALSE) { #' @param detectOnDescendants When set to TRUE, detectOnDescendants will use the vocabulary to find negative control #' outcomes using the outcomeConceptId and all descendants via the concept_ancestor table. #' When FALSE, only the exact outcomeConceptId will be used to detect the outcome. +#' @param incremental Create only cohorts that haven't been created before? +#' +#' @param incrementalFolder If \code{incremental = TRUE}, specify a folder where records are +#' kept of which definition has been executed. #' #' @return #' Invisibly returns an empty negative control outcome cohort set data.frame @@ -97,6 +82,8 @@ generateNegativeControlOutcomeCohorts <- function(connectionDetails = NULL, cohortTable = getCohortTableNames()$cohortTable, negativeControlOutcomeCohortSet, occurrenceType = "all", + incremental = FALSE, + incrementalFolder = NULL, detectOnDescendants = FALSE) { if (is.null(connection) && is.null(connectionDetails)) { stop("You must provide either a database connection or the connection details.") @@ -105,12 +92,44 @@ generateNegativeControlOutcomeCohorts <- function(connectionDetails = NULL, checkmate::assert_choice(x = tolower(occurrenceType), choices = c("all", "first")) checkmate::assert_logical(detectOnDescendants) checkmate::assertNames(colnames(negativeControlOutcomeCohortSet), - must.include = .getNegativeControlOutcomeCohortSetSpecification()$columnName + must.include = names(createEmptyNegativeControlOutcomeCohortSet()) ) checkmate::assert_data_frame( x = negativeControlOutcomeCohortSet, min.rows = 1 ) + assertLargeInteger(negativeControlOutcomeCohortSet$cohortId) + assertLargeInteger(negativeControlOutcomeCohortSet$outcomeConceptId, columnName = "outcomeConceptId") + + # Verify that cohort IDs are not repeated in the negative control + # cohort definition set before generating + if (length(unique(negativeControlOutcomeCohortSet$cohortId)) != length(negativeControlOutcomeCohortSet$cohortId)) { + duplicatedCohortIds <- negativeControlOutcomeCohortSet$cohortId[duplicated(negativeControlOutcomeCohortSet$cohortId)] + stop("Cannot generate! Duplicate cohort IDs found in your negativeControlOutcomeCohortSet: ", paste(duplicatedCohortIds, sep = ","), ". Please fix your negativeControlOutcomeCohortSet and try again.") + } + + if (incremental) { + if (is.null(incrementalFolder)) { + stop("Must specify incrementalFolder when incremental = TRUE") + } + if (!file.exists(incrementalFolder)) { + dir.create(incrementalFolder, recursive = TRUE) + } + + recordKeepingFile <- file.path(incrementalFolder, "GeneratedNegativeControls.csv") + checksum <- computeChecksum(jsonlite::toJSON( + list( + negativeControlOutcomeCohortSet = negativeControlOutcomeCohortSet, + occurrenceType = occurrenceType, + detectOnDescendants = detectOnDescendants + ) + ))[[1]] + + if (!isTaskRequired(paramHash = checksum, checksum = checksum, recordKeepingFile = recordKeepingFile)) { + writeLines("Negative control set skipped") + return(invisible("SKIPPED")) + } + } start <- Sys.time() if (is.null(connection)) { @@ -127,21 +146,7 @@ generateNegativeControlOutcomeCohorts <- function(connectionDetails = NULL, stop(paste0("Table: ", cohortTable, " not found in schema: ", cohortDatabaseSchema, ". Please use `createCohortTable` to ensure the cohort table is created before generating cohorts.")) } - ParallelLogger::logInfo("Generating negative control outcome cohorts") - - # Send the negative control outcome cohort set to the server for use - # in processing. This temp table will hold the mapping between - # cohort_definition_id and the outcomeConceptId in the data.frame() - DatabaseConnector::insertTable( - connection = connection, - data = negativeControlOutcomeCohortSet, - tempEmulationSchema = tempEmulationSchema, - tableName = "#nc_set", - camelCaseToSnakeCase = TRUE, - dropTableIfExists = TRUE, - createTable = TRUE, - tempTable = TRUE - ) + rlang::inform("Generating negative control outcome cohorts") sql <- createNegativeControlOutcomesQuery( connection = connection, @@ -150,7 +155,8 @@ generateNegativeControlOutcomeCohorts <- function(connectionDetails = NULL, cohortDatabaseSchema = cohortDatabaseSchema, cohortTable = cohortTable, occurrenceType = occurrenceType, - detectOnDescendants = detectOnDescendants + detectOnDescendants = detectOnDescendants, + negativeControlOutcomeCohortSet = negativeControlOutcomeCohortSet ) DatabaseConnector::executeSql( @@ -159,6 +165,16 @@ generateNegativeControlOutcomeCohorts <- function(connectionDetails = NULL, ) delta <- Sys.time() - start writeLines(paste("Generating negative control outcomes set took", round(delta, 2), attr(delta, "units"))) + + if (incremental) { + recordTasksDone( + paramHash = checksum, + checksum = checksum, + recordKeepingFile = recordKeepingFile + ) + } + + invisible("FINISHED") } createNegativeControlOutcomesQuery <- function(connection, @@ -167,7 +183,32 @@ createNegativeControlOutcomesQuery <- function(connection, cohortDatabaseSchema, cohortTable, occurrenceType, - detectOnDescendants) { + detectOnDescendants, + negativeControlOutcomeCohortSet) { + selectClause <- "" + for (i in 1:nrow(negativeControlOutcomeCohortSet)) { + selectClause <- paste0( + selectClause, + "SELECT CAST(", negativeControlOutcomeCohortSet$cohortId[i], " AS BIGINT), ", + "CAST(", negativeControlOutcomeCohortSet$outcomeConceptId[i], " AS BIGINT)" + ) + if (i < nrow(negativeControlOutcomeCohortSet)) { + selectClause <- paste0(selectClause, "\nUNION\n") + } + } + selectClause + ncSetQuery <- paste0( + "DROP TABLE IF EXISTS #nc_set;", + "CREATE TABLE #nc_set (", + " cohort_id bigint NOT NULL,", + " outcome_concept_id bigint NOT NULL", + ")", + ";", + "INSERT INTO #nc_set (cohort_id, outcome_concept_id)\n", + selectClause, + "\n;" + ) + sql <- sql <- SqlRender::readSql(system.file("sql/sql_server/NegativeControlOutcomes.sql", package = "CohortGenerator", mustWork = TRUE)) sql <- SqlRender::render( sql = sql, @@ -176,6 +217,7 @@ createNegativeControlOutcomesQuery <- function(connection, cohort_table = cohortTable, detect_on_descendants = detectOnDescendants, occurrence_type = occurrenceType, + nc_set_query = ncSetQuery, warnOnMissingParameters = TRUE ) sql <- SqlRender::translate( diff --git a/R/ResultsDataModel.R b/R/ResultsDataModel.R new file mode 100644 index 0000000..07643f0 --- /dev/null +++ b/R/ResultsDataModel.R @@ -0,0 +1,147 @@ +# Copyright 2024 Observational Health Data Sciences and Informatics +# +# This file is part of CohortGenerator +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +#' Get specifications for CohortGenerator results data model +#' +#' @return +#' A tibble data frame object with specifications +#' +#' @export +getResultsDataModelSpecifications <- function() { + resultsDataModelSpecifications <- readCsv( + file = system.file("csv", "resultsDataModelSpecification.csv", package = "CohortGenerator") + ) + return(resultsDataModelSpecifications) +} + +#' Create the results data model tables on a database server. +#' +#' @details +#' Only PostgreSQL and SQLite servers are supported. +#' +#' @param connectionDetails DatabaseConnector connectionDetails instance @seealso[DatabaseConnector::createConnectionDetails] +#' @param databaseSchema The schema on the server where the tables will be created. +#' @param tablePrefix (Optional) string to insert before table names for database table names +#' @export +createResultsDataModel <- function(connectionDetails = NULL, + databaseSchema, + tablePrefix = "") { + if (connectionDetails$dbms == "sqlite" & databaseSchema != "main") { + stop("Invalid schema for sqlite, use databaseSchema = 'main'") + } + + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + + # Create first version of results model: + sql <- SqlRender::readSql(system.file("sql/sql_server/CreateResultsDataModel.sql", package = "CohortGenerator", mustWork = TRUE)) + sql <- SqlRender::render( + sql = sql, + database_schema = databaseSchema, + table_prefix = tablePrefix + ) + sql <- SqlRender::translate(sql = sql, targetDialect = connection@dbms) + DatabaseConnector::executeSql(connection, sql) + # Migrate to current version: + migrateDataModel( + connectionDetails = connectionDetails, + databaseSchema = databaseSchema, + tablePrefix = tablePrefix + ) +} + +#' Upload results to the database server. +#' +#' @description +#' Requires the results data model tables have been created using the \code{\link{createResultsDataModel}} function. +#' +#' @param connectionDetails An object of type \code{connectionDetails} as created using the +#' \code{\link[DatabaseConnector]{createConnectionDetails}} function in the +#' DatabaseConnector package. +#' @param schema The schema on the server where the tables have been created. +#' @param resultsFolder The folder holding the results in .csv files +#' @param forceOverWriteOfSpecifications If TRUE, specifications of the phenotypes, cohort definitions, and analysis +#' will be overwritten if they already exist on the database. Only use this if these specifications +#' have changed since the last upload. +#' @param purgeSiteDataBeforeUploading If TRUE, before inserting data for a specific databaseId all the data for +#' that site will be dropped. This assumes the resultsFolder file contains the full data for that +#' data site. +#' @param tablePrefix (Optional) string to insert before table names for database table names +#' @param ... See ResultModelManager::uploadResults +#' @export +uploadResults <- function(connectionDetails, + schema, + resultsFolder, + forceOverWriteOfSpecifications = FALSE, + purgeSiteDataBeforeUploading = TRUE, + tablePrefix = "", + ...) { + ResultModelManager::uploadResults( + connectionDetails = connectionDetails, + schema = schema, + resultsFolder = resultsFolder, + tablePrefix = tablePrefix, + forceOverWriteOfSpecifications = forceOverWriteOfSpecifications, + purgeSiteDataBeforeUploading = purgeSiteDataBeforeUploading, + runCheckAndFixCommands = FALSE, + specifications = getResultsDataModelSpecifications(), + warnOnMissingTable = FALSE, + ... + ) +} + +#' Migrate Data model +#' @description +#' Migrate data from current state to next state +#' +#' It is strongly advised that you have a backup of all data (either sqlite files, a backup database (in the case you +#' are using a postgres backend) or have kept the csv/zip files from your data generation. +#' +#' @inheritParams getDataMigrator +#' @export +migrateDataModel <- function(connectionDetails, databaseSchema, tablePrefix = "") { + ParallelLogger::logInfo("Migrating data set") + migrator <- getDataMigrator( + connectionDetails = connectionDetails, + databaseSchema = databaseSchema, + tablePrefix = tablePrefix + ) + migrator$executeMigrations() + migrator$finalize() +} + +#' Get database migrations instance +#' @description +#' +#' Returns ResultModelManager DataMigrationsManager instance. +# '@seealso [ResultModelManager::DataMigrationManager] which this function is a utility for. +#' +#' @param connectionDetails DatabaseConnector connection details object +#' @param databaseSchema String schema where database schema lives +#' @param tablePrefix (Optional) Use if a table prefix is used before table names (e.g. "cg_") +#' @returns Instance of ResultModelManager::DataMigrationManager that has interface for converting existing data models +#' @export +getDataMigrator <- function(connectionDetails, databaseSchema, tablePrefix = "") { + ResultModelManager::DataMigrationManager$new( + connectionDetails = connectionDetails, + databaseSchema = databaseSchema, + tablePrefix = tablePrefix, + packageTablePrefix = "cg_", + migrationPath = "migrations", + packageName = "CohortGenerator" + ) +} diff --git a/R/RunCohortGeneration.R b/R/RunCohortGeneration.R new file mode 100644 index 0000000..7a19b23 --- /dev/null +++ b/R/RunCohortGeneration.R @@ -0,0 +1,320 @@ +# Copyright 2024 Observational Health Data Sciences and Informatics +# +# This file is part of CohortGenerator +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#' Run a cohort generation and export results +#' +#' @details +#' Run a cohort generation for a set of cohorts and negative control outcomes. +#' This function will also export the results of the run to the `outputFolder`. +#' +#' @param connectionDetails An object of type \code{connectionDetails} as created using the +#' \code{\link[DatabaseConnector]{createConnectionDetails}} function in the +#' DatabaseConnector package. +#' +#' @template CdmDatabaseSchema +#' +#' @template TempEmulationSchema +#' +#' @template CohortTableNames +#' +#' @template CohortDefinitionSet +#' +#' @template NegativeControlOutcomeCohortSet +#' +#' @param occurrenceType For negative controls outcomes, the occurrenceType +#' will detect either: the first time an +#' outcomeConceptId occurs or all times the +#' outcomeConceptId occurs for a person. Values +#' accepted: 'all' or 'first'. +#' +#' @param detectOnDescendants For negative controls outcomes, when set to TRUE, +#' detectOnDescendants will use the vocabulary to +#' find negative control outcomes using the +#' outcomeConceptId and all descendants via the +#' concept_ancestor table. When FALSE, only the exact +#' outcomeConceptId will be used to detect the +#' outcome. +#' +#' @param stopOnError If an error happens while generating one of the +#' cohorts in the cohortDefinitionSet, should we +#' stop processing the other cohorts? The default is +#' TRUE; when set to FALSE, failures will be +#' identified in the return value from this function. +#' +#' @param outputFolder Name of the folder where all the outputs will written to. +#' +#' @param databaseId A unique ID for the database. This will be appended to +#' most tables. +#' +#' @template minCellCount +#' +#' @param incremental Create only cohorts that haven't been created before? +#' +#' @param incrementalFolder If \code{incremental = TRUE}, specify a folder where +#' records are kept of which definition has been +#' executed. +#' +#' @export +runCohortGeneration <- function(connectionDetails, + cdmDatabaseSchema, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), + cohortDatabaseSchema = cdmDatabaseSchema, + cohortTableNames = getCohortTableNames(), + cohortDefinitionSet = NULL, + negativeControlOutcomeCohortSet = NULL, + occurrenceType = "all", + detectOnDescendants = FALSE, + stopOnError = TRUE, + outputFolder, + databaseId = 1, + minCellCount = 5, + incremental = FALSE, + incrementalFolder = NULL) { + if (is.null(cohortDefinitionSet) && is.null(negativeControlOutcomeCohortSet)) { + stop("You must supply at least 1 cohortDefinitionSet OR 1 negativeControlOutcomeCohortSet") + } + errorMessages <- checkmate::makeAssertCollection() + if (is(connectionDetails, "connectionDetails")) { + checkmate::assertClass(connectionDetails, "connectionDetails", add = errorMessages) + } else { + checkmate::assertClass(connectionDetails, "ConnectionDetails", add = errorMessages) + } + checkmate::assertCharacter(cdmDatabaseSchema, len = 1, add = errorMessages) + checkmate::assertCharacter(tempEmulationSchema, len = 1, null.ok = TRUE, add = errorMessages) + checkmate::assertCharacter(cohortDatabaseSchema, len = 1, add = errorMessages) + checkmate::assertList(cohortTableNames, min.len = 1, add = errorMessages) + checkmate::assertDataFrame(cohortDefinitionSet, min.rows = 1, null.ok = TRUE, add = errorMessages) + checkmate::assertDataFrame(negativeControlOutcomeCohortSet, min.rows = 1, null.ok = TRUE, add = errorMessages) + checkmate::assert_choice(x = tolower(occurrenceType), choices = c("all", "first"), add = errorMessages) + checkmate::assert_logical(detectOnDescendants, add = errorMessages) + checkmate::assert_logical(stopOnError, add = errorMessages) + checkmate::reportAssertions(collection = errorMessages) + + # Establish the connection and ensure the cleanup is performed + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + + # Create the export folder + if (!dir.exists(outputFolder)) { + dir.create(outputFolder, recursive = T) + } + + # Create the cohort tables + createCohortTables( + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTableNames = cohortTableNames, + incremental = incremental + ) + + generateAndExportCohorts( + connection = connection, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTableNames = cohortTableNames, + cohortDefinitionSet = cohortDefinitionSet, + stopOnError = stopOnError, + outputFolder = outputFolder, + databaseId = databaseId, + minCellCount = minCellCount, + incremental = incremental, + incrementalFolder = incrementalFolder + ) + + generateAndExportNegativeControls( + connection = connection, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTableNames = cohortTableNames, + negativeControlOutcomeCohortSet = negativeControlOutcomeCohortSet, + occurrenceType = occurrenceType, + detectOnDescendants = detectOnDescendants, + outputFolder = outputFolder, + databaseId = databaseId, + minCellCount = minCellCount, + incremental = incremental, + incrementalFolder = incrementalFolder + ) + + # Export the results data model specification + file.copy( + from = system.file("csv", "resultsDataModelSpecification.csv", package = "CohortGenerator"), + to = outputFolder + ) + + rlang::inform("Cohort generation complete.") +} + +generateAndExportCohorts <- function(connection, + cdmDatabaseSchema, + tempEmulationSchema, + cohortDatabaseSchema, + cohortTableNames, + cohortDefinitionSet, + stopOnError, + outputFolder, + databaseId, + minCellCount, + incremental, + incrementalFolder) { + # Generate the cohorts + cohortsGenerated <- createEmptyResult("cg_cohort_generation") + cohortsGeneratedFileName <- file.path(outputFolder, "cg_cohort_generation.csv") + cohortCounts <- createEmptyResult("cg_cohort_count") + cohortCountsFileName <- file.path(outputFolder, "cg_cohort_count.csv") + if (!is.null(cohortDefinitionSet)) { + # Generate cohorts, get counts, write results + cohortsGenerated <- generateCohortSet( + connection = connection, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTableNames = cohortTableNames, + cohortDefinitionSet = cohortDefinitionSet, + stopOnError = stopOnError, + incremental = incremental, + incrementalFolder = incrementalFolder + ) + + cohortCountsFromDb <- getCohortCounts( + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTable = cohortTableNames$cohortTable, + cohortDefinitionSet = cohortDefinitionSet, + databaseId = databaseId + ) + + # Filter to columns in the results data model + cohortCounts <- cohortCountsFromDb[names(cohortCounts)] + } + + # Save the generation information + rlang::inform("Saving cohort generation information") + if (!is.null(cohortsGenerated) && nrow(cohortsGenerated) > 0) { + cohortsGenerated$databaseId <- databaseId + # Remove any cohorts that were skipped + cohortsGenerated <- cohortsGenerated[toupper(cohortsGenerated$generationStatus) != "SKIPPED", ] + if (incremental) { + # Format the data for saving + names(cohortsGenerated) <- SqlRender::camelCaseToSnakeCase(names(cohortsGenerated)) + saveIncremental( + data = cohortsGenerated, + fileName = cohortsGeneratedFileName, + cohort_id = cohortsGenerated$cohort_id + ) + } else { + writeCsv( + x = cohortsGenerated, + file = cohortsGeneratedFileName + ) + } + } + + rlang::inform("Saving cohort counts") + cohortCounts <- cohortCounts %>% + enforceMinCellValue("cohortEntries", minCellCount) %>% + enforceMinCellValue("cohortSubjects", minCellCount) + writeCsv( + x = cohortCounts, + file = cohortCountsFileName + ) + + rlang::inform("Saving cohort statistics") + exportCohortStatsTables( + connection = connection, + cohortTableNames = cohortTableNames, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortStatisticsFolder = outputFolder, + snakeCaseToCamelCase = FALSE, + fileNamesInSnakeCase = TRUE, + incremental = incremental, + databaseId = databaseId, + minCellCount = minCellCount, + cohortDefinitionSet = cohortDefinitionSet, + tablePrefix = "cg_" + ) + + # Export the cohort definition set + rlang::inform("Saving cohort definition set") + exportCohortDefinitionSet(outputFolder, cohortDefinitionSet) +} + +generateAndExportNegativeControls <- function(connection, + cdmDatabaseSchema, + tempEmulationSchema, + cohortDatabaseSchema, + cohortTableNames, + negativeControlOutcomeCohortSet, + occurrenceType, + detectOnDescendants, + outputFolder, + databaseId, + minCellCount, + incremental, + incrementalFolder) { + # Generate any negative controls + negativeControlOutcomes <- createEmptyResult("cg_cohort_definition_neg_ctrl") + negativeControlOutcomesFileName <- file.path(outputFolder, "cg_cohort_definition_neg_ctrl.csv") + cohortCountsNegativeControlOutcomes <- createEmptyResult("cg_cohort_count_neg_ctrl") + cohortCountsNegativeControlOutcomesFileName <- file.path(outputFolder, "cg_cohort_count_neg_ctrl.csv") + if (!is.null(negativeControlOutcomeCohortSet)) { + generateNegativeControlOutcomeCohorts( + connection = connection, + cdmDatabaseSchema = cdmDatabaseSchema, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTable = cohortTableNames$cohortTable, + negativeControlOutcomeCohortSet = negativeControlOutcomeCohortSet, + tempEmulationSchema = tempEmulationSchema, + occurrenceType = occurrenceType, + detectOnDescendants = detectOnDescendants, + incremental = incremental, + incrementalFolder = incrementalFolder + ) + + # Assemble the negativeControlOutcomes for export + negativeControlOutcomes <- cbind( + negativeControlOutcomeCohortSet, + occurrenceType = rep(occurrenceType, nrow(negativeControlOutcomeCohortSet)), + detectOnDescendants = rep(detectOnDescendants, nrow(negativeControlOutcomeCohortSet)) + ) + + # Count the negative controls + cohortCountsNegativeControlOutcomes <- getCohortCounts( + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTable = cohortTableNames$cohortTable, + databaseId = databaseId, + cohortDefinitionSet = negativeControlOutcomeCohortSet[, c("cohortId"), drop = FALSE] + ) + } + + rlang::inform("Saving negative control outcome cohort definition") + writeCsv( + x = negativeControlOutcomes, + file = negativeControlOutcomesFileName + ) + + rlang::inform("Saving negative control outcome cohort counts") + cohortCountsNegativeControlOutcomes <- cohortCountsNegativeControlOutcomes %>% + enforceMinCellValue("cohortEntries", minCellCount) %>% + enforceMinCellValue("cohortSubjects", minCellCount) + writeCsv( + x = cohortCountsNegativeControlOutcomes, + file = cohortCountsNegativeControlOutcomesFileName + ) +} diff --git a/R/SubsetDefinitions.R b/R/SubsetDefinitions.R index 311afcc..9b2804e 100644 --- a/R/SubsetDefinitions.R +++ b/R/SubsetDefinitions.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -64,7 +64,7 @@ CohortSubsetDefinition <- R6::R6Class( subsetOperators = lapply(self$subsetOperators, function(operator) { operator$toList() }), - packageVersion = jsonlite::unbox(as.character(utils::packageVersion(utils::packageName()))), + packageVersion = jsonlite::unbox(as.character(utils::packageVersion("CohortGenerator"))), identifierExpression = jsonlite::unbox(as.character(private$.identifierExpression)), operatorNameConcatString = jsonlite::unbox(as.character(private$.operatorNameConcatString)), subsetCohortNameTemplate = jsonlite::unbox(as.character(private$.subsetCohortNameTemplate)) @@ -476,7 +476,7 @@ hasSubsetDefinitions <- function(x) { #' @description #' This is generally used as part of saveCohortDefinitionSet #' -#' @param subsetDefinition The subset definition object {@seealso CohortSubsetDefinition} +#' @param subsetDefinition The subset definition object @seealso[CohortSubsetDefinition] #' #' @export #' @inheritParams saveCohortDefinitionSet diff --git a/R/SubsetQueryBuilders.R b/R/SubsetQueryBuilders.R index 195d9b3..4edef34 100644 --- a/R/SubsetQueryBuilders.R +++ b/R/SubsetQueryBuilders.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # diff --git a/R/Subsets.R b/R/Subsets.R index 1444448..7bbcc5a 100644 --- a/R/Subsets.R +++ b/R/Subsets.R @@ -1,4 +1,4 @@ -# Copyright 2023 Observational Health Data Sciences and Informatics +# Copyright 2024 Observational Health Data Sciences and Informatics # # This file is part of CohortGenerator # @@ -35,7 +35,7 @@ # SubsetCohortWindow ------------- -#' SubsetCohortWindow settings +#' @title Time Window For Cohort Subset Operator #' @export #' @description #' Representation of a time window to use when subsetting a target cohort with a subset cohort @@ -47,7 +47,6 @@ SubsetCohortWindow <- R6::R6Class( .targetAnchor = "cohortStart" ), public = list( - #' @title to List #' @description List representation of object toList = function() { objRepr <- list() @@ -128,7 +127,7 @@ createSubsetCohortWindow <- function(startDay, endDay, targetAnchor) { } # SubsetOperator ------------------------------ -#' @title SubsetOperator +#' @title Abstract base class for subsets. #' @export #' @description #' Abstract Base Class for subsets. Subsets should inherit from this and implement their own requirements. @@ -403,7 +402,7 @@ CohortSubsetOperator <- R6::R6Class( #' #' @param startWindow A SubsetCohortWindow that patients must fall inside (see createSubsetCohortWindow) #' @param endWindow A SubsetCohortWindow that patients must fall inside (see createSubsetCohortWindow) -#' @param negate The opposite of this definition - include patients who do NOT meet the specified criteria (NOT YET IMPLEMENTED) +#' @param negate The opposite of this definition - include patients who do NOT meet the specified criteria #' @returns a CohortSubsetOperator instance createCohortSubset <- function(name = NULL, cohortIds, cohortCombinationOperator, negate, startWindow, endWindow) { subset <- CohortSubsetOperator$new() @@ -418,7 +417,10 @@ createCohortSubset <- function(name = NULL, cohortIds, cohortCombinationOperator } # DemographicSubsetOperator ------------------------------ -#' Criteria Subset +#' @title Demographic Subset Operator +#' @description +#' Operators for subsetting a cohort by demographic criteria +#' #' @export DemographicSubsetOperator <- R6::R6Class( classname = "DemographicSubsetOperator", @@ -635,9 +637,9 @@ DemographicSubsetOperator <- R6::R6Class( #' @param name Optional char name #' @param ageMin The minimum age #' @param ageMax The maximum age -#' @param gender Gender demographics - concepts - 0, 8532, 8507, 0 "male", "female". -#' Any string that is not (case insensitive) "male" or "female" is converted to gender concept 0 -#' https://www.ohdsi.org/web/wiki/doku.php?id=documentation:vocabulary:gender +#' @param gender Gender demographics - concepts - 0, 8532, 8507, 0, "female", "male". +#' Any string that is not "male" or "female" (case insensitive) is converted to gender concept 0. +#' https://athena.ohdsi.org/search-terms/terms?standardConcept=Standard&domain=Gender&page=1&pageSize=15&query= #' Specific concept ids not in this set can be used but are not explicitly validated #' @param race Race demographics - concept ID list #' @param ethnicity Ethnicity demographics - concept ID list @@ -836,8 +838,8 @@ LimitSubsetOperator <- R6::R6Class( #' Subset cohorts using specified limit criteria #' @export #' @param name Name of operation -#' @param priorTime Required prior observation window -#' @param followUpTime Required post observation window +#' @param priorTime Required prior observation window (specified as a positive integer) +#' @param followUpTime Required post observation window (specified as a positive integer) #' @param limitTo character one of: #' "firstEver" - only first entry in patient history #' "earliestRemaining" - only first entry after washout set by priorTime diff --git a/README.md b/README.md index a394e74..73ea329 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # CohortGenerator -[![Build Status](https://github.com/OHDSI/CohortGenerator/workflows/R-CMD-check/badge.svg)](https://github.com/OHDSI/CohortGenerator/actions?query=workflow%3AR-CMD-check) [![codecov.io](https://codecov.io/github/OHDSI/CohortGenerator/coverage.svg?branch=main)](https://codecov.io/github/OHDSI/CohortGenerator?branch=main) +[![CRAN status](https://www.r-pkg.org/badges/version/CohortGenerator)](https://CRAN.R-project.org/package=CohortGenerator) +[![Build Status](https://github.com/OHDSI/CohortGenerator/workflows/R-CMD-check/badge.svg)](https://github.com/OHDSI/CohortGenerator/actions?query=workflow%3AR-CMD-check) [![codecov.io](https://codecov.io/github/OHDSI/CohortGenerator/coverage.svg?branch=main)](https://app.codecov.io/github/OHDSI/CohortGenerator?branch=main) CohortGenerator is part of [HADES](https://ohdsi.github.io/Hades/). diff --git a/_pkgdown.yml b/_pkgdown.yml index 8ee695f..4f346be 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -1,5 +1,8 @@ +url: https://ohdsi.github.io/CohortGenerator/ + template: - params: + bootstrap: 5 + bslib: bootswatch: cosmo home: @@ -8,6 +11,13 @@ home: href: http://forums.ohdsi.org reference: + - title: "Cohort Generation" + desc: > + Functions that support generating cohorts. + contents: + - runCohortGeneration + - generateCohortSet + - title: "Cohort Tables" desc: > Functions that support creating the necessary cohort tables. @@ -25,13 +35,6 @@ reference: - checkAndFixCohortDefinitionSetDataTypes - isCohortDefinitionSet - - title: "Cohort Generation" - desc: > - Functions that support generating cohorts. - contents: - - generateCohortSet - - createEmptyCohortDefinitionSet - - title: "Cohort Counts" desc: > Function for obtaining the counts of subjects and events for one or @@ -39,23 +42,27 @@ reference: contents: - getCohortCounts - - title: "Cohort Subset" + - title: "Cohort Subset Functions" desc: > - Functions and R6 classes for creating cohort subset definitions and subset - operators. + Functions for creating cohort subset definitions and subset operators. contents: - addCohortSubsetDefinition - - CohortSubsetDefinition - - CohortSubsetOperator - createCohortSubset - createCohortSubsetDefinition - createDemographicSubset - createLimitSubset - createSubsetCohortWindow - - DemographicSubsetOperator - getSubsetDefinitions - - LimitSubsetOperator - saveCohortSubsetDefinition + + - title: "Cohort Subset Classes" + desc: > + R6 classes for cohort subset definitions and subset operators. + contents: + - CohortSubsetDefinition + - CohortSubsetOperator + - DemographicSubsetOperator + - LimitSubsetOperator - SubsetCohortWindow - SubsetOperator @@ -63,9 +70,12 @@ reference: desc: > Functions for inserting inclusion rule names from a cohort definition, exporting the cohort statistics to the file system and a helper function - for dropping those tables when they are no longer needed. + for dropping those tables when they are no longer needed. These functions + assume you are using [Circe](https://github.com/OHDSI/circe-be) for + inclusion rules and cohort statistics. contents: - getCohortStats + - getCohortInclusionRules - insertInclusionRuleNames - exportCohortStatsTables - dropCohortStatsTables @@ -77,6 +87,17 @@ reference: contents: - createEmptyNegativeControlOutcomeCohortSet - generateNegativeControlOutcomeCohorts + + - title: "Result Model Management" + desc: > + Functions for managing the results of running Cohort Generator via + `runCohortGeneration` + contents: + - createResultsDataModel + - getDataMigrator + - getResultsDataModelSpecifications + - migrateDataModel + - uploadResults - title: "CSV File Helpers" desc: > @@ -102,8 +123,16 @@ reference: - isTaskRequired - saveIncremental - computeChecksum + + - title: "Cohort Sampling" + desc: > + Functions that support sampling a cohort. Please note this is only for + software development purposes and NOT for running studies. + contents: + - sampleCohortDefinitionSet navbar: + bg: dark structure: right: [hades, github] components: diff --git a/cran-comments.md b/cran-comments.md new file mode 100644 index 0000000..858617d --- /dev/null +++ b/cran-comments.md @@ -0,0 +1,5 @@ +## R CMD check results + +0 errors | 0 warnings | 1 note + +* This is a new release. diff --git a/docs/404.html b/docs/404.html index 81aa5d3..88ce9be 100644 --- a/docs/404.html +++ b/docs/404.html @@ -4,118 +4,78 @@ - + Page not found (404) • CohortGenerator - - - - - - - + + + + + - - - -
-
-
- +
+
+
Content not found. Please use links in the navbar. -
- - - +
- -
- - + diff --git a/docs/articles/CreatingCohortSubsetDefinitions.html b/docs/articles/CreatingCohortSubsetDefinitions.html index 835a6cc..24f7e3c 100644 --- a/docs/articles/CreatingCohortSubsetDefinitions.html +++ b/docs/articles/CreatingCohortSubsetDefinitions.html @@ -4,95 +4,66 @@ - + Creating Cohort Subset Definitions • CohortGenerator - - - - - - - - + + + + + - - + + Skip to contents -
-
-
-

A definition can include different subset operations - these are applied strictly in order:

@@ -236,7 +213,7 @@ 

Applying subset - + cohortId cohortName atlasId @@ -246,7 +223,7 @@

Applying subset subsetDefinitionId - + 1778211 celecoxib 1778211 @@ -255,7 +232,7 @@

Applying subset FALSE NA - + 1778212 celecoxibAge40 1778212 @@ -264,7 +241,7 @@

Applying subset FALSE NA - + 1778213 celecoxibAge40Male 1778213 @@ -273,7 +250,16 @@

Applying subset FALSE NA - + +1778214 +celecoxibCensored +1778214 + +1778214 +FALSE +NA + + 1778211001 celecoxib - Patients in cohort cohort 1778213 with 365 days prior observation Subset to patients in cohort 1778213, Observation @@ -284,7 +270,7 @@

Applying subset TRUE 1 - + 1778212001 celecoxibAge40 - Patients in cohort cohort 1778213 with 365 days prior observation Subset to patients in cohort 1778213, @@ -295,7 +281,7 @@

Applying subset TRUE 1 - + 1778213001 celecoxibAge40Male - Patients in cohort cohort 1778213 with 365 days prior observation Subset to patients in cohort 1778213, @@ -306,6 +292,17 @@

Applying subset TRUE 1 + +1778214001 +celecoxibCensored - Patients in cohort cohort 1778213 +with 365 days prior observation Subset to patients in cohort 1778213, +Observation of at least 365 days prior +NA +NA +1778214 +TRUE +1 +

We can also apply a subset definition to only a limited number of @@ -325,7 +322,7 @@

Applying subset - + cohortId cohortName atlasId @@ -335,7 +332,7 @@

Applying subset subsetDefinitionId - + 1778211 celecoxib 1778211 @@ -344,7 +341,7 @@

Applying subset FALSE NA - + 1778212 celecoxibAge40 1778212 @@ -353,7 +350,7 @@

Applying subset FALSE NA - + 1778213 celecoxibAge40Male 1778213 @@ -362,7 +359,16 @@

Applying subset FALSE NA - + +1778214 +celecoxibCensored +1778214 + +1778214 +FALSE +NA + + 1778211001 celecoxib - Patients in cohort cohort 1778213 with 365 days prior observation Subset to patients in cohort 1778213, Observation @@ -373,7 +379,7 @@

Applying subset TRUE 1 - + 1778212001 celecoxibAge40 - Patients in cohort cohort 1778213 with 365 days prior observation Subset to patients in cohort 1778213, @@ -384,7 +390,7 @@

Applying subset TRUE 1 - + 1778213001 celecoxibAge40Male - Patients in cohort cohort 1778213 with 365 days prior observation Subset to patients in cohort 1778213, @@ -395,7 +401,18 @@

Applying subset TRUE 1 - + +1778214001 +celecoxibCensored - Patients in cohort cohort 1778213 +with 365 days prior observation Subset to patients in cohort 1778213, +Observation of at least 365 days prior +NA +NA +1778214 +TRUE +1 + + 1778212002 celecoxibAge40 - Patients in cohort 1778213 with 365 days prior obs, aged 18 - 64 Subset to patients in cohort 1778213, @@ -436,9 +453,9 @@

Applying subset paste("Subset Parent Id:", cohortDefinitionSet$subsetParent[4]), paste("Name", cohortDefinitionSet$cohortName[4]) ))

-
#> Cohort Id: 1778211001
-#> Subset Parent Id: 1778211
-#> Name celecoxib - Patients in cohort cohort 1778213 with 365 days prior observation Subset to patients in cohort 1778213, Observation of at least 365 days prior
+
#> Cohort Id: 1778214
+#> Subset Parent Id: 1778214
+#> Name celecoxibCensored

Note that when adding a subset definition to a cohort definition set, the target cohort ids e.g (1778211, 1778212) must exist in the cohortDefinitionSet and the output ids @@ -512,37 +529,30 @@

Writing json objects
 # Save to a file
 ParallelLogger::saveSettingsToJson(subsetDef$toList(), "subsetDefinition1.json")

+
+options(old)
-
- - - +
-
- - + diff --git a/docs/articles/GeneratingCohorts.html b/docs/articles/GeneratingCohorts.html index aa97899..53a58de 100644 --- a/docs/articles/GeneratingCohorts.html +++ b/docs/articles/GeneratingCohorts.html @@ -4,95 +4,66 @@ - + Generating Cohorts • CohortGenerator - - - - - - - - + + + + + - - + + Skip to contents -
-
-
-