Skip to content

Commit

Permalink
feat(ctl): glue worker and deployer
Browse files Browse the repository at this point in the history
  • Loading branch information
lffg committed Jun 21, 2024
1 parent 6921181 commit 21f724c
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 55 deletions.
32 changes: 19 additions & 13 deletions ctl/src/balancer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use hyper_util::{
};
use proto::{
common::{instance::InstanceId, service::ServiceId},
well_known::{PROXY_FORWARDED_HEADER_NAME, PROXY_INSTANCE_HEADER_NAME},
well_known::{PROXY_FORWARDED_HEADER_NAME, PROXY_INSTANCE_HEADER_NAME, WORKER_PROXY_PORT},
};
use tracing::{instrument, trace, warn};
use utils::http::{self, OptionExt as _, ResultExt as _};

#[derive(Default)]
Expand Down Expand Up @@ -58,11 +59,11 @@ impl BalancerState {
)
}

pub fn next(&self, service: &ServiceId) -> (InstanceId, IpAddr) {
pub fn next(&self, service: &ServiceId) -> Option<(InstanceId, IpAddr)> {
let map = self.addrs.lock().unwrap();
let bag = map.get(service).unwrap();
let bag = map.get(service)?;
let count = bag.count.fetch_add(1, Ordering::Relaxed);
bag.instances[count % bag.instances.len()]
Some(bag.instances[count % bag.instances.len()])
}
}

Expand All @@ -72,43 +73,48 @@ pub struct BalancerHandle {

impl BalancerHandle {
#[allow(dead_code)]
pub fn add_instance(&mut self, id: ServiceId, at: (InstanceId, IpAddr)) {
pub fn add_instance(&self, id: ServiceId, instance_id: InstanceId, addr: IpAddr) {
let mut map = self.addrs.lock().unwrap();
let bag = map.entry(id).or_default();
bag.instances.push(at);
bag.instances.push((instance_id, addr));
}

#[allow(dead_code)]
pub fn drop_instance(&mut self, id: &ServiceId, at: (InstanceId, IpAddr)) {
pub fn drop_instance(&self, id: &ServiceId, instance_id: InstanceId) {
let mut map = self.addrs.lock().unwrap();
let Some(bag) = map.get_mut(id) else {
warn!("attempted to drop instance from unknown service id");
return;
};
bag.instances
.retain(|(inst, addr)| inst == &at.0 && addr == &at.1);
// Remove the instance (keep all except this one)
bag.instances.retain(|(inst, _)| inst != &instance_id);
}
}

#[instrument(skip_all)]
pub async fn proxy(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(balancer): State<BalancerState>,
mut req: Request,
) -> http::Result<impl IntoResponse> {
let service = extract_service_id(&mut req)?;
let service_id = extract_service_id(&mut req)?;

let (instance, server_addr) = balancer.next(&service);
let (instance_id, server_addr) = balancer
.next(&service_id)
.or_http_error(StatusCode::NOT_FOUND, "service not found")?;
trace!(%service_id, %instance_id, %server_addr, "received and balanced user request");

*req.uri_mut() = {
let uri = req.uri();
let mut parts = uri.clone().into_parts();
parts.authority = Authority::from_str(&server_addr.to_string()).ok();
parts.authority = Authority::from_str(&format!("{server_addr}:{WORKER_PROXY_PORT}")).ok();
parts.scheme = Some(Scheme::HTTP);
Uri::from_parts(parts).unwrap()
};

req.headers_mut().insert(
PROXY_INSTANCE_HEADER_NAME,
HeaderValue::from_str(&instance.to_string()).unwrap(),
HeaderValue::from_str(&instance_id.to_string()).unwrap(),
);
req.headers_mut().insert(
PROXY_FORWARDED_HEADER_NAME,
Expand Down
40 changes: 35 additions & 5 deletions ctl/src/deployer/instance.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::net::IpAddr;
use std::{net::IpAddr, sync::Arc};

use proto::{
common::instance::{self, InstanceId, InstanceSpec},
common::{
instance::{self, InstanceId, InstanceSpec},
service::ServiceId,
},
ctl::deployer::DeploymentId,
well_known::{MAX_INSTANCE_DEPLOY_RETRIES, MAX_INSTANCE_TERMINATION_RETRIES},
worker::runner::{DeployInstanceRes, TerminateInstanceRes},
};
use tracing::{instrument, warn};
use tracing::{instrument, trace, warn};
use utils::fmt::ElideDebug;

use crate::deployer::Deployer;
Expand Down Expand Up @@ -48,7 +51,7 @@ pub fn next(d: &mut Deployer, current: StateCtx, t: Transition) -> StateCtx {
}

(Deploying { .. }, t::Status(s::Started)) => {
//
propagate_to_balancer(d, &current, Balancer::Include);
current.trans_into(Started)
}

Expand Down Expand Up @@ -77,20 +80,24 @@ pub fn next(d: &mut Deployer, current: StateCtx, t: Transition) -> StateCtx {

(Started, t::Status(s::Terminated { .. })) => {
warn!("instance unexpectedly terminated");
propagate_to_balancer(d, &current, Balancer::Remove);
current.trans_into(UnexpectedTerminated)
}

(Started, t::Status(s::Crashed { .. })) => {
warn!("instance unexpectedly crashed");
propagate_to_balancer(d, &current, Balancer::Remove);
current.trans_into(UnexpectedCrashed)
}

(Started, t::Status(s::Killed { .. })) => {
warn!("instance was killed");
propagate_to_balancer(d, &current, Balancer::Remove);
current.trans_into(UnexpectedCrashed)
}

(Started, t::Terminate) => {
propagate_to_balancer(d, &current, Balancer::Remove);
schedule_instance_termination(d, &current);
current.trans_into(Terminating {
attempt: INITIAL_ATTEMPT,
Expand Down Expand Up @@ -127,16 +134,23 @@ pub struct StateCtx {
id: InstanceId,
/// The address of the worker in which this instance lives.
worker_addr: IpAddr,
service_id: Arc<ServiceId>,
deployment_id: DeploymentId,
}

impl StateCtx {
pub fn new_init(id: InstanceId, worker_addr: IpAddr, deployment_id: DeploymentId) -> Self {
pub fn new_init(
id: InstanceId,
worker_addr: IpAddr,
deployment_id: DeploymentId,
service_id: Arc<ServiceId>,
) -> Self {
StateCtx {
state: State::Init,
id,
worker_addr,
deployment_id,
service_id,
}
}

Expand Down Expand Up @@ -284,3 +298,19 @@ fn schedule_instance_termination_reattempt(
current.trans_into(State::FailedToTerminate)
}
}

#[derive(Debug, Copy, Clone)]
enum Balancer {
Include,
Remove,
}

fn propagate_to_balancer(d: &mut Deployer, ctx: &StateCtx, action: Balancer) {
trace!(?action, "propagating changes to balancer");
let s_id = ctx.service_id.as_ref().clone();
let addr = ctx.worker_addr;
match action {
Balancer::Include => d.h.balancer.add_instance(s_id, ctx.id, addr),
Balancer::Remove => d.h.balancer.drop_instance(&s_id, ctx.id),
}
}
33 changes: 24 additions & 9 deletions ctl/src/deployer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ use tokio::{
sync::{mpsc, oneshot},
task::JoinSet,
};
use tracing::{debug, instrument, warn};
use tracing::{instrument, trace, warn};
use uuid::Uuid;

use crate::{
balancer::BalancerHandle,
deployer::instance::{TerminalKind, Transition},
worker_mgr::WorkerMgrHandle,
};
Expand All @@ -40,13 +41,15 @@ pub struct Deployer {

struct DeployerHandles {
deployer_handle: DeployerHandle,
balancer: BalancerHandle,
worker_mgr: WorkerMgrHandle,
worker_client: WorkerClient,
}

impl Deployer {
#[must_use]
pub fn new(
balancer: BalancerHandle,
worker_mgr: WorkerMgrHandle,
worker_client: WorkerClient,
) -> (Deployer, DeployerHandle) {
Expand All @@ -56,6 +59,7 @@ impl Deployer {
rx,
h: Arc::new(DeployerHandles {
deployer_handle: handle.clone(),
balancer,
worker_mgr,
worker_client,
}),
Expand Down Expand Up @@ -114,18 +118,24 @@ impl Deployer {
}

async fn handle_deploy_service(&mut self, spec: ServiceSpec) -> eyre::Result<DeployServiceRes> {
debug!(?spec, "deploying service");
trace!(?spec, "deploying service");
let workers = self.h.worker_mgr.query_workers().await;
if workers.is_empty() {
bail!("no workers on cluster pool");
}
let instances = alloc::rand_many(&workers, spec.concurrency);
let deployment_id = DeploymentId(Uuid::now_v7());
let service_id = Arc::new(spec.service_id.clone());

let instances = instances
// For each allocated instance, schedule a deploy.
.inspect(|&(instance_id, worker_addr)| {
self.add_instance_init_state(instance_id, worker_addr, deployment_id);
self.add_instance_init_state(
instance_id,
worker_addr,
deployment_id,
service_id.clone(),
);

let spec = InstanceSpec::from_service_spec_cloned(&spec, instance_id).into();
self.trans_instance_state(instance_id, Transition::Deploy { spec });
Expand All @@ -146,10 +156,15 @@ impl Deployer {
_ = self;
}

fn add_instance_init_state(&mut self, id: InstanceId, worker_addr: IpAddr, d_id: DeploymentId) {
let opt = self
.instance_statems
.insert(id, instance::StateCtx::new_init(id, worker_addr, d_id));
fn add_instance_init_state(
&mut self,
id: InstanceId,
worker_addr: IpAddr,
d_id: DeploymentId,
s_id: Arc<ServiceId>,
) {
let s = instance::StateCtx::new_init(id, worker_addr, d_id, s_id);
let opt = self.instance_statems.insert(id, s);

// We have just generated a new ID (in Self::handle_deploy_service), so
// this case shouldn't be possible.
Expand All @@ -163,9 +178,9 @@ impl Deployer {
return;
};

debug!(state = ?statem.state(), "transitioned from");
trace!(state = ?statem.state(), "transitioned from");
let next = instance::next(self, statem, t);
debug!(state = ?next.state(), "transitioned to");
trace!(state = ?next.state(), "transitioned to");

match next.state().kind() {
TerminalKind::NonTerminal => {
Expand Down
5 changes: 3 additions & 2 deletions ctl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> eyre::Result<()> {
worker_mgr.run().await;
});

let (balancer, _balancer_handle) = BalancerState::new();
let (balancer, balancer_handle) = BalancerState::new();
bag.spawn(async move {
let app = balancer::proxy
.with_state(balancer)
Expand All @@ -54,7 +54,8 @@ async fn main() -> eyre::Result<()> {
axum::serve(balancer_listener, app).await.unwrap();
});

let (deployer, deployer_handle) = Deployer::new(worker_mgr_handle.clone(), worker_client);
let (deployer, deployer_handle) =
Deployer::new(balancer_handle, worker_mgr_handle.clone(), worker_client);
bag.spawn(async move {
deployer.run().await;
});
Expand Down
18 changes: 11 additions & 7 deletions ctl/src/worker_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,27 @@ impl WorkerMgr {
}
}

#[instrument(skip_all)]
async fn handle_msg(&mut self, msg: Msg) {
trace!(?msg, "got msg");
match msg {
Msg::Hello(addr, reply) => {
_ = reply.send(self.handle_hello(addr));
Msg::Hello(worker_addr, reply) => {
trace!(?worker_addr, "got hello");
_ = reply.send(self.handle_hello(worker_addr));
}
Msg::Bye(addr) => {
self.handle_bye(addr);
Msg::Bye(worker_addr) => {
trace!(?worker_addr, "got bye");
self.handle_bye(worker_addr);
}
Msg::PushMetrics(a, m, reply) => {
_ = reply.send(self.handle_push_metrics(a, m));
Msg::PushMetrics(worker_addr, m, reply) => {
trace!(?worker_addr, "got metrics");
_ = reply.send(self.handle_push_metrics(worker_addr, m));
}
Msg::QueryWorkers(reply) => {
let workers = self.workers.values().cloned().collect();
_ = reply.send(workers);
}
Msg::Tick(instant) => {
trace!("got tick");
self.handle_tick(instant).await;
}
}
Expand Down
8 changes: 8 additions & 0 deletions proto/src/common/service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt;

use serde::{Deserialize, Serialize};

/// The service ID (i.e., its name).
Expand All @@ -6,6 +8,12 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ServiceId(pub String);

impl fmt::Display for ServiceId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

/// The service image.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ServiceImage(pub String);
Expand Down
5 changes: 3 additions & 2 deletions tests/containers/index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import { createServer } from "node:http";

const server = createServer((req, res) => {
const fwd = req.headers["X-Tuc-Fwd-For"] || "<no `X-Tuc-Fwd-For`>";
res.end(`hello, world! (fwd by ${fwd})`);
const fwd = req.headers["x-tuc-fwd-for"] || "no X-Tuc-Fwd-For";
const inst = req.headers["x-tuc-inst"] || "no X-Tuc-Inst";
res.end(`hello, world! (fwd by <${fwd}>) (inst <${inst}>)`);
});

const port = process.env.PORT;
Expand Down
Loading

0 comments on commit 21f724c

Please sign in to comment.