diff --git a/ctl/src/balancer/mod.rs b/ctl/src/balancer/mod.rs index 8d9d68c..4326ada 100644 --- a/ctl/src/balancer/mod.rs +++ b/ctl/src/balancer/mod.rs @@ -24,8 +24,9 @@ use hyper_util::{ }; use proto::{ common::{instance::InstanceId, service::ServiceId}, - well_known::{PROXY_FORWARDED_HEADER_NAME, PROXY_INSTANCE_HEADER_NAME}, + well_known::{PROXY_FORWARDED_HEADER_NAME, PROXY_INSTANCE_HEADER_NAME, WORKER_PROXY_PORT}, }; +use tracing::{instrument, trace, warn}; use utils::http::{self, OptionExt as _, ResultExt as _}; #[derive(Default)] @@ -58,11 +59,11 @@ impl BalancerState { ) } - pub fn next(&self, service: &ServiceId) -> (InstanceId, IpAddr) { + pub fn next(&self, service: &ServiceId) -> Option<(InstanceId, IpAddr)> { let map = self.addrs.lock().unwrap(); - let bag = map.get(service).unwrap(); + let bag = map.get(service)?; let count = bag.count.fetch_add(1, Ordering::Relaxed); - bag.instances[count % bag.instances.len()] + Some(bag.instances[count % bag.instances.len()]) } } @@ -72,43 +73,48 @@ pub struct BalancerHandle { impl BalancerHandle { #[allow(dead_code)] - pub fn add_instance(&mut self, id: ServiceId, at: (InstanceId, IpAddr)) { + pub fn add_instance(&self, id: ServiceId, instance_id: InstanceId, addr: IpAddr) { let mut map = self.addrs.lock().unwrap(); let bag = map.entry(id).or_default(); - bag.instances.push(at); + bag.instances.push((instance_id, addr)); } #[allow(dead_code)] - pub fn drop_instance(&mut self, id: &ServiceId, at: (InstanceId, IpAddr)) { + pub fn drop_instance(&self, id: &ServiceId, instance_id: InstanceId) { let mut map = self.addrs.lock().unwrap(); let Some(bag) = map.get_mut(id) else { + warn!("attempted to drop instance from unknown service id"); return; }; - bag.instances - .retain(|(inst, addr)| inst == &at.0 && addr == &at.1); + // Remove the instance (keep all except this one) + bag.instances.retain(|(inst, _)| inst != &instance_id); } } +#[instrument(skip_all)] pub async fn proxy( ConnectInfo(addr): ConnectInfo, State(balancer): State, mut req: Request, ) -> http::Result { - let service = extract_service_id(&mut req)?; + let service_id = extract_service_id(&mut req)?; - let (instance, server_addr) = balancer.next(&service); + let (instance_id, server_addr) = balancer + .next(&service_id) + .or_http_error(StatusCode::NOT_FOUND, "service not found")?; + trace!(%service_id, %instance_id, %server_addr, "received and balanced user request"); *req.uri_mut() = { let uri = req.uri(); let mut parts = uri.clone().into_parts(); - parts.authority = Authority::from_str(&server_addr.to_string()).ok(); + parts.authority = Authority::from_str(&format!("{server_addr}:{WORKER_PROXY_PORT}")).ok(); parts.scheme = Some(Scheme::HTTP); Uri::from_parts(parts).unwrap() }; req.headers_mut().insert( PROXY_INSTANCE_HEADER_NAME, - HeaderValue::from_str(&instance.to_string()).unwrap(), + HeaderValue::from_str(&instance_id.to_string()).unwrap(), ); req.headers_mut().insert( PROXY_FORWARDED_HEADER_NAME, diff --git a/ctl/src/deployer/instance.rs b/ctl/src/deployer/instance.rs index 474ca30..6bad058 100644 --- a/ctl/src/deployer/instance.rs +++ b/ctl/src/deployer/instance.rs @@ -1,12 +1,15 @@ -use std::net::IpAddr; +use std::{net::IpAddr, sync::Arc}; use proto::{ - common::instance::{self, InstanceId, InstanceSpec}, + common::{ + instance::{self, InstanceId, InstanceSpec}, + service::ServiceId, + }, ctl::deployer::DeploymentId, well_known::{MAX_INSTANCE_DEPLOY_RETRIES, MAX_INSTANCE_TERMINATION_RETRIES}, worker::runner::{DeployInstanceRes, TerminateInstanceRes}, }; -use tracing::{instrument, warn}; +use tracing::{instrument, trace, warn}; use utils::fmt::ElideDebug; use crate::deployer::Deployer; @@ -48,7 +51,7 @@ pub fn next(d: &mut Deployer, current: StateCtx, t: Transition) -> StateCtx { } (Deploying { .. }, t::Status(s::Started)) => { - // + propagate_to_balancer(d, ¤t, Balancer::Include); current.trans_into(Started) } @@ -77,20 +80,24 @@ pub fn next(d: &mut Deployer, current: StateCtx, t: Transition) -> StateCtx { (Started, t::Status(s::Terminated { .. })) => { warn!("instance unexpectedly terminated"); + propagate_to_balancer(d, ¤t, Balancer::Remove); current.trans_into(UnexpectedTerminated) } (Started, t::Status(s::Crashed { .. })) => { warn!("instance unexpectedly crashed"); + propagate_to_balancer(d, ¤t, Balancer::Remove); current.trans_into(UnexpectedCrashed) } (Started, t::Status(s::Killed { .. })) => { warn!("instance was killed"); + propagate_to_balancer(d, ¤t, Balancer::Remove); current.trans_into(UnexpectedCrashed) } (Started, t::Terminate) => { + propagate_to_balancer(d, ¤t, Balancer::Remove); schedule_instance_termination(d, ¤t); current.trans_into(Terminating { attempt: INITIAL_ATTEMPT, @@ -127,16 +134,23 @@ pub struct StateCtx { id: InstanceId, /// The address of the worker in which this instance lives. worker_addr: IpAddr, + service_id: Arc, deployment_id: DeploymentId, } impl StateCtx { - pub fn new_init(id: InstanceId, worker_addr: IpAddr, deployment_id: DeploymentId) -> Self { + pub fn new_init( + id: InstanceId, + worker_addr: IpAddr, + deployment_id: DeploymentId, + service_id: Arc, + ) -> Self { StateCtx { state: State::Init, id, worker_addr, deployment_id, + service_id, } } @@ -284,3 +298,19 @@ fn schedule_instance_termination_reattempt( current.trans_into(State::FailedToTerminate) } } + +#[derive(Debug, Copy, Clone)] +enum Balancer { + Include, + Remove, +} + +fn propagate_to_balancer(d: &mut Deployer, ctx: &StateCtx, action: Balancer) { + trace!(?action, "propagating changes to balancer"); + let s_id = ctx.service_id.as_ref().clone(); + let addr = ctx.worker_addr; + match action { + Balancer::Include => d.h.balancer.add_instance(s_id, ctx.id, addr), + Balancer::Remove => d.h.balancer.drop_instance(&s_id, ctx.id), + } +} diff --git a/ctl/src/deployer/mod.rs b/ctl/src/deployer/mod.rs index 90f38b2..970b35a 100644 --- a/ctl/src/deployer/mod.rs +++ b/ctl/src/deployer/mod.rs @@ -14,10 +14,11 @@ use tokio::{ sync::{mpsc, oneshot}, task::JoinSet, }; -use tracing::{debug, instrument, warn}; +use tracing::{instrument, trace, warn}; use uuid::Uuid; use crate::{ + balancer::BalancerHandle, deployer::instance::{TerminalKind, Transition}, worker_mgr::WorkerMgrHandle, }; @@ -40,6 +41,7 @@ pub struct Deployer { struct DeployerHandles { deployer_handle: DeployerHandle, + balancer: BalancerHandle, worker_mgr: WorkerMgrHandle, worker_client: WorkerClient, } @@ -47,6 +49,7 @@ struct DeployerHandles { impl Deployer { #[must_use] pub fn new( + balancer: BalancerHandle, worker_mgr: WorkerMgrHandle, worker_client: WorkerClient, ) -> (Deployer, DeployerHandle) { @@ -56,6 +59,7 @@ impl Deployer { rx, h: Arc::new(DeployerHandles { deployer_handle: handle.clone(), + balancer, worker_mgr, worker_client, }), @@ -114,18 +118,24 @@ impl Deployer { } async fn handle_deploy_service(&mut self, spec: ServiceSpec) -> eyre::Result { - debug!(?spec, "deploying service"); + trace!(?spec, "deploying service"); let workers = self.h.worker_mgr.query_workers().await; if workers.is_empty() { bail!("no workers on cluster pool"); } let instances = alloc::rand_many(&workers, spec.concurrency); let deployment_id = DeploymentId(Uuid::now_v7()); + let service_id = Arc::new(spec.service_id.clone()); let instances = instances // For each allocated instance, schedule a deploy. .inspect(|&(instance_id, worker_addr)| { - self.add_instance_init_state(instance_id, worker_addr, deployment_id); + self.add_instance_init_state( + instance_id, + worker_addr, + deployment_id, + service_id.clone(), + ); let spec = InstanceSpec::from_service_spec_cloned(&spec, instance_id).into(); self.trans_instance_state(instance_id, Transition::Deploy { spec }); @@ -146,10 +156,15 @@ impl Deployer { _ = self; } - fn add_instance_init_state(&mut self, id: InstanceId, worker_addr: IpAddr, d_id: DeploymentId) { - let opt = self - .instance_statems - .insert(id, instance::StateCtx::new_init(id, worker_addr, d_id)); + fn add_instance_init_state( + &mut self, + id: InstanceId, + worker_addr: IpAddr, + d_id: DeploymentId, + s_id: Arc, + ) { + let s = instance::StateCtx::new_init(id, worker_addr, d_id, s_id); + let opt = self.instance_statems.insert(id, s); // We have just generated a new ID (in Self::handle_deploy_service), so // this case shouldn't be possible. @@ -163,9 +178,9 @@ impl Deployer { return; }; - debug!(state = ?statem.state(), "transitioned from"); + trace!(state = ?statem.state(), "transitioned from"); let next = instance::next(self, statem, t); - debug!(state = ?next.state(), "transitioned to"); + trace!(state = ?next.state(), "transitioned to"); match next.state().kind() { TerminalKind::NonTerminal => { diff --git a/ctl/src/main.rs b/ctl/src/main.rs index 8368e05..a1aee4b 100644 --- a/ctl/src/main.rs +++ b/ctl/src/main.rs @@ -45,7 +45,7 @@ async fn main() -> eyre::Result<()> { worker_mgr.run().await; }); - let (balancer, _balancer_handle) = BalancerState::new(); + let (balancer, balancer_handle) = BalancerState::new(); bag.spawn(async move { let app = balancer::proxy .with_state(balancer) @@ -54,7 +54,8 @@ async fn main() -> eyre::Result<()> { axum::serve(balancer_listener, app).await.unwrap(); }); - let (deployer, deployer_handle) = Deployer::new(worker_mgr_handle.clone(), worker_client); + let (deployer, deployer_handle) = + Deployer::new(balancer_handle, worker_mgr_handle.clone(), worker_client); bag.spawn(async move { deployer.run().await; }); diff --git a/ctl/src/worker_mgr.rs b/ctl/src/worker_mgr.rs index a7d976c..81853f5 100644 --- a/ctl/src/worker_mgr.rs +++ b/ctl/src/worker_mgr.rs @@ -58,23 +58,27 @@ impl WorkerMgr { } } + #[instrument(skip_all)] async fn handle_msg(&mut self, msg: Msg) { - trace!(?msg, "got msg"); match msg { - Msg::Hello(addr, reply) => { - _ = reply.send(self.handle_hello(addr)); + Msg::Hello(worker_addr, reply) => { + trace!(?worker_addr, "got hello"); + _ = reply.send(self.handle_hello(worker_addr)); } - Msg::Bye(addr) => { - self.handle_bye(addr); + Msg::Bye(worker_addr) => { + trace!(?worker_addr, "got bye"); + self.handle_bye(worker_addr); } - Msg::PushMetrics(a, m, reply) => { - _ = reply.send(self.handle_push_metrics(a, m)); + Msg::PushMetrics(worker_addr, m, reply) => { + trace!(?worker_addr, "got metrics"); + _ = reply.send(self.handle_push_metrics(worker_addr, m)); } Msg::QueryWorkers(reply) => { let workers = self.workers.values().cloned().collect(); _ = reply.send(workers); } Msg::Tick(instant) => { + trace!("got tick"); self.handle_tick(instant).await; } } diff --git a/proto/src/common/service.rs b/proto/src/common/service.rs index 11689d2..1521dce 100644 --- a/proto/src/common/service.rs +++ b/proto/src/common/service.rs @@ -1,3 +1,5 @@ +use std::fmt; + use serde::{Deserialize, Serialize}; /// The service ID (i.e., its name). @@ -6,6 +8,12 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct ServiceId(pub String); +impl fmt::Display for ServiceId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + /// The service image. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct ServiceImage(pub String); diff --git a/tests/containers/index.mjs b/tests/containers/index.mjs index 7a4ad1f..acb3304 100644 --- a/tests/containers/index.mjs +++ b/tests/containers/index.mjs @@ -3,8 +3,9 @@ import { createServer } from "node:http"; const server = createServer((req, res) => { - const fwd = req.headers["X-Tuc-Fwd-For"] || ""; - res.end(`hello, world! (fwd by ${fwd})`); + const fwd = req.headers["x-tuc-fwd-for"] || "no X-Tuc-Fwd-For"; + const inst = req.headers["x-tuc-inst"] || "no X-Tuc-Inst"; + res.end(`hello, world! (fwd by <${fwd}>) (inst <${inst}>)`); }); const port = process.env.PORT; diff --git a/utils/src/http.rs b/utils/src/http.rs index bba05c9..9fcc9ff 100644 --- a/utils/src/http.rs +++ b/utils/src/http.rs @@ -16,11 +16,22 @@ pub struct Error { impl Error { /// Constructs a new error with a public message and a status code. + #[track_caller] pub fn public(status: StatusCode, msg: impl Into>) -> Self { let msg = msg.into(); - let inner = eyre::Report::msg(msg.clone()); + Self::public_with(eyre::Report::msg(msg.clone()), status, msg) + } + + /// Constructs a new error with a inner error, a public message, and a + /// status code. + pub fn public_with( + inner: impl Into, + status: StatusCode, + msg: impl Into>, + ) -> Self { + let msg = msg.into(); Self { - inner, + inner: inner.into(), public: Some((status, msg)), } } @@ -38,8 +49,9 @@ impl IntoResponse for Error { }, }); + let error = self.inner; if StatusCode::INTERNAL_SERVER_ERROR <= status { - error!(error = ?self.inner, "server error http response"); + error!(?error, %status, "server error http response"); } (status, Json(json)).into_response() } @@ -60,6 +72,7 @@ impl ResultExt for Result where E: Into, { + #[track_caller] fn http_error(self, status: StatusCode, msg: impl Into>) -> Result { match self { Ok(ok) => Ok(ok), @@ -80,6 +93,7 @@ pub trait OptionExt { } impl OptionExt for Option { + #[track_caller] fn or_http_error( self, status: StatusCode, diff --git a/worker/src/monitor/pusher.rs b/worker/src/monitor/pusher.rs index a197c87..3ff66a1 100644 --- a/worker/src/monitor/pusher.rs +++ b/worker/src/monitor/pusher.rs @@ -4,13 +4,13 @@ use chrono::Utc; use eyre::Context as _; use proto::{clients::CtlClient, ctl::worker::PushMetricsStatus}; use tokio::time::sleep; -use tracing::{debug, error}; +use tracing::{error, trace}; use crate::{args::WorkerArgs, monitor::collector::MetricsCollector}; pub async fn start_pusher(args: Arc, ctl_client: CtlClient) -> eyre::Result<()> { let mut metrics_report: MetricsCollector = MetricsCollector::new(); - debug!("pusher started"); + trace!("pusher started"); // Try to join the cluster ctl_client @@ -19,7 +19,7 @@ pub async fn start_pusher(args: Arc, ctl_client: CtlClient) -> eyre: .wrap_err("worker failed to join the cluster")?; loop { - debug!("sending metrics"); + trace!("sending metrics"); let metrics = metrics_report.get_metrics(); let now = Utc::now(); diff --git a/worker/src/proxy.rs b/worker/src/proxy.rs index dda1c00..ecd9c0c 100644 --- a/worker/src/proxy.rs +++ b/worker/src/proxy.rs @@ -21,17 +21,20 @@ use hyper_util::{ }; use proto::{common::instance::InstanceId, well_known::PROXY_INSTANCE_HEADER_NAME}; use reqwest::StatusCode; +use tracing::{instrument, trace}; use utils::http::{self, OptionExt as _, ResultExt as _}; +#[instrument(skip_all)] pub async fn proxy( State(proxy): State, mut req: Request, ) -> http::Result { - let id = extract_instance_id(&mut req)?; + let instance_id = extract_instance_id(&mut req)?; + trace!(%instance_id, "received user request"); let maybe_port = { let read_map = proxy.ports.read().unwrap(); - read_map.get(&id).copied() + read_map.get(&instance_id).copied() }; let port = maybe_port .ok_or_else(|| eyre::eyre!("requested instance doesn't exist at requested worker")) @@ -97,7 +100,7 @@ fn extract_instance_id(req: &mut Request) -> http::Result { // i'm so sorry let inner = req .headers_mut() - .remove(PROXY_INSTANCE_HEADER_NAME) + .get(PROXY_INSTANCE_HEADER_NAME) .or_http_error(StatusCode::BAD_REQUEST, "missing instance id from gw")? .to_str() .ok() diff --git a/worker/src/runner/container_rt.rs b/worker/src/runner/container_rt.rs index 7e0f958..659fab6 100644 --- a/worker/src/runner/container_rt.rs +++ b/worker/src/runner/container_rt.rs @@ -14,7 +14,7 @@ use proto::{ common::instance::{InstanceId, InstanceSpec, Status}, well_known::GRACEFUL_SHUTDOWN_DEADLINE, }; -use tracing::{debug, error, instrument}; +use tracing::{error, instrument, trace}; use super::RunnerHandle; @@ -36,7 +36,7 @@ impl ContainerRuntime { handle: RunnerHandle, ) { let container_name = Self::create_container_name(spec.instance_id); - debug!(?spec, container_name, "running instance lifecycle"); + trace!(?spec, container_name, "running instance lifecycle"); if let Err(error) = self .create_and_run(&spec, port, container_name.clone()) @@ -51,7 +51,7 @@ impl ContainerRuntime { } // TODO: Add health check to verify whether the service is running - debug!("container running"); + trace!("container running"); handle .report_instance_status(spec.instance_id, Status::Started) .await; @@ -62,7 +62,7 @@ impl ContainerRuntime { .expect("infallible operation") { ExitStatus::Terminated => { - debug!("container terminated"); + trace!("container terminated"); handle .report_instance_status(spec.instance_id, Status::Terminated) .await; @@ -102,10 +102,10 @@ impl ContainerRuntime { name: String, ) -> eyre::Result<()> { let create_response = self.create_container(spec, port, name.clone()).await?; - debug!("successfully `create` operation"); + trace!("successfully `create` operation"); self.run_container(create_response).await?; - debug!("successfully `run` operation"); + trace!("successfully `run` operation"); Ok(()) } diff --git a/worker/src/runner/mod.rs b/worker/src/runner/mod.rs index e402526..f58954b 100644 --- a/worker/src/runner/mod.rs +++ b/worker/src/runner/mod.rs @@ -15,7 +15,7 @@ use tokio::{ sync::{mpsc, oneshot}, task, }; -use tracing::{debug, error}; +use tracing::{error, trace}; mod container_rt; use crate::proxy::ProxyHandle; @@ -106,7 +106,7 @@ impl Runner { let ctl_client = self.ctl_client.clone(); tokio::spawn(async move { - debug!(?instance_id, ?status, "reporting status"); + trace!(?instance_id, ?status, "reporting status"); if let Err(error) = ctl_client.report_instance_status(instance_id, status).await { error!(?error, "failed to report instance status"); }