diff --git a/Cargo.lock b/Cargo.lock index 03dd7f2..40b7009 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1712,6 +1712,7 @@ dependencies = [ name = "worker" version = "0.1.0" dependencies = [ + "axum", "clap", "eyre", "proto", @@ -1720,4 +1721,5 @@ dependencies = [ "sysinfo", "tokio", "tracing", + "uuid", ] diff --git a/proto/src/common/instance.rs b/proto/src/common/instance.rs index ddac39b..2b242f4 100644 --- a/proto/src/common/instance.rs +++ b/proto/src/common/instance.rs @@ -3,7 +3,7 @@ use uuid::Uuid; use crate::common::service::ServiceImage; -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct InstanceId(Uuid); #[derive(Debug, Serialize, Deserialize)] diff --git a/proto/src/worker/runner.rs b/proto/src/worker/runner.rs index f5e8a17..8c07eb2 100644 --- a/proto/src/worker/runner.rs +++ b/proto/src/worker/runner.rs @@ -3,9 +3,27 @@ //! an instance on a given worker node. use serde::{Deserialize, Serialize}; +use uuid::Uuid; use crate::common::instance::{InstanceId, InstanceSpec}; +/// + +/// Starts a new deploy in the system +#[derive(Debug, Serialize, Deserialize)] +pub struct DeployInstanceReq { + pub id: DeoployReqId, + pub instance_spec: InstanceSpec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeployInstanceRes { + pub id: DeoployReqId, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeoployReqId(Uuid); + /// Starts a new deploy in the system. #[derive(Debug, Serialize, Deserialize)] pub struct DeployReq { diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 266ee19..d2806c7 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -7,12 +7,15 @@ edition.workspace = true workspace = true [dependencies] +# Internal deps setup.workspace = true proto.workspace = true - +# External deps (keep alphabetically sorted) +axum.workspace = true clap.workspace = true eyre.workspace = true reqwest.workspace = true sysinfo.workspace = true tokio.workspace = true tracing.workspace = true +uuid.workspace = true diff --git a/worker/src/http/mod.rs b/worker/src/http/mod.rs new file mode 100644 index 0000000..27d0603 --- /dev/null +++ b/worker/src/http/mod.rs @@ -0,0 +1,19 @@ +use axum::{routing::post, Router}; + +use crate::runner::RunnerHandle; + +mod runner; + +#[derive(Clone)] +pub struct HttpState { + pub runner: RunnerHandle, +} + +pub async fn run_server(state: HttpState) { + let app = Router::new() + .route("/instance/new", post(runner::new_instance)) + .with_state(state); + + let listener = tokio::net::TcpListener::bind("0.0.0.0:6969").await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} diff --git a/worker/src/http/runner/mod.rs b/worker/src/http/runner/mod.rs new file mode 100644 index 0000000..2744061 --- /dev/null +++ b/worker/src/http/runner/mod.rs @@ -0,0 +1,11 @@ +use axum::{extract::State, Json}; +use proto::worker::runner::{DeployInstanceReq, DeployInstanceRes}; + +use crate::http::HttpState; + +pub async fn new_instance( + State(_state): State, + Json(_payload): Json, +) -> Json { + todo!(); +} diff --git a/worker/src/main.rs b/worker/src/main.rs index 7460120..d93fc30 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -1,12 +1,16 @@ use std::sync::Arc; use eyre::Result; +use http::HttpState; +use runner::Runner; use tracing::info; use crate::{args::WorkerArgs, monitor::pusher}; mod args; +mod http; mod monitor; +mod runner; #[tokio::main] async fn main() -> Result<()> { @@ -17,11 +21,28 @@ async fn main() -> Result<()> { let pusher_handle = tokio::spawn({ let args = Arc::clone(&args); - async { + async move { pusher::start_pusher(args).await; } }); + + let (runner, runner_handle) = Runner::new(); + let runner_actor_handle = tokio::spawn(async move { + runner.run().await; + }); + + let http_handle = tokio::spawn({ + let state = HttpState { + runner: runner_handle.clone(), + }; + async { + http::run_server(state).await; + } + }); + pusher_handle.await.unwrap(); + runner_actor_handle.await.unwrap(); + http_handle.await.unwrap(); Ok(()) } diff --git a/worker/src/runner/container_rt.rs b/worker/src/runner/container_rt.rs new file mode 100644 index 0000000..ac3bb83 --- /dev/null +++ b/worker/src/runner/container_rt.rs @@ -0,0 +1,17 @@ +use proto::common::instance::InstanceSpec; + +use super::RunnerHandle; +#[allow(clippy::unused_async)] +pub async fn spawn_instance(spec: InstanceSpec, port: u16, _handle: RunnerHandle) { + tokio::spawn(async move { + match run_instance(spec, port).await { + Ok(()) => todo!(), + Err(_) => todo!(), + } + }); + todo!() +} +#[allow(clippy::unused_async)] +async fn run_instance(_spec: InstanceSpec, _port: u16) -> eyre::Result<()> { + todo!(); +} diff --git a/worker/src/runner/mod.rs b/worker/src/runner/mod.rs new file mode 100644 index 0000000..193888b --- /dev/null +++ b/worker/src/runner/mod.rs @@ -0,0 +1,103 @@ +use std::collections::{HashMap, HashSet}; + +use eyre::{Context as _, Ok, Report}; +use proto::common::instance::{InstanceId, InstanceSpec}; +use tokio::{ + net::TcpListener, + sync::{mpsc, oneshot}, + task, +}; + +mod container_rt; + +pub struct Runner { + rx: mpsc::Receiver, + instances: HashMap, + ports: HashSet, + handle: RunnerHandle, +} + +impl Runner { + #[must_use] + pub fn new() -> (Runner, RunnerHandle) { + let (tx, rx) = mpsc::channel(16); + let handle = RunnerHandle(tx); + let actor = Runner { + rx, + instances: HashMap::default(), + ports: HashSet::default(), + handle: handle.clone(), + }; + (actor, handle) + } + + pub async fn run(mut self) { + while let Some(msg) = self.rx.recv().await { + self.handle_msg(msg).await; + } + } + + async fn handle_msg(&mut self, msg: Msg) { + match msg { + Msg::InstanceDeploy(spec, reply) => { + let res = self.instance_deploy(spec).await; + _ = reply.send(res); + } + Msg::InstanceTerminate(_id, _reply) => todo!(), + Msg::InstanceKill(_id, _report) => todo!(), + } + } + + async fn instance_deploy(&mut self, spec: InstanceSpec) -> eyre::Result<()> { + let port = self.get_port_for_instance(spec.instance_id).await?; + container_rt::spawn_instance(spec, port, self.handle.clone()).await; + Ok(()) + } + + async fn get_port_for_instance(&mut self, id: InstanceId) -> eyre::Result { + let port = loop { + let port = get_port().await?; + if !self.ports.contains(&port) { + break port; + } + }; + self.instances.insert(id, port); + self.ports.insert(port); + Ok(port) + } +} + +#[derive(Clone)] +pub struct RunnerHandle(mpsc::Sender); + +impl RunnerHandle { + #[allow(dead_code)] + pub async fn send(&self, msg: Msg) { + _ = self.0.send(msg).await; + } + + #[allow(dead_code)] + pub async fn deploy_instance(&self, spec: InstanceSpec) -> Result<(), Report> { + let (tx, rx) = oneshot::channel(); + self.send(Msg::InstanceDeploy(spec, tx)).await; + rx.await.unwrap() + } +} + +#[allow(clippy::enum_variant_names)] // remove this once more variants are added +#[allow(dead_code)] +pub enum Msg { + InstanceDeploy(InstanceSpec, oneshot::Sender>), + InstanceTerminate(InstanceId, oneshot::Sender>), + InstanceKill(InstanceId, oneshot::Sender>), +} + +async fn get_port() -> eyre::Result { + let listener = TcpListener::bind(("0.0.0.0", 0)) + .await + .wrap_err("failed to bind while deciding port")?; + let port = listener.local_addr().expect("must have local_addr").port(); + drop(listener); + task::yield_now().await; + Ok(port) +}