diff --git a/Cargo.toml b/Cargo.toml index dad136dda4d..e23ee8a7bbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ members = [ # nebari = { path = "../nebari/nebari", version = "0.3" } # nebari = { git = "https://github.com/khonsulabs/nebari.git", branch = "main" } # arc-bytes = { path = "../shared-buffer" } +circulate = { path = "../circulate" } # [patch."https://github.com/khonsulabs/custodian.git"] # custodian-password = { path = "../custodian/password" } diff --git a/crates/bonsaidb-client/src/client.rs b/crates/bonsaidb-client/src/client.rs index 00a2557d1e2..c4a2475f888 100644 --- a/crates/bonsaidb-client/src/client.rs +++ b/crates/bonsaidb-client/src/client.rs @@ -17,6 +17,7 @@ use async_trait::async_trait; #[cfg(feature = "password-hashing")] use bonsaidb_core::connection::{Authenticated, Authentication}; use bonsaidb_core::{ + arc_bytes::OwnedBytes, connection::{Database, StorageConnection}, custom_api::{CustomApi, CustomApiResult}, networking::{ @@ -828,7 +829,7 @@ async fn process_response_payload( if sender .send(std::sync::Arc::new(bonsaidb_core::circulate::Message { topic, - payload: payload.into_vec(), + payload: OwnedBytes::from(payload.0), })) .is_err() { diff --git a/crates/bonsaidb-client/src/client/remote_database/pubsub.rs b/crates/bonsaidb-client/src/client/remote_database/pubsub.rs index d9a8e5bf70f..e7182280e5e 100644 --- a/crates/bonsaidb-client/src/client/remote_database/pubsub.rs +++ b/crates/bonsaidb-client/src/client/remote_database/pubsub.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use async_trait::async_trait; use bonsaidb_core::{ - arc_bytes::serde::Bytes, + arc_bytes::OwnedBytes, circulate::Message, custom_api::CustomApi, networking::{DatabaseRequest, DatabaseResponse, Request, Response}, @@ -51,13 +51,31 @@ where payload: &P, ) -> Result<(), bonsaidb_core::Error> { let payload = pot::to_vec(&payload)?; + self.publish_raw(topic, payload).await + } + + async fn publish_to_all( + &self, + topics: Vec, + payload: &P, + ) -> Result<(), bonsaidb_core::Error> { + let payload = pot::to_vec(&payload)?; + self.publish_raw_to_all(topics, payload).await + } + + async fn publish_raw + Send, P: Into + Send>( + &self, + topic: S, + payload: P, + ) -> Result<(), bonsaidb_core::Error> { + let payload = payload.into(); match self .client .send_request(Request::Database { database: self.name.to_string(), request: DatabaseRequest::Publish { topic: topic.into(), - payload: Bytes::from(payload), + payload, }, }) .await? @@ -70,20 +88,17 @@ where } } - async fn publish_to_all( + async fn publish_raw_to_all + Send>( &self, topics: Vec, - payload: &P, + payload: P, ) -> Result<(), bonsaidb_core::Error> { - let payload = pot::to_vec(&payload)?; + let payload = payload.into(); match self .client .send_request(Request::Database { database: self.name.to_string(), - request: DatabaseRequest::PublishToAll { - topics, - payload: Bytes::from(payload), - }, + request: DatabaseRequest::PublishToAll { topics, payload }, }) .await? { diff --git a/crates/bonsaidb-core/src/networking.rs b/crates/bonsaidb-core/src/networking.rs index c6a5fc2ded4..391df08dafc 100644 --- a/crates/bonsaidb-core/src/networking.rs +++ b/crates/bonsaidb-core/src/networking.rs @@ -1,4 +1,4 @@ -use arc_bytes::serde::Bytes; +use arc_bytes::{serde::Bytes, OwnedBytes}; use derive_where::derive_where; use schema::SchemaName; use serde::{Deserialize, Serialize}; @@ -230,7 +230,7 @@ pub enum DatabaseRequest { /// The topics to publish to. topic: String, /// The payload to publish. - payload: Bytes, + payload: OwnedBytes, }, /// Publishes `payload` to all subscribers of all `topics`. #[cfg_attr(feature = "actionable-traits", actionable(protection = "custom"))] @@ -238,7 +238,7 @@ pub enum DatabaseRequest { /// The topics to publish to. topics: Vec, /// The payload to publish. - payload: Bytes, + payload: OwnedBytes, }, /// Subscribes `subscriber_id` to messages for `topic`. #[cfg_attr(feature = "actionable-traits", actionable(protection = "simple"))] diff --git a/crates/bonsaidb-core/src/pubsub.rs b/crates/bonsaidb-core/src/pubsub.rs index c75ee1ce0f9..b5bf29cde28 100644 --- a/crates/bonsaidb-core/src/pubsub.rs +++ b/crates/bonsaidb-core/src/pubsub.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use arc_bytes::OwnedBytes; use async_trait::async_trait; use circulate::{flume, Message, Relay}; use serde::Serialize; @@ -22,12 +23,26 @@ pub trait PubSub { payload: &P, ) -> Result<(), Error>; + /// Publishes a `payload` without any extra processing to all subscribers of `topic`. + async fn publish_raw + Send, P: Into + Send>( + &self, + topic: S, + payload: P, + ) -> Result<(), Error>; + /// Publishes a `payload` to all subscribers of all `topics`. async fn publish_to_all( &self, topics: Vec, payload: &P, ) -> Result<(), Error>; + + /// Publishes a `payload` without any extra processing to all subscribers of `topic`. + async fn publish_raw_to_all + Send>( + &self, + topics: Vec, + payload: P, + ) -> Result<(), Error>; } /// A subscriber to one or more topics. @@ -69,6 +84,24 @@ impl PubSub for Relay { self.publish_to_all(topics, payload).await?; Ok(()) } + + async fn publish_raw + Send, P: Into + Send>( + &self, + topic: S, + payload: P, + ) -> Result<(), Error> { + self.publish_raw(topic, payload).await; + Ok(()) + } + + async fn publish_raw_to_all + Send>( + &self, + topics: Vec, + payload: P, + ) -> Result<(), Error> { + self.publish_raw_to_all(topics, payload).await; + Ok(()) + } } #[async_trait] diff --git a/crates/bonsaidb-jobs/Cargo.toml b/crates/bonsaidb-jobs/Cargo.toml index c9165dac411..fde153a285e 100644 --- a/crates/bonsaidb-jobs/Cargo.toml +++ b/crates/bonsaidb-jobs/Cargo.toml @@ -15,6 +15,9 @@ rust-version = "1.58" [dependencies] bonsaidb-core = { version = "0.2.0", path = "../bonsaidb-core" } serde = { version = "1", features = ["derive"] } +thiserror = "1" +tokio = { version = "=1.16.1", default-features = false, features = ["sync"] } +flume = "0.10" [dev-dependencies] tokio = { version = "=1.16.1", features = ["full"] } diff --git a/crates/bonsaidb-jobs/src/job.rs b/crates/bonsaidb-jobs/src/job.rs new file mode 100644 index 00000000000..5cd68544bb0 --- /dev/null +++ b/crates/bonsaidb-jobs/src/job.rs @@ -0,0 +1,297 @@ +use std::{fmt::Display, marker::PhantomData, sync::Arc}; + +use bonsaidb_core::{ + actionable::async_trait, + arc_bytes::serde::Bytes, + connection::Connection, + document::CollectionDocument, + keyvalue::Timestamp, + pubsub::{PubSub, Subscriber}, + schema::{Schematic, SerializedCollection}, + transmog::{Format, OwnedDeserializer}, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::watch; + +use crate::schema; + +pub(crate) fn define_collections(schematic: &mut Schematic) -> Result<(), bonsaidb_core::Error> { + schematic.define_collection::() +} + +pub struct Job(CollectionDocument, PhantomData); + +impl From> for Job { + fn from(doc: CollectionDocument) -> Self { + Self(doc, PhantomData) + } +} + +impl Job { + pub async fn update( + &mut self, + database: &Database, + ) -> Result { + if let Some(doc) = schema::Job::get(self.0.header.id, database).await? { + self.0 = doc; + Ok(true) + } else { + Ok(false) + } + } + + pub async fn wait_for_result( + &mut self, + database: &Database, + ) -> JobResult { + loop { + let subscriber = database.create_subscriber().await?; + subscriber.subscribe_to(job_topic(self.0.header.id)).await?; + // Check that the job hasn't completed before we could create the subscriber + self.update(database).await?; + return if let Some(result) = &self.0.contents.result { + >>::deserialize_owned( + &Q::format(), + result, + ) + } else { + // Wait for the subscriber to be notified + match subscriber.receiver().recv_async().await { + Ok(message) => { + >>::deserialize_owned( + &Q::format(), + &message.payload, + ) + } + Err(_) => continue, + } + } + .map_err(|err| bonsaidb_core::Error::Serialization(err.to_string()))?; + } + } + + pub fn progress(&self) -> &Progress { + &self.0.contents.progress + } + + pub fn enqueued_at(&self) -> Timestamp { + self.0.contents.enqueued_at + } + + pub fn cancelled_at(&self) -> Option { + self.0.contents.cancelled_at + } +} + +#[allow(type_alias_bounds)] +type JobResult = Result, Q::Error>; + +#[async_trait] +pub trait Queueable: Sized + Send + Sync + std::fmt::Debug { + type Format: bonsaidb_core::transmog::OwnedDeserializer + + bonsaidb_core::transmog::OwnedDeserializer, Self::Error>>; + type Output: Send + Sync; + type Error: From + Send + Sync; + + fn format() -> Self::Format; +} + +#[async_trait] +pub trait Executor { + type Job: Queueable; + + async fn execute( + &mut self, + job: Self::Job, + progress: &mut ProgressReporter, + ) -> Result<::Output, ::Error>; + + async fn execute_with_progress( + &mut self, + job: &mut CollectionDocument, + database: &Database, + ) -> Result::Output>, ::Error> { + let (mut executor_handle, mut job_handle) = ProgressReporter::new(); + let payload = Self::Job::format() + .deserialize_owned(&job.contents.payload) + .unwrap(); + let mut task = self.execute(payload, &mut executor_handle); + + let result = loop { + tokio::select! { + output = &mut task => break output.map(Some), + progress = job_handle.receiver.changed() => { + progress.unwrap(); + // TODO throttle progress changes + job.contents.progress = job_handle.receiver.borrow_and_update().clone(); + + match job.update(database).await { + Ok(()) => {} + Err(bonsaidb_core::Error::DocumentConflict(..)) => { + if let Some(updated_job) = schema::Job::get(job.header.id, database).await? { + *job = updated_job; + job_handle.cancel.send(job.contents.cancelled_at).unwrap(); + } else { + break Ok(None) + } + } + Err(other) => break Err(::Error::from(other)) + } + } + } + }; + + let result_bytes = Bytes::from( + ::format() + .serialize(&result) + .map_err(|err| bonsaidb_core::Error::Serialization(err.to_string()))?, + ); + + job.contents.result = Some(result_bytes.clone()); + job.contents.returned_at = Some(Timestamp::now()); + job.update(database).await?; + + database + .publish_raw(job_topic(job.header.id), result_bytes.0) + .await?; + + result + } +} + +fn job_topic(id: u64) -> String { + format!("BONSIADB_JOB_{}_RESULT", id) +} + +#[derive(Default, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub struct Progress { + pub updated_at: Timestamp, + pub message: Option>, + pub step: ProgressStep, + pub total_steps: u64, +} + +#[derive(Default, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub struct ProgressStep { + pub name: Option>, + pub index: u64, + pub completion: StepCompletion, +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub enum StepCompletion { + Indeterminite, + Percent(u8), + Count { index: u64, total_steps: u64 }, + Complete, +} + +impl Default for StepCompletion { + fn default() -> Self { + Self::Indeterminite + } +} + +#[derive(Debug)] +pub struct ProgressReporter { + current: Progress, + sender: watch::Sender, + cancel: watch::Receiver>, +} + +struct ProgressReceiver { + receiver: watch::Receiver, + cancel: watch::Sender>, +} + +impl ProgressReporter { + fn new() -> (Self, ProgressReceiver) { + let (sender, receiver) = watch::channel(Progress::default()); + let (cancel_sender, cancel_receiver) = watch::channel(None); + ( + Self { + sender, + cancel: cancel_receiver, + current: Progress::default(), + }, + ProgressReceiver { + receiver, + cancel: cancel_sender, + }, + ) + } + + pub fn cancelled_at(&mut self) -> Option { + *self.cancel.borrow_and_update() + } + + pub fn set_message(&mut self, message: impl Display) { + let message = message.to_string(); + if self.current.message.as_deref() != Some(&message) { + self.current.message = Some(Arc::new(message)); + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn clear_message(&mut self) { + if self.current.message.is_some() { + self.current.message = None; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_total_steps(&mut self, steps: u64) { + if self.current.total_steps != steps { + self.current.total_steps = steps; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_step(&mut self, step: u64) { + if self.current.step.index != step { + self.current.step.index = step; + self.current.step.name = None; + self.current.step.completion = StepCompletion::Indeterminite; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_step_with_name(&mut self, step: u64, name: impl Display) { + if self.current.step.index != step { + self.current.step.index = step; + self.current.step.name = Some(Arc::new(name.to_string())); + self.current.step.completion = StepCompletion::Indeterminite; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_step_completion(&mut self, completion: StepCompletion) { + if self.current.step.completion != completion { + self.current.step.completion = completion; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_step_percent_complete(&mut self, percent: f32) { + let percent = StepCompletion::Percent((percent.clamp(0., 1.) * 256.).floor() as u8); + if self.current.step.completion != percent { + self.current.step.completion = percent; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn set_step_progress(&mut self, index: u64, total_steps: u64) { + let progress = StepCompletion::Count { index, total_steps }; + if self.current.step.completion != progress { + self.current.step.completion = progress; + self.sender.send(self.current.clone()).unwrap(); + } + } + + pub fn complete_step(&mut self) { + if self.current.step.completion != StepCompletion::Complete { + self.current.step.completion = StepCompletion::Complete; + self.sender.send(self.current.clone()).unwrap(); + } + } +} diff --git a/crates/bonsaidb-jobs/src/lib.rs b/crates/bonsaidb-jobs/src/lib.rs index 3579ff4d9eb..55d45bffbc9 100644 --- a/crates/bonsaidb-jobs/src/lib.rs +++ b/crates/bonsaidb-jobs/src/lib.rs @@ -1,36 +1,14 @@ -use bonsaidb_core::{ - arc_bytes::serde::Bytes, - keyvalue::Timestamp, - schema::{Authority, Collection, CollectionName, SchemaName, Schematic}, -}; -use serde::{Deserialize, Serialize}; +use bonsaidb_core::schema::Schematic; pub fn define_collections(schematic: &mut Schematic) -> Result<(), bonsaidb_core::Error> { - schematic.define_collection::()?; - schematic.define_collection::()?; + queue::define_collections(schematic)?; + job::define_collections(schematic)?; Ok(()) } -#[derive(Collection, Debug, Serialize, Deserialize)] -#[collection(name = "queues", authority = "bonsaidb", core = bonsaidb_core)] -pub struct Queue { - pub owner: QueueOwner, - pub name: String, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum QueueOwner { - Collection(CollectionName), - Authority(Authority), - Schema(SchemaName), - Backend, -} - -#[derive(Collection, Debug, Serialize, Deserialize)] -#[collection(name = "jobs", authority = "bonsaidb", core = bonsaidb_core)] -pub struct Job { - pub queue_id: u64, - pub payload: Bytes, - pub enqueued_at: Timestamp, -} +pub mod job; +pub mod orchestrator; +pub mod queue; +mod schema; +pub mod worker; diff --git a/crates/bonsaidb-jobs/src/orchestrator.rs b/crates/bonsaidb-jobs/src/orchestrator.rs new file mode 100644 index 00000000000..3d0abc163e9 --- /dev/null +++ b/crates/bonsaidb-jobs/src/orchestrator.rs @@ -0,0 +1,235 @@ +use std::collections::{HashMap, VecDeque}; + +use bonsaidb_core::{ + arc_bytes::serde::Bytes, + connection::Connection, + document::CollectionDocument, + keyvalue::{KeyValue, Timestamp}, + pubsub::PubSub, + schema::SerializedCollection, + transmog::Format, +}; +use tokio::sync::oneshot; + +use crate::{ + job::{Job, Progress, Queueable}, + queue::{self, QueueId, QueueName}, + schema::{self, job::PendingJobs, Queue}, + worker::WorkerConfig, +}; + +#[derive(Clone, Debug)] +pub struct Orchestrator +where + S: Strategy, +{ + sender: flume::Sender>, +} + +impl Orchestrator +where + S: Strategy, +{ + pub fn spawn(database: Database, strategy: S) -> Self + where + Database: Connection + PubSub + KeyValue + 'static, + { + let (sender, receiver) = flume::unbounded(); + tokio::task::spawn(Backend::run(receiver, database, strategy)); + Self { sender } + } + + pub async fn enqueue + Send, Payload: Queueable>( + &self, + queue: Queue, + job: &Payload, + ) -> Result, queue::Error> { + let bytes = Payload::format() + .serialize(job) + .map_err(|err| bonsaidb_core::Error::Serialization(err.to_string()))?; + let (sender, receiver) = oneshot::channel(); + self.sender.send(Command::Enqueue { + queue: queue.into(), + payload: Bytes::from(bytes), + result: sender, + })?; + let job = receiver.await??; + + Ok(Job::from(job)) + } +} + +enum Command { + Enqueue { + queue: QueueId, + payload: Bytes, + result: oneshot::Sender, queue::Error>>, + }, + RegisterWorker { + config: S::WorkerConfig, + result: oneshot::Sender, queue::Error>>, + }, +} + +pub struct Backend +where + Database: Connection + PubSub + KeyValue, + S: Strategy, +{ + receiver: flume::Receiver>, + database: Database, + queues_by_name: HashMap>, + queues: HashMap>>, + workers: HashMap, + strategy: S, +} + +impl Backend +where + Database: Connection + PubSub + KeyValue, + S: Strategy, +{ + async fn run( + receiver: flume::Receiver>, + database: Database, + strategy: S, + ) -> Result<(), bonsaidb_core::Error> { + let mut queues_by_name = HashMap::new(); + let mut queues = HashMap::new(); + + for queue in Queue::all(&database).await? { + queues_by_name.insert(queue.contents.name.clone(), queue); + } + + for (_, job) in database + .view::() + .query_with_collection_docs() + .await? + .documents + { + let queue = queues + .entry(job.contents.queue_id) + .or_insert_with(VecDeque::default); + queue.push_back(job); + } + + Self { + receiver, + database, + queues_by_name, + queues, + workers: HashMap::new(), + strategy, + } + .orchestrate() + .await + } + + async fn orchestrate(&mut self) -> Result<(), bonsaidb_core::Error> { + while let Ok(command) = self.receiver.recv_async().await { + match command { + Command::Enqueue { + queue, + payload, + result, + } => { + drop(result.send(self.enqueue(queue, payload).await)); + } + Command::RegisterWorker { config, result } => { + todo!() + } + } + } + Ok(()) + } + + async fn enqueue( + &mut self, + queue: QueueId, + payload: Bytes, + ) -> Result, queue::Error> { + let queue_id = queue.as_id(&self.database).await?; + let job = schema::Job { + queue_id, + payload, + enqueued_at: Timestamp::now(), + progress: Progress::default(), + result: None, + returned_at: None, + cancelled_at: None, + } + .push_into(&self.database) + .await?; + let entries = self.queues.entry(job.contents.queue_id).or_default(); + let insert_at = match entries.binary_search_by(|existing_job| { + existing_job + .contents + .enqueued_at + .cmp(&job.contents.enqueued_at) + }) { + Ok(index) => index, + Err(index) => index, + }; + entries.insert(insert_at, job.clone()); + Ok(job) + } + + pub fn queue(&mut self, queue: u64) -> Option<&mut VecDeque>> { + self.queues.get_mut(&queue) + } + + pub fn queue_by_name( + &mut self, + queue: &QueueName, + ) -> Option<&mut VecDeque>> { + let id = self.queues_by_name.get(queue)?.header.id; + self.queue(id) + } +} + +pub trait Strategy: Sized + Send + Sync + 'static { + type WorkerConfig: Send + Sync; + + fn dequeue_for_worker( + &mut self, + worker: &Self::WorkerConfig, + backend: &mut Backend, + ) -> Option>; +} + +pub struct PriorityFifo; + +impl Strategy for PriorityFifo { + type WorkerConfig = WorkerConfig; + + fn dequeue_for_worker( + &mut self, + worker: &Self::WorkerConfig, + backend: &mut Backend, + ) -> Option> { + for tier in &worker.tiers { + if let Some((queue_with_oldest_job, _)) = tier + .0 + .iter() + .filter_map(|q| { + backend + .queue_by_name(q) + .and_then(|jobs| jobs.front().map(|j| (q, j.clone()))) + }) + .max_by(|(_, q1_front), (_, q2_front)| { + q1_front + .contents + .enqueued_at + .cmp(&q2_front.contents.enqueued_at) + }) + { + return backend + .queue_by_name(queue_with_oldest_job) + .unwrap() + .pop_front(); + } + } + + None + } +} diff --git a/crates/bonsaidb-jobs/src/queue.rs b/crates/bonsaidb-jobs/src/queue.rs new file mode 100644 index 00000000000..84f50ec4154 --- /dev/null +++ b/crates/bonsaidb-jobs/src/queue.rs @@ -0,0 +1,224 @@ +use std::fmt::Display; + +use bonsaidb_core::{ + connection::Connection, + document::CollectionDocument, + schema::{Authority, CollectionName, InsertError, SchemaName, Schematic, SerializedCollection}, +}; +use serde::{Deserialize, Serialize}; + +use crate::schema::{ + self, + queue::{ByOwnerAndName, ViewExt}, +}; + +pub(crate) fn define_collections(schematic: &mut Schematic) -> Result<(), bonsaidb_core::Error> { + schematic.define_collection::() +} + +pub struct Queue(CollectionDocument); + +impl Queue { + pub async fn find< + Owner: Into + Send, + Name: Into + Send, + Database: Connection, + >( + owner: Owner, + name: Name, + database: Database, + ) -> Result, bonsaidb_core::Error> { + let owner = owner.into(); + let name = name.into(); + let existing = database + .view::() + .find_queue(&owner, &name) + .query_with_collection_docs() + .await?; + Ok(existing + .documents + .into_iter() + .next() + .map(|(_, doc)| Self(doc))) + } + + pub async fn create< + Owner: Into + Send, + Name: Into + Send, + Database: Connection, + >( + owner: Owner, + name: Name, + database: Database, + ) -> Result { + schema::Queue { + name: QueueName::new(owner, name), + } + .push_into(&database) + .await + .map(Self) + .map_err(|err| err.error) + } + + pub const fn id(&self) -> u64 { + self.0.header.id + } + + pub const fn name(&self) -> &QueueName { + &self.0.contents.name + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq)] +pub enum QueueOwner { + Collection(CollectionName), + Authority(Authority), + Schema(SchemaName), + Backend, +} + +impl Display for QueueOwner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + QueueOwner::Collection(collection) => write!(f, "collection.{}", collection), + QueueOwner::Authority(authority) => write!(f, "authority.{}", authority), + QueueOwner::Schema(schema) => write!(f, "schema.{}", schema), + QueueOwner::Backend => f.write_str("backend"), + } + } +} + +impl From for QueueOwner { + fn from(name: CollectionName) -> Self { + Self::Collection(name) + } +} + +impl From for QueueOwner { + fn from(name: Authority) -> Self { + Self::Authority(name) + } +} + +impl From for QueueOwner { + fn from(name: SchemaName) -> Self { + Self::Schema(name) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct QueueName { + pub owner: QueueOwner, + pub name: String, +} + +impl QueueName { + pub fn new + Send, Name: Into + Send>( + owner: Owner, + name: Name, + ) -> Self { + Self { + owner: owner.into(), + name: name.into(), + } + } + + pub fn format(owner: &QueueOwner, name: &str) -> String { + let mut string = String::new(); + Self::format_into(owner, name, &mut string).unwrap(); + string + } + + pub fn format_into( + owner: &QueueOwner, + name: &str, + mut writer: impl std::fmt::Write, + ) -> Result<(), std::fmt::Error> { + write!(writer, "{}.{}", owner, name) + } +} + +impl Display for QueueName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.owner, self.name) + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum QueueId { + Id(u64), + Name(QueueName), +} + +impl QueueId { + pub async fn as_id(&self, database: &Database) -> Result { + match self { + QueueId::Id(id) => Ok(*id), + QueueId::Name(name) => { + let existing = database + .view::() + .find_queue(&name.owner, &name.name) + .query() + .await?; + Ok(existing + .into_iter() + .next() + .map(|mapping| mapping.source.id.deserialize()) + .transpose()? + .ok_or(Error::NotFound)?) + } + } + } +} + +impl From for QueueId { + fn from(id: u64) -> Self { + Self::Id(id) + } +} + +impl<'a> From<&'a Queue> for QueueId { + fn from(queue: &'a Queue) -> Self { + Self::Name(queue.name().clone()) + } +} + +impl From for QueueId { + fn from(name: QueueName) -> Self { + Self::Name(name) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("queue not found")] + NotFound, + #[error("database error: {0}")] + Database(#[from] bonsaidb_core::Error), + #[error("internal communication failure")] + InternalCommunication, +} + +impl From for Error { + fn from(_: tokio::sync::oneshot::error::RecvError) -> Self { + Self::InternalCommunication + } +} + +impl From> for Error { + fn from(_: flume::SendError) -> Self { + Self::InternalCommunication + } +} + +impl From for Error { + fn from(_: flume::RecvError) -> Self { + Self::InternalCommunication + } +} + +impl From> for Error { + fn from(err: InsertError) -> Self { + Self::Database(err.error) + } +} diff --git a/crates/bonsaidb-jobs/src/schema/job.rs b/crates/bonsaidb-jobs/src/schema/job.rs new file mode 100644 index 00000000000..f967e7be5a3 --- /dev/null +++ b/crates/bonsaidb-jobs/src/schema/job.rs @@ -0,0 +1,36 @@ +use bonsaidb_core::{ + arc_bytes::serde::Bytes, + document::{CollectionDocument, Emit}, + keyvalue::Timestamp, + schema::{Collection, CollectionViewSchema, View, ViewMapResult}, +}; +use serde::{Deserialize, Serialize}; + +use crate::job::Progress; + +#[derive(Collection, Clone, Debug, Serialize, Deserialize)] +#[collection(name = "jobs", authority = "bonsaidb", core = bonsaidb_core)] +pub struct Job { + pub queue_id: u64, + pub payload: Bytes, + pub enqueued_at: Timestamp, + pub progress: Progress, + pub returned_at: Option, + pub cancelled_at: Option, + pub result: Option, +} + +#[derive(View, Debug, Clone)] +#[view(name = "pending", key = Timestamp, collection = Job, core = bonsaidb_core)] +pub struct PendingJobs; + +impl CollectionViewSchema for PendingJobs { + type View = Self; + + fn map( + &self, + document: CollectionDocument<::Collection>, + ) -> ViewMapResult { + document.header.emit_key(document.contents.enqueued_at) + } +} diff --git a/crates/bonsaidb-jobs/src/schema/mod.rs b/crates/bonsaidb-jobs/src/schema/mod.rs new file mode 100644 index 00000000000..94bb4ed5c09 --- /dev/null +++ b/crates/bonsaidb-jobs/src/schema/mod.rs @@ -0,0 +1,3 @@ +pub mod job; +pub mod queue; +pub use self::{job::Job, queue::Queue}; diff --git a/crates/bonsaidb-jobs/src/schema/queue.rs b/crates/bonsaidb-jobs/src/schema/queue.rs new file mode 100644 index 00000000000..91ce952b90b --- /dev/null +++ b/crates/bonsaidb-jobs/src/schema/queue.rs @@ -0,0 +1,46 @@ +use bonsaidb_core::{ + connection::{self, Connection}, + document::{CollectionDocument, Emit}, + schema::{Collection, CollectionViewSchema, View, ViewMapResult}, +}; +use serde::{Deserialize, Serialize}; + +use crate::queue::{QueueName, QueueOwner}; + +#[derive(Collection, Debug, Serialize, Deserialize)] +#[collection(name = "queues", authority = "bonsaidb", core = bonsaidb_core)] +pub struct Queue { + pub name: QueueName, +} + +#[derive(View, Debug, Clone)] +#[view(name = "by-name", collection = Queue, key = String, core = bonsaidb_core)] +pub struct ByOwnerAndName; + +impl CollectionViewSchema for ByOwnerAndName { + type View = Self; + + fn unique(&self) -> bool { + true + } + + fn map( + &self, + document: CollectionDocument<::Collection>, + ) -> ViewMapResult { + document.header.emit_key(document.contents.name.to_string()) + } +} + +pub trait ViewExt: Sized { + fn find_queue(self, owner: &QueueOwner, name: &str) -> Self; +} + +impl<'a, Cn> ViewExt for connection::View<'a, Cn, ByOwnerAndName> +where + Cn: Connection, +{ + fn find_queue(self, owner: &QueueOwner, name: &str) -> Self { + self.with_key(QueueName::format(owner, name)) + } +} diff --git a/crates/bonsaidb-jobs/src/worker.rs b/crates/bonsaidb-jobs/src/worker.rs new file mode 100644 index 00000000000..78059e4e73d --- /dev/null +++ b/crates/bonsaidb-jobs/src/worker.rs @@ -0,0 +1,9 @@ +use crate::queue::QueueName; + +pub struct Worker {} + +pub struct WorkerConfig { + pub tiers: Vec, +} + +pub struct JobTier(pub Vec); diff --git a/crates/bonsaidb-local/src/database/pubsub.rs b/crates/bonsaidb-local/src/database/pubsub.rs index 132ebf219f9..fb160a6751a 100644 --- a/crates/bonsaidb-local/src/database/pubsub.rs +++ b/crates/bonsaidb-local/src/database/pubsub.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; pub use bonsaidb_core::circulate::Relay; use bonsaidb_core::{ + arc_bytes::OwnedBytes, circulate, pubsub::{self, database_topic, PubSub}, Error, @@ -50,6 +51,39 @@ impl PubSub for super::Database { .await?; Ok(()) } + + async fn publish_raw + Send, P: Into + Send>( + &self, + topic: S, + payload: P, + ) -> Result<(), bonsaidb_core::Error> { + self.data + .storage + .relay() + .publish_raw(database_topic(&self.data.name, &topic.into()), payload) + .await; + Ok(()) + } + + async fn publish_raw_to_all + Send>( + &self, + topics: Vec, + payload: P, + ) -> Result<(), bonsaidb_core::Error> { + let payload = payload.into(); + self.data + .storage + .relay() + .publish_raw_to_all( + topics + .iter() + .map(|topic| database_topic(&self.data.name, topic)) + .collect(), + payload, + ) + .await; + Ok(()) + } } /// A subscriber for `PubSub` messages. diff --git a/crates/bonsaidb-server/src/server.rs b/crates/bonsaidb-server/src/server.rs index e7343d1d51b..6043a3e9008 100644 --- a/crates/bonsaidb-server/src/server.rs +++ b/crates/bonsaidb-server/src/server.rs @@ -16,7 +16,7 @@ use async_lock::{Mutex, RwLock}; use async_trait::async_trait; use bonsaidb_core::{ admin::User, - arc_bytes::serde::Bytes, + arc_bytes::{serde::Bytes, OwnedBytes}, circulate::{Message, Relay, Subscriber}, connection::{self, AccessPolicy, Connection, QueryKey, Range, Sort, StorageConnection}, custom_api::{CustomApi, CustomApiResult}, @@ -795,20 +795,30 @@ impl CustomServer { Ok(permissions) } - async fn publish_message(&self, database: &str, topic: &str, payload: Vec) { + async fn publish_message( + &self, + database: &str, + topic: &str, + payload: impl Into + Send, + ) { self.data .relay .publish_message(Message { topic: database_topic(database, topic), - payload, + payload: payload.into(), }) .await; } - async fn publish_serialized_to_all(&self, database: &str, topics: &[String], payload: Vec) { + async fn publish_raw_to_all( + &self, + database: &str, + topics: &[String], + payload: impl Into + Send, + ) { self.data .relay - .publish_serialized_to_all( + .publish_raw_to_all( topics .iter() .map(|topic| database_topic(database, topic)) @@ -1795,7 +1805,7 @@ impl<'s, B: Backend> bonsaidb_core::networking::PublishHandler for DatabaseDispa async fn resource_name<'a>( &'a self, topic: &'a String, - _payload: &'a Bytes, + _payload: &'a OwnedBytes, ) -> Result, Error> { Ok(pubsub_topic_resource_name(&self.name, topic)) } @@ -1808,11 +1818,11 @@ impl<'s, B: Backend> bonsaidb_core::networking::PublishHandler for DatabaseDispa &self, _permissions: &Permissions, topic: String, - payload: Bytes, + payload: OwnedBytes, ) -> Result>, Error> { self.server_dispatcher .server - .publish_message(&self.name, &topic, payload.into_vec()) + .publish_message(&self.name, &topic, payload) .await; Ok(Response::Ok) } @@ -1824,7 +1834,7 @@ impl<'s, B: Backend> bonsaidb_core::networking::PublishToAllHandler for Database &self, permissions: &Permissions, topics: &Vec, - _payload: &Bytes, + _payload: &OwnedBytes, ) -> Result<(), Error> { for topic in topics { let topic_name = pubsub_topic_resource_name(&self.name, topic); @@ -1844,11 +1854,11 @@ impl<'s, B: Backend> bonsaidb_core::networking::PublishToAllHandler for Database &self, _permissions: &Permissions, topics: Vec, - payload: Bytes, + payload: OwnedBytes, ) -> Result>, Error> { self.server_dispatcher .server - .publish_serialized_to_all(&self.name, &topics, payload.into_vec()) + .publish_raw_to_all(&self.name, &topics, payload) .await; Ok(Response::Ok) } diff --git a/crates/bonsaidb-server/src/server/database.rs b/crates/bonsaidb-server/src/server/database.rs index ab4a97bba85..2939f148871 100644 --- a/crates/bonsaidb-server/src/server/database.rs +++ b/crates/bonsaidb-server/src/server/database.rs @@ -2,6 +2,7 @@ use std::{ops::Deref, sync::Arc}; use async_trait::async_trait; use bonsaidb_core::{ + arc_bytes::OwnedBytes, circulate::Message, connection::{AccessPolicy, QueryKey, Range, Sort}, document::{AnyDocumentId, OwnedDocument}, @@ -57,7 +58,29 @@ impl PubSub for ServerDatabase { payload: &P, ) -> Result<(), bonsaidb_core::Error> { self.server - .publish_serialized_to_all(self.db.name(), &topics, pot::to_vec(payload)?) + .publish_raw_to_all(self.db.name(), &topics, pot::to_vec(payload)?) + .await; + Ok(()) + } + + async fn publish_raw + Send, P: Into + Send>( + &self, + topic: S, + payload: P, + ) -> Result<(), bonsaidb_core::Error> { + self.server + .publish_message(self.db.name(), &topic.into(), payload) + .await; + Ok(()) + } + + async fn publish_raw_to_all + Send>( + &self, + topics: Vec, + payload: P, + ) -> Result<(), bonsaidb_core::Error> { + self.server + .publish_raw_to_all(self.db.name(), &topics, payload) .await; Ok(()) }