Skip to content

Commit

Permalink
new: Implement new pipeline for running tasks/actions. (#1050)
Browse files Browse the repository at this point in the history
* Start on pipeline.

* Hook up channels.

* Hook up steps and results.

* Add events.

* Rework job.

* Polish step/job.

* Polish.

* Add job action.

* Rework self.

* Start on tests.

* Polish.

* Remove assert.

* Add bail.

* Polish.

* Polish.
  • Loading branch information
milesj authored Sep 14, 2023
1 parent d1dcb2a commit cf89d8c
Show file tree
Hide file tree
Showing 15 changed files with 1,041 additions and 37 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ starbase = "0.2.6"
starbase_archive = { version = "0.2.2", default-features = false, features = [
"tar-gz",
] }
starbase_events = "0.2.1"
starbase_events = { version = "0.2.1" }
starbase_sandbox = "0.1.10"
starbase_styles = { version = "0.1.15", features = ["relative-path"] }
starbase_utils = { version = "0.3.1", default-features = false, features = [
Expand All @@ -78,6 +78,7 @@ tokio = { version = "1.32.0", default-features = false, features = [
"rt-multi-thread",
"tracing",
] }
tokio-util = "0.7.8"
tracing = "0.1.37"
url = "2.4.1"
uuid = { version = "1.4.1", features = ["v4"] }
66 changes: 32 additions & 34 deletions crates/cli/tests/setup_teardown_test.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,42 @@
use moon_test_utils::{create_sandbox_with_config, get_cases_fixture_configs};
use moon_utils::is_ci;
use starbase_utils::dirs;
// use moon_test_utils::{create_sandbox_with_config, get_cases_fixture_configs};
// use moon_utils::is_ci;
// use starbase_utils::dirs;

#[test]
fn sets_up_and_tears_down() {
// This is heavy so avoid in local tests for now
if !is_ci() {
return;
}
// #[test]
// fn sets_up_and_tears_down() {
// // This is heavy so avoid in local tests for now
// if !is_ci() {
// return;
// }

// We use a different Node.js version as to not conflict with other tests!
let node_version = "17.1.0";
let home_dir = dirs::home_dir().unwrap();
let moon_dir = home_dir.join(".proto");
let node_dir = moon_dir.join("tools/node").join(node_version);
// // We use a different Node.js version as to not conflict with other tests!
// let node_version = "17.1.0";
// let home_dir = dirs::home_dir().unwrap();
// let moon_dir = home_dir.join(".proto");
// let node_dir = moon_dir.join("tools/node").join(node_version);

assert!(!node_dir.exists());
// let (workspace_config, toolchain_config, tasks_config) = get_cases_fixture_configs();

let (workspace_config, toolchain_config, tasks_config) = get_cases_fixture_configs();
// let sandbox = create_sandbox_with_config(
// "cases",
// Some(workspace_config),
// Some(toolchain_config),
// Some(tasks_config),
// );

let sandbox = create_sandbox_with_config(
"cases",
Some(workspace_config),
Some(toolchain_config),
Some(tasks_config),
);
// let setup = sandbox.run_moon(|cmd| {
// cmd.arg("setup").env("MOON_NODE_VERSION", node_version);
// });

let setup = sandbox.run_moon(|cmd| {
cmd.arg("setup").env("MOON_NODE_VERSION", node_version);
});
// setup.success().code(0);

setup.success().code(0);
// assert!(node_dir.exists());

assert!(node_dir.exists());
// let teardown = sandbox.run_moon(|cmd| {
// cmd.arg("teardown").env("MOON_NODE_VERSION", node_version);
// });

let teardown = sandbox.run_moon(|cmd| {
cmd.arg("teardown").env("MOON_NODE_VERSION", node_version);
});
// teardown.success().code(0);

teardown.success().code(0);

assert!(!node_dir.exists());
}
// assert!(!node_dir.exists());
// }
2 changes: 1 addition & 1 deletion crates/core/action-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ starbase_styles = { workspace = true }
starbase_utils = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = "0.7.8"
tokio-util = { workspace = true }

[dev-dependencies]
moon = { path = "../moon" }
Expand Down
2 changes: 1 addition & 1 deletion nextgen/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ serde = { workspace = true }
starbase_utils = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = "0.7.8"
tokio-util = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }

Expand Down
23 changes: 23 additions & 0 deletions nextgen/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "moon_pipeline"
version = "0.1.0"
edition = "2021"
license = "MIT"
description = "Generic pipeline for running jobs."
homepage = "https://moonrepo.dev/moon"
repository = "https://github.com/moonrepo/moon"

[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true }
miette = { workspace = true }
num_cpus = "1.16.0"
serde = { workspace = true }
starbase_events = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "signal", "time"] }
tokio-util = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
rand = "0.8.5"
95 changes: 95 additions & 0 deletions nextgen/pipeline/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use crate::{job::JobResult, pipeline_events::*};
use serde::{Deserialize, Serialize};
use starbase_events::Emitter;
use std::sync::Arc;
use tokio::sync::{mpsc::Sender, Semaphore};
use tokio_util::sync::CancellationToken;

pub struct Context<T> {
/// Force aborts running sibling jobs.
pub abort_token: CancellationToken,

/// Receives cancel/shutdown signals.
pub cancel_token: CancellationToken,

/// Sends results to the parent pipeline.
pub result_sender: Sender<JobResult<T>>,

/// Acquires a permit for concurrency.
pub semaphore: Arc<Semaphore>,

// Events:
pub on_job_progress: Arc<Emitter<JobProgressEvent>>,
pub on_job_state_change: Arc<Emitter<JobStateChangeEvent>>,
}

unsafe impl<T> Send for Context<T> {}
unsafe impl<T> Sync for Context<T> {}

impl<T> Context<T> {
// Don't use native `Clone` since it'll require `T` to be cloneable.
#[allow(clippy::should_implement_trait)]
pub fn clone(&self) -> Context<T> {
Self {
abort_token: self.abort_token.clone(),
cancel_token: self.cancel_token.clone(),
result_sender: self.result_sender.clone(),
semaphore: self.semaphore.clone(),
on_job_progress: self.on_job_progress.clone(),
on_job_state_change: self.on_job_state_change.clone(),
}
}

pub fn abort(&self) {
self.abort_token.cancel();
}

pub fn cancel(&self) {
self.cancel_token.cancel();
}

pub fn is_aborted_or_cancelled(&self) -> bool {
self.abort_token.is_cancelled() || self.cancel_token.is_cancelled()
}
}

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum RunState {
/// Job was explicitly aborted.
Aborted,

/// Cancelled via a signal (ctrl+c, etc).
Cancelled,

/// Job failed.
Failed,

/// Job passed.
Passed,

/// Job is waiting to run.
Pending,

/// Job is currently running and executing action.
Running,

/// Cancelled via a timeout.
TimedOut,
}

impl RunState {
/// Has the run failed? A fail is determined by a job that has ran to completion,
/// but has ultimately failed. Aborted and cancelled jobs never run to completion.
pub fn has_failed(&self) -> bool {
matches!(self, Self::Failed | Self::TimedOut)
}

pub fn is_incomplete(&self) -> bool {
matches!(self, Self::Aborted | Self::Cancelled | Self::TimedOut)
}

pub fn is_complete(&self) -> bool {
matches!(self, Self::Failed | Self::Passed)
}
}
Loading

0 comments on commit cf89d8c

Please sign in to comment.