diff --git a/docs/docs/background-jobs/cron.md b/docs/docs/background-jobs/cron.md new file mode 100644 index 00000000..f654e376 --- /dev/null +++ b/docs/docs/background-jobs/cron.md @@ -0,0 +1,51 @@ +# Cron jobs + +Cron jobs, or scheduled jobs, are background jobs that are performed automatically based on a schedule. For example, if you want to send a newsletter to your users every week, you can create a background job and schedule it to run weekly using the built-in cron. + +## Defining scheduled jobs + +A scheduled job is a regular [background job](../), for example: + +```rust +use rwf::prelude::*; +use rwf::job::{Error as JobError}; + +#[derive(Default, Debug, Serialize, Deserialize)] +struct WeeklyNewsletter; + +#[async_trait] +impl Job for WeeklyNewsletter { + /// Code in this function will be executed in + /// the background. + async fn execute(&self, _args: serde_json::Value) -> Result<(), JobError> { + // Send the newsletter to all users. + Ok(()) + } +} +``` + +To run a job on a schedule, you need to add it in two places: + +- The list of jobs the worker can run +- The crontab (or the clock, as we call it) + +```rust +// Crontab +let schedule = vec![ + WeeklyNewsletter::default() + .schedule( + serde_json::Value::Null, + "0 0 * * 0", + ), // Every Sunday at midnight +]; + +// Background jobs +let jobs = vec![ + WeeklyNewsletter::default().job() +]; + +let worker = Worker::new(jobs) + .clock(schedule); + +worker.start().await?; +``` diff --git a/docs/docs/background-jobs/index.md b/docs/docs/background-jobs/index.md index e0b6f3c3..132ee7c7 100644 --- a/docs/docs/background-jobs/index.md +++ b/docs/docs/background-jobs/index.md @@ -1 +1,84 @@ # Jobs overview + +Background jobs, also known as asynchronous jobs, are code that can run independently of the main HTTP request/response life cycle. Executing code in background jobs allows you to perform useful work without making the client wait for the job to finish. Examples of background jobs are sending emails or talking to third-party APIs. + +Rwf has its own background job queue and workers that can perform those jobs. + +## Defining jobs + +A background job is any Rust struct that implements the [`Job`](https://docs.rs/rwf/latest/rwf/job/model/trait.Job.html) trait. The only trait method the job needs to implement is the [`async fn execute`](https://docs.rs/rwf/latest/rwf/job/model/trait.Job.html#tymethod.execute) method which accepts job arguments encoded with JSON. + +For example, if we wanted to send a welcome email to all users that sign up for your web app, we can do so as a background job: + +```rust +use rwf::prelude::*; +use rwf::job::{Error as JobError}; +use serde::{Serialize, Deserialize}; + +#[derive(Default, Debug, Serialize, Deserialize)] +struct WelcomeEmail { + email: String, + user_name: String, +} + +#[async_trait] +impl Job for WelcomeEmail { + /// Code in this function will be executed in the background. + async fn execute(&self, args: serde_json::Value) -> Result<(), JobError> { + let args: WelcomeEmail = serde_json::from_value(args)?; + + // Send the email to the user + // with the given email address. + + Ok(()) + } +} +``` + +## Spawning workers + +Once we have background jobs, we need to create background workers that will run in separate threads (Tokio tasks, in reality), and execute those jobs as they are sent to the queue. Spawning workers can be done from anywhere in the code, but typically done so from the `main` function: + +```rust +use rwf::job::Worker; +use tokio::time::{sleep, Duration}; + +#[tokio::main] +async fn main() -> Result<(), Error> { + // Create a new worker with 4 threads. + let worker = Worker::new(vec![ + WelcomeEmail::default().job() + ]) + + worker.start().await?; + + // Put the main task to sleep indefinitely. + sleep(Duration::MAX).await; +} +``` + +### Sharing processes + +Workers can be spawned inside the app without having to create a separate binary application. Since most jobs will be running async code, Tokio will effectively load balance foreground (HTTP requests/responses) and background workloads. + +To spawn a worker inside the web app, use the code above without the `sleep`. The [`Worker::start`](https://docs.rs/rwf/latest/rwf/job/worker/struct.Worker.html#method.start) method returns almost immediately, since it only spawns a worker on a separate Tokio task. + +## Scheduling jobs + +With the background jobs defined and the workers running, we can start scheduling jobs to run in the background. A job can be scheduled to run from anywhere in the code by calling the [`Job::execute_async`](https://docs.rs/rwf/latest/rwf/job/model/trait.Job.html#method.execute_async) method: + +```rust +let email = WelcomeEmail { + email: "new-user@example.com".to_string(), + user_name: "Alice".to_string(), +}; + +// Convert the job to a JSON value. +let args = serde_json::to_value(&email)?; + +// Schedule the job to run in the background +// as soon as possible. +email.execute_async(args).await?; +``` + +The `execute_async` method creates a record of the job in the queue and returns immediately without doing the actual work. This makes this method very quick so you can schedule multiple jobs inside a controller without it having noticeable effect on endpoint latency. diff --git a/docs/docs/background-jobs/queue-guarantees.md b/docs/docs/background-jobs/queue-guarantees.md new file mode 100644 index 00000000..4de2387e --- /dev/null +++ b/docs/docs/background-jobs/queue-guarantees.md @@ -0,0 +1,30 @@ +# Job queue guarantees + +The background queue is stored in the database, so jobs will not get lost. Workers will attempt to run a job at least once. Even if workers crash, when they are restarted, any running jobs will be rescheduled. + +Because of this guarantee, jobs should strive to be idempotent: the same job can be executed multiple times. + +## Performance + +The job queue is using PostgreSQL's `FOR UPDATE SKIP LOCKED` mechanism, which has been shown to support high concurrency job queues. + +## Polling + +Workers poll the queue every second. If there are no jobs, the worker goes to sleep and polls again in one second. If a job is available, it will be executed immediately. Once the job completes, the worker will attempt to fetch the next job immediately, restarting this cycle. + +## Concurrency + +By default, a worker executes one job at a time. This allows to control for background concurrency easily, without complex throttling mechanisms. If you want to execute many jobs concurrently, you can spawn as many workers as you wish. Each worker will poll the queue for jobs once a second. + +To spawn more workers, call [`Worker::spawn`](https://docs.rs/rwf/latest/rwf/job/worker/struct.Worker.html#method.spawn) as many times as you wish to have workers, for example: + +```rust +let worker = Worker::new(vec![]) + .start() + .await? + .spawn() + .spawn() + .spawn(); +``` + +The above code will spawn 4 workers in total.