Skip to content

Commit

Permalink
Initial implementation; needs tests
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonysena committed Jun 18, 2024
1 parent b088443 commit aadf624
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 21 deletions.
88 changes: 67 additions & 21 deletions R/CohortConstruction.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@
#' cohortDefinitionSet, should we stop processing the other
#' cohorts? The default is TRUE; when set to FALSE, failures will
#' be identified in the return value from this function.
#'
#' @param timeout A numeric, greater than 0, specifying the maximum number of seconds the cohort generation task
#' is allowed to run before being interrupted by the timeout. This timeout is used
#' by @seealso [R.utils::withTimeout] to interrupt the cohort generation query.
#' In the case that a timeout occurs, the return value for that generation will
#' be "TIMEOUT" and the end time of the generation will reflect the timeout.
#' A timeout is not considered an error.
#'
#' @param incremental Create only cohorts that haven't been created before?
#'
Expand Down Expand Up @@ -66,6 +73,7 @@ generateCohortSet <- function(connectionDetails = NULL,
cohortTableNames = getCohortTableNames(),
cohortDefinitionSet = NULL,
stopOnError = TRUE,
timeout = 0,
incremental = FALSE,
incrementalFolder = NULL) {
checkmate::assertDataFrame(cohortDefinitionSet, min.rows = 1, col.names = "named")
Expand All @@ -76,6 +84,7 @@ generateCohortSet <- function(connectionDetails = NULL,
"sql"
)
)
checkmate::assertNumeric(timeout, lower = 0)
assertLargeInteger(cohortDefinitionSet$cohortId)
# Verify that cohort IDs are not repeated in the cohort definition
# set before generating
Expand Down Expand Up @@ -159,6 +168,7 @@ generateCohortSet <- function(connectionDetails = NULL,
cohortDatabaseSchema = cohortDatabaseSchema,
cohortTableNames = cohortTableNames,
stopIfError = stopOnError,
timeout = timeout,
incremental = incremental,
recordKeepingFile = recordKeepingFile,
stopOnError = stopOnError,
Expand All @@ -178,6 +188,7 @@ generateCohortSet <- function(connectionDetails = NULL,
cohortDatabaseSchema = cohortDatabaseSchema,
cohortTableNames = cohortTableNames,
stopIfError = stopOnError,
timeout = timeout,
incremental = incremental,
recordKeepingFile = recordKeepingFile,
stopOnError = stopOnError,
Expand Down Expand Up @@ -230,6 +241,7 @@ generateCohort <- function(cohortId = NULL,
cohortDatabaseSchema,
cohortTableNames,
stopIfError = TRUE,
timeout = 0,
incremental,
recordKeepingFile) {
# Get the index of the cohort record for the current cohortId
Expand Down Expand Up @@ -310,28 +322,62 @@ generateCohort <- function(cohortId = NULL,
# outermost assignment will assign generationInfo based on the return
# value in the error() block. If the expr() function evaluates without
# error, the inner most assignment of generationInfo will take place.
generationInfo <- tryCatch(expr = {
startTime <- lubridate::now()
generationInfo <- runCohortSql(
sql = sql,
startTime = startTime,
incremental = incremental,
cohortId = cohortDefinitionSet$cohortId[i],
checksum = cohortDefinitionSet$checksum[i],
recordKeepingFile = recordKeepingFile
)
}, error = function(e) {
endTime <- lubridate::now()
ParallelLogger::logError("An error occurred while generating cohortName = ", cohortName, ". Error: ", e)
if (stopIfError) {
stop()
generationInfo <- tryCatch(
expr = {
startTime <- lubridate::now()
# ANOTHER NOTE: Adding the ability to time-out a function
# after an elapsed time defined by the timeout parameter.
# When this value is a positive numeric, we'll use the
# R.utils::withTimeout to call runCohortSql.
if (timeout > 0) {
generationInfo <- R.utils::withTimeout({
generationInfo <- runCohortSql(
sql = sql,
startTime = startTime,
incremental = incremental,
cohortId = cohortDefinitionSet$cohortId[i],
checksum = cohortDefinitionSet$checksum[i],
recordKeepingFile = recordKeepingFile
)
}, timeout = timeout)
} else {
generationInfo <- runCohortSql(
sql = sql,
startTime = startTime,
incremental = incremental,
cohortId = cohortDefinitionSet$cohortId[i],
checksum = cohortDefinitionSet$checksum[i],
recordKeepingFile = recordKeepingFile
)
}
},
TimeoutException = function(ex) {
# This block is only active when R.utils::withTimeout is activated
# by timeout > 0
endTime <- lubridate::now()
warning("Timeout. Skipping cohort generation.")
return(list(
generationStatus = "TIMEOUT",
startTime = startTime,
endTime = endTime
))
},
error = function(e) {
endTime <- lubridate::now()
ParallelLogger::logError("An error occurred while generating cohortName = ",
cohortName,
". Error: ",
e)
if (stopIfError) {
stop()
}
return(list(
generationStatus = "FAILED",
startTime = startTime,
endTime = endTime
))
}
return(list(
generationStatus = "FAILED",
startTime = startTime,
endTime = endTime
))
})
)
} else {
generationInfo <- list(
generationStatus = "SKIPPED",
Expand Down
67 changes: 67 additions & 0 deletions extras/TestTimeout.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# MODIFIED CODE FROM THE R.utils::withTimeout help page ----------------
# - - - - - - - - - - - - - - - - - - - - - - - - -
# Function that takes "a long" time to run
# - - - - - - - - - - - - - - - - - - - - - - - - -
foo <- function() {
print("Tic")
for (kk in 1:100) {
print(kk)
Sys.sleep(0.1)
if (kk == 2) {
stop("tossing error")
}
}
print("Tac")
}

# - - - - - - - - - - - - - - - - - - - - - - - - -
# Evaluate code, if it takes too long, generate
# a timeout by throwing a TimeoutException.
# - - - - - - - - - - - - - - - - - - - - - - - - -
res <- NULL
res <- tryCatch({
res <- R.utils::withTimeout({
foo()
}, timeout = -1)
}, TimeoutException = function(ex) {
message("Timeout. Skipping.")
return(1)
}, error = function(ex) {
message("Actual error")
return(2)
})

print(res)

# Adaptation for CG ----------------

generationInfo <- tryCatch(expr = {
startTime <- lubridate::now()
generationInfo <- R.utils::withTimeout({
generationInfo <- runCohortSql(
startTime = startTime,
sleep = 10,
throwError = T
)}, timeout = 5)
}, TimeoutException = function(ex) {
endTime <- lubridate::now()
warning("Timeout. Skipping cohort generation.")
return(list(
generationStatus = "TIMEOUT",
startTime = startTime,
endTime = endTime
))
}, error = function(ex) {
endTime <- lubridate::now()
ParallelLogger::logError("An error occurred. Error: ", ex)
# if (stopIfError) {
# stop()
# }
return(list(
generationStatus = "FAILED",
startTime = startTime,
endTime = endTime
))
})

print(generationInfo)
8 changes: 8 additions & 0 deletions man/generateCohortSet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit aadf624

Please sign in to comment.