Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#incidence_summary temp table on DataBricks #66

Open
anthonysena opened this issue Sep 9, 2024 · 21 comments
Open

#incidence_summary temp table on DataBricks #66

anthonysena opened this issue Sep 9, 2024 · 21 comments

Comments

@anthonysena
Copy link
Collaborator

Running CohortIncidence via Strategus multiple times on DataBricks produced the following error:

java.sql.SQLException: [Databricks][DatabricksJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: org.apache.hive.service.cli.HiveSQLException: Error running query: [TABLE_OR_VIEW_ALREADY_EXISTS] org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `scratch_asena5`.`l693slnoincidence_summary` because it already exists.
Choose a different name, drop the existing object, add the IF NOT EXISTS clause to tolerate pre-existing objects, add the OR REPLACE clause to replace the existing materialized view, or add the OR REFRESH clause to refresh the existing streaming table. SQLSTATE: 42P07
	at org.apache.spark.sql.hive.thriftserver.HiveThriftServerErrors$.runningQueryError(HiveThriftServerErrors.scala:49)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.$anonfun$execute$1(SparkExecuteStatementOperation.scala:743)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:585)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:413)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:244)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:240)
	at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
	at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:43)
	at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
	at com.databricks.logging.AttributionContextTracing.withAttributionTags(AttributionContextTracing.scala:95)
	at com.databricks.logging.AttributionContextTracing.withAttributionTags$(AttributionContextTracing.scala:76)
	at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
	at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:72)
	at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:172)
	at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491)
	at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603)
	at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612)
	at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491)
	at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withAttributionTags(SparkExecuteStatementOperation.scala:67)
	at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.$anonfun$withLocalProperties$11(ThriftLocalProperties.scala:195)
	at com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48)
	at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties(ThriftLocalProperties.scala:190)
	at org.apache.spark.sql.hive.thriftserver.ThriftLocalProperties.withLocalProperties$(ThriftLocalProperties.scala:71)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:67)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:391)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:377)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:425)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `scratch_asena5`.`l693slnoincidence_summary` because it already exists.
Choose a different name, drop the existing object, add the IF NOT EXISTS clause to tolerate pre-existing objects, add the OR REPLACE clause to replace the existing materialized view, or add the OR REFRESH clause to refresh the existing streaming table. SQLSTATE: 42P07
	at org.apache.spark.sql.errors.QueryCompilationErrors$.tableAlreadyExistsError(QueryCompilationErrors.scala:1726)
	at org.apache.spark.sql.execution.datasources.v2.AtomicCreateTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:161)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.$anonfun$result$2(V2CommandExec.scala:48)
	at org.apache.spark.sql.execution.SparkPlan.runCommandWithAetherOff(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.runCommandInAetherOrSpark(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.$anonfun$result$1(V2CommandExec.scala:48)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:47)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:45)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:56)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$5(QueryExecution.scala:382)
	at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$4(QueryExecution.scala:382)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:167)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:382)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$9(SQLExecution.scala:400)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:719)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:278)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:165)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:656)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:378)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1176)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:374)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:325)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:371)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:347)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:505)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:505)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:40)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:379)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:375)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:40)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:40)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:481)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:347)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:436)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:347)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:284)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:281)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:339)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.$anonfun$analyzeQuery$2(SparkExecuteStatementOperation.scala:580)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.$anonfun$analyzeQuery$1(SparkExecuteStatementOperation.scala:533)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.getOrCreateDF(SparkExecuteStatementOperation.scala:519)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.analyzeQuery(SparkExecuteStatementOperation.scala:533)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.$anonfun$execute$5(SparkExecuteStatementOperation.scala:623)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:532)
	at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.$anonfun$execute$1(SparkExecuteStatementOperation.scala:623)
	... 40 more
, Query: CREATE TAB***.

Guessing that #incidence_summary is not dropped for some reason? Given the CohortIncidence build options here I presumed it was not using temp tables so I'm a bit confused as to why this would happen.

@anthonysena anthonysena changed the title #incidence_summary temp tables on DataBricks #incidence_summary temp table on DataBricks Sep 9, 2024
@chrisknoll
Copy link
Contributor

It should be dropped. In temp table mode, tho: the temp table is the final result that you query to get the result, but then the cleanup should be run to drop all the temp tables. Let me talk to you offline and figure out what's going on.

@chrisknoll
Copy link
Contributor

So, I've looked through the code, at least in the R context, the cleanup SQL is being invoked (CohortIncidence.R@175):

  results = list()
  #download results into dataframe.  We don't specify ref_id because the temp table will only contain this session results
  results$incidence_summary <- DatabaseConnector::querySql(conn, SqlRender::translate("select * from #incidence_summary", targetDialect = targetDialect));
  results$target_def <- DatabaseConnector::querySql(conn, SqlRender::translate("select * from #target_def", targetDialect = targetDialect));
  results$outcome_def <- DatabaseConnector::querySql(conn, SqlRender::translate("select * from #outcome_def", targetDialect = targetDialect));
  results$tar_def <- DatabaseConnector::querySql(conn, SqlRender::translate("select * from #tar_def", targetDialect = targetDialect));
  results$age_group_def <- DatabaseConnector::querySql(conn, SqlRender::translate("select * from #age_group_def", targetDialect = targetDialect));
  results$subgroup_def <- DatabaseConnector::querySql(conn, SqlRender::translate("select * from #subgroup_def", targetDialect = targetDialect));
  
  # use the getCleanupSql to fetch the DROP TABLE expressions for the tables that were created in tempDDL.
  cleanupSql <- SqlRender::translate(CohortIncidence::getCleanupSql(useTempTables=T), targetDialect);  
  rlang::inform("Drop tables created from temporary DDL")
  DatabaseConnector::executeSql(conn, cleanupSql);

And the cleanup sql is:

DROP TABLE @schemaName.incidence_summary;
DROP TABLE @schemaName.target_def;
DROP TABLE @schemaName.outcome_def;
DROP TABLE @schemaName.tar_def;
DROP TABLE @schemaName.age_group_def;
DROP TABLE @schemaName.subgroup_def;

So between each execution of executeAnalysis, it should be removing those tables. Not arguing that you're getting your error, I just am not sure why.

@anthonysena
Copy link
Collaborator Author

Thanks @chrisknoll - it is also unclear to me as well. Linking this to OHDSI/CohortGenerator#188 since these appeared in the same run.

@anthonysena
Copy link
Collaborator Author

I'm still unclear why this is happening but will close this out since it does not appear to be anything specific to this package.

@chrisknoll
Copy link
Contributor

Did other temp tables persist? I'm happy to trace through it to see if the drop table command isn't being executed.

@anthonysena
Copy link
Collaborator Author

Yes - there were other temp tables persisted from Cohort Generator, Characterization and FeatureExtraction but it is unclear why that happened. Re-running things has only made it more confusing as the packages are properly dropping the temp tables.

@chrisknoll
Copy link
Contributor

one thing I can think of is if there's an error during analysis execution where you 1) create the result tables up front, 2) run the analysis, and then 3) clean up: if it fails at 2, 3 never runs and it's left behind....this wouldn't be catastrophic since temp table names should be unique between connections, but if for some reason they are not and you get the same session token prefixed to the temp tables, you could run into a situation where a table already exists.

@anthonysena
Copy link
Collaborator Author

one thing I can think of is if there's an error during analysis execution where you 1) create the result tables up front, 2) run the analysis, and then 3) clean up: if it fails at 2, 3 never runs and it's left behind....this wouldn't be catastrophic since temp table names should be unique between connections, but if for some reason they are not and you get the same session token prefixed to the temp tables, you could run into a situation where a table already exists.

Yes, I am thinking this is exactly the scenario that played out. The original error in this issue indicated that a temp table couldn't be created since it already exists. Its unclear to me where an error may have occurred.

@anthonysena
Copy link
Collaborator Author

In speaking with @schuemie, we should update our SQL so that we're dropping temp tables ahead of creating them in an effort to prevent problems for those RDBMS platforms that use a tempEmulationSchema. Putting this here for reference to illustrate that point:

https://github.com/OHDSI/CohortMethod/blob/main/inst/sql/CreateCohorts.sql#L33

@chrisknoll
Copy link
Contributor

Would like to offer some pushback on this @schuemie and @anthonysena:

I understand the need to solve a problem, but what is happening here doesn't make sense, and I worry about applying a technique across all our sql packages (and the future ones yet to be written), when it's not solving the actual problem, but obscuring the real one.

This is the real problem I think we are obscuring:

When a connection is started, a unique session ID should be associated with it so that all temp tables are unique within the context of that connection. If it is the case that we are running an analysis...it gets killed or errors out for some reason that doesn't get tot he point of manually killing the temp tables, it should never be the case that if you start a new connection/re-run that you somehow get colliding temp table names. If that's happening (which I believe is happening here) it's a problem with your temp table handling, not that we need to drop before creating. In fact, we want to see the error that the table exists, it's demonstrating the actual problem!

If we go this route (of drop-before-create) and I'm right about the cause, this is what is going to happen:

You will have 2 processes running an analysis. Each process is going to create a conflicting temp table name. The first process runs and is in the middle of generating results. The second one starts and drops-creates the tables created by the first run. This will be a silent error and we will have no way of reproducing this based on the concurrency/race conditions of parallel tasks.

So, I'm imploring us to get to the underlying cause of why this 'table exists' error is happening, and recognize that the error actually uncovered an issue and we shouldn't take steps to hide it.

@anthonysena
Copy link
Collaborator Author

Thanks @chrisknoll - just trying to dig into the details around why this might be happening:

This is the real problem I think we are obscuring:

When a connection is started, a unique session ID should be associated with it so that all temp tables are unique within the context of that connection.

SqlRender creates a unique session ID (see here) for each query translated during a given session that is prefixed to the schema specified by the tempEmulationSchema option. It is using the Random class with no seed so this should be unique across sessions. The value is stored in the SqlTranslate's globalSessionId so that it may be reused for the same translation tasks so that we can keep the same prefix active for a given session.

If it is the case that we are running an analysis...it gets killed or errors out for some reason that doesn't get tot he point of manually killing the temp tables, it should never be the case that if you start a new connection/re-run that you somehow get colliding temp table names. If that's happening (which I believe is happening here) it's a problem with your temp table handling, not that we need to drop before creating. In fact, we want to see the error that the table exists, it's demonstrating the actual problem!

After reviewing the Java code, my assumption about the root cause of this issue is that the org.ohdsi.sql.SqlTranslate class is instantiated but never garbage collected by the JVM thereby leaving the globalSessionId set for the next attempt to translate a query. If the process fails as you described, it isn't guaranteed that the SqlTranslate class is disposed and we could wind up with the same session ID being used across what is presumably multiple sessions. In this scenario, we'd want to drop the "temp" table ahead of creating it.

If we go this route (of drop-before-create) and I'm right about the cause, this is what is going to happen:

You will have 2 processes running an analysis. Each process is going to create a conflicting temp table name. The first process runs and is in the middle of generating results. The second one starts and drops-creates the tables created by the first run. This will be a silent error and we will have no way of reproducing this based on the concurrency/race conditions of parallel tasks.

Assuming more than 1 process should require a new instance of the SqlTranslate class and then we should not face the conflict you've described here. Again, just an assumption based on the description of the way the Random class is working.

So, I'm imploring us to get to the underlying cause of why this 'table exists' error is happening, and recognize that the error actually uncovered an issue and we shouldn't take steps to hide it.

Appreciate the pushback here - it is important to make sure we're not masking a larger issue. I think if there are any changes required (beyond the whole dropping temp tables ahead of creating them) it would be in SqlRender.

@schuemie please correct anything I may have gotten wrong in this write up.

@chrisknoll
Copy link
Contributor

chrisknoll commented Sep 18, 2024

SqlRender creates a unique session ID (see here) for each query translated during a given session that is prefixed to the schema specified by the tempEmulationSchema option. It is using the Random class with no seed so this should be unique across sessions. The value is stored in the SqlTranslate's globalSessionId so that it may be reused for the same translation tasks so that we can keep the same prefix active for a given session.

I think the primary issue with this is that the session ids shouldn't be managed by the rendering engine, but the connection. I'm not sure the difference between a database connection and a 'session' but if connections can be re-created within the same session, then our scheme of creating these unique session IDs for temp tables is flawed.

In this scenario, we'd want to drop the "temp" table ahead of creating it.

But this would obscure that a temp table issue that's a problem, which is the main basis for my objection.

Assuming more than 1 process should require a new instance of the SqlTranslate class and then we should not face the conflict you've described here. Again, just an assumption based on the description of the way the Random class is working.

Not sure we can assume this, like when parallel forks R sessions, there could be process cloning (somethign that can happen on unix and macs, maybe even windows) where it's making a copy of memory and state from the main session into the child sessions (this is so it can transfer state between the forked processes)...so I'm not sure we can assume new processes will automatically yield new instances with new state...the opposite is more likely true.

Appreciate the pushback here - it is important to make sure we're not masking a larger issue. I think if there are any changes required (beyond the whole dropping temp tables ahead of creating them) it would be in SqlRender.

No worries, again, I'm just trying to avoid a fix that could either 1) not address the problem or 2) obscure an actual problem that we need to know about. I don't think we should persue the drop table option, we should understand when the unique session Id is yielded/rotated/decached....and I also think this might be a function of database connector (ie: when you invoke connect(), a unique session id for it should be attached). If we do want to make it a function of SqlRender, then we shouldn't depend on the automatic creation of the session (because it will never know when to reset it) but rather have the sessionId specifically allocated and applied for the series of sql renders that are being performed in a given analysis. If it is the case that we have to go back to all our analyitic queries to 'fix' them, i'd rather go through all the code and fix it by yielding a correct sessionID vs. drop-create in the sql. IMO, the former is the correct approach, and we'd have to evaluate the same amount of code for the changes as if we had to apply a drop-create.

@anthonysena
Copy link
Collaborator Author

anthonysena commented Sep 18, 2024

I think the primary issue with this is that the session ids shouldn't be managed by the rendering engine, but the connection. I'm not sure the difference between a database connection and a 'session' but if connections can be re-created within the same session, then our scheme of creating these unique session IDs for temp tables is flawed.

I'll let @schuemie comment on this point as he designed SqlRender & DatabaseConnector. Best I can tell the reason to put this into SqlRender is that there is no guarantee of how the query is executed. For example, I render some SQL using SqlRender and then I want to paste it into my favorite query editor to run it.

Let me attempt to break this issue down into a few scenarios for discussion. For all of these scenarios, I'm assuming you are running this on a single machine using RStudio.

The process stops unexpectedly.

Let's start with what I believe is the scenario that prompted the original issue: You run CI (or any package that creates temp tables) and the process stops unexpectedly. You then re-run and find that the temp table is still present leading to the error noted in this issue. In this scenario, the R session was not restarted thus leading to the problem of re-using the same temp table prefix. To illustrate this, here is a reprex that shows that SqlRender produces the same temp table prefix during a single R session:

SqlRender::getTempTablePrefix()
#> [1] "q4qwdvbt"
SqlRender::getTempTablePrefix()
#> [1] "q4qwdvbt"
SqlRender::getTempTablePrefix()
#> [1] "q4qwdvbt"

Created on 2024-09-18 with reprex v2.1.1

Unfortunately I couldn't find a way to then restart RStudio in a single reprex. So trust me that I restarted my R session and re-ran the code above to get:

SqlRender::getTempTablePrefix()
#> [1] "rqsar57a"
SqlRender::getTempTablePrefix()
#> [1] "rqsar57a"
SqlRender::getTempTablePrefix()
#> [1] "rqsar57a"

Created on 2024-09-18 with reprex v2.1.1

So one option would be to restart your R session if something unexpectedly fails. I think that's too big a burden to put on the user but it is an option. But this at least demonstrates that if something fails but the R session is not restarted, you'll get the same temp table prefix.

Prefix collisions in multiple processes

Credit to @gowthamrao for this script which he posted internally on this topic. Sharing it here since it nicely illustrates that the temp table prefix will be unique across R sessions using ParallelLogger:

# All credit to @gowthamrao for this reprex!
# Setup parameters for the test
numberOfClusters <- 5

# Create a temporary directory for storing session information
path <- file.path(tempdir(), "ParallelLoggerTest", tempfile() |> basename())
dir.create(path = path, showWarnings = FALSE, recursive = TRUE)

# Generate a unique parent session ID (to differentiate from cluster session IDs)
parentSessionId <- SqlRender::getTempTablePrefix()

# Define a helper function to generate a session ID and save it to a CSV file
getSessionId <- function(path) {
  sessionInformation <- dplyr::tibble(
    sessionId = SqlRender::getTempTablePrefix(),   # Generate a temporary session ID
    sessionSysTime = Sys.time()                    # Record the current system time
  )
  
  # Create a random string to ensure unique file names
  randomString <- tempfile() |> basename()
  
  # Define the file path for saving session information
  filePath <- file.path(path, paste0("sessionInformation_", randomString, ".csv"))
  
  # Save the session information to the file
  write.csv(sessionInformation, filePath, row.names = FALSE)
}

# Set up cluster configurations, assigning a path to each cluster node
clusterConfig <- list()
for (i in 1:numberOfClusters) {
  clusterConfig[[i]] <- list(path = path)
}

# Initialize the cluster with the specified number of threads
cluster <- ParallelLogger::makeCluster(numberOfThreads = length(clusterConfig))
#> Initiating cluster with 5 threads

# Execute the session ID generation function across all cluster nodes
ParallelLogger::clusterApply(cluster = cluster, x = clusterConfig, fun = getSessionId)
#>   |                                                                              |                                                                      |   0%  |                                                                              |==============                                                        |  20%  |                                                                              |============================                                          |  40%  |                                                                              |==========================================                            |  60%  |                                                                              |========================================================              |  80%  |                                                                              |======================================================================| 100%
#> [[1]]
#> NULL
#> 
#> [[2]]
#> NULL
#> 
#> [[3]]
#> NULL
#> 
#> [[4]]
#> NULL
#> 
#> [[5]]
#> NULL

# Stop the cluster after execution
ParallelLogger::stopCluster(cluster = cluster)
#> Stopping cluster

# Define a helper function to find and combine all session information CSV files
appendMatchingFiles <- function(path) {
  # Match all files with the sessionInformation_*.csv pattern
  filePattern <- file.path(path, "sessionInformation_*.csv")
  matchingFiles <- Sys.glob(filePattern)
  
  # Initialize an empty tibble to store all session data
  allData <- dplyr::tibble()
  
  # Read each matching file and append its data to the combined tibble
  for (file in matchingFiles) {
    fileData <- read.csv(file, stringsAsFactors = FALSE)
    allData <- dplyr::bind_rows(allData, dplyr::as_tibble(fileData))
  }
  
  return(allData)
}

# Combine all session data from generated files
allSessions <- appendMatchingFiles(path = path) |>
  dplyr::mutate(parentSessionId = parentSessionId)  # Append the parent session ID for comparison

allSessions
#> # A tibble: 5 × 3
#>   sessionId sessionSysTime             parentSessionId
#>   <chr>     <chr>                      <chr>          
#> 1 vme40vwd  2024-09-18 13:46:13.807492 r9zugdw1       
#> 2 ymytk1gp  2024-09-18 13:46:13.803506 r9zugdw1       
#> 3 hepypvac  2024-09-18 13:46:13.782942 r9zugdw1       
#> 4 dnf9f64h  2024-09-18 13:46:13.751967 r9zugdw1       
#> 5 rat9iomb  2024-09-18 13:46:13.767976 r9zugdw1

# Test that no session ID in the generated sessions matches the parent session ID
#testthat::expect_true(nrow(allSessions |> dplyr::filter(parentSessionId == sessionId)) == 0)

# Test that all generated session IDs are unique
#testthat::expect_true(length(unique(allSessions$sessionId)) == length(allSessions$sessionId))

# Clean up: remove the temporary directory and its contents
unlink(path, recursive = TRUE, force = TRUE)

Created on 2024-09-18 with reprex v2.1.1

This provides evidence that the session ID generated by SqlRender will be unique per R session, even in subprocesses. As a result, I think it would then make sense to defensively code the SQL to drop those temp tables if they exists before creating them since there is the possibility of reuse of the same prefix during the same R session.

@schuemie
Copy link
Member

Just a reminder that temp table emulation has been working without issue for about 10 years now, on all supported operating systems and DBMSs. It does require cleaning up your temp tables.

I know there's something to be said for having the prefix be unique to a connection, but that would require splitting translation into two steps, whereas now it is one (by SqlRender). Separate R threads will always have their own prefix. If not, that would mean somehow the Java VM is shared in which case a duplicate prefix would be the least of our worries.

@chrisknoll
Copy link
Contributor

Just a reminder that temp table emulation has been working without issue for about 10 years now, on all supported operating systems and DBMSs. It does require cleaning up your temp tables.

I think we should also consider that in those 10 years, 99.99999% of the environments have been supporting temp tables. It took us 2 weeks to uncover this issue when we went into an environment that doesn't support temp tables natively (DataBricks).

Separate R threads will always have their own prefix. If not, that would mean somehow the Java VM is shared in which case a duplicate prefix would be the least of our worries.

I think it would be a useful optimization of R process memory if JVMs were instantiated once and shared across R sessions, and this could possibly be a future direction of rJava, so I don't think we should make the assumption that there are unique JVMs per R Sessions for all time in the future. We may want to clean up our semantics of these session IDs (and I'm beginning to think when we say 'session' we mean the R session which could span multiple DB connections).

@anthonysena:

Thanks for the REPls and detailed description. I think it's slightly side-stepping what my argument was by nature of looking at how re-used session ids would lead to problems. if i take your examples above and adjust it slightly:

analysisQueryList <- builtBatchAnalysisQueries(analyisisSpec, dialect) // we create sql here and translate it.  Will use same session prefix.

if (runMode == sequential) {
  // for each analyisis query run it in sequence
} else {
 // run the queries in parallel....this will lead to conflicting temp tables across the processes.
}

btw, if we drop-before-create as we're proposing, the parallel approach will blow away each other's intermediate temp tables.

If i'm understanding you correctly, the SqlRender::getTempTablePrefix() (invoked here, defined here) reads from a static variable in the JVM, which is only initialized once for the entire JVM lifetime (and on the first time the getGlobalSessionId() is invoked.

Ok, so that's the flaw: the temp table prefix can't be re-used across db connections else you get into the problem we're seeing here (this breaks the temp table semantics we're trying to emulate).

Solutions? Well, one hand we have to write our code with more attention to temp table emulation. Personally, I thought this was handled under the covers but after thinking through it, it's impossible for the render layer to handle this because it doesn't know the connection context that the renderings are occurring under. But the developer does, and the developer will have to be responsible for properly associating the correct session IDs with the series of queries that are being translated. I think this is an inescapable truth.

So, what can we do? One idea: provide a mechanism to reset the global session Id. Maybe in translate() we can pass a flag (default to false for backwards compatability) that will reset the global session Id before translation. The reason why you might need to make this choice is that if you have multiple sql statements that are translated across multiple translate() invocations, they will need to share the same session ID for that series of translations. Other times, the entire analysis SQL is one huge block, and you'd want a new session ID for this one render because it's the only one you do for the connection. But, normally, I think you'll be doing multiple translates per 1 connection to the DB, so I feel the answer here is that we just need to be conscious about the temp table semantics in SqlRender.

Example: In CI, if you run in temp table mode, we actually do 3 steps:

  1. Set up result tables as temp -- get a new session ID here
  2. execute analysis -- use same session id
  3. download results to csv
  4. clean up result temp tables. -- use same session ID

These 4 steps occur within the context of 1 connection, but 1, 2, and 4 involve their own translate, so they need to use the same session identifiers otherwise you'll get different temp table names for the tables you set up in 1, and cleaned up in 4. We need a way to direct translate to get a new id or use the existing one.

But, even better, I think how I will approach this is that I will go into the places where translate is called, and fetch my own unique session ID and use it across the translate() calls. Unfortunately, looking at the API for sqlRender, it doesn't look like you can pass in your own SessionID for the duration of the translate. Although the underlying Java function does accept this parameter.

So, I propose the solution is that we expose the sessionID parameter to the translate calls at the R-level, and direct developers to use this parameter correctly to handle the cases of emulated temporary tables which means you need to fetch a new session ID to be used for the duration of your DB connection, and use that session ID in the calls to translate().

Conclusion:
I don't think we can depend on the JVM instantiation semantics to handle sessionID uniqueness. Only the developers will know the 'transactional boundaries' of their calls to translate, and therefore they have to make conscious decisions about providing the session ID to the calls. This should be the same level of work as looking for all places in code where we need to do a drop-before-create as suggested above, but by handling sessionID properly, we will actually solve the root problem we're exeriencing.

@anthonysena
Copy link
Collaborator Author

anthonysena commented Sep 19, 2024

analysisQueryList <- builtBatchAnalysisQueries(analyisisSpec, dialect) // we create sql here and translate it.  Will use same session prefix.

if (runMode == sequential) {
  // for each analyisis query run it in sequence
} else {
 // run the queries in parallel....this will lead to conflicting temp tables across the processes.
}

In the pseudo code above, when would you translate the query list and then execute in parallel? I'm not really sure where we are doing parallel SQL operations in HADES where this pattern might exist? The scenario I put forward earlier, if we use ParallelLogger to carry out parallel SQL tasks (where you performed the translation in that subprocess) you'd have a guarantee that there are no temp table prefix conflicts between those processes.

I understand your desire to have control of how the temp table prefix is specified/recycled but in my view that feels more complex that simply ensuring that for a given R session the temp table is dropped ahead of using it.

@chrisknoll
Copy link
Contributor

In the pseudo code above, when would you translate the query list and then execute in parallel?

The translate was done in the buildBatchAnalysisQueries...so it was formulated within 1 R session (and therefore the translation occurred in the R session with a single session ID). The next part of the code was just showing how sequentially it would work, but in parallel you'd have the risk of colliding temp tables where simultaneous queries would be dropping-creating each others temp tables. In either the squential case or parallel, the query would be invoked over their own connection. In normal temp table semantics, each temp table would be isolated from the other connections. For our temp-table emulation, we need to take more care in doing it. Enforcing drop-before-create logic is not addressing it...it's just 'making it work until it doesn't.'

For an example in practice: The characterization package introduced a sort of splitting up / incremental mode where one characterization design can be split up and run in parallel (I suggested that a choice be given if you want the pieces to be run in sequence (to consume less resources on the server at a time) or in parallel (to maximize resource utilization). So the above example is not so much contrived but based on real world example.

I understand your desire to have control of how the temp table prefix is specified/recycled but in my view that feels more complex that simply ensuring that for a given R session the temp table is dropped ahead of using it.

I understand, but as long as this suggestion keeps coming up, I'll continue to state how it will not solve the problem. The complexity at play here is handling the management of temp tables, and considering not every DBMS is supporting it, it is not a simple thing to consider. But if we're going to support platforms that do not have native temp tables, then we have to accept the complexity in the codebase. A simple drop-before-create solution isn't the way to do this. If you run the risk of one connection being able to touch a table that was created in another connection, then you are not providing the correct temp-table semantics that we are trying to emulate. They way to do that is provide a way to specify what the sessionID is going to be at translation time, and the developer (due to the requirement that they need to account for fake-temp tables) will have to understand how to specify the sessionIDs during translation. But, the R api doesn't let you pass in a session ID during translation, and I would prefer that it added this and then I'd be happy to alter code to assign the correct sessionID for the series of translations. but I'm not happy looking at a solution (drop-before-create) that does not solve the problem.

Final thought: we had some question about what you should do to clean up temp tables after an error. I was thinking that we have a current pattern where we do a connect() call and then add an handler after the function to do a dbDisconnect() type of call, I think we could implement something that would clean up this session's temp tables by fetching tables from the server in the temp schema, filter to tables beginning with sessionID and then dropping them. This way you have automatic cleanup. Obviously this won't work if you allow sessionIDs to be used across connections, but this is why the crux of my solution is to prevent that from happening.

@chrisknoll
Copy link
Contributor

chrisknoll commented Sep 19, 2024

Hey, Everyone,

After today's HADES call, we got to a point where there was some question about how this is an actual problem when forked processes will create their own session IDs and we won't have temp table conflicts in practice, so we don't really have to concern ourselves with dealing with reproducing the temp-table semantics.

So, to try to illustrate my point, I'll flip the script a bit by asking a simple question:

Why do we need to drop-and-create tables?

In proper temp table semantics, we wouldn't need to do that. And if no one can see the problem with how we're implemented 'fake-temp-table' semantics (worked fine for 10 years), why do we have the need to drop-and-create now? Shouldn't I be able execute con<-connect(); executeSql(conn,SqlRender::translate(create temp table #test, dialect='databricks'); dbDisconnect(con) multiple times and have it run without error? But it won't under our current implementation. Why Not? Shouldn't the answer be 'it should'? Because....that's how normal temp tables work....

If we wrote these sort of queries with fake-temp-table semantics in mind form the jump, it would look something like this:

con<-connect(); 
sessionId = SqlRender::generateSessionId()
executeSql(conn,SqlRender::translate(`create temp table #test`, dialect='databricks', sessionId = sessionId) 
dbDisconnect(con)

The astute reader would recognize this would leave fake-temp-tables littered around our temp schema, so we may also need to adopt the habit of cleaning up our temp space:

con<-connect(); 
sessionId = SqlRender::generateSessionId()
 tryCatch({
executeSql(conn,SqlRender::translate(`create temp table #test`, dialect='databricks', sessionId = sessionId); 
}, error = function(e) {

}, finally = {
  //  find and drop tables in temp schema that start with sessionId
  DatabaseConnector::dropTempTables(tempSchema, sessionId); 
  dbDisconnect(con)
})

The very astute reader may realize that the result of the error may leave the connection in a closed state, so there's no way to go back and clean up the temp tables. This is just an unfortunate reality of trying to emulate temp tables....there's going to be some corner cases we will have to rely on manual intervention (like a scheduled DB maintenance to clean orphaned tables).

@schuemie
Copy link
Member

Please let us leave all the theoretical and metaphysical concerns aside. The only real problem we have is this:

If a user runs an analysis,
and the analysis for some reason breaks down
and the user restarts the analysis in the same R session
and the platform requires temp table emulation
then the user gets an error that the temp table already exists.

The current proposed solutions are:

  1. Add DROP TABLE IF EXISTS before all attempts to create temp tables.

  2. Add extra rules to SqlRender to automatically insert the DROP TABLE IF EXISTS when creating a temp table.

  3. Move the temp table handling from SqlRender to DatabaseConnector. So translated SQL would still have regular temp table names (e.g. #temp), and in all DatabaseConnector functions we add code that appends random prefixes.

  4. Require the developer to provide session IDs to each call to SqlRender::translate(). This means every use of SQL translation in HADES would need to be changed into something like this:

    connection <- DatabaseConnector::connect(connectionDetails)
    sessionId <- SqlRender::getNewSessionId()
    sql <- "..."
    sql <- SqlRender::translate(sql, connectionDetails$dbms, sessionId)
    DatabaseConnector::executeSql(connection, sql)
    sql <- "..."
    sql <- SqlRender::translate(sql, connectionDetails$dbms, sessionId)
    data <- DatabaseConnector::querySql(connection, sql)

@chrisknoll : Did you have any solutions you would like to add to this?

Solution 3 would introduce several problems, including that the translated SQL now only works with DatabaseConnector. It also moves DatabaseConnector in the opposite direction of where we are going with DatabaseConnector 7, where we are trying to make it lighter and do less.

Solution 4 would require major rework of our code, requiring weeks of developer time, and introduce many opportunities for errors. It also would add significantly to our code complexity, making maintenance harder.

My vote is for solution 2, which seems the least amount of work, would keep our code clean, and readily solves the problem we have.

@chrisknoll
Copy link
Contributor

No, only that Solution 4 was my contribution to this discussion, and that it would only impact those queries that involve temp tables (which, admittedly is a majority of our analytical queries).

Solution 2 was the first I heard of it, and this solution means we can target this behavior only to dialects which don't support native temp tables. And we don't have to change all of our queries around it, so that would also be one that I'd compromise on. However, we're not fixing the temp table semantics because of allowing session IDs to be shared across connections, and that will continue to be my objection. When this theoretical/metaphysical situation becomes an actual problem, I reserve the right to an 'I told you so'.

@anthonysena
Copy link
Collaborator Author

Solution 2 from the earlier post works for me too. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants