From cef4069233b297b468a607c7ab18764c4063669a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luiz=20Felipe=20Gon=C3=A7alves?= Date: Fri, 21 Jun 2024 03:18:27 -0300 Subject: [PATCH] finish basic --- TODO.md | 16 +++ ctl/src/deployer/alloc.rs | 2 +- ctl/src/deployer/instance.rs | 240 +++++++++++++++++++++++------------ ctl/src/deployer/mod.rs | 146 +++++++++++++++++---- ctl/src/http/deployer.rs | 44 ++++--- ctl/src/http/mod.rs | 3 +- ctl/src/main.rs | 20 ++- ctl/src/worker_mgr.rs | 1 - docs/state.md | 44 ------- docs/statem.md | 91 +++++++++++++ proto/src/common/instance.rs | 21 ++- utils/src/fmt.rs | 30 +++++ utils/src/lib.rs | 1 + 13 files changed, 484 insertions(+), 175 deletions(-) create mode 100644 TODO.md delete mode 100644 docs/state.md create mode 100644 docs/statem.md create mode 100644 utils/src/fmt.rs diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..3fed6d2 --- /dev/null +++ b/TODO.md @@ -0,0 +1,16 @@ +# High-level + +- Distribute control-plane through multiple controller nodes (we currently only + support one controlling node per cluster). + +# Mid-level + +- Sysadmin notifier. +- Persist node states across crashes. +- Support more service redeployment policies. +- Add correlation IDs and error correlation IDs. + +# Refactors + +- Rename "deploy_instance" operation to "start_instance". This way, we keep it + consistent with, e.g., the instance state machine. diff --git a/ctl/src/deployer/alloc.rs b/ctl/src/deployer/alloc.rs index ae76cf1..af8c31d 100644 --- a/ctl/src/deployer/alloc.rs +++ b/ctl/src/deployer/alloc.rs @@ -20,6 +20,6 @@ pub fn rand_many( } /// Randomly allocates a single instance from the provided pool of `workers`. -pub fn rand_single(workers: &[WorkerDetails]) -> (InstanceId, IpAddr) { +pub fn _rand_single(workers: &[WorkerDetails]) -> (InstanceId, IpAddr) { rand_many(workers, 1).next().unwrap() } diff --git a/ctl/src/deployer/instance.rs b/ctl/src/deployer/instance.rs index bd5256c..474ca30 100644 --- a/ctl/src/deployer/instance.rs +++ b/ctl/src/deployer/instance.rs @@ -2,10 +2,12 @@ use std::net::IpAddr; use proto::{ common::instance::{self, InstanceId, InstanceSpec}, - ctl::deployer::RedeploymentPolicy, - well_known::{MAX_INSTANCE_DEPLOY_RETRIES, MAX_INSTANCE_TERMINATION_RETRIES}, worker::runner::DeployInstanceRes, + ctl::deployer::DeploymentId, + well_known::{MAX_INSTANCE_DEPLOY_RETRIES, MAX_INSTANCE_TERMINATION_RETRIES}, + worker::runner::{DeployInstanceRes, TerminateInstanceRes}, }; -use tracing::warn; +use tracing::{instrument, warn}; +use utils::fmt::ElideDebug; use crate::deployer::Deployer; @@ -17,45 +19,41 @@ const INITIAL_ATTEMPT: u8 = 1; /// Before returning the next state, this function may also schedule some /// background worker task that will *eventually* produce another transition /// message. -pub fn next(current: StateCtx, t: Transition, d: &mut Deployer) -> StateCtx { +#[ + // Notice that through this span we log eventual errors. + instrument(skip(d)) +] +pub fn next(d: &mut Deployer, current: StateCtx, t: Transition) -> StateCtx { use instance::Status as s; use State::*; use Transition as t; - match (current.state, t) { - (Init, t::Deploy { worker_addr, spec }) => current.trans_into(Deploying { - attempt: INITIAL_ATTEMPT, - worker_addr, - spec, - }), - ( - Deploying { - attempt, - worker_addr, + match (current.state.clone(), t) { + (Init, t::Deploy { spec }) => { + schedule_instance_deployment(d, ¤t, spec.get().clone()); + current.trans_into(Deploying { + attempt: INITIAL_ATTEMPT, spec, - }, - t::Status(s::FailedToStart { error }), - ) => { - warn!(?error, "failed deploy attempt #{attempt}"); - if attempt <= MAX_INSTANCE_DEPLOY_RETRIES { - let new_attempt = attempt + 1; - current.trans_into(Deploying { - attempt: new_attempt, - worker_addr, - spec, - }) - } else { - current.trans_into(FailedToStart) - } + }) + } + + (Deploying { attempt, spec }, t::FailedToDeploy(_error)) => { + warn!("failed to deploy (deployment attempt #{attempt}"); + schedule_instance_deployment_reattempt(d, current, attempt, spec.get().clone()) + } + + (Deploying { attempt, spec }, t::Status(s::FailedToStart { error: _ })) => { + warn!("failed to start (deployment attempt #{attempt})"); + schedule_instance_deployment_reattempt(d, current, attempt, spec.get().clone()) } (Deploying { .. }, t::Status(s::Started)) => { - // TODO + // current.trans_into(Started) } - (Deploying { .. }, t::Terminate(_)) => { - // TODO + (Deploying { .. }, t::Terminate) => { + // current.trans_into(PreTerminating) } @@ -66,56 +64,56 @@ pub fn next(current: StateCtx, t: Transition, d: &mut Deployer) -> StateCtx { }) } + (PreTerminating, t::FailedToDeploy(_error)) => { + warn!("failed to deploy instance"); + current.trans_into(NeverStarted) + } + (PreTerminating, t::Status(s::FailedToStart { .. })) => { + warn!("failed to start instance"); // TODO current.trans_into(NeverStarted) } (Started, t::Status(s::Terminated { .. })) => { - // TODO + warn!("instance unexpectedly terminated"); current.trans_into(UnexpectedTerminated) } (Started, t::Status(s::Crashed { .. })) => { - // TODO + warn!("instance unexpectedly crashed"); current.trans_into(UnexpectedCrashed) } (Started, t::Status(s::Killed { .. })) => { - // TODO + warn!("instance was killed"); current.trans_into(UnexpectedCrashed) } - (Started, t::Terminate(_)) => { - // TODO + (Started, t::Terminate) => { + schedule_instance_termination(d, ¤t); current.trans_into(Terminating { attempt: INITIAL_ATTEMPT, }) } - (Terminating { attempt }, t::FailedToTerminate) => { - if attempt <= MAX_INSTANCE_TERMINATION_RETRIES { - let new_attempt = attempt + 1; - current.trans_into(Terminating { - attempt: new_attempt, - }) - } else { - current.trans_into(FailedToTerminate) - } + (Terminating { attempt }, t::FailedToTerminate(_error)) => { + warn!("failed to terminate (termination attempt #{attempt})"); + schedule_instance_termination_reattempt(d, current.clone(), attempt) } (Terminating { .. }, t::Status(s::Terminated)) => { - // TODO + // current.trans_into(Terminated) } (Terminating { .. }, t::Status(s::Crashed { .. })) => { - // TODO + warn!("instance crashed"); current.trans_into(Crashed) } (Terminating { .. }, t::Status(s::Killed { .. })) => { - // TODO + warn!("instance was killed"); current.trans_into(Crashed) } @@ -123,72 +121,99 @@ pub fn next(current: StateCtx, t: Transition, d: &mut Deployer) -> StateCtx { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StateCtx { state: State, + id: InstanceId, /// The address of the worker in which this instance lives. worker_addr: IpAddr, - id: InstanceId, + deployment_id: DeploymentId, } impl StateCtx { - fn trans_into(mut self, next: State) -> StateCtx { - self.state = next; - self + pub fn new_init(id: InstanceId, worker_addr: IpAddr, deployment_id: DeploymentId) -> Self { + StateCtx { + state: State::Init, + id, + worker_addr, + deployment_id, + } } - fn trans_into_with_addr(mut self, next: State, next_addr: IpAddr) -> StateCtx { + pub fn state(&self) -> &State { + &self.state + } + + #[allow(dead_code)] + pub fn deployment_id(&self) -> DeploymentId { + self.deployment_id + } + + fn trans_into(mut self, next: State) -> StateCtx { self.state = next; - self.worker_addr = next_addr; self } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum State { Init, - Deploying { attempt: u8, spec: InstanceSpec }, + Deploying { + attempt: u8, + spec: ElideDebug, + }, FailedToStart, PreTerminating, NeverStarted, Started, UnexpectedTerminated, UnexpectedCrashed, - Terminating { attempt: u8 }, + Terminating { + attempt: u8, + }, Terminated, Crashed, FailedToTerminate, } impl State { - pub fn is_terminal(&self) -> bool { + #[allow(clippy::match_same_arms)] + pub fn kind(&self) -> TerminalKind { + use TerminalKind::*; match self { - State::Init => false, - State::Deploying { .. } => false, - State::FailedToStart => true, - State::PreTerminating => false, - State::NeverStarted => true, - State::Started => false, - State::UnexpectedTerminated => true, - State::UnexpectedCrashed => true, - State::Terminating { .. } => false, - State::Terminated => true, - State::Crashed => true, - State::FailedToTerminate => true, + State::Init => NonTerminal, + State::Deploying { .. } => NonTerminal, + State::FailedToStart => UnsuccessfulTerminal, + State::PreTerminating => NonTerminal, + State::NeverStarted => UnsuccessfulTerminal, + State::Started => NonTerminal, + State::UnexpectedTerminated => UnsuccessfulTerminal, + State::UnexpectedCrashed => UnsuccessfulTerminal, + State::Terminating { .. } => NonTerminal, + State::Terminated => SuccessfulTerminal, + State::Crashed => UnsuccessfulTerminal, + State::FailedToTerminate => UnsuccessfulTerminal, } } } +/// Describes whether a state machine state is terminal or not, and if a +/// terminal state is (or not) successful. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[allow(clippy::enum_variant_names)] +pub enum TerminalKind { + NonTerminal, + SuccessfulTerminal, + UnsuccessfulTerminal, +} + #[derive(Debug)] pub enum Transition { Deploy { - worker_addr: IpAddr, - spec: InstanceSpec, - }, - Terminate { - worker_addr: IpAddr, - id: InstanceId, + spec: ElideDebug, }, + #[allow(dead_code)] + Terminate, Status(instance::Status), // XX: For now, `FailedToTerminate` doesn't live in `instance::Status` since // we are not sure how to handle those corner error cases. In the future, we @@ -197,18 +222,65 @@ pub enum Transition { // must be able to recover from potential Docker failures (which are the // only sources of termination failures that the Tucano system can // encounter). - FailedToTerminate, + FailedToTerminate(eyre::Report), + FailedToDeploy(eyre::Report), } -fn deploy_instance(ctx: &StateCtx, spec: InstanceSpec, d: &mut Deployer) { +fn schedule_instance_deployment(d: &mut Deployer, ctx: &StateCtx, spec: InstanceSpec) { let worker_addr = ctx.worker_addr; d.instance_task(ctx.id, move |h| async move { - let result = h.worker_client - .deploy_instance(worker_addr, spec) - .await; + let result = h.worker_client.deploy_instance(worker_addr, spec).await; match result { - Ok(DeployInstanceRes {}) => Transition:: + Ok(DeployInstanceRes {}) => None, + Err(error) => Some(Transition::FailedToDeploy(error)), } - todo!() }); } + +fn schedule_instance_deployment_reattempt( + d: &mut Deployer, + current: StateCtx, + attempt: u8, + spec: InstanceSpec, +) -> StateCtx { + if attempt <= MAX_INSTANCE_DEPLOY_RETRIES { + schedule_instance_deployment(d, ¤t, spec.clone()); + + let new_attempt = attempt + 1; + current.trans_into(State::Deploying { + attempt: new_attempt, + spec: spec.into(), + }) + } else { + current.trans_into(State::FailedToStart) + } +} + +fn schedule_instance_termination(d: &mut Deployer, ctx: &StateCtx) { + let worker_addr = ctx.worker_addr; + let id = ctx.id; + d.instance_task(ctx.id, move |h| async move { + let result = h.worker_client.terminate_instance(worker_addr, id).await; + match result { + Ok(TerminateInstanceRes {}) => None, + Err(error) => Some(Transition::FailedToTerminate(error)), + } + }); +} + +fn schedule_instance_termination_reattempt( + d: &mut Deployer, + current: StateCtx, + attempt: u8, +) -> StateCtx { + if attempt <= MAX_INSTANCE_TERMINATION_RETRIES { + schedule_instance_termination(d, ¤t); + + let new_attempt = attempt + 1; + current.trans_into(State::Terminating { + attempt: new_attempt, + }) + } else { + current.trans_into(State::FailedToTerminate) + } +} diff --git a/ctl/src/deployer/mod.rs b/ctl/src/deployer/mod.rs index 1d2aa2d..8378f1e 100644 --- a/ctl/src/deployer/mod.rs +++ b/ctl/src/deployer/mod.rs @@ -1,36 +1,40 @@ -use std::{collections::HashMap, future::Future, os::unix::net::SocketAddr, sync::Arc}; +use std::{collections::HashMap, future::Future, net::IpAddr, sync::Arc}; -use chrono::{DateTime, Utc}; use proto::{ clients::WorkerClient, common::{ - instance::{InstanceId, InstanceSpec}, + instance::{self as proto_instance, InstanceId, InstanceSpec}, service::{ServiceId, ServiceSpec}, }, - ctl::deployer::DeploymentId, + ctl::deployer::{DeployServiceRes, DeploymentId}, }; use tokio::{ select, sync::{mpsc, oneshot}, task::JoinSet, }; -use tracing::{error, info, instrument}; +use tracing::{error, instrument, warn}; +use uuid::Uuid; -use crate::{deployer::instance::Transition, worker_mgr::WorkerMgrHandle}; +use crate::{ + deployer::instance::{TerminalKind, Transition}, + worker_mgr::WorkerMgrHandle, +}; mod alloc; mod instance; pub struct Deployer { rx: mpsc::Receiver, - tasks: JoinSet<()>, h: Arc, + /// Set of deployer-related background-running tasks. + tasks: JoinSet<()>, + /// Pending deployment state machine contexts. + _deployment_statems: HashMap, + /// Instance state machine contexts. + instance_statems: HashMap, /// Whether the deployer actor is terminating. terminating: bool, - // data records - services: HashMap, - instances: HashMap, - deployments: HashMap, } struct DeployerHandles { @@ -54,15 +58,15 @@ impl Deployer { worker_mgr, worker_client, }), - terminating: false, tasks: JoinSet::new(), - services: HashMap::new(), - instances: HashMap::new(), - deployments: HashMap::new(), + _deployment_statems: HashMap::new(), + instance_statems: HashMap::new(), + terminating: false, }; (actor, handle) } + #[allow(clippy::match_same_arms)] pub async fn run(mut self) { loop { select! { @@ -86,29 +90,79 @@ impl Deployer { } } + #[instrument(skip_all)] async fn handle_msg(&mut self, msg: Msg) { - info!(?msg, "deployer msg"); match msg { Msg::DeployService(spec, reply) => { - _ = reply.send(self.deploy_service(spec).await); + _ = reply.send(self.handle_deploy_service(spec).await); } Msg::TerminateService(id, reply) => { - _ = reply.send(self.terminate_service(id).await); + self.handle_terminate_service(&id); + _ = reply.send(Ok(())); + } + Msg::ReportInstanceStatus(id, status) => { + self.trans_instance_state(id, instance::Transition::Status(status)); + } + Msg::InstanceTransition(id, t) => { + self.trans_instance_state(id, t); } - Msg::InstanceTransition(_, _) => todo!(), } } - async fn deploy_service(&mut self, spec: ServiceSpec) -> eyre::Result<()> { + async fn handle_deploy_service(&mut self, spec: ServiceSpec) -> eyre::Result { let workers = self.h.worker_mgr.query_workers().await; let instances = alloc::rand_many(&workers, spec.concurrency); + let deployment_id = DeploymentId(Uuid::now_v7()); + + let instances = instances + // For each allocated instance, schedule a deploy. + .inspect(|&(instance_id, worker_addr)| { + self.add_instance_init_state(instance_id, worker_addr, deployment_id); + + let spec = InstanceSpec::from_service_spec_cloned(&spec, instance_id).into(); + self.trans_instance_state(instance_id, Transition::Deploy { spec }); + }) + .collect(); - for (instance, addr) in instances {} - Ok(()) + Ok(DeployServiceRes { + deployment_id, + instances, + }) } - async fn terminate_service(&mut self, id: ServiceId) -> eyre::Result<()> { - Ok(()) + fn handle_terminate_service(&mut self, _id: &ServiceId) { + _ = self; + } + + fn _lffg_todo_deploy_service(&mut self) { + _ = self; + } + + fn add_instance_init_state(&mut self, id: InstanceId, worker_addr: IpAddr, d_id: DeploymentId) { + let opt = self + .instance_statems + .insert(id, instance::StateCtx::new_init(id, worker_addr, d_id)); + + // We have just generated a new ID (in Self::handle_deploy_service), so + // this case shouldn't be possible. + assert!(opt.is_none()); + } + + fn trans_instance_state(&mut self, id: InstanceId, t: instance::Transition) { + let Some(statem) = self.instance_statems.remove(&id) else { + warn!("tried to transition nonexistent instance machine"); + return; + }; + + let next = instance::next(self, statem, t); + match next.state().kind() { + TerminalKind::NonTerminal => { + self.instance_statems.insert(id, next); + } + // If the new state is terminal, we don't need to waste memory by + // keeping track of it, so we don't add it again. + TerminalKind::SuccessfulTerminal | TerminalKind::UnsuccessfulTerminal => (), + } } } @@ -140,7 +194,6 @@ impl Deployer { pub struct DeployerHandle(mpsc::Sender); impl DeployerHandle { - /// Sends a message. async fn send(&self, msg: Msg) { _ = self.0.send(msg).await; } @@ -154,16 +207,54 @@ impl DeployerHandle { self.send(f(tx)).await; rx.await.expect("actor must be alive") } + + pub async fn deploy_service(&self, spec: ServiceSpec) -> eyre::Result { + self.send_wait(|r| Msg::DeployService(spec, r)).await + } + + pub async fn terminate_service(&self, id: ServiceId) -> eyre::Result<()> { + self.send_wait(|r| Msg::TerminateService(id, r)).await + } + + pub async fn report_instance_status(&self, id: InstanceId, status: proto_instance::Status) { + self.send(Msg::ReportInstanceStatus(id, status)).await; + } } #[derive(Debug)] enum Msg { - DeployService(ServiceSpec, oneshot::Sender>), + DeployService(ServiceSpec, oneshot::Sender>), TerminateService(ServiceId, oneshot::Sender>), + ReportInstanceStatus(InstanceId, proto_instance::Status), // Internal messages InstanceTransition(InstanceId, Transition), } +/* +================================================================================ +(TODO: DATA RECORDS) + + // data records { +services: HashMap, +instances: HashMap, +deployments: HashMap, + } + +lista de serviços rodando + +show (service id) +-> instâncias rodando (e os respectivos deployments) +ShowResponse { + deployments: Vec<(DeploymentId, Vec<(InstanceId, InstanceState)>)> +} + +struct DeploymentStateCtx { + service_id: ServiceId, + id: DeploymentId, + // instance_deployment_results: Vec<_> + // state: State, +} + #[derive(Default)] pub struct ServiceInfo { pub instances: Vec, @@ -180,4 +271,7 @@ impl ServiceInfo { pub struct DeploymentInfo { pub id: DeploymentId, pub at: DateTime, + pub alive_instances: InstanceId, } +================================================================================ +*/ diff --git a/ctl/src/http/deployer.rs b/ctl/src/http/deployer.rs index 3dd582a..c07e161 100644 --- a/ctl/src/http/deployer.rs +++ b/ctl/src/http/deployer.rs @@ -3,26 +3,40 @@ use proto::ctl::deployer::{ DeployServiceReq, DeployServiceRes, ReportDeployInstanceStatusReq, ReportDeployInstanceStatusRes, TerminateServiceReq, TerminateServiceRes, }; +use utils::http; use crate::http::HttpState; -pub async fn report_instance_status( - State(_state): State, - Json(_payload): Json, -) -> Json { - todo!(); -} - pub async fn deploy_service( - State(_state): State, - Json(_payload): Json, -) -> Json { - todo!(); + State(state): State, + Json(DeployServiceReq { + service_spec, + // TODO: Use redeployment policy + redeployment_policy: _, + }): Json, +) -> http::Result> { + let res = state.deployer.deploy_service(service_spec).await?; + Ok(Json(res)) } pub async fn terminate_service( - State(_state): State, - Json(_payload): Json, -) -> Json { - todo!(); + State(state): State, + Json(TerminateServiceReq { service_id }): Json, +) -> http::Result> { + state.deployer.terminate_service(service_id).await?; + Ok(Json(TerminateServiceRes {})) +} + +pub async fn report_instance_status( + State(state): State, + Json(ReportDeployInstanceStatusReq { + instance_id, + status, + }): Json, +) -> Json { + state + .deployer + .report_instance_status(instance_id, status) + .await; + Json(ReportDeployInstanceStatusRes {}) } diff --git a/ctl/src/http/mod.rs b/ctl/src/http/mod.rs index fce696c..217e9eb 100644 --- a/ctl/src/http/mod.rs +++ b/ctl/src/http/mod.rs @@ -1,6 +1,6 @@ use axum::{routing::post, Router}; -use crate::worker_mgr::WorkerMgrHandle; +use crate::{deployer::DeployerHandle, worker_mgr::WorkerMgrHandle}; pub mod deployer; pub mod worker_mgr; @@ -8,6 +8,7 @@ pub mod worker_mgr; #[derive(Clone)] pub struct HttpState { pub worker_mgr: WorkerMgrHandle, + pub deployer: DeployerHandle, } pub fn mk_app(state: HttpState) -> Router { diff --git a/ctl/src/main.rs b/ctl/src/main.rs index 07303a5..76ab7a8 100644 --- a/ctl/src/main.rs +++ b/ctl/src/main.rs @@ -5,12 +5,18 @@ use std::{ use axum::handler::Handler; use clap::Parser; -use proto::well_known::{CTL_BALANCER_PORT, CTL_HTTP_PORT}; +use proto::{ + clients::WorkerClient, + well_known::{CTL_BALANCER_PORT, CTL_HTTP_PORT}, +}; use tokio::task::JoinSet; use tracing::info; use utils::server::mk_listener; -use crate::{args::CtlArgs, balancer::BalancerState, http::HttpState, worker_mgr::WorkerMgr}; +use crate::{ + args::CtlArgs, balancer::BalancerState, deployer::Deployer, http::HttpState, + worker_mgr::WorkerMgr, +}; mod args; mod balancer; @@ -27,6 +33,10 @@ async fn main() -> eyre::Result<()> { let args = Arc::new(CtlArgs::parse()); info!(?args, "started ctl"); + let worker_client = WorkerClient::new(); + + let worker_client = WorkerClient::new(); + let balancer_listener = mk_listener(ANY_IP, CTL_BALANCER_PORT).await?; let http_listener = mk_listener(ANY_IP, CTL_HTTP_PORT).await?; @@ -46,9 +56,15 @@ async fn main() -> eyre::Result<()> { axum::serve(balancer_listener, app).await.unwrap(); }); + let (deployer, deployer_handle) = Deployer::new(worker_mgr_handle.clone(), worker_client); + bag.spawn(async move { + deployer.run().await; + }); + bag.spawn(async move { let state = HttpState { worker_mgr: worker_mgr_handle, + deployer: deployer_handle, }; let app = http::mk_app(state).into_make_service_with_connect_info::(); info!("ctl http listening at {ANY_IP}:{CTL_HTTP_PORT}"); diff --git a/ctl/src/worker_mgr.rs b/ctl/src/worker_mgr.rs index 74def49..a7d976c 100644 --- a/ctl/src/worker_mgr.rs +++ b/ctl/src/worker_mgr.rs @@ -169,7 +169,6 @@ impl WorkerMgrHandle { self.send_wait(|r| Msg::PushMetrics(addr, metrics, r)).await } - #[allow(dead_code)] pub async fn query_workers(&self) -> Vec { self.send_wait(Msg::QueryWorkers).await } diff --git a/docs/state.md b/docs/state.md deleted file mode 100644 index fefac84..0000000 --- a/docs/state.md +++ /dev/null @@ -1,44 +0,0 @@ -```mermaid -flowchart TD - init([init]) - init -->|make deploy| deploying - - deploying([deploying]) - deploying -->|status::FailedToStart| start_fail_dec - deploying -->|status::Started| started - deploying --->|terminate request| pre_terminating - - start_fail_dec{ } - start_fail_dec -->|attempt N <= 5| deploying - start_fail_dec -->|attempt 5 < N| failed_to_start - - failed_to_start[[failed to start]] - - started([started]) - started -->|status::Terminated| unexpected_terminated - started -->|status::Crashed| unexpected_crashed - started -->|terminate request| terminating - - unexpected_terminated[[unexpected terminated]] - unexpected_crashed[[unexpected crashed]] - - pre_terminating([pre terminating]) - pre_terminating -->|status::Started| terminating - pre_terminating -->|status::FailedToStart| never_started - - never_started[[never started]] - - terminating([terminating]) - terminating -->|status::Terminated| terminated - terminating -->|status::Crashed| crashed - terminating -->|FailedToTerminate| term_fail_dec - - term_fail_dec{ } - term_fail_dec -->|attempt N <= 5| terminating - term_fail_dec -->|attempt 5 < N| failed_to_terminate - - failed_to_terminate[[failed to terminate]] - - terminated[[terminated]] - crashed[[crashed]] -``` diff --git a/docs/statem.md b/docs/statem.md new file mode 100644 index 0000000..f4a6859 --- /dev/null +++ b/docs/statem.md @@ -0,0 +1,91 @@ +# Service (TODO) + +```mermaid +flowchart TD + init([init]) + init -->|make deploy| deploying + + deploying([deploying]) + deploying -->|instance deploy ok| part_ok + deploying -->|instance deploy err| part_err + + part_ok([partial deploying ok]) + part_ok -->|instance deploy ok| part_ok + part_ok -->|no more instances| r_ok + part_ok -->|instance deploy err| part_err + + part_err([partial deploying err]) + part_err -->|instance deploy ok| part_err + part_err -->|instance deploy err| part_err + part_err -->|no more instances| r_err + + r_ok{{complete running deploy}} + r_ok -->|terminate deploy| terminating + + r_err{{incomplete running deploy}} + r_err -->|terminate deploy| terminating + + terminating([terminating deploy]) + terminating -->|instance terminated ok| part_term_ok + terminating -->|instance terminated err| part_term_err + + part_term_ok([partial terminating ok]) + part_term_ok -->|instance terminated ok| part_term_ok + part_term_ok -->|no more instances| term_ok + part_term_ok -->|instance terminated err| part_term_err + + part_term_err([partial terminating err]) + part_term_err -->|instance terminated ok| part_term_err + part_term_err -->|instance terminated err| part_term_err + part_term_err -->|no more instances| term_err + + term_ok[[complete termination]] + term_err[[incomplete termination]] +``` + +# Instance + +```mermaid +flowchart TD + init([init]) + init -->|make deploy| deploying + + deploying([deploying]) + deploying -->|status::FailedToStart| start_fail_dec + deploying -->|status::Started| started + deploying --->|terminate request| pre_terminating + + start_fail_dec{ } + start_fail_dec -->|attempt N <= 5| deploying + start_fail_dec -->|attempt 5 < N| failed_to_start + + failed_to_start[[failed to start]] + + started([started]) + started -->|status::Terminated| unexpected_terminated + started -->|status::Crashed| unexpected_crashed + started -->|terminate request| terminating + + unexpected_terminated[[unexpected terminated]] + unexpected_crashed[[unexpected crashed]] + + pre_terminating([pre terminating]) + pre_terminating -->|status::Started| terminating + pre_terminating -->|status::FailedToStart| never_started + + never_started[[never started]] + + terminating([terminating]) + terminating -->|status::Terminated| terminated + terminating -->|status::Crashed| crashed + terminating -->|FailedToTerminate| term_fail_dec + + term_fail_dec{ } + term_fail_dec -->|attempt N <= 5| terminating + term_fail_dec -->|attempt 5 < N| failed_to_terminate + + failed_to_terminate[[failed to terminate]] + + terminated[[terminated]] + crashed[[crashed]] +``` diff --git a/proto/src/common/instance.rs b/proto/src/common/instance.rs index 6bbc282..77bc2cd 100644 --- a/proto/src/common/instance.rs +++ b/proto/src/common/instance.rs @@ -3,7 +3,7 @@ use std::fmt; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::common::service::{ResourceConfig, ServiceImage}; +use crate::common::service::{ResourceConfig, ServiceImage, ServiceSpec}; #[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct InstanceId(pub Uuid); @@ -30,6 +30,25 @@ pub struct InstanceSpec { pub resource_config: ResourceConfig, } +impl InstanceSpec { + #[must_use] + pub fn from_service_spec_cloned(spec: &ServiceSpec, instance_id: InstanceId) -> Self { + let ServiceSpec { + service_id: _, + image, + public, + concurrency: _, + resource_config, + } = spec; + InstanceSpec { + instance_id, + image: image.clone(), + public: *public, + resource_config: resource_config.clone(), + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub enum Status { /// The instance has successfully started. diff --git a/utils/src/fmt.rs b/utils/src/fmt.rs new file mode 100644 index 0000000..ea77b18 --- /dev/null +++ b/utils/src/fmt.rs @@ -0,0 +1,30 @@ +use std::fmt; + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct ElideDebug(pub T); + +impl From for ElideDebug { + fn from(inner: T) -> Self { + ElideDebug(inner) + } +} + +impl ElideDebug { + pub fn get(&self) -> &T { + &self.0 + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.0 + } + + pub fn into(self) -> T { + self.0 + } +} + +impl fmt::Debug for ElideDebug { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "<...>") + } +} diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 8f4afd2..e3aaa6b 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -1,3 +1,4 @@ +pub mod fmt; pub mod http; pub mod server; pub mod setup;