From 120e8e0e1fb7c5a1c0692be739e1e4d96e8a266b Mon Sep 17 00:00:00 2001 From: Miles Johnson Date: Fri, 18 Oct 2024 16:38:59 -0700 Subject: [PATCH] fix: Fix task timeout not applying to retries. (#1695) * Move timeout from job to runner. * Finalize timeout. * Bump. * Fix versions. --- .yarn/versions/a679b82f.yml | 16 +++ CHANGELOG.md | 9 ++ Cargo.lock | 1 + .../action-graph/src/action_graph_builder.rs | 1 - crates/action-pipeline/src/action_pipeline.rs | 4 - crates/action-pipeline/src/job.rs | 46 -------- crates/action/src/action_node.rs | 2 - crates/process/src/lib.rs | 1 + crates/project-expander/src/tasks_expander.rs | 10 -- crates/task-runner/Cargo.toml | 1 + crates/task-runner/src/command_executor.rs | 103 ++++++++++++++---- packages/types/src/pipeline.ts | 1 - 12 files changed, 111 insertions(+), 84 deletions(-) create mode 100644 .yarn/versions/a679b82f.yml diff --git a/.yarn/versions/a679b82f.yml b/.yarn/versions/a679b82f.yml new file mode 100644 index 00000000000..462d5dd6f3c --- /dev/null +++ b/.yarn/versions/a679b82f.yml @@ -0,0 +1,16 @@ +releases: + '@moonrepo/cli': patch + '@moonrepo/core-linux-arm64-gnu': patch + '@moonrepo/core-linux-arm64-musl': patch + '@moonrepo/core-linux-x64-gnu': patch + '@moonrepo/core-linux-x64-musl': patch + '@moonrepo/core-macos-arm64': patch + '@moonrepo/core-macos-x64': patch + '@moonrepo/core-windows-x64-msvc': patch + '@moonrepo/types': patch + +declined: + - '@moonrepo/nx-compat' + - '@moonrepo/report' + - '@moonrepo/runtime' + - website diff --git a/CHANGELOG.md b/CHANGELOG.md index 7749b65b5bf..1ee875df362 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ ## Unreleased +#### 🚀 Updates + +- Removed the warning around `.env` files not existing in certain environments. + +#### 🐞 Fixes + +- Fixed an issue where the task option `timeout` would apply to the overall run, and not for each + attempt when using the `retryCount` option. + #### ⚙️ Internal - Updated Rust to v1.82. diff --git a/Cargo.lock b/Cargo.lock index e62ff0953ba..4514423d501 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3951,6 +3951,7 @@ dependencies = [ "starbase_utils", "thiserror", "tokio", + "tokio-util", "tracing", ] diff --git a/crates/action-graph/src/action_graph_builder.rs b/crates/action-graph/src/action_graph_builder.rs index 93286934e6b..30f9442c058 100644 --- a/crates/action-graph/src/action_graph_builder.rs +++ b/crates/action-graph/src/action_graph_builder.rs @@ -319,7 +319,6 @@ impl<'app> ActionGraphBuilder<'app> { persistent: task.is_persistent(), runtime: self.get_runtime(project, task.platform, true), target: task.target.to_owned(), - timeout: task.options.timeout, id: None, }); diff --git a/crates/action-pipeline/src/action_pipeline.rs b/crates/action-pipeline/src/action_pipeline.rs index 73ab7afaf87..4e9a6a1e1d2 100644 --- a/crates/action-pipeline/src/action_pipeline.rs +++ b/crates/action-pipeline/src/action_pipeline.rs @@ -412,10 +412,6 @@ async fn dispatch_job( action_context: Arc, ) { let job = Job { - timeout: match &node { - ActionNode::RunTask(inner) => inner.timeout, - _ => None, - }, node, node_index, context: job_context, diff --git a/crates/action-pipeline/src/job.rs b/crates/action-pipeline/src/job.rs index e935cc6f2ab..ed631bf525f 100644 --- a/crates/action-pipeline/src/job.rs +++ b/crates/action-pipeline/src/job.rs @@ -4,10 +4,6 @@ use moon_action::{Action, ActionNode, ActionStatus}; use moon_action_context::ActionContext; use moon_app_context::AppContext; use std::sync::Arc; -use std::time::Duration; -use tokio::task::JoinHandle; -use tokio::time::{sleep, timeout}; -use tokio_util::sync::CancellationToken; use tracing::{instrument, trace}; pub struct Job { @@ -18,17 +14,11 @@ pub struct Job { pub context: JobContext, pub app_context: Arc, pub action_context: Arc, - - /// Maximum seconds to run before it's cancelled - pub timeout: Option, } impl Job { #[instrument(skip_all)] pub async fn dispatch(self) { - let timeout_token = CancellationToken::new(); - let timeout_handle = self.monitor_timeout(self.timeout, timeout_token.clone()); - let mut action = Action::new(self.node); action.node_index = self.node_index; @@ -56,17 +46,6 @@ impl Job { action.finish(ActionStatus::Skipped); } - // Cancel if we have timed out - _ = timeout_token.cancelled() => { - trace!( - index = self.node_index, - timeout = self.timeout, - "Job timed out", - ); - - action.finish(ActionStatus::TimedOut); - } - // Or run the job to completion _ = run_action( &mut action, @@ -78,32 +57,7 @@ impl Job { ) => {}, }; - // Cleanup before sending the result - if let Some(handle) = timeout_handle { - handle.abort(); - } - // Send the result back to the pipeline self.context.send_result(action).await; } - - fn monitor_timeout( - &self, - duration: Option, - timeout_token: CancellationToken, - ) -> Option> { - duration.map(|duration| { - tokio::spawn(async move { - if timeout( - Duration::from_secs(duration), - sleep(Duration::from_secs(86400)), // 1 day - ) - .await - .is_err() - { - timeout_token.cancel(); - } - }) - }) - } } diff --git a/crates/action/src/action_node.rs b/crates/action/src/action_node.rs index b7744fbcf4d..593e33e28e0 100644 --- a/crates/action/src/action_node.rs +++ b/crates/action/src/action_node.rs @@ -30,7 +30,6 @@ pub struct RunTaskNode { pub persistent: bool, // Never terminates pub runtime: Runtime, pub target: Target, - pub timeout: Option, pub id: Option, // For action graph states } @@ -43,7 +42,6 @@ impl RunTaskNode { persistent: false, runtime, target, - timeout: None, id: None, } } diff --git a/crates/process/src/lib.rs b/crates/process/src/lib.rs index 2d5cec67a93..413365410ac 100644 --- a/crates/process/src/lib.rs +++ b/crates/process/src/lib.rs @@ -5,6 +5,7 @@ mod output; mod process_error; mod shell; +pub use async_command::*; pub use command::*; pub use moon_args as args; pub use output::*; diff --git a/crates/project-expander/src/tasks_expander.rs b/crates/project-expander/src/tasks_expander.rs index c9e89287943..fd0ae13940e 100644 --- a/crates/project-expander/src/tasks_expander.rs +++ b/crates/project-expander/src/tasks_expander.rs @@ -1,7 +1,6 @@ use crate::expander_context::*; use crate::tasks_expander_error::TasksExpanderError; use crate::token_expander::TokenExpander; -use moon_common::color; use moon_config::{TaskArgs, TaskDependencyConfig}; use moon_project::Project; use moon_task::{Target, TargetScope, Task}; @@ -292,15 +291,6 @@ impl<'graph, 'query> TasksExpander<'graph, 'query> { for (key, val) in merged_env_vars { env.entry(key).or_insert(val); } - - if !missing_paths.is_empty() { - warn!( - target = task.target.as_str(), - env_files = ?missing_paths, - "Setting {} is enabled but file(s) don't exist, skipping as this may be intentional", - color::property("options.envFile"), - ); - } } task.env = substitute_env_vars(env); diff --git a/crates/task-runner/Cargo.toml b/crates/task-runner/Cargo.toml index 61276f182ed..2dcea2b2d87 100644 --- a/crates/task-runner/Cargo.toml +++ b/crates/task-runner/Cargo.toml @@ -31,6 +31,7 @@ starbase_archive = { workspace = true } starbase_utils = { workspace = true, features = ["glob"] } thiserror = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } tracing = { workspace = true } [dev-dependencies] diff --git a/crates/task-runner/src/command_executor.rs b/crates/task-runner/src/command_executor.rs index 9e9c4d7ba93..5f91925732e 100644 --- a/crates/task-runner/src/command_executor.rs +++ b/crates/task-runner/src/command_executor.rs @@ -4,13 +4,14 @@ use moon_app_context::AppContext; use moon_common::{color, is_ci, is_test_env}; use moon_config::TaskOutputStyle; use moon_console::TaskReportItem; -use moon_process::args::join_args; -use moon_process::Command; +use moon_process::{args::join_args, AsyncCommand, Command}; use moon_project::Project; use moon_task::Task; +use std::process::Output; use std::time::Duration; use tokio::task::{self, JoinHandle}; -use tokio::time::sleep; +use tokio::time::{sleep, timeout}; +use tokio_util::sync::CancellationToken; use tracing::{debug, instrument}; fn is_ci_env() -> bool { @@ -81,7 +82,7 @@ impl<'task> CommandExecutor<'task> { self.prepare_state(context, report_item); // For long-running process, log a message on an interval to indicate it's still running - self.start_monitoring(); + self.monitor_running_status(); // Execute the command on a loop as an attempt for every retry count we have let command_line = self.get_command_line(context); @@ -106,28 +107,70 @@ impl<'task> CommandExecutor<'task> { self.print_command_line(&command_line)?; // Attempt to execute command - let mut command = self.command.create_async(); + async fn execute_command( + mut command: AsyncCommand<'_>, + stream: bool, + interactive: bool, + ) -> miette::Result { + match (stream, interactive) { + (true, true) | (false, true) => command.exec_stream_output().await, + (true, false) => command.exec_stream_and_capture_output().await, + _ => command.exec_capture_output().await, + } + } + + let timeout_token = CancellationToken::new(); + let timeout_handle = + self.monitor_timeout(self.task.options.timeout, timeout_token.clone()); + + let attempt_result = tokio::select! { + // Run conditions in order! + biased; - let attempt_result = match (self.stream, self.interactive) { - (true, true) | (false, true) => command.exec_stream_output().await, - (true, false) => command.exec_stream_and_capture_output().await, - _ => command.exec_capture_output().await, + // Cancel if we have timed out + _ = timeout_token.cancelled() => { + Ok(None) + } + + // Or run the job to completion + result = execute_command( + self.command.create_async(), + self.stream, + self.interactive, + ) => result.map(Some), }; + // Cleanup before sending the result + if let Some(handle) = timeout_handle { + handle.abort(); + } + // Handle the execution result match attempt_result { // Zero and non-zero exit codes - Ok(output) => { - let is_success = output.status.success(); + Ok(maybe_output) => { + let mut is_success = false; - debug!( - task = self.task.target.as_str(), - command = self.command.bin.to_str(), - exit_code = output.status.code(), - "Ran task, checking conditions", - ); + if let Some(output) = maybe_output { + is_success = output.status.success(); - attempt.finish_from_output(output); + debug!( + task = self.task.target.as_str(), + command = self.command.bin.to_str(), + exit_code = output.status.code(), + "Ran task, checking conditions", + ); + + attempt.finish_from_output(output); + } else { + debug!( + task = self.task.target.as_str(), + command = self.command.bin.to_str(), + "Task timed out", + ); + + attempt.finish(ActionStatus::TimedOut); + } self.app.console.reporter.on_task_finished( &self.task.target, @@ -162,7 +205,7 @@ impl<'task> CommandExecutor<'task> { else { debug!( task = self.task.target.as_str(), - "Task was unsuccessful, failing early as we hit our max attempts", + "Task was unsuccessful, failing as we hit our max attempts", ); break None; @@ -202,7 +245,7 @@ impl<'task> CommandExecutor<'task> { }) } - fn start_monitoring(&mut self) { + fn monitor_running_status(&mut self) { if self.persistent || self.interactive { return; } @@ -222,6 +265,26 @@ impl<'task> CommandExecutor<'task> { })); } + fn monitor_timeout( + &self, + duration: Option, + timeout_token: CancellationToken, + ) -> Option> { + duration.map(|duration| { + tokio::spawn(async move { + if timeout( + Duration::from_secs(duration), + sleep(Duration::from_secs(86400)), // 1 day + ) + .await + .is_err() + { + timeout_token.cancel(); + } + }) + }) + } + fn stop_monitoring(&mut self) { if let Some(handle) = self.handle.take() { handle.abort(); diff --git a/packages/types/src/pipeline.ts b/packages/types/src/pipeline.ts index 280de8aa779..a728a3201ac 100644 --- a/packages/types/src/pipeline.ts +++ b/packages/types/src/pipeline.ts @@ -167,7 +167,6 @@ export interface ActionNodeRunTask { persistent: boolean; runtime: Runtime; target: string; - timeout: number | null; id: number | null; }; }