Skip to content

Commit

Permalink
worker: add wasmtime engine epoch callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
fuxiaohei committed Apr 2, 2024
1 parent 97f25b9 commit f2b0423
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 25 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/worker-impl/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ impl Context {
pub fn set_body(&mut self, handle: u32, body: Body) -> u32 {
self.http_ctx.set_body(handle, body)
}
/// elapsed returns the duration since the request started
pub fn elapsed(&self) -> tokio::time::Duration {
self.http_ctx.elapsed()
}
}
65 changes: 60 additions & 5 deletions crates/worker-impl/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,81 @@
use lazy_static::lazy_static;
use std::{collections::HashMap, sync::Mutex};
use tracing::debug;
use wasmtime::{Config, Engine, InstanceAllocationStrategy, PoolingAllocationConfig};

// global engine hashmap with string key with sync mutex
lazy_static! {
pub static ref ENGINE_MAP: Mutex<HashMap<String, Engine>> = Mutex::new(HashMap::new());
}

// 10 ms to trigger epoch increment
pub const EPOCH_INC_INTERVAL: u64 = 10;

/// init_epoch_loop initialize the global ENGINE_MAP epoch callbacks
pub fn init_epoch_loop() {
// try use std to run this loop. not tokio
std::thread::spawn(|| {
increment_epoch_loop_inner();
});
}

/// increment_epoch_loop
fn increment_epoch_loop_inner() {
loop {
// if ENGINE_MAP is empty, sleep 3 seconds to wait for new engine
if ENGINE_MAP.lock().unwrap().is_empty() {
debug!("ENGINE_MAP is empty, sleep 3 seconds to wait for new engine");
std::thread::sleep(std::time::Duration::from_secs(3));
continue;
}

// iterate ENGINE_MAP to increment epoch for every EPOCH_INC_INTERVAL ms
for (_, engine) in ENGINE_MAP.lock().unwrap().iter() {
engine.increment_epoch();
}
// use std thread to sleep 3 seconds
std::thread::sleep(std::time::Duration::from_millis(EPOCH_INC_INTERVAL));
}
}

fn create_config() -> Config {
let mut config = Config::new();
config.wasm_component_model(true);
config.async_support(true);
config.epoch_interruption(true);

// SIMD support requires SSE3 and SSSE3 on x86_64.
// in docker container, it will cause error
// config.wasm_simd(false);

// const MB: usize = 1 << 20;
// let mut pooling_allocation_config = PoolingAllocationConfig::default();
// pooling_allocation_config.max_core_instance_size(MB);
// pooling_allocation_config.max_memories_per_component(128 * (MB as u32) / (64 * 1024));
let pooling_allocation_config = PoolingAllocationConfig::default();
let mut pooling_allocation_config = PoolingAllocationConfig::default();

// Core wasm programs have 1 memory
pooling_allocation_config.max_memories_per_module(1);
// Total memory size 128 MB, allow for up to memory_limit of linear memory. Wasm pages are 64k

const MB: usize = 1 << 20;
pooling_allocation_config.memory_pages(128 * (MB as u64) / (64 * 1024));

// Core wasm programs have 1 table
pooling_allocation_config.max_tables_per_module(1);

// Some applications create a large number of functions, in particular
// when compiled in debug mode or applications written in swift. Every
// function can end up in the table
pooling_allocation_config.table_elements(98765);

// Maximum number of slots in the pooling allocator to keep "warm", or those
// to keep around to possibly satisfy an affine allocation request or an
// instantiation of a module previously instantiated within the pool.
pooling_allocation_config.max_unused_warm_slots(100);

// Use a large pool, but one smaller than the default of 1000 to avoid runnign out of virtual
// memory space if multiple engines are spun up in a single process. We'll likely want to move
// to the on-demand allocator eventually for most purposes; see
// https://github.com/fastly/Viceroy/issues/255
pooling_allocation_config.total_core_instances(1000);

config.allocation_strategy(InstanceAllocationStrategy::Pooling(
pooling_allocation_config,
));
Expand Down
11 changes: 9 additions & 2 deletions crates/worker-impl/src/hostcall/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ static REDIRECT_MANUAL_POOL: OnceCell<Client> = OnceCell::new();
/// READ_DEFAULT_SIZE is the default read size in once read if not specified
const READ_DEFAULT_SIZE: u32 = 128 * 1024;

fn init_clients() {
/// init_clients is used to init http clients
pub fn init_clients() {
CLIENT_INIT_ONCE.call_once(|| {
REDIRECT_ERROR_POOL
.set(
Expand Down Expand Up @@ -55,21 +56,27 @@ pub struct HttpContext {
body_buffer_map: HashMap<u32, Vec<u8>>,
body_sender_map: HashMap<u32, Sender>,
body_sender_closed: HashMap<u32, bool>,
created_at: tokio::time::Instant,
}

impl HttpContext {
pub fn new() -> Self {
init_clients();
Self {
body_seq_id: AtomicU32::new(1),
body_map: HashMap::new(),
body_stream_map: HashMap::new(),
body_buffer_map: HashMap::new(),
body_sender_map: HashMap::new(),
body_sender_closed: HashMap::new(),
created_at: tokio::time::Instant::now(),
}
}

/// elapsed returns the elapsed time in milliseconds
pub fn elapsed(&self) -> tokio::time::Duration {
self.created_at.elapsed()
}

/// get_http_client returns http client based on redirect policy
pub fn get_http_client(r: RedirectPolicy) -> Client {
match r {
Expand Down
22 changes: 18 additions & 4 deletions crates/worker-impl/src/hostcall/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl Host for HttpContext {
request: Request,
options: RequestOptions,
) -> wasmtime::Result<Result<Response, RequestError>> {
let st = tokio::time::Instant::now();
debug!(method = request.method, uri = request.uri, "Fetch start");

// use client pool to reuse client
Expand Down Expand Up @@ -79,11 +80,14 @@ impl Host for HttpContext {
let mut resp_headers = vec![];
// if body is stream, header should not contain content-length, use Transfer-Encoding:chunk
let mut is_stream = true;
let mut content_length: usize = 0;
for (key, value) in fetch_response.headers() {
if key == "content-length" {
is_stream = false;
content_length = value.to_str().unwrap().parse().unwrap();
}
resp_headers.push((key.to_string(), value.to_str().unwrap().to_string()));
let header_value = String::from_utf8_lossy(value.as_bytes()).to_string();
resp_headers.push((key.to_string(), header_value));
}

let status = fetch_response.status().as_u16();
Expand All @@ -96,17 +100,27 @@ impl Host for HttpContext {
let body = Body::from(body);
self.set_body(0, body)
};
debug!(
method = request.method,
uri = request.uri,
"Fetch set body: {}, is_stream:{}, content_length:{}",
body_handle,
is_stream,
content_length,
);
let resp = Response {
status,
headers: resp_headers,
body: Some(body_handle),
};
let elasped = st.elapsed().as_millis();
debug!(
method = request.method,
uri = request.uri,
"Fetch response: {}, handle={}",
resp.status,
body_handle
status = resp.status,
handle = body_handle,
cost = elasped,
"Fetch done",
);
Ok(Ok(resp))
}
Expand Down
2 changes: 1 addition & 1 deletion crates/worker-impl/src/hostcall/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod fetch;
mod guest;
mod host;

pub use context::HttpContext;
pub use context::{init_clients, HttpContext};
pub use guest::exports::land::http::incoming::{Request, Response};
pub use guest::HttpHandler;
pub use host::HttpService;
Expand Down
3 changes: 2 additions & 1 deletion crates/worker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ mod context;
pub use context::Context;

mod engine;
pub use engine::init_epoch_loop;

mod worker;
pub use worker::Worker;

pub mod pool;

pub mod hostcall;
pub mod hostcall;
3 changes: 3 additions & 0 deletions crates/worker-impl/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Result;
use axum::body::Body;
use tracing::debug;
use wasmtime::UpdateDeadline;
use wasmtime::{
component::{Component, InstancePre, Linker},
Engine, Store,
Expand Down Expand Up @@ -108,6 +109,8 @@ impl Worker {
) -> Result<(hostcall::Response, Body)> {
// create store
let mut store = Store::new(&self.engine, context);
store.set_epoch_deadline(1);
store.epoch_deadline_callback(move |_store| Ok(UpdateDeadline::Yield(1)));

// get exports and call handle_request
let (exports, _instance) =
Expand Down
4 changes: 4 additions & 0 deletions crates/worker-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ pub async fn start(opts: Opts) -> Result<()> {
// set pool's local dir to load module file
land_worker_impl::pool::FILE_DIR.set(opts.dir).unwrap();

// start wasmtime engines epoch calls
land_worker_impl::hostcall::init_clients();
land_worker_impl::init_epoch_loop();

// load default wasm
load_default_wasm().await?;

Expand Down

0 comments on commit f2b0423

Please sign in to comment.