From 2361d353b8ec570a3f11f1d9e9a2c0105cc05e19 Mon Sep 17 00:00:00 2001 From: j-mendez Date: Wed, 29 Nov 2023 18:07:42 -0500 Subject: [PATCH] chore(crate): init of crate --- .github/workflows/rust.yml | 29 +++ .gitignore | 1 + Cargo.lock | 507 +++++++++++++++++++++++++++++++++++++ Cargo.toml | 19 ++ README.md | 34 +++ examples/cargo.toml | 28 ++ examples/example.rs | 40 +++ src/lib.rs | 399 +++++++++++++++++++++++++++++ 8 files changed, 1057 insertions(+) create mode 100644 .github/workflows/rust.yml create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 examples/cargo.toml create mode 100644 examples/example.rs create mode 100644 src/lib.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..9ed0ad2 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,29 @@ +name: Rust +on: + push: + branches: [main] + pull_request: + branches: [main] + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/cache@v3 + id: cache + with: + path: | + ~/.cargo/bin/ + ~/.cargo/registry/index/ + ~/.cargo/registry/cache/ + ~/.cargo/git/db/ + target/ + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + - name: Build + run: cargo build --verbose --release + - name: Run tests + run: cargo test --verbose --all-features diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..ffdca5b --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,507 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "async-trait" +version = "0.1.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "async_job" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "cron", + "lazy_static", + "log", + "tokio", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bumpalo" +version = "3.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" + +[[package]] +name = "cc" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "chrono" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" + +[[package]] +name = "cron" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff76b51e4c068c52bfd2866e1567bee7c567ae8f24ada09fd4307019e25eab7" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + +[[package]] +name = "gimli" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" + +[[package]] +name = "hermit-abi" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" + +[[package]] +name = "iana-time-zone" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "js-sys" +version = "0.3.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.150" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" + +[[package]] +name = "lock_api" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "memchr" +version = "2.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" + +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] +name = "num-traits" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "object" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" + +[[package]] +name = "proc-macro2" +version = "1.0.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "smallvec" +version = "1.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" + +[[package]] +name = "syn" +version = "2.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +dependencies = [ + "backtrace", + "num_cpus", + "parking_lot", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "wasm-bindgen" +version = "0.2.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" + +[[package]] +name = "windows-core" +version = "0.51.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9e029c6 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "async_job" +version = "0.1.0" +edition = "2021" +description = "Simple async cron jobs for Rust" +repository = "https://github.com/spider-rs/async_job" +readme = "README.md" +keywords = ["crawler", "spider"] +categories = ["web-programming"] +license = "MIT" +documentation = "https://docs.rs/async_job" + +[dependencies] +async-trait = "0.1.74" +chrono = "0.4.31" +cron = "0.12.0" +lazy_static = "1.4.0" +log = "0.4.20" +tokio = { version = "^1.34.0", features = [ "rt-multi-thread", "macros", "time", "parking_lot" ] } diff --git a/README.md b/README.md new file mode 100644 index 0000000..05b0be9 --- /dev/null +++ b/README.md @@ -0,0 +1,34 @@ +# async_job + +A simple trait that allows you to attach cron jobs to anything in Rust. + +## Getting Started + +1. `cargo add async_job` + +## Usage + +```rs + +use async_job::{Job, Runner, Schedule, async_trait}; + +use std::thread; +use std::time::Duration; + + struct ExampleJob; + + #[async_trait] + impl Job for ExampleJob { + fn schedule(&self) -> Option { + Some("1/5 * * * * *".parse().unwrap()) + } + // run any async or sync task here with mutation capabilities + async fn handle(&mut self) { + println!("Hello, I am a cron job running at: {}", self.now()); + } +} +``` + +## Examples + +Run the example with `cargo run --example example` diff --git a/examples/cargo.toml b/examples/cargo.toml new file mode 100644 index 0000000..7c7c556 --- /dev/null +++ b/examples/cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "async_job_examples" +version = "1.50.14" +authors = ["j-mendez "] +description = "Async job trait for Rust." +repository = "https://github.com/spider-rs/spider" +readme = "README.md" +keywords = ["async_job", "cron", "async_task"] +categories = ["web-programming"] +license = "MIT" +documentation = "https://docs.rs/async_job" +publish = false +edition = "2018" + +[badges] +maintenance = { status = "as-is" } + +[dev-dependencies] +tokio = { version = "^1.34.0", features = [ "rt-multi-thread", "macros", "time", "parking_lot" ] } + +[dependencies.async_job] +version = "0.0.1" +path = "../async_job" +features = [] + +[[example]] +name = "example" +path = "example.rs" diff --git a/examples/example.rs b/examples/example.rs new file mode 100644 index 0000000..169d8a5 --- /dev/null +++ b/examples/example.rs @@ -0,0 +1,40 @@ +//! `cargo run --example example` +extern crate async_job; + +use async_job::{async_trait, Job, Runner, Schedule}; + +use tokio; + +use std::thread; +use std::time::Duration; + +struct ExampleJob; + +#[async_trait] +impl Job for ExampleJob { + fn schedule(&self) -> Option { + Some("1/5 * * * * *".parse().unwrap()) + } + async fn handle(&mut self) { + println!("Hello, I am a cron job running at: {}", self.now()); + } +} + +async fn run() { + let mut runner = Runner::new(); + + println!("Adding ExampleJob to the Runner"); + runner = runner.add(Box::new(ExampleJob)); + + println!("Starting the Runner for 20 seconds"); + runner = runner.run().await; + thread::sleep(Duration::from_millis(20 * 1000)); + + println!("Stopping the Runner"); + runner.stop().await; +} + +#[tokio::main] +async fn main() { + run().await; +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..486765e --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,399 @@ +//! # async_job: a simple async cron runner +//! +//! Use the `Job` trait to create your cron job struct, pass it to the `Runner` and then start it via `run()` method. +//! Runner will spawn new thread where it will start looping through the jobs and will run their handle +//! method once the scheduled time is reached. +//! +//! If your OS has enough threads to spare each job will get its own thread to execute, if not it will be +//! executed in the same thread as the loop but will hold the loop until the job is finished. +//! +//! Please look at the [**`Job trait`**](./trait.Job.html) documentation for more information. +//! +//! ## Example +//! ``` +//! use async_job::{Job, Runner, Schedule, async_trait}; +//! use std::thread; +//! use std::time::Duration; +//! use tokio; +//! +//! struct ExampleJob; +//! +//! #[async_trait] +//! impl Job for ExampleJob { +//! fn schedule(&self) -> Option { +//! Some("1/5 * * * * *".parse().unwrap()) +//! } +//! async fn handle(&mut self) { +//! println!("Hello, I am a cron job running at: {}", self.now()); +//! } +//! } +//! +//! async fn run() { +//! let mut runner = Runner::new(); +//! +//! println!("Adding ExampleJob to the Runner"); +//! runner = runner.add(Box::new(ExampleJob)); +//! +//! println!("Starting the Runner for 20 seconds"); +//! runner = runner.run().await; +//! thread::sleep(Duration::from_millis(20 * 1000)); +//! +//! println!("Stopping the Runner"); +//! runner.stop().await; +//! } +//! +//! #[tokio::main] +//! async fn main() { +//! run().await; +//! } +//! ``` +//! +//! Output: +//! ```shell +//! Adding ExampleJob to the Runner +//! Starting the Runner for 20 seconds +//! Hello, I am a cron job running at: 2021-01-31 03:06:25.908475 UTC +//! Hello, I am a cron job running at: 2021-01-31 03:06:30.912637 UTC +//! Hello, I am a cron job running at: 2021-01-31 03:06:35.926938 UTC +//! Hello, I am a cron job running at: 2021-01-31 03:06:40.962138 UTC +//! Stopping the Runner +//! ``` +extern crate chrono; +extern crate cron; + +pub use async_trait::async_trait; +use chrono::{DateTime, Duration, Utc}; +pub use cron::Schedule; +use lazy_static::lazy_static; +use log::{debug, error, info}; +use std::panic; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, Arc, Mutex, +}; +use tokio::task::JoinHandle; + +lazy_static! { + /// Singleton instance of a tracker that won't allow + /// same job to run again while its already running + /// unless you specificly allow the job to run in + /// parallel with itself + pub static ref TRACKER: Mutex = Mutex::new(Tracker::new()); +} + +#[async_trait] +/// A cron job that runs for a website. +pub trait Job: Send + Sync { + /// Default implementation of is_active method will + /// make this job always active + fn is_active(&self) -> bool { + true + } + + /// In case your job takes longer to finish and it's scheduled + /// to start again (while its still running), default behaviour + /// will skip the next run while one instance is already running. + /// (if your OS has enough threads, and is spawning a thread for next job) + /// + /// To override this behaviour and enable it to run in parallel + /// with other instances of self, return `true` on this instance. + fn allow_parallel_runs(&self) -> bool { + false + } + + /// Define the run schedule for your job + fn schedule(&self) -> Option; + + /// This is where your jobs magic happens, define the action that + /// will happen once the cron start running your job + /// + /// If this method panics, your entire job will panic and that may + /// or may not make the whole runner panic. Handle your errors + /// properly and don't let it panic. + async fn handle(&mut self); + + /// Decide wheather or not to start running your job + fn should_run(&self) -> bool { + if self.is_active() { + match self.schedule() { + Some(schedule) => { + for item in schedule.upcoming(Utc).take(1) { + let difference = item - Utc::now(); + if difference <= Duration::milliseconds(100) { + return true; + } + } + } + _ => (), + } + } + + false + } + + /// Simple output that will return current time so you don't have to do so + /// in your job if you wish to display the time of the run. + fn now(&self) -> DateTime { + Utc::now() + } +} + +/// Struct for marking jobs running +pub struct Tracker(Vec); + +impl Default for Tracker { + fn default() -> Self { + Self::new() + } +} + +impl Tracker { + /// Return new instance of running + pub fn new() -> Self { + Tracker(vec![]) + } + + /// Check if id of the job is marked as running + pub fn running(&self, id: &usize) -> bool { + self.0.contains(id) + } + + /// Set job id as running + pub fn start(&mut self, id: &usize) -> usize { + if !self.running(id) { + self.0.push(*id); + } + self.0.len() + } + + /// Unmark the job from running + pub fn stop(&mut self, id: &usize) -> usize { + if self.running(id) { + match self.0.iter().position(|&r| r == *id) { + Some(i) => self.0.remove(i), + None => 0, + }; + } + self.0.len() + } +} + +/// Runner that will hold all the jobs and will start up the execution +/// and eventually will stop it. +pub struct Runner { + /// the current jobs + pub jobs: Vec>, + /// the task that is running the handle + pub thread: Option>, + /// is the task running or not + pub running: bool, + /// channel sending message + pub tx: Option>>, + /// tracker to determine crons working + pub working: Arc, +} + +impl Default for Runner { + fn default() -> Self { + Self::new() + } +} + +impl Runner { + /// Create new runner + pub fn new() -> Self { + Runner { + jobs: vec![], + thread: None, + running: false, + tx: None, + working: Arc::new(AtomicBool::new(false)), + } + } + + /// Add jobs into the runner + /// + /// **panics** if you try to push a job onto already started runner + #[allow(clippy::should_implement_trait)] + pub fn add(mut self, job: Box) -> Self { + if self.running { + panic!("Cannot push job onto runner once the runner is started!"); + } + self.jobs.push(job); + self + } + + /// Number of jobs ready to start running + pub fn jobs_to_run(&self) -> usize { + self.jobs.len() + } + + /// Start the loop and job execution + pub async fn run(self) -> Self { + if self.jobs.is_empty() { + return self; + } + + let working = Arc::new(AtomicBool::new(false)); + let (thread, tx) = spawn(self, working.clone()).await; + + Self { + thread, + jobs: vec![], + running: true, + tx, + working, + } + } + + /// Stop the spawned runner + pub async fn stop(&mut self) { + if !self.running { + return; + } + if let Some(thread) = self.thread.take() { + if let Some(tx) = &self.tx { + match tx.send(Ok(())) { + Ok(_) => (), + Err(e) => error!("Could not send stop signal to cron runner thread: {}", e), + }; + } + thread.abort() + } + } + + /// Lets us know if the cron worker is running + pub fn is_running(&self) -> bool { + self.running + } + + /// Lets us know if the worker is in the process of executing a job currently + pub fn is_working(&self) -> bool { + self.working.load(Ordering::Relaxed) + } +} + +/// Spawn the thread for the runner and return its sender to stop it +async fn spawn( + runner: Runner, + working: Arc, +) -> (Option>, Option>>) { + let (tx, rx): (Sender>, Receiver>) = mpsc::channel(); + + let handler = tokio::spawn(async move { + let mut jobs = runner.jobs; + + loop { + if rx.try_recv().is_ok() { + info!("Stopping the cron runner thread"); + break; + } + + for (id, job) in jobs.iter_mut().enumerate() { + let no = (id + 1).to_string(); + + if job.should_run() + && (job.allow_parallel_runs() || !TRACKER.lock().unwrap().running(&id)) + { + TRACKER.lock().unwrap().start(&id); + + let now = Utc::now(); + debug!( + "START: {} --- {}", + format!("cron-job-thread-{}", no), + now.format("%H:%M:%S%.f") + ); + working.store(true, Ordering::Relaxed); + + // keep the work on the same task for now. + job.handle().await; + + working.store(TRACKER.lock().unwrap().stop(&id) != 0, Ordering::Relaxed); + + debug!( + "FINISH: {} --- {}", + format!("cron-job-thread-{}", no), + now.format("%H:%M:%S%.f") + ); + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + }); + + (Some(handler), Some(tx)) +} + +#[cfg(test)] +mod tests { + use super::{Job, Runner}; + use async_trait::async_trait; + use cron::Schedule; + use std::str::FromStr; + struct SomeJob; + + #[async_trait] + impl Job for SomeJob { + fn schedule(&self) -> Option { + Some(Schedule::from_str("0 * * * * *").unwrap()) + } + + async fn handle(&mut self) {} + } + struct AnotherJob; + #[async_trait] + impl Job for AnotherJob { + fn schedule(&self) -> Option { + Some(Schedule::from_str("0 * * * * *").unwrap()) + } + + async fn handle(&mut self) {} + } + #[tokio::test] + async fn create_job() { + let mut some_job = SomeJob; + + assert_eq!(some_job.handle().await, ()); + } + + #[tokio::test] + async fn test_adding_jobs_to_runner() { + let some_job = SomeJob; + let another_job = AnotherJob; + + let runner = Runner::new() + .add(Box::new(some_job)) + .add(Box::new(another_job)); + + assert_eq!(runner.jobs_to_run(), 2); + } + + #[tokio::test] + async fn test_jobs_are_empty_after_runner_starts() { + let some_job = SomeJob; + let another_job = AnotherJob; + + let runner = Runner::new() + .add(Box::new(some_job)) + .add(Box::new(another_job)) + .run() + .await; + + assert_eq!(runner.jobs_to_run(), 0); + } + + #[tokio::test] + async fn test_stopping_the_runner() { + let some_job = SomeJob; + let another_job = AnotherJob; + + let mut runner = Runner::new() + .add(Box::new(some_job)) + .add(Box::new(another_job)) + .run() + .await; + + assert_eq!(runner.stop().await, ()); + } +}