Skip to content

Commit

Permalink
feat(worker): check all deploys data when worker starting and normal …
Browse files Browse the repository at this point in the history
…running
  • Loading branch information
fuxiaohei committed Apr 26, 2024
1 parent b837db4 commit 9da8744
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 47 deletions.
18 changes: 16 additions & 2 deletions crates/dao/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ pub enum DeployStatus {
#[strum(serialize_all = "lowercase")]
pub enum DeploymentStatus {
Active,
Deleted,
Deleted, // if a deployment is deleted, it will not be shown
Outdated, // if a deployment is outdated, it will be deleted
}

/// get_last_by_project gets the last deployment by project
Expand Down Expand Up @@ -185,7 +186,20 @@ pub async fn set_compiling(id: i32, project_id: i32) -> Result<()> {

/// set_success sets a deployment as success
pub async fn set_success(id: i32, project_id: i32) -> Result<()> {
set_status(id, project_id, DeployStatus::Success).await
set_status(id, project_id, DeployStatus::Success).await?;
// set old deployments as outdated
let db = DB.get().unwrap();
deployment::Entity::update_many()
.filter(deployment::Column::ProjectId.eq(project_id))
.filter(deployment::Column::Id.ne(id))
.col_expr(
deployment::Column::Status,
Expr::value(DeploymentStatus::Outdated.to_string()),
)
.col_expr(deployment::Column::UpdatedAt, Expr::value(now_time()))
.exec(db)
.await?;
Ok(())
}

/// set_uploaded sets a deployment as uploaded, waiting for deploying
Expand Down
16 changes: 15 additions & 1 deletion crates/wasm/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::Worker;
use crate::engine::MODULE_VERSION;
use anyhow::{anyhow, Result};
use lazy_static::lazy_static;
use moka::sync::Cache;
use once_cell::sync::OnceCell;
use std::time::Duration;
use tokio::time::Instant;
use tracing::{info, warn};
use tracing::{debug, info, warn};

/// FILE_DIR is the directory of wasm files
pub static FILE_DIR: OnceCell<String> = OnceCell::new();
Expand Down Expand Up @@ -50,3 +51,16 @@ pub async fn prepare_worker(key: &str, is_aot: bool) -> Result<Worker> {
}
Err(anyhow!("Invalid key"))
}

/// compile_aot compile aot wasm
pub async fn compile_aot(path: &str) -> Result<()> {
let suffix = format!(".wasm.{}.aot", MODULE_VERSION);
let aot_path = path.replace(".wasm", &suffix);
if std::path::Path::new(&aot_path).exists() {
debug!("AOT file already exists: {}", &aot_path);
return Ok(());
}
Worker::compile_aot(&path, &aot_path)?;
debug!("Compile AOT success: {}", &aot_path);
Ok(())
}
7 changes: 3 additions & 4 deletions crates/wasm/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::engine::MODULE_VERSION;
use crate::hostcall;
use anyhow::Result;
use axum::body::Body;
use tracing::debug;
Expand All @@ -7,9 +9,6 @@ use wasmtime::{
Engine, Store,
};

use crate::engine::MODULE_VERSION;
use crate::hostcall;

/// Worker is used to run wasm component
#[derive(Clone)]
pub struct Worker {
Expand Down Expand Up @@ -72,7 +71,7 @@ impl Worker {
})
}

fn compile_aot(src: &str, dst: &str) -> Result<()> {
pub fn compile_aot(src: &str, dst: &str) -> Result<()> {
let engine = super::engine::get("default")?;
let component = Component::from_file(&engine, src)?;
let bytes = Component::serialize(&component)?;
Expand Down
19 changes: 11 additions & 8 deletions crates/worker-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ static ENDPOINT_NAME: OnceCell<String> = OnceCell::new();
static AOT_ENABLED: OnceCell<bool> = OnceCell::new();
static METRICS_ENABLED: OnceCell<bool> = OnceCell::new();

pub async fn start(opts: Opts) -> Result<()> {
let hostname = if let Some(endpoint) = opts.endpoint_name {
endpoint
pub fn init_globals(opts: &Opts) -> Result<()> {
let hostname = if let Some(endpoint) = &opts.endpoint_name {
endpoint.clone()
} else {
hostname::get()
.unwrap_or("unknown".into())
Expand All @@ -73,29 +73,32 @@ pub async fn start(opts: Opts) -> Result<()> {
// create directory
std::fs::create_dir_all(&opts.dir).unwrap();

DEFAULT_WASM.set(opts.default_wasm).unwrap();
DEFAULT_WASM.set(opts.default_wasm.clone()).unwrap();
ENDPOINT_NAME.set(hostname).unwrap();
AOT_ENABLED.set(opts.wasm_aot).unwrap();
METRICS_ENABLED.set(opts.metrics).unwrap();

// set pool's local dir to load module file
land_wasm::pool::FILE_DIR.set(opts.dir).unwrap();
land_wasm::pool::FILE_DIR.set(opts.dir.clone()).unwrap();

// start wasmtime engines epoch calls
land_wasm::hostcall::init_clients();
land_wasm::init_engines()?;

Ok(())
}

pub async fn start(addr: SocketAddr) -> Result<()> {
// load default wasm
load_default_wasm().await?;

let app = Router::new()
.route("/", any(default_handler))
.route("/*path", any(default_handler))
.layer(TimeoutLayer::new(Duration::from_secs(10)))
.route_layer(axum::middleware::from_fn(middleware::middleware));
let make_service = app.into_make_service_with_connect_info::<SocketAddr>();
info!("Starting worker server on: {}", opts.addr);
let listener = tokio::net::TcpListener::bind(opts.addr).await?;
info!("Starting worker server on: {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, make_service).await?;
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions land-server/src/deployer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use tracing::warn;

mod deploying;
mod waiting;
pub use waiting::TaskValue;

/// run_background starts the background worker to handle the deployer's tasks.
pub fn run_background() {
Expand Down
19 changes: 11 additions & 8 deletions land-server/src/deployer/waiting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use land_dao::models::deployment::Model as DeploymentModel;
use serde::{Deserialize, Serialize};
use tracing::{debug, debug_span, info, warn, Instrument};

/// run_tasks deploy tasks
pub async fn run_tasks() -> Result<()> {
// 1.read waiting tasks from db
let dps =
Expand Down Expand Up @@ -111,7 +112,7 @@ async fn handle_deploy(dp: &DeploymentModel) -> Result<()> {
dp.id,
dp.project_id,
storage_file_name.clone(),
upload_data_md5,
upload_data_md5.clone(),
upload_data_size,
)
.await?;
Expand Down Expand Up @@ -139,6 +140,7 @@ async fn handle_deploy(dp: &DeploymentModel) -> Result<()> {
download_url: storage_settings.build_url(&storage_file_name)?,
wasm_path: storage_file_name,
task_id: dp.task_id.clone(),
checksum: upload_data_md5,
};
let task_content = serde_json::to_string(&task_value)?;
for worker in workers {
Expand All @@ -157,11 +159,12 @@ async fn handle_deploy(dp: &DeploymentModel) -> Result<()> {
}

#[derive(Serialize, Deserialize)]
struct TaskValue {
user_uuid: String,
project_uuid: String,
domain: String,
download_url: String,
wasm_path: String,
task_id: String,
pub struct TaskValue {
pub user_uuid: String,
pub project_uuid: String,
pub domain: String,
pub download_url: String,
pub wasm_path: String,
pub task_id: String,
pub checksum: String,
}
4 changes: 2 additions & 2 deletions land-server/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ async fn log_middleware(request: Request, next: Next) -> Result<Response, Status
remote = connect_info.to_string();
}

if path.starts_with("/api/v1") {
if path.starts_with("/api/v1/worker-api/alive") {
// high sequence url
// return Ok(next.run(request).await);
return Ok(next.run(request).await);
}

let method = request.method().clone().to_string();
Expand Down
42 changes: 40 additions & 2 deletions land-server/src/server/workerapi/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use super::ServerError;
use crate::deployer::TaskValue;
use anyhow::Result;
use axum::routing::get;
use axum::{response::IntoResponse, routing::post, Json, Router};
use land_common::IPInfo;
use land_dao::deployment::DeployStatus;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::info;

/// router returns the router for the worker api
pub fn router() -> Result<Router> {
let app = Router::new().route("/alive", post(alive));
let app = Router::new()
.route("/alive", post(alive))
.route("/deploys", get(deploys));
Ok(app)
}

Expand Down Expand Up @@ -37,10 +42,14 @@ async fn alive(Json(p): Json<Request>) -> Result<impl IntoResponse, ServerError>

let mut tasks_conf = vec![];
let tasks =
land_dao::deployment::list_tasks_by_ip(ipinfo.ip, Some(DeployStatus::Deploying)).await?;
land_dao::deployment::list_tasks_by_ip(ipinfo.ip.clone(), Some(DeployStatus::Deploying))
.await?;
for task in tasks {
tasks_conf.push(task.content);
}
if !tasks_conf.is_empty() {
info!(ip = ipinfo.ip, "Alive with {} tasks", tasks_conf.len())
}
Ok(Json(tasks_conf))
}

Expand All @@ -49,3 +58,32 @@ pub struct Request {
pub ip: IPInfo,
pub tasks: HashMap<String, String>,
}

/// deploys is the handler for the /deploys endpoint
async fn deploys() -> Result<impl IntoResponse, ServerError> {
let dps = land_dao::deployment::list_by_status(DeployStatus::Success).await?;
let mut tasks = vec![];
let (domain, _) = land_dao::settings::get_domain_settings().await?;
let storage_settings = land_dao::settings::get_storage().await?;
for dp in dps {
let task = TaskValue {
user_uuid: dp.user_uuid.clone(),
project_uuid: dp.project_uuid.clone(),
domain: format!("{}.{}", dp.domain, domain),
download_url: storage_settings.build_url(&dp.storage_path)?,
wasm_path: dp.storage_path.clone(),
task_id: dp.task_id.clone(),
checksum: dp.storage_md5,
};
tasks.push(task);
}
let content = serde_json::to_vec(&tasks)?;
let checksum = format!("{:x}", md5::compute(content));
Ok(Json(DeploysResp { checksum, tasks }))
}

#[derive(Serialize, Deserialize)]
struct DeploysResp {
checksum: String,
tasks: Vec<TaskValue>,
}
44 changes: 35 additions & 9 deletions land-worker/src/agent/conf.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use super::TaskValue;
use super::{TaskTotal, TaskValue};
use crate::agent::{CLIENT, DATA_DIR};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf};
use tracing::{debug, info};

/// handle_task handles the task
pub async fn handle_task(task: &TaskValue) -> Result<()> {
// 1. save wasm to file
let data_dir = DATA_DIR.lock().await;
async fn download_wasm(data_dir: &str, task: &TaskValue) -> Result<()> {
let wasm_path = format!("{}/{}", data_dir, task.wasm_path);
if !PathBuf::from(wasm_path.clone()).exists() {
// 1.1 download wasm from url
Expand All @@ -23,11 +20,10 @@ pub async fn handle_task(task: &TaskValue) -> Result<()> {
} else {
debug!("Wasm file already exists: {}", wasm_path);
}
Ok(())
}

// 2. load wasm into mem
land_wasm::pool::prepare_worker(&task.wasm_path, true).await?;

// 3. write traefik conf after wasm is ready
fn write_traefik_conf(data_dir: &str, task: &TaskValue) -> Result<()> {
let conf_file = format!(
"{}/traefik/{}.yaml",
data_dir,
Expand All @@ -41,7 +37,37 @@ pub async fn handle_task(task: &TaskValue) -> Result<()> {
let traefik_yaml = serde_yaml::to_string(&traefik_confs)?;
info!("Write traefik conf to file: {}", conf_file);
std::fs::write(conf_file, traefik_yaml)?;
Ok(())
}

/// handle_task handles the task
pub async fn handle_task(task: &TaskValue) -> Result<()> {
// 1. save wasm to file
let data_dir = DATA_DIR.lock().await;
download_wasm(data_dir.as_str(), task).await?;

// 2. load wasm into mem
land_wasm::pool::prepare_worker(&task.wasm_path, true).await?;

// 3. write traefik conf after wasm is ready
write_traefik_conf(data_dir.as_str(), task)?;

Ok(())
}

/// handle_total handles the total tasks
pub async fn handle_total(data_dir: &str, total: &TaskTotal) -> Result<()> {
for task in &total.tasks {
// 1. it downloads wasm module file
download_wasm(data_dir, task).await?;

// 2. not load wasm to memory, generate aot wasm to make sure loading quickly
let wasm_path = format!("{}/{}", data_dir, task.wasm_path);
land_wasm::pool::compile_aot(&wasm_path).await?;

// 3. write traefik conf
write_traefik_conf(data_dir, task)?;
}
Ok(())
}

Expand Down
Loading

0 comments on commit 9da8744

Please sign in to comment.