Skip to content

Commit

Permalink
new: Implement new pipeline for running tasks/actions. (#1050)
Browse files Browse the repository at this point in the history
* Start on pipeline.

* Hook up channels.

* Hook up steps and results.

* Add events.

* Rework job.

* Polish step/job.

* Polish.

* Add job action.

* Rework self.

* Start on tests.

* Polish.

* Remove assert.

* Add bail.

* Polish.

* Polish.
  • Loading branch information
milesj committed Sep 25, 2023
1 parent 92f3fd6 commit 5c59573
Show file tree
Hide file tree
Showing 14 changed files with 1,009 additions and 3 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ starbase = "0.2.6"
starbase_archive = { version = "0.2.2", default-features = false, features = [
"tar-gz",
] }
starbase_events = "0.2.1"
starbase_events = { version = "0.2.1" }
starbase_sandbox = "0.1.10"
starbase_styles = { version = "0.1.15", features = ["relative-path"] }
starbase_utils = { version = "0.3.1", default-features = false, features = [
Expand All @@ -78,6 +78,7 @@ tokio = { version = "1.32.0", default-features = false, features = [
"rt-multi-thread",
"tracing",
] }
tokio-util = "0.7.8"
tracing = "0.1.37"
url = "2.4.1"
uuid = { version = "1.4.1", features = ["v4"] }
2 changes: 1 addition & 1 deletion crates/core/action-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ starbase_styles = { workspace = true }
starbase_utils = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = "0.7.8"
tokio-util = { workspace = true }

[dev-dependencies]
moon = { path = "../moon" }
Expand Down
2 changes: 1 addition & 1 deletion nextgen/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ serde = { workspace = true }
starbase_utils = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = "0.7.8"
tokio-util = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }

Expand Down
23 changes: 23 additions & 0 deletions nextgen/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "moon_pipeline"
version = "0.1.0"
edition = "2021"
license = "MIT"
description = "Generic pipeline for running jobs."
homepage = "https://moonrepo.dev/moon"
repository = "https://github.com/moonrepo/moon"

[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true }
miette = { workspace = true }
num_cpus = "1.16.0"
serde = { workspace = true }
starbase_events = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "signal", "time"] }
tokio-util = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
rand = "0.8.5"
95 changes: 95 additions & 0 deletions nextgen/pipeline/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use crate::{job::JobResult, pipeline_events::*};
use serde::{Deserialize, Serialize};
use starbase_events::Emitter;
use std::sync::Arc;
use tokio::sync::{mpsc::Sender, Semaphore};
use tokio_util::sync::CancellationToken;

pub struct Context<T> {
/// Force aborts running sibling jobs.
pub abort_token: CancellationToken,

/// Receives cancel/shutdown signals.
pub cancel_token: CancellationToken,

/// Sends results to the parent pipeline.
pub result_sender: Sender<JobResult<T>>,

/// Acquires a permit for concurrency.
pub semaphore: Arc<Semaphore>,

// Events:
pub on_job_progress: Arc<Emitter<JobProgressEvent>>,
pub on_job_state_change: Arc<Emitter<JobStateChangeEvent>>,
}

unsafe impl<T> Send for Context<T> {}
unsafe impl<T> Sync for Context<T> {}

impl<T> Context<T> {
// Don't use native `Clone` since it'll require `T` to be cloneable.
#[allow(clippy::should_implement_trait)]
pub fn clone(&self) -> Context<T> {
Self {
abort_token: self.abort_token.clone(),
cancel_token: self.cancel_token.clone(),
result_sender: self.result_sender.clone(),
semaphore: self.semaphore.clone(),
on_job_progress: self.on_job_progress.clone(),
on_job_state_change: self.on_job_state_change.clone(),
}
}

pub fn abort(&self) {
self.abort_token.cancel();
}

pub fn cancel(&self) {
self.cancel_token.cancel();
}

pub fn is_aborted_or_cancelled(&self) -> bool {
self.abort_token.is_cancelled() || self.cancel_token.is_cancelled()
}
}

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum RunState {
/// Job was explicitly aborted.
Aborted,

/// Cancelled via a signal (ctrl+c, etc).
Cancelled,

/// Job failed.
Failed,

/// Job passed.
Passed,

/// Job is waiting to run.
Pending,

/// Job is currently running and executing action.
Running,

/// Cancelled via a timeout.
TimedOut,
}

impl RunState {
/// Has the run failed? A fail is determined by a job that has ran to completion,
/// but has ultimately failed. Aborted and cancelled jobs never run to completion.
pub fn has_failed(&self) -> bool {
matches!(self, Self::Failed | Self::TimedOut)
}

pub fn is_incomplete(&self) -> bool {
matches!(self, Self::Aborted | Self::Cancelled | Self::TimedOut)
}

pub fn is_complete(&self) -> bool {
matches!(self, Self::Failed | Self::Passed)
}
}
Loading

0 comments on commit 5c59573

Please sign in to comment.