Skip to content

Commit

Permalink
finish basic
Browse files Browse the repository at this point in the history
  • Loading branch information
lffg committed Jun 21, 2024
1 parent 477dac7 commit cef4069
Show file tree
Hide file tree
Showing 13 changed files with 484 additions and 175 deletions.
16 changes: 16 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# High-level

- Distribute control-plane through multiple controller nodes (we currently only
support one controlling node per cluster).

# Mid-level

- Sysadmin notifier.
- Persist node states across crashes.
- Support more service redeployment policies.
- Add correlation IDs and error correlation IDs.

# Refactors

- Rename "deploy_instance" operation to "start_instance". This way, we keep it
consistent with, e.g., the instance state machine.
2 changes: 1 addition & 1 deletion ctl/src/deployer/alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ pub fn rand_many(
}

/// Randomly allocates a single instance from the provided pool of `workers`.
pub fn rand_single(workers: &[WorkerDetails]) -> (InstanceId, IpAddr) {
pub fn _rand_single(workers: &[WorkerDetails]) -> (InstanceId, IpAddr) {
rand_many(workers, 1).next().unwrap()
}
240 changes: 156 additions & 84 deletions ctl/src/deployer/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use std::net::IpAddr;

use proto::{
common::instance::{self, InstanceId, InstanceSpec},
ctl::deployer::RedeploymentPolicy,
well_known::{MAX_INSTANCE_DEPLOY_RETRIES, MAX_INSTANCE_TERMINATION_RETRIES}, worker::runner::DeployInstanceRes,
ctl::deployer::DeploymentId,
well_known::{MAX_INSTANCE_DEPLOY_RETRIES, MAX_INSTANCE_TERMINATION_RETRIES},
worker::runner::{DeployInstanceRes, TerminateInstanceRes},
};
use tracing::warn;
use tracing::{instrument, warn};
use utils::fmt::ElideDebug;

use crate::deployer::Deployer;

Expand All @@ -17,45 +19,41 @@ const INITIAL_ATTEMPT: u8 = 1;
/// Before returning the next state, this function may also schedule some
/// background worker task that will *eventually* produce another transition
/// message.
pub fn next(current: StateCtx, t: Transition, d: &mut Deployer) -> StateCtx {
#[
// Notice that through this span we log eventual errors.
instrument(skip(d))
]
pub fn next(d: &mut Deployer, current: StateCtx, t: Transition) -> StateCtx {
use instance::Status as s;
use State::*;
use Transition as t;
match (current.state, t) {
(Init, t::Deploy { worker_addr, spec }) => current.trans_into(Deploying {
attempt: INITIAL_ATTEMPT,
worker_addr,
spec,
}),

(
Deploying {
attempt,
worker_addr,
match (current.state.clone(), t) {
(Init, t::Deploy { spec }) => {
schedule_instance_deployment(d, &current, spec.get().clone());
current.trans_into(Deploying {
attempt: INITIAL_ATTEMPT,
spec,
},
t::Status(s::FailedToStart { error }),
) => {
warn!(?error, "failed deploy attempt #{attempt}");
if attempt <= MAX_INSTANCE_DEPLOY_RETRIES {
let new_attempt = attempt + 1;
current.trans_into(Deploying {
attempt: new_attempt,
worker_addr,
spec,
})
} else {
current.trans_into(FailedToStart)
}
})
}

(Deploying { attempt, spec }, t::FailedToDeploy(_error)) => {
warn!("failed to deploy (deployment attempt #{attempt}");
schedule_instance_deployment_reattempt(d, current, attempt, spec.get().clone())
}

(Deploying { attempt, spec }, t::Status(s::FailedToStart { error: _ })) => {
warn!("failed to start (deployment attempt #{attempt})");
schedule_instance_deployment_reattempt(d, current, attempt, spec.get().clone())
}

(Deploying { .. }, t::Status(s::Started)) => {
// TODO
//
current.trans_into(Started)
}

(Deploying { .. }, t::Terminate(_)) => {
// TODO
(Deploying { .. }, t::Terminate) => {
//
current.trans_into(PreTerminating)
}

Expand All @@ -66,129 +64,156 @@ pub fn next(current: StateCtx, t: Transition, d: &mut Deployer) -> StateCtx {
})
}

(PreTerminating, t::FailedToDeploy(_error)) => {
warn!("failed to deploy instance");
current.trans_into(NeverStarted)
}

(PreTerminating, t::Status(s::FailedToStart { .. })) => {
warn!("failed to start instance");
// TODO
current.trans_into(NeverStarted)
}

(Started, t::Status(s::Terminated { .. })) => {
// TODO
warn!("instance unexpectedly terminated");
current.trans_into(UnexpectedTerminated)
}

(Started, t::Status(s::Crashed { .. })) => {
// TODO
warn!("instance unexpectedly crashed");
current.trans_into(UnexpectedCrashed)
}

(Started, t::Status(s::Killed { .. })) => {
// TODO
warn!("instance was killed");
current.trans_into(UnexpectedCrashed)
}

(Started, t::Terminate(_)) => {
// TODO
(Started, t::Terminate) => {
schedule_instance_termination(d, &current);
current.trans_into(Terminating {
attempt: INITIAL_ATTEMPT,
})
}

(Terminating { attempt }, t::FailedToTerminate) => {
if attempt <= MAX_INSTANCE_TERMINATION_RETRIES {
let new_attempt = attempt + 1;
current.trans_into(Terminating {
attempt: new_attempt,
})
} else {
current.trans_into(FailedToTerminate)
}
(Terminating { attempt }, t::FailedToTerminate(_error)) => {
warn!("failed to terminate (termination attempt #{attempt})");
schedule_instance_termination_reattempt(d, current.clone(), attempt)
}

(Terminating { .. }, t::Status(s::Terminated)) => {
// TODO
//
current.trans_into(Terminated)
}

(Terminating { .. }, t::Status(s::Crashed { .. })) => {
// TODO
warn!("instance crashed");
current.trans_into(Crashed)
}

(Terminating { .. }, t::Status(s::Killed { .. })) => {
// TODO
warn!("instance was killed");
current.trans_into(Crashed)
}

(s, t) => panic!("unexpected state transition `{t:?}` for current state `{s:?}`"),
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct StateCtx {
state: State,
id: InstanceId,
/// The address of the worker in which this instance lives.
worker_addr: IpAddr,
id: InstanceId,
deployment_id: DeploymentId,
}

impl StateCtx {
fn trans_into(mut self, next: State) -> StateCtx {
self.state = next;
self
pub fn new_init(id: InstanceId, worker_addr: IpAddr, deployment_id: DeploymentId) -> Self {
StateCtx {
state: State::Init,
id,
worker_addr,
deployment_id,
}
}

fn trans_into_with_addr(mut self, next: State, next_addr: IpAddr) -> StateCtx {
pub fn state(&self) -> &State {
&self.state
}

#[allow(dead_code)]
pub fn deployment_id(&self) -> DeploymentId {
self.deployment_id
}

fn trans_into(mut self, next: State) -> StateCtx {
self.state = next;
self.worker_addr = next_addr;
self
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum State {
Init,
Deploying { attempt: u8, spec: InstanceSpec },
Deploying {
attempt: u8,
spec: ElideDebug<InstanceSpec>,
},
FailedToStart,
PreTerminating,
NeverStarted,
Started,
UnexpectedTerminated,
UnexpectedCrashed,
Terminating { attempt: u8 },
Terminating {
attempt: u8,
},
Terminated,
Crashed,
FailedToTerminate,
}

impl State {
pub fn is_terminal(&self) -> bool {
#[allow(clippy::match_same_arms)]
pub fn kind(&self) -> TerminalKind {
use TerminalKind::*;
match self {
State::Init => false,
State::Deploying { .. } => false,
State::FailedToStart => true,
State::PreTerminating => false,
State::NeverStarted => true,
State::Started => false,
State::UnexpectedTerminated => true,
State::UnexpectedCrashed => true,
State::Terminating { .. } => false,
State::Terminated => true,
State::Crashed => true,
State::FailedToTerminate => true,
State::Init => NonTerminal,
State::Deploying { .. } => NonTerminal,
State::FailedToStart => UnsuccessfulTerminal,
State::PreTerminating => NonTerminal,
State::NeverStarted => UnsuccessfulTerminal,
State::Started => NonTerminal,
State::UnexpectedTerminated => UnsuccessfulTerminal,
State::UnexpectedCrashed => UnsuccessfulTerminal,
State::Terminating { .. } => NonTerminal,
State::Terminated => SuccessfulTerminal,
State::Crashed => UnsuccessfulTerminal,
State::FailedToTerminate => UnsuccessfulTerminal,
}
}
}

/// Describes whether a state machine state is terminal or not, and if a
/// terminal state is (or not) successful.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[allow(clippy::enum_variant_names)]
pub enum TerminalKind {
NonTerminal,
SuccessfulTerminal,
UnsuccessfulTerminal,
}

#[derive(Debug)]
pub enum Transition {
Deploy {
worker_addr: IpAddr,
spec: InstanceSpec,
},
Terminate {
worker_addr: IpAddr,
id: InstanceId,
spec: ElideDebug<InstanceSpec>,
},
#[allow(dead_code)]
Terminate,
Status(instance::Status),
// XX: For now, `FailedToTerminate` doesn't live in `instance::Status` since
// we are not sure how to handle those corner error cases. In the future, we
Expand All @@ -197,18 +222,65 @@ pub enum Transition {
// must be able to recover from potential Docker failures (which are the
// only sources of termination failures that the Tucano system can
// encounter).
FailedToTerminate,
FailedToTerminate(eyre::Report),
FailedToDeploy(eyre::Report),
}

fn deploy_instance(ctx: &StateCtx, spec: InstanceSpec, d: &mut Deployer) {
fn schedule_instance_deployment(d: &mut Deployer, ctx: &StateCtx, spec: InstanceSpec) {
let worker_addr = ctx.worker_addr;
d.instance_task(ctx.id, move |h| async move {
let result = h.worker_client
.deploy_instance(worker_addr, spec)
.await;
let result = h.worker_client.deploy_instance(worker_addr, spec).await;
match result {
Ok(DeployInstanceRes {}) => Transition::
Ok(DeployInstanceRes {}) => None,
Err(error) => Some(Transition::FailedToDeploy(error)),
}
todo!()
});
}

fn schedule_instance_deployment_reattempt(
d: &mut Deployer,
current: StateCtx,
attempt: u8,
spec: InstanceSpec,
) -> StateCtx {
if attempt <= MAX_INSTANCE_DEPLOY_RETRIES {
schedule_instance_deployment(d, &current, spec.clone());

let new_attempt = attempt + 1;
current.trans_into(State::Deploying {
attempt: new_attempt,
spec: spec.into(),
})
} else {
current.trans_into(State::FailedToStart)
}
}

fn schedule_instance_termination(d: &mut Deployer, ctx: &StateCtx) {
let worker_addr = ctx.worker_addr;
let id = ctx.id;
d.instance_task(ctx.id, move |h| async move {
let result = h.worker_client.terminate_instance(worker_addr, id).await;
match result {
Ok(TerminateInstanceRes {}) => None,
Err(error) => Some(Transition::FailedToTerminate(error)),
}
});
}

fn schedule_instance_termination_reattempt(
d: &mut Deployer,
current: StateCtx,
attempt: u8,
) -> StateCtx {
if attempt <= MAX_INSTANCE_TERMINATION_RETRIES {
schedule_instance_termination(d, &current);

let new_attempt = attempt + 1;
current.trans_into(State::Terminating {
attempt: new_attempt,
})
} else {
current.trans_into(State::FailedToTerminate)
}
}
Loading

0 comments on commit cef4069

Please sign in to comment.