-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AN-226] Enable submitting workflows to Cromwell's GCP Batch backend #3141
Changes from 21 commits
1c37f55
6adc2cc
9f0a1bc
e5c70f3
6493663
4c21a9d
0924efb
fb0bbc7
d065d7f
fef634e
bb29484
60b70c7
1af1359
f33844a
5f142a8
01b9f48
6f7de22
02dddf7
3187133
25e2c4e
b8bc763
024e36d
1a4de46
d05642c
4ae9be2
0697b44
8ee7126
0d90360
2d8bf9d
9c3b890
d39caae
1129375
c854091
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ object WorkflowSubmissionActor { | |
useWorkflowCollectionLabel: Boolean, | ||
defaultNetworkCromwellBackend: CromwellBackend, | ||
highSecurityNetworkCromwellBackend: CromwellBackend, | ||
gcpBatchBackend: CromwellBackend, | ||
methodConfigResolver: MethodConfigResolver, | ||
bardService: BardService, | ||
workspaceSettingRepository: WorkspaceSettingRepository | ||
|
@@ -69,6 +70,7 @@ object WorkflowSubmissionActor { | |
useWorkflowCollectionLabel, | ||
defaultNetworkCromwellBackend, | ||
highSecurityNetworkCromwellBackend, | ||
gcpBatchBackend, | ||
methodConfigResolver, | ||
bardService, | ||
workspaceSettingRepository | ||
|
@@ -105,6 +107,7 @@ class WorkflowSubmissionActor(val dataSource: SlickDataSource, | |
val useWorkflowCollectionLabel: Boolean, | ||
val defaultNetworkCromwellBackend: CromwellBackend, | ||
val highSecurityNetworkCromwellBackend: CromwellBackend, | ||
val gcpBatchBackend: CromwellBackend, | ||
val methodConfigResolver: MethodConfigResolver, | ||
val bardService: BardService, | ||
val workspaceSettingRepository: WorkspaceSettingRepository | ||
|
@@ -159,6 +162,7 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths | |
val useWorkflowCollectionLabel: Boolean | ||
val defaultNetworkCromwellBackend: CromwellBackend | ||
val highSecurityNetworkCromwellBackend: CromwellBackend | ||
val gcpBatchBackend: CromwellBackend | ||
val methodConfigResolver: MethodConfigResolver | ||
val bardService: BardService | ||
val workspaceSettingRepository: WorkspaceSettingRepository | ||
|
@@ -301,7 +305,16 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths | |
// - final_workflow_outputs_dir = submissions/final-outputs | ||
// - final_workflow_outputs_mode = "copy". | ||
|
||
useCromwellGcpBatchBackend: Boolean = currentSettings | ||
.collectFirst { case setting: UseCromwellGcpBatchBackendSetting => | ||
setting | ||
} | ||
.exists(_.config.enabled) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this would be clearer if we did it in a single invocation (further tweaking welcome, this is just an idea):
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you mean by a single invocation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean going from a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should probably create a reusable utility method to retrieve a strongly-typed workspace setting from the settings list, a la: def findSetting[T](allSettings: List[WorkspaceSetting]): Option[T] = {
allSettings.collectFirst {
case found:T => found
}
} if we had that, then this code could be a bit clearer, like: useBatchSetting: Option[UseCromwellGcpBatchBackendSetting] = somwhereTheUtilityLives
.findSetting[UseCromwellGcpBatchBackendSetting](currentSettings) or useCromwellGcpBatchBackend: Boolean = somwhereTheUtilityLives
.findSetting[UseCromwellGcpBatchBackendSetting](currentSettings).exists(_.config.enabled) you could use the same utility to calculate I'd consider this a stretch goal/out of scope/does not block this PR. I don't want to scope-creep! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! @davidangb I agree that refactoring would be great for clarity and reusability in the future, but I have implemented Janet's suggestion for now. |
||
cromwellSubmissionBackend = | ||
if (useCromwellGcpBatchBackend) gcpBatchBackend else highSecurityNetworkCromwellBackend | ||
|
||
executionServiceWorkflowOptions = ExecutionServiceWorkflowOptions( | ||
submission.submissionRoot, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this meant to be duplicated? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes! David was confused by the same thing. I added a clarifying comment as he suggested! |
||
submission.submissionRoot, | ||
Comment on lines
+318
to
319
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's a good comment on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a clarifying comment, thanks! |
||
final_workflow_outputs_dir, | ||
final_workflow_outputs_dir_metadata, | ||
|
@@ -315,7 +328,7 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths | |
deleteIntermediateOutputFiles, | ||
useReferenceDisks, | ||
memoryRetryMultiplier, | ||
highSecurityNetworkCromwellBackend, | ||
cromwellSubmissionBackend, | ||
workflowFailureMode, | ||
google_labels = Map("terra-submission-id" -> s"terra-${submission.id.toString}"), | ||
ignoreEmptyOutputs, | ||
|
@@ -467,6 +480,7 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths | |
monitoringImageScript = submissionRec.monitoringImageScript | ||
) | ||
} yield { | ||
logger.error("Test workflow options: " + wfOpts.toString); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove or demote below There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please see my standalone comment below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will remove this log before merging. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this is still here! |
||
val submissionAndWorkspaceLabels = | ||
Map("submission-id" -> submissionRec.id.toString, "workspace-id" -> workspaceRec.id.toString) | ||
val wfLabels = workspaceRec.workflowCollection match { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,10 @@ | |
import org.broadinstitute.dsde.rawls.dataaccess._ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any non-scalafmt or whitespace changes to this file? If not, can you back out these changes? It's better to not have files modified at all if we don't need to change them functionally. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty certain that I made the commit to scalafmt this file because it was giving me an error on the scalafmt PR check. I see that that action claims to check only modified files, but maybe that file was being checked because the merge commit from dev modified it. Is there a way to revert the scalafmt changes without failing the check? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, maybe reverse all of the changes so that the diff between this branch and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was about to revert the commit that scalafmt-ed this file, but I decided to check the diff first and I couldn't see my changes anymore. It turns out that Bria must have been having the same problem, and this PR that was merged yesterday also fixed the formatting. Does it matter now whether or not I revert the commit that made the same changes on my branch? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, if the diff looks good you're all set! |
||
import org.broadinstitute.dsde.rawls.dataaccess.slick.{TestDriverComponent, WorkflowRecord} | ||
import org.broadinstitute.dsde.rawls.expressions.{BoundOutputExpression, OutputExpression} | ||
import org.broadinstitute.dsde.rawls.jobexec.SubmissionMonitorActor.{ExecutionServiceStatusResponse, StatusCheckComplete} | ||
import org.broadinstitute.dsde.rawls.jobexec.SubmissionMonitorActor.{ | ||
ExecutionServiceStatusResponse, | ||
StatusCheckComplete | ||
} | ||
import org.broadinstitute.dsde.rawls.metrics.RawlsStatsDTestUtils | ||
import org.broadinstitute.dsde.rawls.mock.{MockSamDAO, RemoteServicesMockServer} | ||
import org.broadinstitute.dsde.rawls.model._ | ||
|
@@ -39,7 +42,7 @@ | |
* Created by dvoet on 7/1/15. | ||
*/ | ||
//noinspection NameBooleanParameters,ScalaUnnecessaryParentheses,TypeAnnotation,ScalaUnusedSymbol | ||
class SubmissionMonitorSpec(_system: ActorSystem) | ||
Check failure on line 45 in core/src/test/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorSpec.scala GitHub Actions / Test Report pushSubmissionMonitorSpec.SubmissionMonitor should save workflow error messages
Raw output
|
||
extends TestKit(_system) | ||
with AnyFlatSpecLike | ||
with Matchers | ||
|
@@ -1348,22 +1351,33 @@ | |
dataSource: SlickDataSource => | ||
val cheapWorkflowId = UUID.randomUUID().toString | ||
val expensiveWorkflowId = UUID.randomUUID().toString | ||
val cheapWorkflow = Workflow(Some(cheapWorkflowId), WorkflowStatuses.Submitted, new DateTime(), Some(testData.sample1.toReference), Seq.empty) | ||
val expensiveWorkflow = Workflow(Some(expensiveWorkflowId), WorkflowStatuses.Submitted, new DateTime(), Some(testData.sample2.toReference), Seq.empty) | ||
val submission = testData.submission1.copy(submissionId = UUID.randomUUID().toString, workflows = Seq(cheapWorkflow, expensiveWorkflow)) | ||
val cheapWorkflow = Workflow(Some(cheapWorkflowId), | ||
WorkflowStatuses.Submitted, | ||
new DateTime(), | ||
Some(testData.sample1.toReference), | ||
Seq.empty | ||
) | ||
val expensiveWorkflow = Workflow(Some(expensiveWorkflowId), | ||
WorkflowStatuses.Submitted, | ||
new DateTime(), | ||
Some(testData.sample2.toReference), | ||
Seq.empty | ||
) | ||
val submission = testData.submission1.copy(submissionId = UUID.randomUUID().toString, | ||
workflows = Seq(cheapWorkflow, expensiveWorkflow) | ||
) | ||
runAndWait(submissionQuery.create(testData.workspace, submission)) | ||
runAndWait(updateWorkflowExecutionServiceKey("unittestdefault")) | ||
|
||
class CostCapTestExecutionServiceDAO(status: String) extends SubmissionTestExecutionServiceDAO(status) { | ||
override def getCost(id: String, userInfo: UserInfo): Future[WorkflowCostBreakdown] = { | ||
override def getCost(id: String, userInfo: UserInfo): Future[WorkflowCostBreakdown] = | ||
if (id.equals(cheapWorkflowId)) { | ||
Future.successful(WorkflowCostBreakdown(id, BigDecimal(1), "USD", status, Seq.empty)) | ||
} else if (id.equals(expensiveWorkflowId)) { | ||
Future.successful(WorkflowCostBreakdown(id, BigDecimal(11), "USD", status, Seq.empty)) | ||
} else { | ||
Future.failed(new Exception("Unexpected workflow ID")) | ||
} | ||
} | ||
} | ||
|
||
val monitor = createSubmissionMonitor( | ||
|
@@ -1377,7 +1391,8 @@ | |
) | ||
|
||
val workflowCosts = await(monitor.queryExecutionServiceForStatus()).statusResponse.collect { | ||
case Success(Some(recordWithOutputs)) => recordWithOutputs._1.externalId.get -> (recordWithOutputs._1.status, recordWithOutputs._1.cost) | ||
case Success(Some(recordWithOutputs)) => | ||
recordWithOutputs._1.externalId.get -> (recordWithOutputs._1.status, recordWithOutputs._1.cost) | ||
}.toMap | ||
|
||
workflowCosts(cheapWorkflowId) shouldEqual (WorkflowStatuses.Running.toString, Option(BigDecimal(1))) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I recommend only checking in the content in the
Publish rawls-model
section.I think a larger rewrite of the instructions is in order, which I intend to do in AN-297.
In particular, I don't think it makes sense to write instructions on what doesn't work, people want to know what does work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the note from lines 177-180 — I am happy to defer updates related to local workflow submission to your ticket. Did you also think I should revert my other updates other than those related to the Rawls model? I think my additional VPN reminder and additional context about using the dev database based on what we learned will help future developers not make the same mistakes that I did.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please stick to updating the content in the Publish rawls-model section.
We will have horrible merge conflicts otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will definitely pull you in to review that PR so you can confirm I covered everything you ran into.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#3146
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, thanks! I have reverted the other changes.