Skip to content

Commit

Permalink
feat(discovery): initial impl
Browse files Browse the repository at this point in the history
  • Loading branch information
lffg committed Apr 12, 2024
1 parent 63216e1 commit e067766
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ pedantic = "warn"
wildcard_imports = "allow"
module_name_repetitions = "allow"
cast_precision_loss = "allow"
unused_async = "allow"
1 change: 1 addition & 0 deletions ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ setup.workspace = true
axum.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
91 changes: 91 additions & 0 deletions ctl/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::{collections::HashMap, net::SocketAddr};

use proto::common::node::Metrics;
use tokio::sync::{mpsc, oneshot};

pub struct Discovery {
rx: mpsc::Receiver<Msg>,
// TODO: Add more information on workers
workers: HashMap<SocketAddr, Metrics>,
}

impl Discovery {
#[must_use]
pub fn new() -> (Discovery, DiscoveryHandle) {
let (tx, rx) = mpsc::channel(10);
let actor = Discovery {
rx,
workers: HashMap::default(),
};
let handle = DiscoveryHandle(tx);
(actor, handle)
}

pub async fn run(mut self) {
while let Some(msg) = self.rx.recv().await {
// Attention to back pressure.
self.handle_msg(msg).await;
}
}

async fn handle_msg(&mut self, msg: Msg) {
match msg {
Msg::WorkerAdd(addr, metrics) => {
self.workers.insert(addr, metrics);
}
Msg::WorkerDrop(addr) => {
self.workers.remove(&addr);
}
Msg::WorkerQuery(reply) => {
let entries = self
.workers
.iter()
.map(|(&addr, metrics)| WorkerDetails {
addr,
metrics: metrics.clone(),
})
.collect();
_ = reply.send(entries);
}
}
}
}

#[derive(Clone)]
pub struct DiscoveryHandle(mpsc::Sender<Msg>);

impl DiscoveryHandle {
async fn send(&self, msg: Msg) {
_ = self.0.send(msg).await;
}

#[allow(dead_code)] // TODO: Remove
pub async fn worker_add(&self, addr: SocketAddr, metrics: Metrics) {
self.send(Msg::WorkerAdd(addr, metrics)).await;
}

#[allow(dead_code)] // TODO: Remove
pub async fn worker_drop(&self, addr: SocketAddr) {
self.send(Msg::WorkerDrop(addr)).await;
}

#[allow(dead_code)] // TODO: Remove
pub async fn worker_query(&self) -> Vec<WorkerDetails> {
let (tx, rx) = oneshot::channel();
self.send(Msg::WorkerQuery(tx)).await;
rx.await.expect("actor must be alive")
}
}

#[derive(Debug)]
enum Msg {
WorkerAdd(SocketAddr, Metrics),
WorkerDrop(SocketAddr),
WorkerQuery(oneshot::Sender<Vec<WorkerDetails>>),
}

#[derive(Debug)]
pub struct WorkerDetails {
pub addr: SocketAddr,
pub metrics: Metrics,
}
14 changes: 13 additions & 1 deletion ctl/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use tracing::info;

use crate::discovery::Discovery;

mod discovery;
mod http;

#[tokio::main]
Expand All @@ -8,6 +11,15 @@ async fn main() {

info!("started controller");

let http_handle = tokio::spawn(async { http::run_server().await });
let (discovery, _discovery_handle) = Discovery::new();
let discovery_actor_handle = tokio::spawn(async move {
discovery.run().await;
});

let http_handle = tokio::spawn(async {
http::run_server().await;
});

discovery_actor_handle.await.unwrap();
http_handle.await.unwrap();
}
2 changes: 1 addition & 1 deletion proto/src/common/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum NodeKind {
Worker,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Metrics {
/// The average CPU usage.
pub cpu_usage: f64,
Expand Down

0 comments on commit e067766

Please sign in to comment.