Skip to content

Commit

Permalink
feat(ctl/worker_mgr): implement basic worker mgr
Browse files Browse the repository at this point in the history
  • Loading branch information
lffg committed Jun 20, 2024
1 parent 67282d4 commit e54b6af
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 45 deletions.
24 changes: 23 additions & 1 deletion ctl/src/args.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
use std::time::Duration;

use clap::Parser;

#[derive(Debug, Parser)]
pub struct CtlArgs {}
pub struct CtlArgs {
/// Interval after which a worker that hasn't send any metrics *can be*
/// considered dead, after which it will be removed from the controller's
/// workers pool.
///
/// Notice that this interval MUST be greater than the value configured for
/// **each** worker's `--metrics_report_interval` parameter.
///
/// Time in seconds. Should be greater than 1.
#[arg(
long,
default_value = "10",
value_parser = parse_duration
)]
pub worker_liveness_timeout: Duration,
}

fn parse_duration(arg: &str) -> eyre::Result<Duration> {
let s = arg.parse()?;
Ok(Duration::from_secs(s))
}
3 changes: 2 additions & 1 deletion ctl/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use axum::{routing::post, Router};

use crate::discovery::DiscoveryHandle;
use crate::{discovery::DiscoveryHandle, worker_mgr::WorkerMgrHandle};

pub mod deployer;
pub mod worker;

#[derive(Clone)]
pub struct HttpState {
pub discovery: DiscoveryHandle,
pub worker_mgr: WorkerMgrHandle,
}

pub fn mk_app(state: HttpState) -> Router {
Expand Down
41 changes: 30 additions & 11 deletions ctl/src/http/worker.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,39 @@
use axum::Json;
use std::net::SocketAddr;

use axum::{
extract::{ConnectInfo, State},
Json,
};
use proto::ctl::worker::{
ByeReq, ByeRes, HelloReq, HelloRes, PushWorkerMetricsReq, PushWorkerMetricsRes,
};
use tracing::info;

pub async fn hello(Json(payload): Json<HelloReq>) -> Json<HelloRes> {
info!("{payload:#?}");
Json(HelloRes {})
use crate::http::HttpState;

pub async fn hello(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(state): State<HttpState>,
Json(HelloReq {}): Json<HelloReq>,
) -> Json<HelloRes> {
let addr = addr.ip();
let status = state.worker_mgr.hello(addr).await;
Json(HelloRes { status })
}

pub async fn bye(Json(payload): Json<ByeReq>) -> Json<ByeRes> {
info!("{payload:#?}");
Json(ByeRes {})
pub async fn bye(Json(ByeReq {}): Json<ByeReq>) -> Json<ByeRes> {
todo!();
// Json(ByeRes {})
}

pub async fn push_metrics(Json(payload): Json<PushWorkerMetricsReq>) -> Json<PushWorkerMetricsRes> {
info!("{payload:#?}");
Json(PushWorkerMetricsRes {})
pub async fn push_metrics(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(state): State<HttpState>,
Json(PushWorkerMetricsReq {
metrics,
recorded_at: _,
}): Json<PushWorkerMetricsReq>,
) -> Json<PushWorkerMetricsRes> {
let addr = addr.ip();
let status = state.worker_mgr.push_metrics(addr, metrics).await;
Json(PushWorkerMetricsRes { status })
}
15 changes: 11 additions & 4 deletions ctl/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
net::{IpAddr, Ipv4Addr},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
};

Expand All @@ -9,11 +9,12 @@ use tokio::task::JoinSet;
use tracing::info;
use utils::server::mk_listener;

use crate::{args::CtlArgs, discovery::Discovery, http::HttpState};
use crate::{args::CtlArgs, discovery::Discovery, http::HttpState, worker_mgr::WorkerMgr};

mod args;
mod discovery;
mod http;
mod worker_mgr;

const ANY_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));

Expand All @@ -34,11 +35,17 @@ async fn main() -> eyre::Result<()> {
discovery.run().await;
});

let (worker_mgr, worker_mgr_handle) = WorkerMgr::new(args.worker_liveness_timeout);
bag.spawn(async move {
worker_mgr.run().await;
});

bag.spawn(async move {
let state = HttpState {
discovery: discovery_handle.clone(),
discovery: discovery_handle,
worker_mgr: worker_mgr_handle,
};
let app = http::mk_app(state);
let app = http::mk_app(state).into_make_service_with_connect_info::<SocketAddr>();
info!("ctl http listening at {ANY_IP}:{CTL_HTTP_PORT}");
axum::serve(http_listener, app).await.unwrap();
});
Expand Down
186 changes: 186 additions & 0 deletions ctl/src/worker_mgr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#![allow(dead_code)]

use std::{
collections::{hash_map::Entry, HashMap},
net::IpAddr,
time::{Duration, Instant},
};

use proto::{
common::node::Metrics,
ctl::worker::{HelloStatus, PushMetricsStatus},
};
use tokio::{
select,
sync::{mpsc, oneshot},
time,
};
use tracing::{info, instrument, trace, warn};

pub struct WorkerMgr {
rx: mpsc::Receiver<Msg>,
handle: WorkerMgrHandle,
workers: HashMap<IpAddr, WorkerDetails>,
liveness_timeout: Duration,
}

#[derive(Debug, Clone)]
pub struct WorkerDetails {
pub addr: IpAddr,
pub metrics: Metrics,
pub collected_at: Instant,
}

impl WorkerMgr {
#[must_use]
pub fn new(liveness_timeout: Duration) -> (WorkerMgr, WorkerMgrHandle) {
let (tx, rx) = mpsc::channel(16);
let handle = WorkerMgrHandle(tx);
let actor = WorkerMgr {
rx,
handle: handle.clone(),
workers: HashMap::default(),
liveness_timeout,
};
(actor, handle)
}

pub async fn run(mut self) {
let mut interval = time::interval(self.liveness_timeout);
loop {
select! {
Some(msg) = self.rx.recv() => {
// Attention to back pressure.
self.handle_msg(msg).await;
}
inst = interval.tick() => {
self.handle_msg(Msg::Tick(inst.into_std())).await;
}
}
}
}

async fn handle_msg(&mut self, msg: Msg) {
trace!(?msg, "got msg");
match msg {
Msg::Hello(addr, reply) => {
_ = reply.send(self.handle_hello(addr));
}
Msg::Bye(addr) => {
self.handle_bye(addr);
}
Msg::PushMetrics(a, m, reply) => {
_ = reply.send(self.handle_push_metrics(a, m));
}
Msg::QueryWorkers(reply) => {
let workers = self.workers.values().cloned().collect();
_ = reply.send(workers);
}
Msg::Tick(instant) => {
self.handle_tick(instant).await;
}
}
}

#[instrument(skip(self))]
fn handle_hello(&mut self, addr: IpAddr) -> HelloStatus {
match self.workers.entry(addr) {
Entry::Occupied(_) => {
warn!("unnecessarily hello operation");
HelloStatus::AlreadyRegistered
}
Entry::Vacant(entry) => {
info!("worker joined");
entry.insert(WorkerDetails {
addr,
metrics: Metrics::default(),
collected_at: Instant::now(),
});
HelloStatus::Ok
}
}
// TODO: Notify interested parties
}

#[instrument(skip(self))]
fn handle_bye(&mut self, addr: IpAddr) {
let opt = self.workers.remove(&addr);
info!("removed worker from ctl pool");
if opt.is_none() {
warn!("worker wasn't registered");
}
}

#[instrument(skip(self, metrics))]
fn handle_push_metrics(&mut self, addr: IpAddr, metrics: Metrics) -> PushMetricsStatus {
let Some(details) = self.workers.get_mut(&addr) else {
warn!("received metrics from removed worker");
return PushMetricsStatus::Removed;
};
details.metrics = metrics;
details.collected_at = Instant::now();
PushMetricsStatus::Ack
}

async fn handle_tick(&mut self, instant: Instant) {
// For the purposes of this routine, we assume that `instant` occurs
// AFTER every `worker`'s `collected_at` instant.
for worker in self.workers.values() {
let maybe_elapsed = instant.checked_duration_since(worker.collected_at);
let Some(elapsed) = maybe_elapsed else {
// collected_at occurred after instant, so the worker is alive
continue;
};
if elapsed < self.liveness_timeout {
// elapsed time is within the timeout bounds, so worker is alive
continue;
}
// worker is most possibly dead, send a bye
self.handle.send(Msg::Bye(worker.addr)).await;
}
}
}

#[derive(Clone)]
pub struct WorkerMgrHandle(mpsc::Sender<Msg>);

impl WorkerMgrHandle {
async fn send(&self, msg: Msg) {
_ = self.0.send(msg).await;
}

/// Sends a message and waits for a reply.
async fn send_wait<F, R>(&self, f: F) -> R
where
F: FnOnce(oneshot::Sender<R>) -> Msg,
{
let (tx, rx) = oneshot::channel();
self.send(f(tx)).await;
rx.await.expect("actor must be alive")
}

pub async fn hello(&self, addr: IpAddr) -> HelloStatus {
self.send_wait(|r| Msg::Hello(addr, r)).await
}

pub async fn bye(&self, addr: IpAddr) {
self.send(Msg::Bye(addr)).await;
}

pub async fn push_metrics(&self, addr: IpAddr, metrics: Metrics) -> PushMetricsStatus {
self.send_wait(|r| Msg::PushMetrics(addr, metrics, r)).await
}

pub async fn query_workers(&self) -> Vec<WorkerDetails> {
self.send_wait(Msg::QueryWorkers).await
}
}

#[derive(Debug)]
enum Msg {
Hello(IpAddr, oneshot::Sender<HelloStatus>),
Bye(IpAddr),
PushMetrics(IpAddr, Metrics, oneshot::Sender<PushMetricsStatus>),
QueryWorkers(oneshot::Sender<Vec<WorkerDetails>>),
Tick(Instant),
}
8 changes: 3 additions & 5 deletions proto/src/clients/ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use crate::{
DeployServiceReq, DeployServiceRes, RedeploymentPolicy, ReportDeployInstanceStatusReq,
ReportDeployInstanceStatusRes, TerminateServiceReq, TerminateServiceRes,
},
worker::{
ByeRes, HelloReq, HelloRes, PortsMap, PushWorkerMetricsReq, PushWorkerMetricsRes,
},
worker::{ByeRes, HelloReq, HelloRes, PushWorkerMetricsReq, PushWorkerMetricsRes},
},
well_known::CTL_HTTP_PORT,
};
Expand All @@ -42,8 +40,8 @@ impl CtlClient {
format!("{base}{path}", base = self.base_url)
}

pub async fn hello(&self, ports: PortsMap) -> eyre::Result<HelloRes> {
let body = HelloReq { ports };
pub async fn hello(&self) -> eyre::Result<HelloRes> {
let body = HelloReq {};
self.client.send(self.url("/worker/hello"), &body).await
}

Expand Down
2 changes: 1 addition & 1 deletion proto/src/common/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum NodeKind {
Worker,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Metrics {
/// The average CPU usage.
pub cpu_usage: f64,
Expand Down
Loading

0 comments on commit e54b6af

Please sign in to comment.