From e54b6af7e4a543e341db1ca4b5571dae55850621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luiz=20Felipe=20Gon=C3=A7alves?= Date: Thu, 20 Jun 2024 19:12:02 -0300 Subject: [PATCH] feat(ctl/worker_mgr): implement basic worker mgr --- ctl/src/args.rs | 24 ++++- ctl/src/http/mod.rs | 3 +- ctl/src/http/worker.rs | 41 +++++--- ctl/src/main.rs | 15 ++- ctl/src/worker_mgr.rs | 186 +++++++++++++++++++++++++++++++++++ proto/src/clients/ctl.rs | 8 +- proto/src/common/node.rs | 2 +- proto/src/ctl/worker.rs | 33 ++++--- worker/src/args.rs | 13 ++- worker/src/main.rs | 2 +- worker/src/monitor/pusher.rs | 30 +++++- 11 files changed, 312 insertions(+), 45 deletions(-) create mode 100644 ctl/src/worker_mgr.rs diff --git a/ctl/src/args.rs b/ctl/src/args.rs index da8e51c..6ad4113 100644 --- a/ctl/src/args.rs +++ b/ctl/src/args.rs @@ -1,4 +1,26 @@ +use std::time::Duration; + use clap::Parser; #[derive(Debug, Parser)] -pub struct CtlArgs {} +pub struct CtlArgs { + /// Interval after which a worker that hasn't send any metrics *can be* + /// considered dead, after which it will be removed from the controller's + /// workers pool. + /// + /// Notice that this interval MUST be greater than the value configured for + /// **each** worker's `--metrics_report_interval` parameter. + /// + /// Time in seconds. Should be greater than 1. + #[arg( + long, + default_value = "10", + value_parser = parse_duration + )] + pub worker_liveness_timeout: Duration, +} + +fn parse_duration(arg: &str) -> eyre::Result { + let s = arg.parse()?; + Ok(Duration::from_secs(s)) +} diff --git a/ctl/src/http/mod.rs b/ctl/src/http/mod.rs index 02d943b..5e130f8 100644 --- a/ctl/src/http/mod.rs +++ b/ctl/src/http/mod.rs @@ -1,6 +1,6 @@ use axum::{routing::post, Router}; -use crate::discovery::DiscoveryHandle; +use crate::{discovery::DiscoveryHandle, worker_mgr::WorkerMgrHandle}; pub mod deployer; pub mod worker; @@ -8,6 +8,7 @@ pub mod worker; #[derive(Clone)] pub struct HttpState { pub discovery: DiscoveryHandle, + pub worker_mgr: WorkerMgrHandle, } pub fn mk_app(state: HttpState) -> Router { diff --git a/ctl/src/http/worker.rs b/ctl/src/http/worker.rs index 203ab5f..ea7669c 100644 --- a/ctl/src/http/worker.rs +++ b/ctl/src/http/worker.rs @@ -1,20 +1,39 @@ -use axum::Json; +use std::net::SocketAddr; + +use axum::{ + extract::{ConnectInfo, State}, + Json, +}; use proto::ctl::worker::{ ByeReq, ByeRes, HelloReq, HelloRes, PushWorkerMetricsReq, PushWorkerMetricsRes, }; -use tracing::info; -pub async fn hello(Json(payload): Json) -> Json { - info!("{payload:#?}"); - Json(HelloRes {}) +use crate::http::HttpState; + +pub async fn hello( + ConnectInfo(addr): ConnectInfo, + State(state): State, + Json(HelloReq {}): Json, +) -> Json { + let addr = addr.ip(); + let status = state.worker_mgr.hello(addr).await; + Json(HelloRes { status }) } -pub async fn bye(Json(payload): Json) -> Json { - info!("{payload:#?}"); - Json(ByeRes {}) +pub async fn bye(Json(ByeReq {}): Json) -> Json { + todo!(); + // Json(ByeRes {}) } -pub async fn push_metrics(Json(payload): Json) -> Json { - info!("{payload:#?}"); - Json(PushWorkerMetricsRes {}) +pub async fn push_metrics( + ConnectInfo(addr): ConnectInfo, + State(state): State, + Json(PushWorkerMetricsReq { + metrics, + recorded_at: _, + }): Json, +) -> Json { + let addr = addr.ip(); + let status = state.worker_mgr.push_metrics(addr, metrics).await; + Json(PushWorkerMetricsRes { status }) } diff --git a/ctl/src/main.rs b/ctl/src/main.rs index fa2590e..7839fda 100644 --- a/ctl/src/main.rs +++ b/ctl/src/main.rs @@ -1,5 +1,5 @@ use std::{ - net::{IpAddr, Ipv4Addr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, sync::Arc, }; @@ -9,11 +9,12 @@ use tokio::task::JoinSet; use tracing::info; use utils::server::mk_listener; -use crate::{args::CtlArgs, discovery::Discovery, http::HttpState}; +use crate::{args::CtlArgs, discovery::Discovery, http::HttpState, worker_mgr::WorkerMgr}; mod args; mod discovery; mod http; +mod worker_mgr; const ANY_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); @@ -34,11 +35,17 @@ async fn main() -> eyre::Result<()> { discovery.run().await; }); + let (worker_mgr, worker_mgr_handle) = WorkerMgr::new(args.worker_liveness_timeout); + bag.spawn(async move { + worker_mgr.run().await; + }); + bag.spawn(async move { let state = HttpState { - discovery: discovery_handle.clone(), + discovery: discovery_handle, + worker_mgr: worker_mgr_handle, }; - let app = http::mk_app(state); + let app = http::mk_app(state).into_make_service_with_connect_info::(); info!("ctl http listening at {ANY_IP}:{CTL_HTTP_PORT}"); axum::serve(http_listener, app).await.unwrap(); }); diff --git a/ctl/src/worker_mgr.rs b/ctl/src/worker_mgr.rs new file mode 100644 index 0000000..dc8bff1 --- /dev/null +++ b/ctl/src/worker_mgr.rs @@ -0,0 +1,186 @@ +#![allow(dead_code)] + +use std::{ + collections::{hash_map::Entry, HashMap}, + net::IpAddr, + time::{Duration, Instant}, +}; + +use proto::{ + common::node::Metrics, + ctl::worker::{HelloStatus, PushMetricsStatus}, +}; +use tokio::{ + select, + sync::{mpsc, oneshot}, + time, +}; +use tracing::{info, instrument, trace, warn}; + +pub struct WorkerMgr { + rx: mpsc::Receiver, + handle: WorkerMgrHandle, + workers: HashMap, + liveness_timeout: Duration, +} + +#[derive(Debug, Clone)] +pub struct WorkerDetails { + pub addr: IpAddr, + pub metrics: Metrics, + pub collected_at: Instant, +} + +impl WorkerMgr { + #[must_use] + pub fn new(liveness_timeout: Duration) -> (WorkerMgr, WorkerMgrHandle) { + let (tx, rx) = mpsc::channel(16); + let handle = WorkerMgrHandle(tx); + let actor = WorkerMgr { + rx, + handle: handle.clone(), + workers: HashMap::default(), + liveness_timeout, + }; + (actor, handle) + } + + pub async fn run(mut self) { + let mut interval = time::interval(self.liveness_timeout); + loop { + select! { + Some(msg) = self.rx.recv() => { + // Attention to back pressure. + self.handle_msg(msg).await; + } + inst = interval.tick() => { + self.handle_msg(Msg::Tick(inst.into_std())).await; + } + } + } + } + + async fn handle_msg(&mut self, msg: Msg) { + trace!(?msg, "got msg"); + match msg { + Msg::Hello(addr, reply) => { + _ = reply.send(self.handle_hello(addr)); + } + Msg::Bye(addr) => { + self.handle_bye(addr); + } + Msg::PushMetrics(a, m, reply) => { + _ = reply.send(self.handle_push_metrics(a, m)); + } + Msg::QueryWorkers(reply) => { + let workers = self.workers.values().cloned().collect(); + _ = reply.send(workers); + } + Msg::Tick(instant) => { + self.handle_tick(instant).await; + } + } + } + + #[instrument(skip(self))] + fn handle_hello(&mut self, addr: IpAddr) -> HelloStatus { + match self.workers.entry(addr) { + Entry::Occupied(_) => { + warn!("unnecessarily hello operation"); + HelloStatus::AlreadyRegistered + } + Entry::Vacant(entry) => { + info!("worker joined"); + entry.insert(WorkerDetails { + addr, + metrics: Metrics::default(), + collected_at: Instant::now(), + }); + HelloStatus::Ok + } + } + // TODO: Notify interested parties + } + + #[instrument(skip(self))] + fn handle_bye(&mut self, addr: IpAddr) { + let opt = self.workers.remove(&addr); + info!("removed worker from ctl pool"); + if opt.is_none() { + warn!("worker wasn't registered"); + } + } + + #[instrument(skip(self, metrics))] + fn handle_push_metrics(&mut self, addr: IpAddr, metrics: Metrics) -> PushMetricsStatus { + let Some(details) = self.workers.get_mut(&addr) else { + warn!("received metrics from removed worker"); + return PushMetricsStatus::Removed; + }; + details.metrics = metrics; + details.collected_at = Instant::now(); + PushMetricsStatus::Ack + } + + async fn handle_tick(&mut self, instant: Instant) { + // For the purposes of this routine, we assume that `instant` occurs + // AFTER every `worker`'s `collected_at` instant. + for worker in self.workers.values() { + let maybe_elapsed = instant.checked_duration_since(worker.collected_at); + let Some(elapsed) = maybe_elapsed else { + // collected_at occurred after instant, so the worker is alive + continue; + }; + if elapsed < self.liveness_timeout { + // elapsed time is within the timeout bounds, so worker is alive + continue; + } + // worker is most possibly dead, send a bye + self.handle.send(Msg::Bye(worker.addr)).await; + } + } +} + +#[derive(Clone)] +pub struct WorkerMgrHandle(mpsc::Sender); + +impl WorkerMgrHandle { + async fn send(&self, msg: Msg) { + _ = self.0.send(msg).await; + } + + /// Sends a message and waits for a reply. + async fn send_wait(&self, f: F) -> R + where + F: FnOnce(oneshot::Sender) -> Msg, + { + let (tx, rx) = oneshot::channel(); + self.send(f(tx)).await; + rx.await.expect("actor must be alive") + } + + pub async fn hello(&self, addr: IpAddr) -> HelloStatus { + self.send_wait(|r| Msg::Hello(addr, r)).await + } + + pub async fn bye(&self, addr: IpAddr) { + self.send(Msg::Bye(addr)).await; + } + + pub async fn push_metrics(&self, addr: IpAddr, metrics: Metrics) -> PushMetricsStatus { + self.send_wait(|r| Msg::PushMetrics(addr, metrics, r)).await + } + + pub async fn query_workers(&self) -> Vec { + self.send_wait(Msg::QueryWorkers).await + } +} + +#[derive(Debug)] +enum Msg { + Hello(IpAddr, oneshot::Sender), + Bye(IpAddr), + PushMetrics(IpAddr, Metrics, oneshot::Sender), + QueryWorkers(oneshot::Sender>), + Tick(Instant), +} diff --git a/proto/src/clients/ctl.rs b/proto/src/clients/ctl.rs index 01a0938..c929630 100644 --- a/proto/src/clients/ctl.rs +++ b/proto/src/clients/ctl.rs @@ -14,9 +14,7 @@ use crate::{ DeployServiceReq, DeployServiceRes, RedeploymentPolicy, ReportDeployInstanceStatusReq, ReportDeployInstanceStatusRes, TerminateServiceReq, TerminateServiceRes, }, - worker::{ - ByeRes, HelloReq, HelloRes, PortsMap, PushWorkerMetricsReq, PushWorkerMetricsRes, - }, + worker::{ByeRes, HelloReq, HelloRes, PushWorkerMetricsReq, PushWorkerMetricsRes}, }, well_known::CTL_HTTP_PORT, }; @@ -42,8 +40,8 @@ impl CtlClient { format!("{base}{path}", base = self.base_url) } - pub async fn hello(&self, ports: PortsMap) -> eyre::Result { - let body = HelloReq { ports }; + pub async fn hello(&self) -> eyre::Result { + let body = HelloReq {}; self.client.send(self.url("/worker/hello"), &body).await } diff --git a/proto/src/common/node.rs b/proto/src/common/node.rs index e8a2831..ebbc180 100644 --- a/proto/src/common/node.rs +++ b/proto/src/common/node.rs @@ -15,7 +15,7 @@ pub enum NodeKind { Worker, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct Metrics { /// The average CPU usage. pub cpu_usage: f64, diff --git a/proto/src/ctl/worker.rs b/proto/src/ctl/worker.rs index 9f34738..4320c73 100644 --- a/proto/src/ctl/worker.rs +++ b/proto/src/ctl/worker.rs @@ -4,20 +4,19 @@ use serde::{Deserialize, Serialize}; use crate::common::node::Metrics; #[derive(Debug, Serialize, Deserialize)] -pub struct HelloReq { - pub ports: PortsMap, -} +pub struct HelloReq {} -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PortsMap { - #[serde(rename = "h")] - pub http: u16, - #[serde(rename = "p")] - pub proxy: u16, +#[derive(Debug, Serialize, Deserialize)] +pub struct HelloRes { + pub status: HelloStatus, } #[derive(Debug, Serialize, Deserialize)] -pub struct HelloRes {} +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum HelloStatus { + Ok, + AlreadyRegistered, +} #[derive(Debug, Serialize, Deserialize)] pub struct ByeReq {} @@ -40,4 +39,16 @@ pub struct PushWorkerMetricsReq { /// Response for [`PushWorkerMetricsReq`]. #[derive(Debug, Serialize, Deserialize)] -pub struct PushWorkerMetricsRes {} +pub struct PushWorkerMetricsRes { + pub status: PushMetricsStatus, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum PushMetricsStatus { + /// Acknowledged. + Ack, + /// The worker has been removed from the cluster (at some moment in the + /// past), and this metrics call is refused. + Removed, +} diff --git a/worker/src/args.rs b/worker/src/args.rs index 9346e12..3b888c3 100644 --- a/worker/src/args.rs +++ b/worker/src/args.rs @@ -10,12 +10,15 @@ pub struct WorkerArgs { /// Interval at which metrics are pushed to the controller. /// - /// Time in seconds. Must be greater than 1. + /// Notice that this interval MUST be smaller than the value configured for + /// the controller's `--worker_liveness_timeout` parameter. + /// + /// Time in seconds. Should be greater than 1. #[arg( - long, - default_value = "5", - value_parser = parse_duration - )] + long, + default_value = "5", + value_parser = parse_duration +)] pub metrics_report_interval: Duration, } diff --git a/worker/src/main.rs b/worker/src/main.rs index 868dc1f..5dcc2c3 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -67,7 +67,7 @@ async fn main() -> Result<()> { let args = Arc::clone(&args); let ctl_client = ctl_client.clone(); async move { - pusher::start_pusher(args, ctl_client).await; + pusher::start_pusher(args, ctl_client).await.unwrap(); } }); diff --git a/worker/src/monitor/pusher.rs b/worker/src/monitor/pusher.rs index 63d6f24..a197c87 100644 --- a/worker/src/monitor/pusher.rs +++ b/worker/src/monitor/pusher.rs @@ -1,22 +1,42 @@ use std::sync::Arc; use chrono::Utc; -use proto::clients::CtlClient; +use eyre::Context as _; +use proto::{clients::CtlClient, ctl::worker::PushMetricsStatus}; use tokio::time::sleep; use tracing::{debug, error}; use crate::{args::WorkerArgs, monitor::collector::MetricsCollector}; -pub async fn start_pusher(args: Arc, ctl_client: CtlClient) { +pub async fn start_pusher(args: Arc, ctl_client: CtlClient) -> eyre::Result<()> { let mut metrics_report: MetricsCollector = MetricsCollector::new(); debug!("pusher started"); + + // Try to join the cluster + ctl_client + .hello() + .await + .wrap_err("worker failed to join the cluster")?; + loop { - sleep(args.metrics_report_interval).await; debug!("sending metrics"); let metrics = metrics_report.get_metrics(); let now = Utc::now(); - if let Err(error) = ctl_client.push_metrics(metrics, now).await { - error!(?error, "failed to send metrics to ctl"); + + let result = ctl_client + .push_metrics(metrics, now) + .await + .map(|r| r.status); + match result { + Ok(PushMetricsStatus::Ack) => (), + Ok(PushMetricsStatus::Removed) => { + eyre::bail!("worker was removed from cluster"); + } + Err(error) => { + error!(?error, "failed to send metrics to ctl"); + } } + + sleep(args.metrics_report_interval).await; } }