-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(worker-runner): Added runner actor & handle (#43)
Clippy is being annoying so I just jumped him up. But the clippy command works on my machine lol
- Loading branch information
Showing
9 changed files
with
198 additions
and
5 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
use axum::{routing::post, Router}; | ||
|
||
use crate::runner::RunnerHandle; | ||
|
||
mod runner; | ||
|
||
#[derive(Clone)] | ||
pub struct HttpState { | ||
pub runner: RunnerHandle, | ||
} | ||
|
||
pub async fn run_server(state: HttpState) { | ||
let app = Router::new() | ||
.route("/instance/new", post(runner::new_instance)) | ||
.with_state(state); | ||
|
||
let listener = tokio::net::TcpListener::bind("0.0.0.0:6969").await.unwrap(); | ||
axum::serve(listener, app).await.unwrap(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
use axum::{extract::State, Json}; | ||
use proto::worker::runner::{DeployInstanceReq, DeployInstanceRes}; | ||
|
||
use crate::http::HttpState; | ||
|
||
pub async fn new_instance( | ||
State(_state): State<HttpState>, | ||
Json(_payload): Json<DeployInstanceReq>, | ||
) -> Json<DeployInstanceRes> { | ||
todo!(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,47 @@ | ||
use std::sync::Arc; | ||
|
||
use eyre::Result; | ||
use http::HttpState; | ||
use runner::Runner; | ||
use tracing::info; | ||
|
||
use crate::{args::WorkerArgs, monitor::pusher}; | ||
|
||
mod args; | ||
mod http; | ||
mod monitor; | ||
mod runner; | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<()> { | ||
setup::tracing(); | ||
|
||
let args = Arc::new(WorkerArgs::parse()); | ||
let args = WorkerArgs::parse(); | ||
info!(?args, "started worker"); | ||
|
||
let pusher_handle = tokio::spawn({ | ||
let args = Arc::clone(&args); | ||
async move { | ||
pusher::start_pusher(Arc::new(args)).await; | ||
} | ||
}); | ||
|
||
let (runner, runner_handle) = Runner::new(); | ||
let runner_actor_handle = tokio::spawn(async move { | ||
runner.run().await; | ||
}); | ||
|
||
let http_handle = tokio::spawn({ | ||
let state = HttpState { | ||
runner: runner_handle.clone(), | ||
}; | ||
async { | ||
pusher::start_pusher(args).await; | ||
http::run_server(state).await; | ||
} | ||
}); | ||
|
||
pusher_handle.await.unwrap(); | ||
runner_actor_handle.await.unwrap(); | ||
http_handle.await.unwrap(); | ||
|
||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
use proto::common::instance::InstanceSpec; | ||
|
||
use super::RunnerHandle; | ||
#[allow(clippy::unused_async)] | ||
pub async fn spawn_instance(spec: InstanceSpec, port: u16, _handle: RunnerHandle) { | ||
tokio::spawn(async move { | ||
match run_instance(spec, port).await { | ||
Ok(()) => todo!(), | ||
Err(_) => todo!(), | ||
} | ||
}); | ||
todo!() | ||
} | ||
#[allow(clippy::unused_async)] | ||
async fn run_instance(_spec: InstanceSpec, _port: u16) -> eyre::Result<()> { | ||
todo!(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
use std::collections::{HashMap, HashSet}; | ||
|
||
use eyre::{Context as _, Ok, Report}; | ||
use proto::common::instance::{InstanceId, InstanceSpec}; | ||
use tokio::{ | ||
net::TcpListener, | ||
sync::{mpsc, oneshot}, | ||
task, | ||
}; | ||
|
||
mod container_rt; | ||
|
||
pub struct Runner { | ||
rx: mpsc::Receiver<Msg>, | ||
instances: HashMap<InstanceId, u16>, | ||
ports: HashSet<u16>, | ||
handle: RunnerHandle, | ||
} | ||
|
||
impl Runner { | ||
#[must_use] | ||
pub fn new() -> (Runner, RunnerHandle) { | ||
let (tx, rx) = mpsc::channel(16); | ||
let handle = RunnerHandle(tx); | ||
let actor = Runner { | ||
rx, | ||
instances: HashMap::default(), | ||
ports: HashSet::default(), | ||
handle: handle.clone(), | ||
}; | ||
(actor, handle) | ||
} | ||
|
||
pub async fn run(mut self) { | ||
while let Some(msg) = self.rx.recv().await { | ||
self.handle_msg(msg).await; | ||
} | ||
} | ||
|
||
async fn handle_msg(&mut self, msg: Msg) { | ||
match msg { | ||
Msg::InstanceDeploy(spec, reply) => { | ||
let res = self.instance_deploy(spec).await; | ||
_ = reply.send(res); | ||
} | ||
Msg::InstanceTerminate(_id, _reply) => todo!(), | ||
Msg::InstanceKill(_id, _report) => todo!(), | ||
} | ||
} | ||
|
||
async fn instance_deploy(&mut self, spec: InstanceSpec) -> eyre::Result<()> { | ||
let port = self.get_port_for_instance(spec.instance_id).await?; | ||
container_rt::spawn_instance(spec, port, self.handle.clone()).await; | ||
Ok(()) | ||
} | ||
|
||
async fn get_port_for_instance(&mut self, id: InstanceId) -> eyre::Result<u16> { | ||
let port = loop { | ||
let port = get_port().await?; | ||
if !self.ports.contains(&port) { | ||
break port; | ||
} | ||
}; | ||
self.instances.insert(id, port); | ||
self.ports.insert(port); | ||
Ok(port) | ||
} | ||
} | ||
|
||
#[derive(Clone)] | ||
pub struct RunnerHandle(mpsc::Sender<Msg>); | ||
|
||
impl RunnerHandle { | ||
#[allow(dead_code)] | ||
async fn send(&self, msg: Msg) { | ||
_ = self.0.send(msg).await; | ||
} | ||
|
||
#[allow(dead_code)] | ||
pub async fn deploy_instance(&self, spec: InstanceSpec) -> Result<(), Report> { | ||
let (tx, rx) = oneshot::channel(); | ||
self.send(Msg::InstanceDeploy(spec, tx)).await; | ||
rx.await.unwrap() | ||
} | ||
} | ||
|
||
#[allow(clippy::enum_variant_names)] // remove this once more variants are added | ||
#[allow(dead_code)] | ||
pub enum Msg { | ||
InstanceDeploy(InstanceSpec, oneshot::Sender<Result<(), Report>>), | ||
InstanceTerminate(InstanceId, oneshot::Sender<Result<(), Report>>), | ||
InstanceKill(InstanceId, oneshot::Sender<Result<(), Report>>), | ||
} | ||
|
||
async fn get_port() -> eyre::Result<u16> { | ||
let listener = TcpListener::bind(("0.0.0.0", 0)) | ||
.await | ||
.wrap_err("failed to bind while deciding port")?; | ||
let port = listener.local_addr().expect("must have local_addr").port(); | ||
drop(listener); | ||
task::yield_now().await; | ||
Ok(port) | ||
} |