Skip to content

Commit

Permalink
queue
Browse files Browse the repository at this point in the history
  • Loading branch information
levkk committed Oct 18, 2024
1 parent 36ff28c commit 7e3f06a
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 0 deletions.
51 changes: 51 additions & 0 deletions docs/docs/background-jobs/cron.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Cron jobs

Cron jobs, or scheduled jobs, are background jobs that are performed automatically based on a schedule. For example, if you want to send a newsletter to your users every week, you can create a background job and schedule it to run weekly using the built-in cron.

## Defining scheduled jobs

A scheduled job is a regular [background job](../), for example:

```rust
use rwf::prelude::*;
use rwf::job::{Error as JobError};

#[derive(Default, Debug, Serialize, Deserialize)]
struct WeeklyNewsletter;

#[async_trait]
impl Job for WeeklyNewsletter {
/// Code in this function will be executed in
/// the background.
async fn execute(&self, _args: serde_json::Value) -> Result<(), JobError> {
// Send the newsletter to all users.
Ok(())
}
}
```

To run a job on a schedule, you need to add it in two places:

- The list of jobs the worker can run
- The crontab (or the clock, as we call it)

```rust
// Crontab
let schedule = vec![
WeeklyNewsletter::default()
.schedule(
serde_json::Value::Null,
"0 0 * * 0",
), // Every Sunday at midnight
];

// Background jobs
let jobs = vec![
WeeklyNewsletter::default().job()
];

let worker = Worker::new(jobs)
.clock(schedule);

worker.start().await?;
```
83 changes: 83 additions & 0 deletions docs/docs/background-jobs/index.md
Original file line number Diff line number Diff line change
@@ -1 +1,84 @@
# Jobs overview

Background jobs, also known as asynchronous jobs, are code that can run independently of the main HTTP request/response life cycle. Executing code in background jobs allows you to perform useful work without making the client wait for the job to finish. Examples of background jobs are sending emails or talking to third-party APIs.

Rwf has its own background job queue and workers that can perform those jobs.

## Defining jobs

A background job is any Rust struct that implements the [`Job`](https://docs.rs/rwf/latest/rwf/job/model/trait.Job.html) trait. The only trait method the job needs to implement is the [`async fn execute`](https://docs.rs/rwf/latest/rwf/job/model/trait.Job.html#tymethod.execute) method which accepts job arguments encoded with JSON.

For example, if we wanted to send a welcome email to all users that sign up for your web app, we can do so as a background job:

```rust
use rwf::prelude::*;
use rwf::job::{Error as JobError};
use serde::{Serialize, Deserialize};

#[derive(Default, Debug, Serialize, Deserialize)]
struct WelcomeEmail {
email: String,
user_name: String,
}

#[async_trait]
impl Job for WelcomeEmail {
/// Code in this function will be executed in the background.
async fn execute(&self, args: serde_json::Value) -> Result<(), JobError> {
let args: WelcomeEmail = serde_json::from_value(args)?;

// Send the email to the user
// with the given email address.

Ok(())
}
}
```

## Spawning workers

Once we have background jobs, we need to create background workers that will run in separate threads (Tokio tasks, in reality), and execute those jobs as they are sent to the queue. Spawning workers can be done from anywhere in the code, but typically done so from the `main` function:

```rust
use rwf::job::Worker;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<(), Error> {
// Create a new worker with 4 threads.
let worker = Worker::new(vec![
WelcomeEmail::default().job()
])

worker.start().await?;

// Put the main task to sleep indefinitely.
sleep(Duration::MAX).await;
}
```

### Sharing processes

Workers can be spawned inside the app without having to create a separate binary application. Since most jobs will be running async code, Tokio will effectively load balance foreground (HTTP requests/responses) and background workloads.

To spawn a worker inside the web app, use the code above without the `sleep`. The [`Worker::start`](https://docs.rs/rwf/latest/rwf/job/worker/struct.Worker.html#method.start) method returns almost immediately, since it only spawns a worker on a separate Tokio task.

## Scheduling jobs

With the background jobs defined and the workers running, we can start scheduling jobs to run in the background. A job can be scheduled to run from anywhere in the code by calling the [`Job::execute_async`](https://docs.rs/rwf/latest/rwf/job/model/trait.Job.html#method.execute_async) method:

```rust
let email = WelcomeEmail {
email: "[email protected]".to_string(),
user_name: "Alice".to_string(),
};

// Convert the job to a JSON value.
let args = serde_json::to_value(&email)?;

// Schedule the job to run in the background
// as soon as possible.
email.execute_async(args).await?;
```

The `execute_async` method creates a record of the job in the queue and returns immediately without doing the actual work. This makes this method very quick so you can schedule multiple jobs inside a controller without it having noticeable effect on endpoint latency.
30 changes: 30 additions & 0 deletions docs/docs/background-jobs/queue-guarantees.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Job queue guarantees

The background queue is stored in the database, so jobs will not get lost. Workers will attempt to run a job at least once. Even if workers crash, when they are restarted, any running jobs will be rescheduled.

Because of this guarantee, jobs should strive to be idempotent: the same job can be executed multiple times.

## Performance

The job queue is using PostgreSQL's `FOR UPDATE SKIP LOCKED` mechanism, which has been shown to support high concurrency job queues.

## Polling

Workers poll the queue every second. If there are no jobs, the worker goes to sleep and polls again in one second. If a job is available, it will be executed immediately. Once the job completes, the worker will attempt to fetch the next job immediately, restarting this cycle.

## Concurrency

By default, a worker executes one job at a time. This allows to control for background concurrency easily, without complex throttling mechanisms. If you want to execute many jobs concurrently, you can spawn as many workers as you wish. Each worker will poll the queue for jobs once a second.

To spawn more workers, call [`Worker::spawn`](https://docs.rs/rwf/latest/rwf/job/worker/struct.Worker.html#method.spawn) as many times as you wish to have workers, for example:

```rust
let worker = Worker::new(vec![])
.start()
.await?
.spawn()
.spawn()
.spawn();
```

The above code will spawn 4 workers in total.

0 comments on commit 7e3f06a

Please sign in to comment.