Skip to content

Commit

Permalink
fix: Ensure action node is always present. (#1132)
Browse files Browse the repository at this point in the history
* Use an arc.

* Fix tests.
  • Loading branch information
milesj authored Oct 21, 2023
1 parent 956c608 commit 9e5473e
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 52 deletions.
2 changes: 1 addition & 1 deletion crates/core/action-pipeline/src/actions/run_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn run_task(
color::label(&task.target)
);

runner.node = action.node.clone();
runner.node = Arc::clone(&action.node);
action.allow_failure = task.options.allow_failure;

// If a dependency failed, we should skip this target
Expand Down
6 changes: 1 addition & 5 deletions crates/core/action-pipeline/src/estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ impl Estimator {
// Bucket every ran target based on task name,
// and aggregate all tasks of the same name.
for result in results {
let Some(node) = &result.node else {
continue;
};

let Some(duration) = &result.duration else {
continue;
};
Expand All @@ -66,7 +62,7 @@ impl Estimator {
task_duration *= 10;
}

match node {
match &*result.node {
ActionNode::SetupTool { .. }
| ActionNode::InstallDeps { .. }
| ActionNode::InstallProjectDeps { .. } => {
Expand Down
6 changes: 3 additions & 3 deletions crates/core/action-pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ impl Pipeline {
}

term.line(label_checkpoint(
match &result.node {
Some(ActionNode::RunTask { target, .. }) => target.as_str(),
match &*result.node {
ActionNode::RunTask { target, .. } => target.as_str(),
_ => &result.label,
},
Checkpoint::RunFailed,
Expand Down Expand Up @@ -470,7 +470,7 @@ impl Pipeline {
let mut skipped_count = 0;

for result in results {
if compact && !matches!(result.node.as_ref().unwrap(), ActionNode::RunTask { .. }) {
if compact && !matches!(*result.node, ActionNode::RunTask { .. }) {
continue;
}

Expand Down
13 changes: 6 additions & 7 deletions crates/core/action-pipeline/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::actions::run_task::run_task;
use crate::actions::setup_tool::setup_tool;
use crate::actions::sync_project::sync_project;
use crate::actions::sync_workspace::sync_workspace;
use moon_action::{Action, ActionNode};
use moon_action::{Action, ActionNode, ActionStatus};
use moon_action_context::ActionContext;
use moon_emitter::{Emitter, Event};
use moon_logger::trace;
Expand All @@ -29,7 +29,7 @@ pub async fn process_action(
) -> miette::Result<Action> {
action.start();

let node = action.node.take().unwrap();
let node = Arc::clone(&action.node);
let log_action_label = color::muted_light(&action.label);

trace!(
Expand All @@ -51,7 +51,9 @@ pub async fn process_action(
})
.await?;

let result = match &node {
let result = match &*node {
ActionNode::None => Ok(ActionStatus::Skipped),

// Setup and install the specific tool
ActionNode::SetupTool { runtime } => {
local_emitter
Expand Down Expand Up @@ -214,7 +216,7 @@ pub async fn process_action(
if action.has_failed() {
// If these fail, we should abort instead of trying to continue
if matches!(
node,
*node,
ActionNode::SetupTool { .. } | ActionNode::InstallDeps { .. }
) {
action.abort();
Expand Down Expand Up @@ -245,8 +247,5 @@ pub async fn process_action(
);
}

// Reassign the node for reuse
action.node = Some(node);

Ok(action)
}
55 changes: 28 additions & 27 deletions crates/core/action-pipeline/tests/estimator_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ use moon_action::{Action, ActionNode, ActionStatus};
use moon_action_pipeline::estimator::{Estimator, TaskEstimate};
use moon_platform::Runtime;
use rustc_hash::FxHashMap;
use std::sync::Arc;
use std::time::Duration;

const NANOS_PER_MILLI: u32 = 1_000_000;
const HALF_SECOND: u32 = NANOS_PER_MILLI * 500;

fn create_run_task_action(runtime: Runtime, target: &str) -> ActionNode {
ActionNode::RunTask {
fn create_run_task_action(runtime: Runtime, target: &str) -> Arc<ActionNode> {
Arc::new(ActionNode::RunTask {
interactive: false,
persistent: false,
runtime,
target: target.into(),
}
})
}

mod estimator {
Expand All @@ -40,7 +41,7 @@ mod estimator {
let est = Estimator::calculate(
&[Action {
duration: Some(Duration::new(10, 0)),
node: Some(create_run_task_action(Runtime::system(), "proj:task")),
node: create_run_task_action(Runtime::system(), "proj:task"),
..Action::default()
}],
Duration::new(5, 0),
Expand All @@ -67,27 +68,27 @@ mod estimator {
&[
Action {
duration: Some(Duration::new(10, 0)),
node: Some(create_run_task_action(Runtime::system(), "a:build")),
node: create_run_task_action(Runtime::system(), "a:build"),
..Action::default()
},
Action {
duration: Some(Duration::new(5, 0)),
node: Some(create_run_task_action(Runtime::system(), "a:lint")),
node: create_run_task_action(Runtime::system(), "a:lint"),
..Action::default()
},
Action {
duration: Some(Duration::new(15, 0)),
node: Some(create_run_task_action(Runtime::system(), "b:build")),
node: create_run_task_action(Runtime::system(), "b:build"),
..Action::default()
},
Action {
duration: Some(Duration::new(8, 0)),
node: Some(create_run_task_action(Runtime::system(), "c:test")),
node: create_run_task_action(Runtime::system(), "c:test"),
..Action::default()
},
Action {
duration: Some(Duration::new(12, 0)),
node: Some(create_run_task_action(Runtime::system(), "d:lint")),
node: create_run_task_action(Runtime::system(), "d:lint"),
..Action::default()
},
],
Expand Down Expand Up @@ -122,21 +123,21 @@ mod estimator {
&[
Action {
duration: Some(Duration::new(10, 0)),
node: Some(ActionNode::SetupTool {
node: Arc::new(ActionNode::SetupTool {
runtime: Runtime::system(),
}),
..Action::default()
},
Action {
duration: Some(Duration::new(25, 0)),
node: Some(ActionNode::InstallDeps {
node: Arc::new(ActionNode::InstallDeps {
runtime: Runtime::system(),
}),
..Action::default()
},
Action {
duration: Some(Duration::new(10, 0)),
node: Some(create_run_task_action(Runtime::system(), "proj:task")),
node: create_run_task_action(Runtime::system(), "proj:task"),
..Action::default()
},
],
Expand Down Expand Up @@ -166,7 +167,7 @@ mod estimator {
let est = Estimator::calculate(
&[Action {
duration: Some(Duration::new(3, 0)),
node: Some(create_run_task_action(Runtime::system(), "proj:task")),
node: create_run_task_action(Runtime::system(), "proj:task"),
status: ActionStatus::Cached,
..Action::default()
}],
Expand Down Expand Up @@ -194,41 +195,41 @@ mod estimator {
&[
Action {
duration: Some(Duration::new(10, 0)),
node: Some(ActionNode::SetupTool {
node: Arc::new(ActionNode::SetupTool {
runtime: Runtime::system(),
}),
..Action::default()
},
Action {
duration: Some(Duration::new(25, 0)),
node: Some(ActionNode::InstallDeps {
node: Arc::new(ActionNode::InstallDeps {
runtime: Runtime::system(),
}),
..Action::default()
},
Action {
duration: Some(Duration::new(10, 0)),
node: Some(create_run_task_action(Runtime::system(), "a:build")),
node: create_run_task_action(Runtime::system(), "a:build"),
..Action::default()
},
Action {
duration: Some(Duration::new(5, 0)),
node: Some(create_run_task_action(Runtime::system(), "a:lint")),
node: create_run_task_action(Runtime::system(), "a:lint"),
..Action::default()
},
Action {
duration: Some(Duration::new(15, 0)),
node: Some(create_run_task_action(Runtime::system(), "b:build")),
node: create_run_task_action(Runtime::system(), "b:build"),
..Action::default()
},
Action {
duration: Some(Duration::new(8, 0)),
node: Some(create_run_task_action(Runtime::system(), "c:test")),
node: create_run_task_action(Runtime::system(), "c:test"),
..Action::default()
},
Action {
duration: Some(Duration::new(12, 0)),
node: Some(create_run_task_action(Runtime::system(), "d:lint")),
node: create_run_task_action(Runtime::system(), "d:lint"),
..Action::default()
},
],
Expand Down Expand Up @@ -267,41 +268,41 @@ mod estimator {
&[
Action {
duration: Some(Duration::new(10, 0)),
node: Some(ActionNode::SetupTool {
node: Arc::new(ActionNode::SetupTool {
runtime: Runtime::system(),
}),
..Action::default()
},
Action {
duration: Some(Duration::new(25, 0)),
node: Some(ActionNode::InstallDeps {
node: Arc::new(ActionNode::InstallDeps {
runtime: Runtime::system(),
}),
..Action::default()
},
Action {
duration: Some(Duration::new(10, 0)),
node: Some(create_run_task_action(Runtime::system(), "a:build")),
node: create_run_task_action(Runtime::system(), "a:build"),
..Action::default()
},
Action {
duration: Some(Duration::new(5, 0)),
node: Some(create_run_task_action(Runtime::system(), "a:lint")),
node: create_run_task_action(Runtime::system(), "a:lint"),
..Action::default()
},
Action {
duration: Some(Duration::new(15, 0)),
node: Some(create_run_task_action(Runtime::system(), "b:build")),
node: create_run_task_action(Runtime::system(), "b:build"),
..Action::default()
},
Action {
duration: Some(Duration::new(8, 0)),
node: Some(create_run_task_action(Runtime::system(), "c:test")),
node: create_run_task_action(Runtime::system(), "c:test"),
..Action::default()
},
Action {
duration: Some(Duration::new(12, 0)),
node: Some(create_run_task_action(Runtime::system(), "d:lint")),
node: create_run_task_action(Runtime::system(), "d:lint"),
..Action::default()
},
],
Expand Down
5 changes: 3 additions & 2 deletions crates/core/action/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use moon_action_graph::ActionNode;
use moon_common::color;
use moon_utils::time::{chrono::prelude::*, now_timestamp};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::{Duration, Instant};

fn has_failed(status: &ActionStatus) -> bool {
Expand Down Expand Up @@ -100,7 +101,7 @@ pub struct Action {
pub log_target: String,

#[serde(skip)]
pub node: Option<ActionNode>,
pub node: Arc<ActionNode>,

#[serde(skip)]
pub node_index: usize,
Expand All @@ -126,7 +127,7 @@ impl Action {
flaky: false,
label: node.label(),
log_target: String::new(),
node: Some(node),
node: Arc::new(node),
node_index: 0,
started_at: None,
start_time: None,
Expand Down
10 changes: 5 additions & 5 deletions crates/core/runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use moon_workspace::Workspace;
use rustc_hash::FxHashMap;
use starbase_styles::color;
use starbase_utils::glob;
use std::sync::Arc;
use tokio::{
task,
time::{sleep, Duration},
Expand All @@ -38,7 +39,7 @@ pub enum HydrateFrom {
pub struct Runner<'a> {
pub cache: CacheItem<RunTargetState>,

pub node: Option<ActionNode>,
pub node: Arc<ActionNode>,

emitter: &'a Emitter,

Expand Down Expand Up @@ -70,7 +71,7 @@ impl<'a> Runner<'a> {

Ok(Runner {
cache,
node: None,
node: Arc::new(ActionNode::None),
emitter,
project,
stderr: Term::buffered_stderr(),
Expand Down Expand Up @@ -545,15 +546,14 @@ impl<'a> Runner<'a> {
let primary_longest_width = context.primary_targets.iter().map(|t| t.id.len()).max();
let is_primary = context.primary_targets.contains(&self.task.target);
let is_real_ci = is_ci() && !is_test_env();
let is_persistent =
self.node.as_ref().is_some_and(|n| n.is_persistent()) || self.task.is_persistent();
let is_persistent = self.node.is_persistent() || self.task.is_persistent();
let output;
let error;

// When a task is configured as local (no caching), or the interactive flag is passed,
// we don't "capture" stdout/stderr (which breaks stdin) and let it stream natively.
let is_interactive = (!self.task.options.cache && context.primary_targets.len() == 1)
|| self.node.as_ref().is_some_and(|n| n.is_interactive())
|| self.node.is_interactive()
|| self.task.is_interactive();

// When the primary target, always stream the output for a better developer experience.
Expand Down
8 changes: 6 additions & 2 deletions nextgen/action-graph/src/action_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use moon_task::Target;
use serde::Serialize;
use std::hash::{Hash, Hasher};

#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize)]
#[serde(tag = "action", content = "params")]
pub enum ActionNode {
#[default]
None,

/// Install tool dependencies in the workspace root.
InstallDeps { runtime: Runtime },

Expand Down Expand Up @@ -40,7 +43,7 @@ impl ActionNode {
Self::RunTask { runtime, .. } => runtime,
Self::SetupTool { runtime } => runtime,
Self::SyncProject { runtime, .. } => runtime,
Self::SyncWorkspace => unreachable!(),
_ => unreachable!(),
}
}

Expand Down Expand Up @@ -97,6 +100,7 @@ impl ActionNode {
format!("Sync{runtime}Project({project})")
}
Self::SyncWorkspace => "SyncWorkspace".into(),
Self::None => "None".into(),
}
}
}
Expand Down
Loading

0 comments on commit 9e5473e

Please sign in to comment.