diff --git a/Cargo.lock b/Cargo.lock index 1df48d9..4c39f3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -277,6 +277,7 @@ dependencies = [ "setup", "tokio", "tracing", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2a0307d..23c3c60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,3 +42,4 @@ pedantic = "warn" wildcard_imports = "allow" module_name_repetitions = "allow" cast_precision_loss = "allow" +unused_async = "allow" diff --git a/ctl/Cargo.toml b/ctl/Cargo.toml index 38447e7..92c62e4 100644 --- a/ctl/Cargo.toml +++ b/ctl/Cargo.toml @@ -13,3 +13,4 @@ setup.workspace = true axum.workspace = true tokio.workspace = true tracing.workspace = true +uuid.workspace = true diff --git a/ctl/src/discovery/mod.rs b/ctl/src/discovery/mod.rs new file mode 100644 index 0000000..fbf8705 --- /dev/null +++ b/ctl/src/discovery/mod.rs @@ -0,0 +1,91 @@ +use std::{collections::HashMap, net::SocketAddr}; + +use proto::common::node::Metrics; +use tokio::sync::{mpsc, oneshot}; + +pub struct Discovery { + rx: mpsc::Receiver, + // TODO: Add more information on workers + workers: HashMap, +} + +impl Discovery { + #[must_use] + pub fn new() -> (Discovery, DiscoveryHandle) { + let (tx, rx) = mpsc::channel(10); + let actor = Discovery { + rx, + workers: HashMap::default(), + }; + let handle = DiscoveryHandle(tx); + (actor, handle) + } + + pub async fn run(mut self) { + while let Some(msg) = self.rx.recv().await { + // Attention to back pressure. + self.handle_msg(msg).await; + } + } + + async fn handle_msg(&mut self, msg: Msg) { + match msg { + Msg::WorkerAdd(addr, metrics) => { + self.workers.insert(addr, metrics); + } + Msg::WorkerDrop(addr) => { + self.workers.remove(&addr); + } + Msg::WorkerQuery(reply) => { + let entries = self + .workers + .iter() + .map(|(&addr, metrics)| WorkerDetails { + addr, + metrics: metrics.clone(), + }) + .collect(); + _ = reply.send(entries); + } + } + } +} + +#[derive(Clone)] +pub struct DiscoveryHandle(mpsc::Sender); + +impl DiscoveryHandle { + async fn send(&self, msg: Msg) { + _ = self.0.send(msg).await; + } + + #[allow(dead_code)] // TODO: Remove + pub async fn worker_add(&self, addr: SocketAddr, metrics: Metrics) { + self.send(Msg::WorkerAdd(addr, metrics)).await; + } + + #[allow(dead_code)] // TODO: Remove + pub async fn worker_drop(&self, addr: SocketAddr) { + self.send(Msg::WorkerDrop(addr)).await; + } + + #[allow(dead_code)] // TODO: Remove + pub async fn worker_query(&self) -> Vec { + let (tx, rx) = oneshot::channel(); + self.send(Msg::WorkerQuery(tx)).await; + rx.await.expect("actor must be alive") + } +} + +#[derive(Debug)] +enum Msg { + WorkerAdd(SocketAddr, Metrics), + WorkerDrop(SocketAddr), + WorkerQuery(oneshot::Sender>), +} + +#[derive(Debug)] +pub struct WorkerDetails { + pub addr: SocketAddr, + pub metrics: Metrics, +} diff --git a/ctl/src/main.rs b/ctl/src/main.rs index 821d21e..929a80b 100644 --- a/ctl/src/main.rs +++ b/ctl/src/main.rs @@ -1,5 +1,8 @@ use tracing::info; +use crate::discovery::Discovery; + +mod discovery; mod http; #[tokio::main] @@ -8,6 +11,15 @@ async fn main() { info!("started controller"); - let http_handle = tokio::spawn(async { http::run_server().await }); + let (discovery, _discovery_handle) = Discovery::new(); + let discovery_actor_handle = tokio::spawn(async move { + discovery.run().await; + }); + + let http_handle = tokio::spawn(async { + http::run_server().await; + }); + + discovery_actor_handle.await.unwrap(); http_handle.await.unwrap(); } diff --git a/proto/src/common/node.rs b/proto/src/common/node.rs index 3cf8159..aeea744 100644 --- a/proto/src/common/node.rs +++ b/proto/src/common/node.rs @@ -20,7 +20,7 @@ pub enum NodeKind { Worker, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct Metrics { /// The average CPU usage. pub cpu_usage: f64,