From 9d21d6046446b45f325aabfd9ba4c87e6e32bde1 Mon Sep 17 00:00:00 2001 From: fuxiaohei Date: Fri, 23 Aug 2024 11:32:43 +0800 Subject: [PATCH] sdk: add ExecutionCtx to handle asyncio task --- .github/workflows/tests.yaml | 3 +- Cargo.lock | 36 +++-- Cargo.toml | 3 +- lib/sdk-macro/src/http_handler.rs | 73 +++++++-- lib/sdk-macro/src/lib.rs | 78 +++++++++- lib/sdk/src/execution_ctx.rs | 147 ++++++++++++++++++ lib/sdk/src/http_service.rs | 162 +++++++++++++++++++- lib/sdk/src/lib.rs | 8 +- lib/wasm-host/build.rs | 1 + lib/wasm-host/src/hostcall/asyncio.rs | 170 +++++++++++++++++++++ lib/wasm-host/src/hostcall/context.rs | 74 ++++++--- lib/wasm-host/src/hostcall/mod.rs | 2 + lib/wasm-host/src/worker.rs | 69 ++++++++- lib/wasm-host/wit/deps/asyncio/asyncio.wit | 17 +++ lib/wasm-host/wit/deps/asyncio/context.wit | 8 + lib/wasm-host/wit/deps/asyncio/types.wit | 6 + lib/wasm-host/wit/http-handler.wit | 1 + lib/wasm-host/wit/http-service.wit | 1 + tests/test_examples.sh | 5 + tests/wait-until/Cargo.toml | 13 ++ tests/wait-until/src/lib.rs | 31 ++++ 21 files changed, 843 insertions(+), 65 deletions(-) create mode 100644 lib/sdk/src/execution_ctx.rs create mode 100644 lib/wasm-host/src/hostcall/asyncio.rs create mode 100644 lib/wasm-host/wit/deps/asyncio/asyncio.wit create mode 100644 lib/wasm-host/wit/deps/asyncio/context.wit create mode 100644 lib/wasm-host/wit/deps/asyncio/types.wit create mode 100644 tests/test_examples.sh create mode 100644 tests/wait-until/Cargo.toml create mode 100644 tests/wait-until/src/lib.rs diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 0cb6c97d..9438abc5 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -36,5 +36,4 @@ jobs: shared-key: "build" - name: test hello-wasm run: | - cargo build -p hello-wasm --target wasm32-wasi --release - cargo test -p land-wasm-host --release \ No newline at end of file + bash ./tests/test_examples.sh \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 07971093..ec2305b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,7 +1892,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hello-wasm" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "http", @@ -2306,7 +2306,7 @@ dependencies = [ [[package]] name = "land-cli" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "clap", @@ -2323,7 +2323,7 @@ dependencies = [ [[package]] name = "land-common" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "base64 0.22.1", @@ -2342,7 +2342,7 @@ dependencies = [ [[package]] name = "land-core" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "chrono", @@ -2370,7 +2370,7 @@ dependencies = [ [[package]] name = "land-dao" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "async-trait", @@ -2392,7 +2392,7 @@ dependencies = [ [[package]] name = "land-sdk" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "http", @@ -2404,7 +2404,7 @@ dependencies = [ [[package]] name = "land-sdk-macro" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "http", @@ -2414,7 +2414,7 @@ dependencies = [ [[package]] name = "land-server" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "axum", @@ -2442,7 +2442,7 @@ dependencies = [ [[package]] name = "land-vars" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "chrono", @@ -2454,7 +2454,7 @@ dependencies = [ [[package]] name = "land-wasm-gen" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "tracing", @@ -2467,7 +2467,7 @@ dependencies = [ [[package]] name = "land-wasm-host" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "async-trait", @@ -2489,7 +2489,7 @@ dependencies = [ [[package]] name = "land-wasm-server" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "axum", @@ -2508,7 +2508,7 @@ dependencies = [ [[package]] name = "land-worker" -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" dependencies = [ "anyhow", "clap", @@ -5511,6 +5511,16 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "65dd7eed29412da847b0f78bcec0ac98588165988a8cfe41d4ea1d429f8ccfff" +[[package]] +name = "wait-until" +version = "0.5.0-rc.6" +dependencies = [ + "anyhow", + "http", + "land-sdk", + "wit-bindgen", +] + [[package]] name = "walkdir" version = "2.5.0" diff --git a/Cargo.toml b/Cargo.toml index 49c159b9..2ed9602e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.5.0-rc.2" +version = "0.5.0-rc.6" edition = "2021" authors = ["fuxiaohei "] @@ -18,6 +18,7 @@ members = [ "lib/wasm-host", "lib/wasm-server", "tests/hello-wasm", + "tests/wait-until", ] default-members = ["land-cli"] resolver = "2" diff --git a/lib/sdk-macro/src/http_handler.rs b/lib/sdk-macro/src/http_handler.rs index 3dfd0706..4360a93c 100644 --- a/lib/sdk-macro/src/http_handler.rs +++ b/lib/sdk-macro/src/http_handler.rs @@ -182,6 +182,57 @@ pub mod land { pub mod exports { #[allow(dead_code)] pub mod land { + #[allow(dead_code)] + pub mod asyncio { + #[allow(dead_code, clippy::all)] + pub mod context { + #[used] + #[doc(hidden)] + static __FORCE_SECTION_REF: fn() = super::super::super::super::__link_custom_section_describing_imports; + use super::super::super::super::_rt; + #[doc(hidden)] + #[allow(non_snake_case)] + pub unsafe fn _export_is_pending_cabi() -> i32 { + #[cfg(target_arch = "wasm32")] _rt::run_ctors_once(); + let result0 = T::is_pending(); + match result0 { + true => 1, + false => 0, + } + } + #[doc(hidden)] + #[allow(non_snake_case)] + pub unsafe fn _export_select_cabi() -> i32 { + #[cfg(target_arch = "wasm32")] _rt::run_ctors_once(); + let result0 = T::select(); + match result0 { + true => 1, + false => 0, + } + } + pub trait Guest { + /// is ctx pending + fn is_pending() -> bool; + /// select one task to run, if no task is ready, return false + fn select() -> bool; + } + #[doc(hidden)] + #[macro_export] + macro_rules! __export_land_asyncio_context_cabi { + ($ty:ident with_types_in $($path_to_types:tt)*) => { + const _ : () = { #[export_name = + "land:asyncio/context#is-pending"] unsafe extern "C" fn + export_is_pending() -> i32 { $($path_to_types)*:: + _export_is_pending_cabi::<$ty > () } #[export_name = + "land:asyncio/context#select"] unsafe extern "C" fn + export_select() -> i32 { $($path_to_types)*:: + _export_select_cabi::<$ty > () } }; + }; + } + #[doc(hidden)] + pub use __export_land_asyncio_context_cabi; + } + } #[allow(dead_code)] pub mod http { #[allow(dead_code, clippy::all)] @@ -462,13 +513,16 @@ macro_rules! __export_http_handler_impl { ($ty:ident with_types_in $($path_to_types_root:tt)*) => { $($path_to_types_root)*:: exports::land::http::incoming::__export_land_http_incoming_cabi!($ty - with_types_in $($path_to_types_root)*:: exports::land::http::incoming); const _ : - () = { #[cfg(target_arch = "wasm32")] #[link_section = + with_types_in $($path_to_types_root)*:: exports::land::http::incoming); + $($path_to_types_root)*:: + exports::land::asyncio::context::__export_land_asyncio_context_cabi!($ty + with_types_in $($path_to_types_root)*:: exports::land::asyncio::context); const _ + : () = { #[cfg(target_arch = "wasm32")] #[link_section = "component-type:wit-bindgen:0.30.0:http-handler:imports and exports"] - #[doc(hidden)] pub static __WIT_BINDGEN_COMPONENT_TYPE : [u8; 692] = * + #[doc(hidden)] pub static __WIT_BINDGEN_COMPONENT_TYPE : [u8; 751] = * b"\ -\0asm\x0d\0\x01\0\0\x19\x16wit-component-encoding\x04\0\x07\xb1\x04\x01A\x02\x01\ -A\x06\x01B\x16\x01{\x04\0\x0bstatus-code\x03\0\0\x01s\x04\0\x06method\x03\0\x02\x01\ +\0asm\x0d\0\x01\0\0\x19\x16wit-component-encoding\x04\0\x07\xec\x04\x01A\x02\x01\ +A\x08\x01B\x16\x01{\x04\0\x0bstatus-code\x03\0\0\x01s\x04\0\x06method\x03\0\x02\x01\ o\x02ss\x01p\x04\x04\0\x07headers\x03\0\x05\x01s\x04\0\x03uri\x03\0\x07\x01y\x04\ \0\x0bbody-handle\x03\0\x09\x01k\x0a\x01r\x04\x06method\x03\x03uri\x08\x07header\ s\x06\x04body\x0b\x04\0\x07request\x03\0\x0c\x01r\x03\x06status\x01\x07headers\x06\ @@ -479,10 +533,11 @@ eout\0\0\x0binvalid-url\0\0\x17destination-not-allowed\0\0\x11too-many-requests\ edirect\x13\x04\0\x0frequest-options\x03\0\x14\x03\x01\x0fland:http/types\x05\0\x02\ \x03\0\0\x07request\x02\x03\0\0\x08response\x01B\x06\x02\x03\x02\x01\x01\x04\0\x07\ request\x03\0\0\x02\x03\x02\x01\x02\x04\0\x08response\x03\0\x02\x01@\x01\x03req\x01\ -\0\x03\x04\0\x0ehandle-request\x01\x04\x04\x01\x12land:http/incoming\x05\x03\x04\ -\x01\x18land:worker/http-handler\x04\0\x0b\x12\x01\0\x0chttp-handler\x03\0\0\0G\x09\ -producers\x01\x0cprocessed-by\x02\x0dwit-component\x070.215.0\x10wit-bindgen-rus\ -t\x060.30.0"; +\0\x03\x04\0\x0ehandle-request\x01\x04\x04\x01\x12land:http/incoming\x05\x03\x01\ +B\x03\x01@\0\0\x7f\x04\0\x0ais-pending\x01\0\x04\0\x06select\x01\0\x04\x01\x14la\ +nd:asyncio/context\x05\x04\x04\x01\x18land:worker/http-handler\x04\0\x0b\x12\x01\ +\0\x0chttp-handler\x03\0\0\0G\x09producers\x01\x0cprocessed-by\x02\x0dwit-compon\ +ent\x070.215.0\x10wit-bindgen-rust\x060.30.0"; }; }; } diff --git a/lib/sdk-macro/src/lib.rs b/lib/sdk-macro/src/lib.rs index dade3964..43a9adf8 100644 --- a/lib/sdk-macro/src/lib.rs +++ b/lib/sdk-macro/src/lib.rs @@ -38,6 +38,7 @@ static HTTP_SRC_INCLUDE: AtomicBool = AtomicBool::new(false); pub fn http_main(_attr: TokenStream, item: TokenStream) -> TokenStream { let func = syn::parse_macro_input!(item as syn::ItemFn); let func_name = func.sig.ident.clone(); + let func_args_len = func.sig.inputs.len(); let src_http_handler = if HTTP_SRC_INCLUDE.load(std::sync::atomic::Ordering::Relaxed) { String::new() @@ -52,6 +53,7 @@ pub fn http_main(_attr: TokenStream, item: TokenStream) -> TokenStream { let iface_impl = quote!( use exports::land::http::incoming; + use exports::land::asyncio::context; struct WorkerHttpImpl; @@ -92,6 +94,39 @@ pub fn http_main(_attr: TokenStream, item: TokenStream) -> TokenStream { } } + ); + + // if func args len is 1, it means that the function has one argument, no ExecutionCtx + // so context::Guest should not be used + let mut async_impl = quote!( + impl context::Guest for WorkerHttpImpl { + fn is_pending() -> bool{ + return false + } + + fn select() -> bool { + return false + } + } + ); + if func_args_len == 2 { + async_impl = quote!( + impl context::Guest for WorkerHttpImpl { + fn is_pending() -> bool{ + let ctx = ExecutionCtx::get(); + ctx.is_pending() + } + + fn select() -> bool { + let mut ctx = ExecutionCtx::get(); + ctx.execute(); + ctx.is_pending() + } + } + ); + } + + let mut iface_impl2 = quote!( impl incoming::Guest for WorkerHttpImpl { fn handle_request(req: incoming::Request) -> incoming::Response { #func @@ -120,11 +155,50 @@ pub fn http_main(_attr: TokenStream, item: TokenStream) -> TokenStream { } } } + ); - export!(WorkerHttpImpl); + // if func args len is 2, it means that the function has two arguments, + // the first one is the request, the second one is the context + if func_args_len == 2 { + iface_impl2 = quote!( + impl incoming::Guest for WorkerHttpImpl { + fn handle_request(req: incoming::Request) -> incoming::Response { + #func + + // get execution context + let mut ctx = ExecutionCtx::get(); + // convert wasm_request to sdk_request + let sdk_request: Request = req.try_into().unwrap(); + let sdk_response = match #func_name(sdk_request, ctx){ + Ok(r) => r, + Err(e) => { + land_sdk::http::error_response( + http::StatusCode::INTERNAL_SERVER_ERROR, + e.to_string(), + ) + } + }; + + let sdk_response_body_handle = sdk_response.body().body_handle(); + // convert sdk_response to wasm_response + match sdk_response.try_into() { + Ok(r) => r, + Err(_e) => incoming::Response { + status: 500, + headers: vec![], + body: Some(sdk_response_body_handle), + }, + } + } + } + ); + } + let iface_impl3 = quote!( + export!(WorkerHttpImpl); ); let user_code_comment = "// User code start"; - let value = format!("{iface}\n\n{user_code_comment}\n\n{iface_impl}"); + let value = + format!("{iface}\n\n{user_code_comment}\n\n{iface_impl}\n\n{async_impl}\n\n{iface_impl2}\n\n{iface_impl3}"); value.parse().unwrap() } diff --git a/lib/sdk/src/execution_ctx.rs b/lib/sdk/src/execution_ctx.rs new file mode 100644 index 00000000..10b2cb58 --- /dev/null +++ b/lib/sdk/src/execution_ctx.rs @@ -0,0 +1,147 @@ +use super::http_service::land::asyncio::asyncio; +use std::sync::{Arc, Mutex}; + +type WaitUntilHandler = Box; + +struct Inner { + pub handlers: Vec<(u32, WaitUntilHandler)>, +} + +impl Inner { + pub fn new() -> Self { + Self { handlers: vec![] } + } + pub fn wait_until(&mut self, f: WaitUntilHandler) { + let seq_id = asyncio::new().unwrap(); + self.handlers.push((seq_id, f)); + } + pub fn execute(&mut self) { + let current = self.handlers.pop(); + if let Some((seq_id, handler)) = current { + handler(); + asyncio::finish(seq_id); + } else { + // if nothing pop, check is-pending to wait sleep timer tasks + if asyncio::is_pending() { + asyncio::wait(); + } + } + } + pub fn is_pending(&self) -> bool { + !self.handlers.is_empty() || asyncio::is_pending() + } +} + +/// `ExecutionCtx` is context to handle asyncio tasks +/// It used to add functions after http request done +#[derive(Clone)] +pub struct ExecutionCtx { + inner: Arc>, +} + +impl Default for ExecutionCtx { + fn default() -> Self { + Self::new() + } +} + +lazy_static::lazy_static! { + static ref CTX: Mutex = Mutex::new(ExecutionCtx::new()); +} + +impl ExecutionCtx { + /// `get_ctx` gets global execution ctx instance + pub fn get() -> ExecutionCtx { + CTX.lock().unwrap().clone() + } + /// `new` create new exection ctx instance + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(Inner::new())), + } + } + /// `wait_until` add function to asyncio task + /// after http request done, it will be executed + /// + /// # Example + /// + /// ```no_run + /// use land_sdk::http::{fetch, Body, Error, Request, RequestOptions, Response}; + /// use land_sdk::{http_main, ExecutionCtx}; + + /// #[http_main] + /// pub fn handle_request(req: Request, mut ctx: ExecutionCtx) -> Result { + /// // read uri and method from request + /// let url = req.uri().clone(); + /// let method = req.method().to_string().to_uppercase(); + /// + /// ctx.wait_until(|| { + /// // this fetch behavior will execute after http request done + /// let fetch_request = http::Request::builder() + /// .method("GET") + /// .uri("https://www.rust-lang.org/") + /// .body(Body::from("")) + /// .unwrap(); + /// let fetch_response = fetch(fetch_request, RequestOptions::default()).unwrap(); + /// println!("wait until fetch: {:?}", fetch_response); + /// }); + /// + /// // build response + /// Ok(http::Response::builder() + /// .status(200) + /// .header("X-Request-Url", url.to_string()) + /// .header("X-Request-Method", method) + /// .body(Body::from("Hello Runtime.land!!")) + /// .unwrap()) + /// } + /// ``` + /// + pub fn wait_until(&mut self, f: F) + where + F: Fn() + 'static + Send, + { + self.inner.lock().unwrap().wait_until(Box::new(f)); + } + /// `execute` calls one asyncio task + /// after execute, it will be removed from asyncio task list + /// then it should check is_pending to check if there is any asyncio task pending + pub fn execute(&mut self) { + self.inner.lock().unwrap().execute(); + } + /// `is_pending` check if there is any asyncio task pending + pub fn is_pending(&self) -> bool { + self.inner.lock().unwrap().is_pending() + } + /// `sleep` sleep for `ms` milliseconds in hostcall tokio spawn task + pub fn sleep(&self, ms: u32) -> u32 { + asyncio::sleep(ms).unwrap() + } +} + +/* +#[cfg(test)] +mod execution_ctx_test { + use super::ExecutionCtx; + + fn test() { + let mut ctx = ExecutionCtx::new(); + ctx.wait_until(|| { + println!("sleep 1s..."); + std::thread::sleep(std::time::Duration::from_secs(1)); + println!("sleep 1s done!"); + }); + + ctx.wait_until(|| { + println!("sleep 2s..."); + std::thread::sleep(std::time::Duration::from_secs(2)); + println!("sleep 2s done!"); + }); + + let ctx2 = ctx.clone(); + ctx.wait_until(move || { + ctx2.sleep(1500); + println!("sleep 1.5s done!"); + }); + } +} +*/ diff --git a/lib/sdk/src/http_service.rs b/lib/sdk/src/http_service.rs index f1428729..537da98c 100644 --- a/lib/sdk/src/http_service.rs +++ b/lib/sdk/src/http_service.rs @@ -1,5 +1,148 @@ #[allow(dead_code)] pub mod land { + #[allow(dead_code)] + pub mod asyncio { + #[allow(dead_code, clippy::all)] + pub mod types { + #[used] + #[doc(hidden)] + static __FORCE_SECTION_REF: fn() = super::super::super::__link_custom_section_describing_imports; + /// async io task handle + pub type Handle = u32; + } + #[allow(dead_code, clippy::all)] + pub mod asyncio { + #[used] + #[doc(hidden)] + static __FORCE_SECTION_REF: fn() = super::super::super::__link_custom_section_describing_imports; + use super::super::super::_rt; + pub type Handle = super::super::super::land::asyncio::types::Handle; + #[allow(unused_unsafe, clippy::all)] + /// new async task handle + pub fn new() -> Result { + unsafe { + #[repr(align(4))] + struct RetArea([::core::mem::MaybeUninit; 8]); + let mut ret_area = RetArea([::core::mem::MaybeUninit::uninit(); 8]); + let ptr0 = ret_area.0.as_mut_ptr().cast::(); + #[cfg(target_arch = "wasm32")] + #[link(wasm_import_module = "land:asyncio/asyncio")] + extern "C" { + #[link_name = "new"] + fn wit_import(_: *mut u8); + } + #[cfg(not(target_arch = "wasm32"))] + fn wit_import(_: *mut u8) { + unreachable!() + } + wit_import(ptr0); + let l1 = i32::from(*ptr0.add(0).cast::()); + match l1 { + 0 => { + let e = { + let l2 = *ptr0.add(4).cast::(); + l2 as u32 + }; + Ok(e) + } + 1 => { + let e = (); + Err(e) + } + _ => _rt::invalid_enum_discriminant(), + } + } + } + #[allow(unused_unsafe, clippy::all)] + /// new sleep time task handle and sleep + pub fn sleep(ms: u32) -> Result { + unsafe { + #[repr(align(4))] + struct RetArea([::core::mem::MaybeUninit; 8]); + let mut ret_area = RetArea([::core::mem::MaybeUninit::uninit(); 8]); + let ptr0 = ret_area.0.as_mut_ptr().cast::(); + #[cfg(target_arch = "wasm32")] + #[link(wasm_import_module = "land:asyncio/asyncio")] + extern "C" { + #[link_name = "sleep"] + fn wit_import(_: i32, _: *mut u8); + } + #[cfg(not(target_arch = "wasm32"))] + fn wit_import(_: i32, _: *mut u8) { + unreachable!() + } + wit_import(_rt::as_i32(&ms), ptr0); + let l1 = i32::from(*ptr0.add(0).cast::()); + match l1 { + 0 => { + let e = { + let l2 = *ptr0.add(4).cast::(); + l2 as u32 + }; + Ok(e) + } + 1 => { + let e = (); + Err(e) + } + _ => _rt::invalid_enum_discriminant(), + } + } + } + #[allow(unused_unsafe, clippy::all)] + /// finish async task + pub fn finish(handle: Handle) { + unsafe { + #[cfg(target_arch = "wasm32")] + #[link(wasm_import_module = "land:asyncio/asyncio")] + extern "C" { + #[link_name = "finish"] + fn wit_import(_: i32); + } + #[cfg(not(target_arch = "wasm32"))] + fn wit_import(_: i32) { + unreachable!() + } + wit_import(_rt::as_i32(handle)); + } + } + #[allow(unused_unsafe, clippy::all)] + /// is-pending checks if the task is pending + pub fn is_pending() -> bool { + unsafe { + #[cfg(target_arch = "wasm32")] + #[link(wasm_import_module = "land:asyncio/asyncio")] + extern "C" { + #[link_name = "is-pending"] + fn wit_import() -> i32; + } + #[cfg(not(target_arch = "wasm32"))] + fn wit_import() -> i32 { + unreachable!() + } + let ret = wit_import(); + _rt::bool_lift(ret as u8) + } + } + #[allow(unused_unsafe, clippy::all)] + /// wait for async task to finish + pub fn wait() { + unsafe { + #[cfg(target_arch = "wasm32")] + #[link(wasm_import_module = "land:asyncio/asyncio")] + extern "C" { + #[link_name = "wait"] + fn wit_import(); + } + #[cfg(not(target_arch = "wasm32"))] + fn wit_import() { + unreachable!() + } + wit_import(); + } + } + } + } #[allow(dead_code)] pub mod http { #[allow(dead_code, clippy::all)] @@ -955,9 +1098,9 @@ mod _rt { #[cfg(target_arch = "wasm32")] #[link_section = "component-type:wit-bindgen:0.30.0:http-service-with-all-of-its-exports-removed:encoded world"] #[doc(hidden)] -pub static __WIT_BINDGEN_COMPONENT_TYPE: [u8; 1168] = *b"\ -\0asm\x0d\0\x01\0\0\x19\x16wit-component-encoding\x04\0\x07\xed\x07\x01A\x02\x01\ -A\x0b\x01B\x16\x01{\x04\0\x0bstatus-code\x03\0\0\x01s\x04\0\x06method\x03\0\x02\x01\ +pub static __WIT_BINDGEN_COMPONENT_TYPE: [u8; 1359] = *b"\ +\0asm\x0d\0\x01\0\0\x19\x16wit-component-encoding\x04\0\x07\xac\x09\x01A\x02\x01\ +A\x10\x01B\x16\x01{\x04\0\x0bstatus-code\x03\0\0\x01s\x04\0\x06method\x03\0\x02\x01\ o\x02ss\x01p\x04\x04\0\x07headers\x03\0\x05\x01s\x04\0\x03uri\x03\0\x07\x01y\x04\ \0\x0bbody-handle\x03\0\x09\x01k\x0a\x01r\x04\x06method\x03\x03uri\x08\x07header\ s\x06\x04body\x0b\x04\0\x07request\x03\0\x0c\x01r\x03\x06status\x01\x07headers\x06\ @@ -979,10 +1122,15 @@ B\x0b\x02\x03\x02\x01\x03\x04\0\x07request\x03\0\0\x02\x03\x02\x01\x04\x04\0\x08 response\x03\0\x02\x02\x03\x02\x01\x05\x04\0\x0drequest-error\x03\0\x04\x02\x03\x02\ \x01\x06\x04\0\x0frequest-options\x03\0\x06\x01j\x01\x03\x01\x05\x01@\x02\x03req\ \x01\x07options\x07\0\x08\x04\0\x0csend-request\x01\x09\x03\x01\x12land:http/fet\ -ching\x05\x07\x04\x018land:worker/http-service-with-all-of-its-exports-removed\x04\ -\0\x0b2\x01\0,http-service-with-all-of-its-exports-removed\x03\0\0\0G\x09produce\ -rs\x01\x0cprocessed-by\x02\x0dwit-component\x070.215.0\x10wit-bindgen-rust\x060.\ -30.0"; +ching\x05\x07\x01B\x02\x01y\x04\0\x06handle\x03\0\0\x03\x01\x12land:asyncio/type\ +s\x05\x08\x02\x03\0\x03\x06handle\x01B\x0d\x02\x03\x02\x01\x09\x04\0\x06handle\x03\ +\0\0\x01j\x01\x01\0\x01@\0\0\x02\x04\0\x03new\x01\x03\x01@\x01\x02msy\0\x02\x04\0\ +\x05sleep\x01\x04\x01@\x01\x06handle\x01\x01\0\x04\0\x06finish\x01\x05\x01@\0\0\x7f\ +\x04\0\x0ais-pending\x01\x06\x01@\0\x01\0\x04\0\x04wait\x01\x07\x03\x01\x14land:\ +asyncio/asyncio\x05\x0a\x04\x018land:worker/http-service-with-all-of-its-exports\ +-removed\x04\0\x0b2\x01\0,http-service-with-all-of-its-exports-removed\x03\0\0\0\ +G\x09producers\x01\x0cprocessed-by\x02\x0dwit-component\x070.215.0\x10wit-bindge\ +n-rust\x060.30.0"; #[inline(never)] #[doc(hidden)] pub fn __link_custom_section_describing_imports() { diff --git a/lib/sdk/src/lib.rs b/lib/sdk/src/lib.rs index cfde1ad5..9235f3c3 100644 --- a/lib/sdk/src/lib.rs +++ b/lib/sdk/src/lib.rs @@ -30,10 +30,14 @@ #![warn(missing_docs)] mod body; -mod http_service; +mod execution_ctx; mod fetch; pub mod http; +mod http_service; pub mod router; +/// export execution ctx +pub use execution_ctx::ExecutionCtx; + /// Re-export macro from sdk-macro -pub use land_sdk_macro::http_main; \ No newline at end of file +pub use land_sdk_macro::http_main; diff --git a/lib/wasm-host/build.rs b/lib/wasm-host/build.rs index 0b47737f..edb869e4 100644 --- a/lib/wasm-host/build.rs +++ b/lib/wasm-host/build.rs @@ -5,6 +5,7 @@ fn main() { println!("cargo:rerun-if-changed=build.rs"); println!("cargo:rerun-if-changed=wit/*.wit"); println!("cargo:rerun-if-changed=wit/deps/http/*.wit"); + println!("cargo:rerun-if-changed=wit/deps/asyncio/*.wit"); build_wit_guest_code(); copy_guest_code_to_sdk(); diff --git a/lib/wasm-host/src/hostcall/asyncio.rs b/lib/wasm-host/src/hostcall/asyncio.rs new file mode 100644 index 00000000..b4d82464 --- /dev/null +++ b/lib/wasm-host/src/hostcall/asyncio.rs @@ -0,0 +1,170 @@ +use super::{ + host::land::asyncio::{asyncio, types::Handle}, + HostContext, +}; +use std::{ + collections::HashMap, + sync::{atomic::AtomicU32, Arc}, +}; +use tokio::sync::{Mutex, Notify}; +use tracing::debug; + +#[derive(Clone, Debug, PartialEq)] +pub enum Status { + Pending, + // Running, + // Canceled, + Finished, +} + +#[derive(Clone, Debug)] +struct Task { + status: Status, +} + +#[derive(Debug)] +struct Inner { + pub seq_id: AtomicU32, + pub tasks: HashMap, + pub notify: Arc, +} + +impl Inner { + pub fn new(notify: Arc) -> Self { + Self { + seq_id: AtomicU32::new(1), + tasks: HashMap::new(), + notify, + } + } +} + +#[derive(Clone, Debug)] +pub struct Context { + notify: Arc, + inner: Arc>, +} + +impl Context { + pub fn new() -> Self { + let notify = Arc::new(Notify::new()); + Self { + inner: Arc::new(Mutex::new(Inner::new(notify.clone()))), + notify, + } + } + pub async fn set_finish(&mut self, seq_id: u32) { + let mut inner = self.inner.lock().await; + let task = inner.tasks.get_mut(&seq_id); + if let Some(task) = task { + if task.status == Status::Pending { + // println!("asyncio->set_finish: {}", seq_id); + debug!("asyncio->set_finish: {}", seq_id); + task.status = Status::Finished; + } + // notify to wake up other function to check is_pending + inner.notify.notify_one(); + } + } + /// wait one task done + /// it a task is done, it wakes up to check is_pending + pub async fn wait(&self) { + self.notify.notified().await + } + /// is_pending check if there is any task pending + pub async fn is_pending(&self) -> bool { + let inner = self.inner.lock().await; + return inner + .tasks + .values() + .any(|task| task.status == Status::Pending); + } +} + +#[async_trait::async_trait] +impl asyncio::Host for Context { + async fn new(&mut self) -> Result { + let mut inner = self.inner.lock().await; + let seq_id = inner + .seq_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let task = Task { + status: Status::Pending, + }; + // println!("asyncio->new: {}", seq_id); + debug!("asyncio->new: {}", seq_id); + inner.tasks.insert(seq_id, task); + Ok(seq_id) + } + async fn sleep(&mut self, ms: u32) -> Result { + let seq_id = self.new().await?; + // println!("asyncio->sleep: {}, {}ms", seq_id, ms); + debug!("asyncio->sleep: {}, {}ms", seq_id, ms); + + let mut ctx2 = self.clone(); + tokio::task::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(ms as u64)).await; + ctx2.set_finish(seq_id).await; + // println!("asyncio->sleep->done: {}, {}ms", seq_id, ms); + debug!("asyncio->sleep->done: {}, {}ms", seq_id, ms); + }); + Ok(seq_id) + } + async fn finish(&mut self, handle: u32) { + // println!("asyncio->finish: {}", handle); + debug!("asyncio->finish: {}", handle); + self.set_finish(handle).await; + } + async fn is_pending(&mut self) -> bool { + self.is_pending().await + } + async fn wait(&mut self) { + self.wait().await; + } +} + +#[async_trait::async_trait] +impl asyncio::Host for HostContext { + async fn new(&mut self) -> Result { + self.asyncio_ctx.new().await + } + async fn sleep(&mut self, ms: u32) -> Result { + self.asyncio_ctx.sleep(ms).await + } + async fn finish(&mut self, handle: u32) { + self.asyncio_ctx.finish(handle).await; + } + async fn is_pending(&mut self) -> bool { + self.asyncio_ctx.is_pending().await + } + async fn wait(&mut self) { + self.asyncio_ctx.wait().await; + } +} + +#[cfg(test)] +mod asyncio_test { + use crate::hostcall::{asyncio::Context, host::land::asyncio::asyncio::Host}; + + #[tokio::test] + async fn test_sleep() { + let mut ctx = Context::new(); + let _ = ctx.sleep(1500).await; + let _ = ctx.sleep(1000).await; + let mut index = 0; + loop { + ctx.wait().await; + index += 1; + let is_pending = ctx.is_pending().await; + println!("is_pending: {}, index:{}", is_pending, index); + if index == 2 { + assert!(!is_pending) + } else { + assert!(is_pending) + } + if !is_pending { + break; + } + } + } +} diff --git a/lib/wasm-host/src/hostcall/context.rs b/lib/wasm-host/src/hostcall/context.rs index 3dcb63db..60a019fe 100644 --- a/lib/wasm-host/src/hostcall/context.rs +++ b/lib/wasm-host/src/hostcall/context.rs @@ -1,4 +1,5 @@ use super::{ + asyncio, body_impl::{new_channel, Sender}, host::land::http::body::BodyError, }; @@ -9,21 +10,17 @@ use std::{collections::HashMap, sync::atomic::AtomicU32}; // READ_DEFAULT_SIZE is the default read size in once read if not specified const READ_DEFAULT_SIZE: u32 = 128 * 1024; -pub struct HostContext { - // body related +/// BodyContext is used to store body related data +struct BodyContext { body_seq_id: AtomicU32, body_map: HashMap, body_buffer_map: HashMap>, body_stream_map: HashMap, body_sender_map: HashMap, body_sender_closed: HashMap, - - // elapsed time need - created_at: tokio::time::Instant, } -impl HostContext { - /// new context +impl BodyContext { pub fn new() -> Self { Self { body_seq_id: AtomicU32::new(1), @@ -32,31 +29,54 @@ impl HostContext { body_stream_map: HashMap::new(), body_sender_map: HashMap::new(), body_sender_closed: HashMap::new(), + } + } +} + +pub struct HostContext { + // body related + body_ctx: BodyContext, + + // asyncio context + pub(crate) asyncio_ctx: asyncio::Context, + + // elapsed time need + created_at: tokio::time::Instant, +} + +impl HostContext { + /// new context + pub fn new() -> Self { + Self { + body_ctx: BodyContext::new(), + asyncio_ctx: asyncio::Context::new(), created_at: tokio::time::Instant::now(), } } /// new_body creates new empty body and returns handle id pub fn new_empty_body(&mut self) -> u32 { - self.body_seq_id + self.body_ctx + .body_seq_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst) } /// set_body sets body by id, it will return handle id pub fn set_body(&mut self, id: u32, body: Body) -> u32 { let handle = if id < 1 { - self.body_seq_id + self.body_ctx + .body_seq_id .fetch_add(1, std::sync::atomic::Ordering::SeqCst) } else { id }; - self.body_map.insert(handle, body); + self.body_ctx.body_map.insert(handle, body); handle } /// take_body takes body by id, it will remove body from map pub fn take_body(&mut self, id: u32) -> Option { - self.body_map.remove(&id) + self.body_ctx.body_map.remove(&id) } /// read_body reads body by id @@ -66,23 +86,28 @@ impl HostContext { size: u32, ) -> Result<(Vec, bool), BodyError> { let read_size = if size == 0 { READ_DEFAULT_SIZE } else { size }; - let mut current_buffer = self.body_buffer_map.remove(&handle).unwrap_or_default(); + let mut current_buffer = self + .body_ctx + .body_buffer_map + .remove(&handle) + .unwrap_or_default(); // if buffer is over the read size, split it and return the read part if current_buffer.len() > read_size as usize { let (read, rest) = current_buffer.split_at(read_size as usize); - self.body_buffer_map.insert(handle, rest.to_vec()); + self.body_ctx.body_buffer_map.insert(handle, rest.to_vec()); return Ok((read.to_vec(), false)); } // if handle is Body, move it to BodyStream to read chunk - if let Some(body) = self.body_map.remove(&handle) { + if let Some(body) = self.body_ctx.body_map.remove(&handle) { let stream = body.into_data_stream(); - self.body_stream_map.insert(handle, stream); + self.body_ctx.body_stream_map.insert(handle, stream); } // if handle is not in BodyStream, return InvalidHandle let stream = self + .body_ctx .body_stream_map .get_mut(&handle) .ok_or(BodyError::InvalidHandle)?; @@ -105,7 +130,7 @@ impl HostContext { current_buffer.extend_from_slice(&chunk); if current_buffer.len() > read_size as usize { let (read, rest) = current_buffer.split_at(read_size as usize); - self.body_buffer_map.insert(handle, rest.to_vec()); + self.body_ctx.body_buffer_map.insert(handle, rest.to_vec()); return Ok((read.to_vec(), false)); } } @@ -120,17 +145,18 @@ impl HostContext { /// set_sender_closed makes the body sender is closed. fn set_sender_closed(&mut self, handle: u32) { - if self.body_sender_map.contains_key(&handle) { + if self.body_ctx.body_sender_map.contains_key(&handle) { // call finish to notify receiver - let sender = self.body_sender_map.remove(&handle).unwrap(); + let sender = self.body_ctx.body_sender_map.remove(&handle).unwrap(); let _ = sender.finish(); } - self.body_sender_closed.insert(handle, true); + self.body_ctx.body_sender_closed.insert(handle, true); } /// write_body is used to write data to body pub async fn write_body(&mut self, handle: u32, data: Vec) -> Result { let closed = self + .body_ctx .body_sender_closed .get(&handle) .copied() @@ -141,14 +167,14 @@ impl HostContext { let data_len = data.len() as u64; // if Sender exist, write data to sender - if self.body_sender_map.contains_key(&handle) { - let sender = self.body_sender_map.get_mut(&handle).unwrap(); + if self.body_ctx.body_sender_map.contains_key(&handle) { + let sender = self.body_ctx.body_sender_map.get_mut(&handle).unwrap(); sender.write(Bytes::from(data))?; return Ok(data_len); } // if exist in body map, return ReadOnly error - if self.body_map.contains_key(&handle) { + if self.body_ctx.body_map.contains_key(&handle) { return Err(BodyError::ReadOnly); } @@ -162,7 +188,7 @@ impl HostContext { pub fn new_writable_body(&mut self) -> u32 { let (sender, body) = new_channel(); let handle = self.set_body(0, body); - self.body_sender_map.insert(handle, sender); + self.body_ctx.body_sender_map.insert(handle, sender); handle } @@ -200,7 +226,7 @@ mod context_test { assert!(end == true); } else { if index == 31 { - // last chunk is 101*3%10 = + // last chunk is 101*3%10 = assert_eq!(3, data.len()); } else { // common chunk is 10 diff --git a/lib/wasm-host/src/hostcall/mod.rs b/lib/wasm-host/src/hostcall/mod.rs index 0db91924..2e841bec 100644 --- a/lib/wasm-host/src/hostcall/mod.rs +++ b/lib/wasm-host/src/hostcall/mod.rs @@ -1,3 +1,4 @@ +mod asyncio; mod body; mod body_impl; mod client; @@ -13,3 +14,4 @@ pub use guest::HttpHandlerPre; pub use host::HttpService; impl host::land::http::types::Host for HostContext {} +impl host::land::asyncio::types::Host for HostContext {} diff --git a/lib/wasm-host/src/worker.rs b/lib/wasm-host/src/worker.rs index b32f8ae3..ccd5cbc6 100644 --- a/lib/wasm-host/src/worker.rs +++ b/lib/wasm-host/src/worker.rs @@ -125,6 +125,32 @@ impl Worker { .call_handle_request(&mut store, &req) .await?; let body = store.data_mut().take_body(resp.body.unwrap()).unwrap(); + + // check async task is pending + let is_pending = exports + .land_asyncio_context() + .call_is_pending(&mut store) + .await?; + debug!("async task is pending: {}", is_pending); + if is_pending { + tokio::task::spawn(async move { + let now = tokio::time::Instant::now(); + loop { + let res = exports + .land_asyncio_context() + .call_select(&mut store) + .await + .unwrap(); + debug!("async task is done, res: {:?}", res); + if !res { + break; + } + } + debug!("async task is done, cost:{:.2?}", now.elapsed()); + // println!("async task is done, cost:{:.2?}", now.elapsed()); + }); + } + Ok((resp, body)) } } @@ -134,11 +160,11 @@ mod worker_test { use crate::{hostcall::Request, Context, Worker}; #[tokio::test] - async fn run_wasm() { - let test_hello_file = "../../target/wasm32-wasi/release/hello_wasm.wasm"; - land_wasm_gen::componentize_wasm(&test_hello_file).expect("componentize wasm failed"); + async fn run_hello_wasm() { + let test_wasm_file = "../../target/wasm32-wasi/release/hello_wasm.wasm"; + land_wasm_gen::componentize_wasm(&test_wasm_file).expect("componentize wasm failed"); - let worker = Worker::new(test_hello_file, false) + let worker = Worker::new(test_wasm_file, false) .await .expect("load worker failed"); let request = Request { @@ -159,7 +185,40 @@ mod worker_test { } } let body_handle = resp.1; - let body =axum::body::to_bytes(body_handle,9999).await.unwrap(); + let body = axum::body::to_bytes(body_handle, 9999).await.unwrap(); assert_eq!(body, b"Hello Runtime.land!!".to_vec()); } + + #[tokio::test] + async fn run_wait_until() { + let test_wasm_file = "../../target/wasm32-wasi/release/wait_until.wasm"; + land_wasm_gen::componentize_wasm(&test_wasm_file).expect("componentize wasm failed"); + + let worker = Worker::new(test_wasm_file, false) + .await + .expect("load worker failed"); + let request = Request { + method: "GET".to_string(), + uri: "/".to_string(), + headers: vec![("X-Request-Id".to_string(), "123456".to_string())], + body: None, + }; + let context = Context::new(None); + let resp = worker + .handle_request(request, context) + .await + .expect("handle request failed"); + assert_eq!(resp.0.status, 200); + for (h, v) in resp.0.headers { + if h == "X-Request-Method" { + assert_eq!(v, "GET"); + } + } + let body_handle = resp.1; + let body = axum::body::to_bytes(body_handle, 9999).await.unwrap(); + assert_eq!(body, b"Hello Runtime.land!!".to_vec()); + + // wait until + tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; + } } diff --git a/lib/wasm-host/wit/deps/asyncio/asyncio.wit b/lib/wasm-host/wit/deps/asyncio/asyncio.wit new file mode 100644 index 00000000..4d93227d --- /dev/null +++ b/lib/wasm-host/wit/deps/asyncio/asyncio.wit @@ -0,0 +1,17 @@ +package land:asyncio; + +interface asyncio{ + use types.{handle}; + + // new async task handle + new: func() -> result; + // new sleep time task handle and sleep + sleep: func(ms: u32) -> result; + // finish async task + finish: func(handle: handle); + + // is-pending checks if the task is pending + is-pending: func() -> bool; + // wait for async task to finish + wait: func(); +} \ No newline at end of file diff --git a/lib/wasm-host/wit/deps/asyncio/context.wit b/lib/wasm-host/wit/deps/asyncio/context.wit new file mode 100644 index 00000000..c56c591c --- /dev/null +++ b/lib/wasm-host/wit/deps/asyncio/context.wit @@ -0,0 +1,8 @@ +package land:asyncio; + +interface context { + // is ctx pending + is-pending: func() -> bool; + // select one task to run, if no task is ready, return false + select: func() -> bool; +} \ No newline at end of file diff --git a/lib/wasm-host/wit/deps/asyncio/types.wit b/lib/wasm-host/wit/deps/asyncio/types.wit new file mode 100644 index 00000000..19729c34 --- /dev/null +++ b/lib/wasm-host/wit/deps/asyncio/types.wit @@ -0,0 +1,6 @@ +package land:asyncio; + +interface types{ + // async io task handle + type handle = u32; +} \ No newline at end of file diff --git a/lib/wasm-host/wit/http-handler.wit b/lib/wasm-host/wit/http-handler.wit index 71292cc7..3be22950 100644 --- a/lib/wasm-host/wit/http-handler.wit +++ b/lib/wasm-host/wit/http-handler.wit @@ -2,4 +2,5 @@ package land:worker; world http-handler { export land:http/incoming; + export land:asyncio/context; } \ No newline at end of file diff --git a/lib/wasm-host/wit/http-service.wit b/lib/wasm-host/wit/http-service.wit index ab09b4d3..ed23edb9 100644 --- a/lib/wasm-host/wit/http-service.wit +++ b/lib/wasm-host/wit/http-service.wit @@ -3,4 +3,5 @@ package land:worker; world http-service { import land:http/body; import land:http/fetching; + import land:asyncio/asyncio; } \ No newline at end of file diff --git a/tests/test_examples.sh b/tests/test_examples.sh new file mode 100644 index 00000000..48125e76 --- /dev/null +++ b/tests/test_examples.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +cargo build -p hello-wasm --target wasm32-wasi --release +cargo build -p wait-until --target wasm32-wasi --release +cargo test -p land-wasm-host worker_test -- --nocapture \ No newline at end of file diff --git a/tests/wait-until/Cargo.toml b/tests/wait-until/Cargo.toml new file mode 100644 index 00000000..b9bd278d --- /dev/null +++ b/tests/wait-until/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "wait-until" +version = { workspace = true } +edition = "2021" + +[dependencies] +anyhow = { workspace = true } +http = { workspace = true } +land-sdk = { path = "../../lib/sdk" } +wit-bindgen = { workspace = true } + +[lib] +crate-type = ["cdylib"] diff --git a/tests/wait-until/src/lib.rs b/tests/wait-until/src/lib.rs new file mode 100644 index 00000000..6d9206ce --- /dev/null +++ b/tests/wait-until/src/lib.rs @@ -0,0 +1,31 @@ +use land_sdk::http::{Body, Error, Request, Response}; +use land_sdk::{http_main, ExecutionCtx}; + +#[http_main] +pub fn handle_request(req: Request, mut ctx: ExecutionCtx) -> Result { + // read uri and method from request + let url = req.uri().clone(); + let method = req.method().to_string().to_uppercase(); + + ctx.sleep(1500); + + ctx.wait_until(|| { + println!("sleep 1s..."); + std::thread::sleep(std::time::Duration::from_secs(1)); + println!("sleep 1s done!"); + }); + + ctx.wait_until(|| { + println!("sleep 2s..."); + std::thread::sleep(std::time::Duration::from_secs(2)); + println!("sleep 2s done!"); + }); + + // build response + Ok(http::Response::builder() + .status(200) + .header("X-Request-Url", url.to_string()) + .header("X-Request-Method", method) + .body(Body::from("Hello Runtime.land!!")) + .unwrap()) +}