-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AN-226] Enable submitting workflows to Cromwell's GCP Batch backend #3141
Changes from 27 commits
1c37f55
6adc2cc
9f0a1bc
e5c70f3
6493663
4c21a9d
0924efb
fb0bbc7
d065d7f
fef634e
bb29484
60b70c7
1af1359
f33844a
5f142a8
01b9f48
6f7de22
02dddf7
3187133
25e2c4e
b8bc763
024e36d
1a4de46
d05642c
4ae9be2
0697b44
8ee7126
0d90360
2d8bf9d
9c3b890
d39caae
1129375
c854091
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ object WorkflowSubmissionActor { | |
useWorkflowCollectionLabel: Boolean, | ||
defaultNetworkCromwellBackend: CromwellBackend, | ||
highSecurityNetworkCromwellBackend: CromwellBackend, | ||
gcpBatchBackend: CromwellBackend, | ||
methodConfigResolver: MethodConfigResolver, | ||
bardService: BardService, | ||
workspaceSettingRepository: WorkspaceSettingRepository | ||
|
@@ -69,6 +70,7 @@ object WorkflowSubmissionActor { | |
useWorkflowCollectionLabel, | ||
defaultNetworkCromwellBackend, | ||
highSecurityNetworkCromwellBackend, | ||
gcpBatchBackend, | ||
methodConfigResolver, | ||
bardService, | ||
workspaceSettingRepository | ||
|
@@ -105,6 +107,7 @@ class WorkflowSubmissionActor(val dataSource: SlickDataSource, | |
val useWorkflowCollectionLabel: Boolean, | ||
val defaultNetworkCromwellBackend: CromwellBackend, | ||
val highSecurityNetworkCromwellBackend: CromwellBackend, | ||
val gcpBatchBackend: CromwellBackend, | ||
val methodConfigResolver: MethodConfigResolver, | ||
val bardService: BardService, | ||
val workspaceSettingRepository: WorkspaceSettingRepository | ||
|
@@ -159,6 +162,7 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths | |
val useWorkflowCollectionLabel: Boolean | ||
val defaultNetworkCromwellBackend: CromwellBackend | ||
val highSecurityNetworkCromwellBackend: CromwellBackend | ||
val gcpBatchBackend: CromwellBackend | ||
val methodConfigResolver: MethodConfigResolver | ||
val bardService: BardService | ||
val workspaceSettingRepository: WorkspaceSettingRepository | ||
|
@@ -301,7 +305,16 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths | |
// - final_workflow_outputs_dir = submissions/final-outputs | ||
// - final_workflow_outputs_mode = "copy". | ||
|
||
useCromwellGcpBatchBackend: Boolean = currentSettings | ||
.collectFirst { case setting: UseCromwellGcpBatchBackendSetting => | ||
setting | ||
} | ||
.exists(_.config.enabled) | ||
cromwellSubmissionBackend = | ||
if (useCromwellGcpBatchBackend) gcpBatchBackend else highSecurityNetworkCromwellBackend | ||
|
||
executionServiceWorkflowOptions = ExecutionServiceWorkflowOptions( | ||
submission.submissionRoot, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this meant to be duplicated? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes! David was confused by the same thing. I added a clarifying comment as he suggested! |
||
submission.submissionRoot, | ||
Comment on lines
+318
to
319
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's a good comment on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a clarifying comment, thanks! |
||
final_workflow_outputs_dir, | ||
final_workflow_outputs_dir_metadata, | ||
|
@@ -315,7 +328,7 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths | |
deleteIntermediateOutputFiles, | ||
useReferenceDisks, | ||
memoryRetryMultiplier, | ||
highSecurityNetworkCromwellBackend, | ||
cromwellSubmissionBackend, | ||
workflowFailureMode, | ||
google_labels = Map("terra-submission-id" -> s"terra-${submission.id.toString}"), | ||
ignoreEmptyOutputs, | ||
|
@@ -467,6 +480,7 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths | |
monitoringImageScript = submissionRec.monitoringImageScript | ||
) | ||
} yield { | ||
logger.error("Test workflow options: " + wfOpts.toString); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove or demote below There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please see my standalone comment below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will remove this log before merging. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this is still here! |
||
val submissionAndWorkspaceLabels = | ||
Map("submission-id" -> submissionRec.id.toString, "workspace-id" -> workspaceRec.id.toString) | ||
val wfLabels = workspaceRec.workflowCollection match { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,9 +23,11 @@ import org.broadinstitute.dsde.rawls.jobexec.WorkflowSubmissionActor.{ | |
import org.broadinstitute.dsde.rawls.metrics.{BardService, RawlsStatsDTestUtils} | ||
import org.broadinstitute.dsde.rawls.mock.{MockBardService, MockSamDAO, RemoteServicesMockServer} | ||
import org.broadinstitute.dsde.rawls.model.ExecutionJsonSupport.ExecutionServiceWorkflowOptionsFormat | ||
import org.broadinstitute.dsde.rawls.model.WorkspaceSettingConfig.SeparateSubmissionFinalOutputsConfig | ||
import org.broadinstitute.dsde.rawls.model.WorkspaceSettingTypes.SeparateSubmissionFinalOutputs | ||
import org.broadinstitute.dsde.rawls.model.{WorkspaceSetting, _} | ||
import org.broadinstitute.dsde.rawls.model.WorkspaceSettingConfig.{ | ||
SeparateSubmissionFinalOutputsConfig, | ||
UseCromwellGcpBatchBackendConfig | ||
} | ||
import org.broadinstitute.dsde.rawls.model._ | ||
import org.broadinstitute.dsde.rawls.util.MockitoTestUtils | ||
import org.broadinstitute.dsde.rawls.workspace.WorkspaceSettingRepository | ||
import org.broadinstitute.dsde.rawls.{RawlsExceptionWithErrorReport, RawlsTestUtils} | ||
|
@@ -120,6 +122,7 @@ class WorkflowSubmissionSpec(_system: ActorSystem) | |
val useWorkflowCollectionLabel: Boolean = false, | ||
val defaultNetworkCromwellBackend: CromwellBackend = CromwellBackend("PAPIv2"), | ||
val highSecurityNetworkCromwellBackend: CromwellBackend = CromwellBackend("PAPIv2-CloudNAT"), | ||
val gcpBatchBackend: CromwellBackend = CromwellBackend("GCPBatch"), | ||
val methodConfigResolver: MethodConfigResolver = methodConfigResolver, | ||
val bardService: BardService = mockBardService, | ||
val workspaceSettingRepository: WorkspaceSettingRepository = mockWorkspaceSettingRepository | ||
|
@@ -381,6 +384,7 @@ class WorkflowSubmissionSpec(_system: ActorSystem) | |
Some( | ||
ExecutionServiceWorkflowOptions( | ||
jes_gcs_root = s"gs://${testData.workspace.bucketName}/${testData.submission1.submissionId}", | ||
gcp_batch_gcs_root = s"gs://${testData.workspace.bucketName}/${testData.submission1.submissionId}", | ||
None, | ||
None, | ||
google_project = testData.workspace.googleProjectId.value, | ||
|
@@ -937,6 +941,7 @@ class WorkflowSubmissionSpec(_system: ActorSystem) | |
false, | ||
CromwellBackend("PAPIv2"), | ||
CromwellBackend("PAPIv2-CloudNAT"), | ||
CromwellBackend("GCPBatch"), | ||
methodConfigResolver, | ||
mockBardService, | ||
mockWorkspaceSettingRepository | ||
|
@@ -1005,6 +1010,7 @@ class WorkflowSubmissionSpec(_system: ActorSystem) | |
false, | ||
CromwellBackend("PAPIv2"), | ||
CromwellBackend("PAPIv2-CloudNAT"), | ||
CromwellBackend("GCPBatch"), | ||
methodConfigResolver, | ||
mockBardService, | ||
mockWorkspaceSettingRepository | ||
|
@@ -1202,6 +1208,90 @@ class WorkflowSubmissionSpec(_system: ActorSystem) | |
} | ||
} | ||
|
||
it should "submit workflows to Cromwell's GCP Batch backend when UseCromwellGcpBatchBackendSetting is true" in withDefaultTestDatabase { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice tests! |
||
val mockExecCluster = MockShardedExecutionServiceCluster.fromDAO(new MockExecutionServiceDAO(), slickDataSource) | ||
val workspaceSettingRepository = mock[WorkspaceSettingRepository] | ||
when(workspaceSettingRepository.getWorkspaceSettings(UUID.fromString(testData.workspace.workspaceId))).thenReturn( | ||
Future.successful(List(UseCromwellGcpBatchBackendSetting(UseCromwellGcpBatchBackendConfig(true)))) | ||
) | ||
|
||
val workflowSubmission = | ||
new TestWorkflowSubmission(slickDataSource, workspaceSettingRepository = workspaceSettingRepository) { | ||
override val executionServiceCluster = mockExecCluster | ||
} | ||
|
||
val (workflowRecs, submissionRec, workspaceRec) = | ||
getWorkflowSubmissionWorkspaceRecords(testData.regionalSubmission, testData.workspace) | ||
|
||
Await.result( | ||
workflowSubmission.submitWorkflowBatch(WorkflowBatch(workflowRecs.map(_.id), submissionRec, workspaceRec)), | ||
Duration.Inf | ||
) | ||
|
||
val workflowOptions = mockExecCluster.getDefaultSubmitMember | ||
.asInstanceOf[MockExecutionServiceDAO] | ||
.submitOptions | ||
.map(_.parseJson.convertTo[ExecutionServiceWorkflowOptions]) | ||
|
||
workflowOptions.get.backend should be(CromwellBackend("GCPBatch")) | ||
} | ||
|
||
it should "submit workflows to Cromwell's high security network backend when UseCromwellGcpBatchBackendSetting is false" in withDefaultTestDatabase { | ||
val mockExecCluster = MockShardedExecutionServiceCluster.fromDAO(new MockExecutionServiceDAO(), slickDataSource) | ||
val workspaceSettingRepository = mock[WorkspaceSettingRepository] | ||
when(workspaceSettingRepository.getWorkspaceSettings(UUID.fromString(testData.workspace.workspaceId))).thenReturn( | ||
Future.successful(List(UseCromwellGcpBatchBackendSetting(UseCromwellGcpBatchBackendConfig(false)))) | ||
) | ||
|
||
val workflowSubmission = | ||
new TestWorkflowSubmission(slickDataSource, workspaceSettingRepository = workspaceSettingRepository) { | ||
override val executionServiceCluster = mockExecCluster | ||
} | ||
|
||
val (workflowRecs, submissionRec, workspaceRec) = | ||
getWorkflowSubmissionWorkspaceRecords(testData.regionalSubmission, testData.workspace) | ||
|
||
Await.result( | ||
workflowSubmission.submitWorkflowBatch(WorkflowBatch(workflowRecs.map(_.id), submissionRec, workspaceRec)), | ||
Duration.Inf | ||
) | ||
|
||
val workflowOptions = mockExecCluster.getDefaultSubmitMember | ||
.asInstanceOf[MockExecutionServiceDAO] | ||
.submitOptions | ||
.map(_.parseJson.convertTo[ExecutionServiceWorkflowOptions]) | ||
|
||
workflowOptions.get.backend should be(CromwellBackend("PAPIv2-CloudNAT")) | ||
} | ||
|
||
it should "submit workflows to Cromwell's high security network backend when UseCromwellGcpBatchBackendSetting is not set" in withDefaultTestDatabase { | ||
val mockExecCluster = MockShardedExecutionServiceCluster.fromDAO(new MockExecutionServiceDAO(), slickDataSource) | ||
val workspaceSettingRepository = mock[WorkspaceSettingRepository] | ||
when(workspaceSettingRepository.getWorkspaceSettings(UUID.fromString(testData.workspace.workspaceId))).thenReturn( | ||
Future.successful(List()) | ||
) | ||
|
||
val workflowSubmission = | ||
new TestWorkflowSubmission(slickDataSource, workspaceSettingRepository = workspaceSettingRepository) { | ||
override val executionServiceCluster = mockExecCluster | ||
} | ||
|
||
val (workflowRecs, submissionRec, workspaceRec) = | ||
getWorkflowSubmissionWorkspaceRecords(testData.regionalSubmission, testData.workspace) | ||
|
||
Await.result( | ||
workflowSubmission.submitWorkflowBatch(WorkflowBatch(workflowRecs.map(_.id), submissionRec, workspaceRec)), | ||
Duration.Inf | ||
) | ||
|
||
val workflowOptions = mockExecCluster.getDefaultSubmitMember | ||
.asInstanceOf[MockExecutionServiceDAO] | ||
.submitOptions | ||
.map(_.parseJson.convertTo[ExecutionServiceWorkflowOptions]) | ||
|
||
workflowOptions.get.backend should be(CromwellBackend("PAPIv2-CloudNAT")) | ||
} | ||
|
||
private def setWorkflowBatchToQueued(batchSize: Int, submissionId: String): Seq[WorkflowRecord] = | ||
runAndWait( | ||
for { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -126,6 +126,7 @@ class ExecutionModelSpec extends AnyFlatSpec with Matchers { | |
"ExecutionServiceWorkflowOptions" should "serialize/deserialize to/from JSON" in { | ||
val test = ExecutionServiceWorkflowOptions( | ||
jes_gcs_root = "jes_gcs_root", | ||
gcp_batch_gcs_root = "example_gcp_batch_gcs_root", | ||
final_workflow_outputs_dir = None, | ||
final_workflow_outputs_dir_metadata = None, | ||
google_project = "google_project", | ||
|
@@ -162,6 +163,7 @@ class ExecutionModelSpec extends AnyFlatSpec with Matchers { | |
""" | ||
|{ | ||
| "jes_gcs_root": "jes_gcs_root", | ||
| "gcp_batch_gcs_root": "example_gcp_batch_gcs_root", | ||
| "google_project": "google_project", | ||
| "account_name": "account_name", | ||
| "google_compute_service_account": "[email protected]", | ||
|
@@ -192,6 +194,7 @@ class ExecutionModelSpec extends AnyFlatSpec with Matchers { | |
""" | ||
|{ | ||
| "jes_gcs_root": "jes_gcs_root", | ||
| "gcp_batch_gcs_root": "example_gcp_batch_gcs_root", | ||
| "google_project": "google_project", | ||
| "account_name": "account_name", | ||
| "google_compute_service_account": "[email protected]", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,8 +93,8 @@ dataRepo { | |
|
||
resourceBuffer { | ||
projectPool { | ||
regular = "cwb_ws_dev_v7" | ||
exfiltrationControlled = "vpc_sc_v10" | ||
regular = "cwb_ws_dev_v8" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jgainerdewar I updated these pools in the local dev config to reflect the new pools you recently created because the old values were giving me trouble when trying to run Rawls locally. Do these new values seem correct? (The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, those look correct! |
||
exfiltrationControlled = "vpc_sc_v13" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whoops, should have updated these when I switched to the new pools but I didn't know they existed! |
||
} | ||
url = "https://buffer.dsde-dev.broadinstitute.org" | ||
saEmail = "[email protected]" | ||
|
@@ -132,6 +132,7 @@ executionservice { | |
|
||
defaultNetworkBackend = "PAPIv2-beta" | ||
highSecurityNetworkBackend = "PAPIv2-CloudNAT" | ||
gcpBatchBackend = "GCPBatch" | ||
|
||
cromiamUrl = "https://cromiam-priv.dsde-dev.broadinstitute.org" | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be clearer if we did it in a single invocation (further tweaking welcome, this is just an idea):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by a single invocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean going from a
collectFirst
followed by aexists
, to just anexists
. I find it easier to read because I don't have to think about the type coming out of thecollectFirst
. I also like making it obvious that we are handling two differentfalse
cases - the settings exists and is set to false, or the setting doesn't exist.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably create a reusable utility method to retrieve a strongly-typed workspace setting from the settings list, a la:
if we had that, then this code could be a bit clearer, like:
or
you could use the same utility to calculate
separateSubmissionSetting
up at line 293, too.I'd consider this a stretch goal/out of scope/does not block this PR. I don't want to scope-creep!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! @davidangb I agree that refactoring would be great for clarity and reusability in the future, but I have implemented Janet's suggestion for now.