Skip to content

Commit

Permalink
fix: Fix task timeout not applying to retries. (#1695)
Browse files Browse the repository at this point in the history
* Move timeout from job to runner.

* Finalize timeout.

* Bump.

* Fix versions.
  • Loading branch information
milesj authored Oct 18, 2024
1 parent 9e37211 commit 120e8e0
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 84 deletions.
16 changes: 16 additions & 0 deletions .yarn/versions/a679b82f.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
releases:
'@moonrepo/cli': patch
'@moonrepo/core-linux-arm64-gnu': patch
'@moonrepo/core-linux-arm64-musl': patch
'@moonrepo/core-linux-x64-gnu': patch
'@moonrepo/core-linux-x64-musl': patch
'@moonrepo/core-macos-arm64': patch
'@moonrepo/core-macos-x64': patch
'@moonrepo/core-windows-x64-msvc': patch
'@moonrepo/types': patch

declined:
- '@moonrepo/nx-compat'
- '@moonrepo/report'
- '@moonrepo/runtime'
- website
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## Unreleased

#### 🚀 Updates

- Removed the warning around `.env` files not existing in certain environments.

#### 🐞 Fixes

- Fixed an issue where the task option `timeout` would apply to the overall run, and not for each
attempt when using the `retryCount` option.

#### ⚙️ Internal

- Updated Rust to v1.82.
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/action-graph/src/action_graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ impl<'app> ActionGraphBuilder<'app> {
persistent: task.is_persistent(),
runtime: self.get_runtime(project, task.platform, true),
target: task.target.to_owned(),
timeout: task.options.timeout,
id: None,
});

Expand Down
4 changes: 0 additions & 4 deletions crates/action-pipeline/src/action_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,6 @@ async fn dispatch_job(
action_context: Arc<ActionContext>,
) {
let job = Job {
timeout: match &node {
ActionNode::RunTask(inner) => inner.timeout,
_ => None,
},
node,
node_index,
context: job_context,
Expand Down
46 changes: 0 additions & 46 deletions crates/action-pipeline/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ use moon_action::{Action, ActionNode, ActionStatus};
use moon_action_context::ActionContext;
use moon_app_context::AppContext;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use tokio_util::sync::CancellationToken;
use tracing::{instrument, trace};

pub struct Job {
Expand All @@ -18,17 +14,11 @@ pub struct Job {
pub context: JobContext,
pub app_context: Arc<AppContext>,
pub action_context: Arc<ActionContext>,

/// Maximum seconds to run before it's cancelled
pub timeout: Option<u64>,
}

impl Job {
#[instrument(skip_all)]
pub async fn dispatch(self) {
let timeout_token = CancellationToken::new();
let timeout_handle = self.monitor_timeout(self.timeout, timeout_token.clone());

let mut action = Action::new(self.node);
action.node_index = self.node_index;

Expand Down Expand Up @@ -56,17 +46,6 @@ impl Job {
action.finish(ActionStatus::Skipped);
}

// Cancel if we have timed out
_ = timeout_token.cancelled() => {
trace!(
index = self.node_index,
timeout = self.timeout,
"Job timed out",
);

action.finish(ActionStatus::TimedOut);
}

// Or run the job to completion
_ = run_action(
&mut action,
Expand All @@ -78,32 +57,7 @@ impl Job {
) => {},
};

// Cleanup before sending the result
if let Some(handle) = timeout_handle {
handle.abort();
}

// Send the result back to the pipeline
self.context.send_result(action).await;
}

fn monitor_timeout(
&self,
duration: Option<u64>,
timeout_token: CancellationToken,
) -> Option<JoinHandle<()>> {
duration.map(|duration| {
tokio::spawn(async move {
if timeout(
Duration::from_secs(duration),
sleep(Duration::from_secs(86400)), // 1 day
)
.await
.is_err()
{
timeout_token.cancel();
}
})
})
}
}
2 changes: 0 additions & 2 deletions crates/action/src/action_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub struct RunTaskNode {
pub persistent: bool, // Never terminates
pub runtime: Runtime,
pub target: Target,
pub timeout: Option<u64>,
pub id: Option<u32>, // For action graph states
}

Expand All @@ -43,7 +42,6 @@ impl RunTaskNode {
persistent: false,
runtime,
target,
timeout: None,
id: None,
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod output;
mod process_error;
mod shell;

pub use async_command::*;
pub use command::*;
pub use moon_args as args;
pub use output::*;
Expand Down
10 changes: 0 additions & 10 deletions crates/project-expander/src/tasks_expander.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::expander_context::*;
use crate::tasks_expander_error::TasksExpanderError;
use crate::token_expander::TokenExpander;
use moon_common::color;
use moon_config::{TaskArgs, TaskDependencyConfig};
use moon_project::Project;
use moon_task::{Target, TargetScope, Task};
Expand Down Expand Up @@ -292,15 +291,6 @@ impl<'graph, 'query> TasksExpander<'graph, 'query> {
for (key, val) in merged_env_vars {
env.entry(key).or_insert(val);
}

if !missing_paths.is_empty() {
warn!(
target = task.target.as_str(),
env_files = ?missing_paths,
"Setting {} is enabled but file(s) don't exist, skipping as this may be intentional",
color::property("options.envFile"),
);
}
}

task.env = substitute_env_vars(env);
Expand Down
1 change: 1 addition & 0 deletions crates/task-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ starbase_archive = { workspace = true }
starbase_utils = { workspace = true, features = ["glob"] }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
Expand Down
103 changes: 83 additions & 20 deletions crates/task-runner/src/command_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use moon_app_context::AppContext;
use moon_common::{color, is_ci, is_test_env};
use moon_config::TaskOutputStyle;
use moon_console::TaskReportItem;
use moon_process::args::join_args;
use moon_process::Command;
use moon_process::{args::join_args, AsyncCommand, Command};
use moon_project::Project;
use moon_task::Task;
use std::process::Output;
use std::time::Duration;
use tokio::task::{self, JoinHandle};
use tokio::time::sleep;
use tokio::time::{sleep, timeout};
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument};

fn is_ci_env() -> bool {
Expand Down Expand Up @@ -81,7 +82,7 @@ impl<'task> CommandExecutor<'task> {
self.prepare_state(context, report_item);

// For long-running process, log a message on an interval to indicate it's still running
self.start_monitoring();
self.monitor_running_status();

// Execute the command on a loop as an attempt for every retry count we have
let command_line = self.get_command_line(context);
Expand All @@ -106,28 +107,70 @@ impl<'task> CommandExecutor<'task> {
self.print_command_line(&command_line)?;

// Attempt to execute command
let mut command = self.command.create_async();
async fn execute_command(
mut command: AsyncCommand<'_>,
stream: bool,
interactive: bool,
) -> miette::Result<Output> {
match (stream, interactive) {
(true, true) | (false, true) => command.exec_stream_output().await,
(true, false) => command.exec_stream_and_capture_output().await,
_ => command.exec_capture_output().await,
}
}

let timeout_token = CancellationToken::new();
let timeout_handle =
self.monitor_timeout(self.task.options.timeout, timeout_token.clone());

let attempt_result = tokio::select! {
// Run conditions in order!
biased;

let attempt_result = match (self.stream, self.interactive) {
(true, true) | (false, true) => command.exec_stream_output().await,
(true, false) => command.exec_stream_and_capture_output().await,
_ => command.exec_capture_output().await,
// Cancel if we have timed out
_ = timeout_token.cancelled() => {
Ok(None)
}

// Or run the job to completion
result = execute_command(
self.command.create_async(),
self.stream,
self.interactive,
) => result.map(Some),
};

// Cleanup before sending the result
if let Some(handle) = timeout_handle {
handle.abort();
}

// Handle the execution result
match attempt_result {
// Zero and non-zero exit codes
Ok(output) => {
let is_success = output.status.success();
Ok(maybe_output) => {
let mut is_success = false;

debug!(
task = self.task.target.as_str(),
command = self.command.bin.to_str(),
exit_code = output.status.code(),
"Ran task, checking conditions",
);
if let Some(output) = maybe_output {
is_success = output.status.success();

attempt.finish_from_output(output);
debug!(
task = self.task.target.as_str(),
command = self.command.bin.to_str(),
exit_code = output.status.code(),
"Ran task, checking conditions",
);

attempt.finish_from_output(output);
} else {
debug!(
task = self.task.target.as_str(),
command = self.command.bin.to_str(),
"Task timed out",
);

attempt.finish(ActionStatus::TimedOut);
}

self.app.console.reporter.on_task_finished(
&self.task.target,
Expand Down Expand Up @@ -162,7 +205,7 @@ impl<'task> CommandExecutor<'task> {
else {
debug!(
task = self.task.target.as_str(),
"Task was unsuccessful, failing early as we hit our max attempts",
"Task was unsuccessful, failing as we hit our max attempts",
);

break None;
Expand Down Expand Up @@ -202,7 +245,7 @@ impl<'task> CommandExecutor<'task> {
})
}

fn start_monitoring(&mut self) {
fn monitor_running_status(&mut self) {
if self.persistent || self.interactive {
return;
}
Expand All @@ -222,6 +265,26 @@ impl<'task> CommandExecutor<'task> {
}));
}

fn monitor_timeout(
&self,
duration: Option<u64>,
timeout_token: CancellationToken,
) -> Option<JoinHandle<()>> {
duration.map(|duration| {
tokio::spawn(async move {
if timeout(
Duration::from_secs(duration),
sleep(Duration::from_secs(86400)), // 1 day
)
.await
.is_err()
{
timeout_token.cancel();
}
})
})
}

fn stop_monitoring(&mut self) {
if let Some(handle) = self.handle.take() {
handle.abort();
Expand Down
1 change: 0 additions & 1 deletion packages/types/src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ export interface ActionNodeRunTask {
persistent: boolean;
runtime: Runtime;
target: string;
timeout: number | null;
id: number | null;
};
}
Expand Down

0 comments on commit 120e8e0

Please sign in to comment.