Skip to content

Commit

Permalink
Merge branch 'main' into feat/worker/monitor/pusher
Browse files Browse the repository at this point in the history
  • Loading branch information
lemosep authored Apr 23, 2024
2 parents 042af35 + 75c120f commit 36ce172
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 7 deletions.
12 changes: 10 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ setup.path = "crates/setup"
worker.path = "worker"
# External deps (keep alphabetically sorted)
axum = "0.7"
bty = { version = "0.1.0-pre.1", features = ["uuid"] }
bty = { version = "0.2", features = ["uuid-v7"] }
clap = { version = "4.5", features = ["derive"] }
chrono = { version = "0.4", default-features = false, features = [
"std",
Expand All @@ -34,11 +34,12 @@ tokio = { version = "1.36", features = [
] }
tracing = "0.1"
tracing-subscriber = "0.3"
uuid = { version = "1", features = ["serde", "v4"] }
uuid = { version = "1", features = ["serde", "v7"] }

[workspace.lints.clippy]
all = "warn"
pedantic = "warn"
wildcard_imports = "allow"
module_name_repetitions = "allow"
cast_precision_loss = "allow"
unused_async = "allow"
1 change: 1 addition & 0 deletions ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ setup.workspace = true
axum.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
152 changes: 152 additions & 0 deletions ctl/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::{collections::HashMap, net::SocketAddr};

use proto::{
common::node::Metrics,
ctl::deployer::{DeployId, DeployStatus, RevisionId},
};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, instrument};

pub struct Discovery {
rx: mpsc::Receiver<Msg>,
// TODO: Add more information on workers
workers: HashMap<SocketAddr, Metrics>,
deploys: HashMap<DeployId, DeployDetails>,
}

impl Discovery {
#[must_use]
pub fn new() -> (Discovery, DiscoveryHandle) {
let (tx, rx) = mpsc::channel(16);
let actor = Discovery {
rx,
workers: HashMap::default(),
deploys: HashMap::default(),
};
let handle = DiscoveryHandle(tx);
(actor, handle)
}

pub async fn run(mut self) {
while let Some(msg) = self.rx.recv().await {
// Attention to back pressure.
self.handle_msg(msg).await;
}
}

#[instrument(skip(self))]
async fn handle_msg(&mut self, msg: Msg) {
match msg {
Msg::WorkerAdd(addr, metrics) => {
self.workers.insert(addr, metrics);
}
Msg::WorkerDrop(addr) => {
self.workers.remove(&addr);
}
Msg::WorkerQuery(reply) => {
let entries = self
.workers
.iter()
.map(|(&addr, metrics)| WorkerDetails {
addr,
metrics: metrics.clone(),
})
.collect();
_ = reply.send(entries);
}
Msg::DeploySchedule(revision_id, reply) => {
let deploy_id = DeployId::now_v7();
assert!(!self.deploys.contains_key(&deploy_id));
self.deploys.insert(
deploy_id,
DeployDetails {
revision_id,
status: WorkerDeployStatus::Scheduled,
},
);
_ = reply.send(deploy_id);
}
Msg::DeployPushStatus(deploy_id, worker_addr, status) => {
let Some(details) = self.deploys.get_mut(&deploy_id) else {
debug!(?deploy_id, "queried for unavailable deploy");
return;
};
details.status = WorkerDeployStatus::Deployed(worker_addr, status);
}
}
}
}

#[derive(Clone)]
pub struct DiscoveryHandle(mpsc::Sender<Msg>);

impl DiscoveryHandle {
async fn send(&self, msg: Msg) {
_ = self.0.send(msg).await;
}

#[allow(dead_code)] // TODO: Remove
pub async fn add_worker(&self, addr: SocketAddr, metrics: Metrics) {
self.send(Msg::WorkerAdd(addr, metrics)).await;
}

#[allow(dead_code)] // TODO: Remove
pub async fn drop_worker(&self, addr: SocketAddr) {
self.send(Msg::WorkerDrop(addr)).await;
}

#[allow(dead_code)] // TODO: Remove
pub async fn query_worker(&self) -> Vec<WorkerDetails> {
let (tx, rx) = oneshot::channel();
self.send(Msg::WorkerQuery(tx)).await;
rx.await.expect("actor must be alive")
}

#[allow(dead_code)] // TODO: Remove
pub async fn schedule_deploy(&self, revision_id: RevisionId) -> DeployId {
let (tx, rx) = oneshot::channel();
self.send(Msg::DeploySchedule(revision_id, tx)).await;
rx.await.expect("actor must be alive")
}

#[allow(dead_code)] // TODO: Remove
pub async fn push_deploy_status(
&self,
deploy_id: DeployId,
worker_addr: SocketAddr,
status: DeployStatus,
) {
self.send(Msg::DeployPushStatus(deploy_id, worker_addr, status))
.await;
}
}

#[derive(Debug)]
enum Msg {
WorkerAdd(SocketAddr, Metrics),
WorkerDrop(SocketAddr),
WorkerQuery(oneshot::Sender<Vec<WorkerDetails>>),

DeploySchedule(RevisionId, oneshot::Sender<DeployId>),
DeployPushStatus(DeployId, SocketAddr, DeployStatus),
}

#[derive(Debug)]
pub struct DeployDetails {
pub revision_id: RevisionId,
pub status: WorkerDeployStatus,
}

#[derive(Debug)]
pub enum WorkerDeployStatus {
/// Deployment is scheduled (not yet in progress).
Scheduled,
/// Service is being deployed or running at a given node.
Deployed(SocketAddr, DeployStatus),
}

#[derive(Debug)]
pub struct WorkerDetails {
pub addr: SocketAddr,
pub metrics: Metrics,
}
14 changes: 13 additions & 1 deletion ctl/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use tracing::info;

use crate::discovery::Discovery;

mod discovery;
mod http;

#[tokio::main]
Expand All @@ -8,6 +11,15 @@ async fn main() {

info!("started controller");

let http_handle = tokio::spawn(async { http::run_server().await });
let (discovery, _discovery_handle) = Discovery::new();
let discovery_actor_handle = tokio::spawn(async move {
discovery.run().await;
});

let http_handle = tokio::spawn(async {
http::run_server().await;
});

discovery_actor_handle.await.unwrap();
http_handle.await.unwrap();
}
2 changes: 1 addition & 1 deletion proto/src/common/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum NodeKind {
Worker,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Metrics {
/// The average CPU usage.
pub cpu_usage: f64,
Expand Down
25 changes: 24 additions & 1 deletion proto/src/ctl/deployer.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,40 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::common::service::{ServiceName, ServiceSpec};

bty::brand!(
pub type RevisionId = Uuid;

pub type DeployId = Uuid;
);

#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub enum DeployStatus {
/// The deployment process is in progress (e.g. running the build script).
InProgress,
/// The deployment is finished and the service is running.
Running,
/// The service has gracefully stopped.
Stopped,
/// The service build script has failed.
BuildFailed,
/// The service has abruptly crashed.
Crashed,
}

/// Starts a new deploy in the system.
#[derive(Debug, Serialize, Deserialize)]
pub struct DeployReq {
pub revision_id: RevisionId,
pub service_spec: ServiceSpec,
}

/// Response for [`DeployReq`].
#[derive(Debug, Serialize, Deserialize)]
pub struct DeployRes {
// ???
pub revision_id: RevisionId,
pub deploy_ids: Vec<DeployId>,
}

/// Stops a given service from running in the system.
Expand Down

0 comments on commit 36ce172

Please sign in to comment.