diff --git a/crates/cpp/tests/symmetric_stream/Cargo.lock b/crates/cpp/tests/symmetric_stream/Cargo.lock index 989d700e1..2d07da533 100644 --- a/crates/cpp/tests/symmetric_stream/Cargo.lock +++ b/crates/cpp/tests/symmetric_stream/Cargo.lock @@ -385,6 +385,7 @@ dependencies = [ "source", "wit-bindgen", "wit-bindgen-rt", + "wit-bindgen-symmetric-rt", ] [[package]] @@ -562,6 +563,14 @@ dependencies = [ "wit-bindgen-rust", ] +[[package]] +name = "wit-bindgen-symmetric-rt" +version = "0.36.0" +dependencies = [ + "futures", + "wit-bindgen", +] + [[package]] name = "wit-component" version = "0.221.2" diff --git a/crates/cpp/tests/symmetric_stream/stream/Cargo.toml b/crates/cpp/tests/symmetric_stream/stream/Cargo.toml index a54e7c8c9..d6bb64454 100644 --- a/crates/cpp/tests/symmetric_stream/stream/Cargo.toml +++ b/crates/cpp/tests/symmetric_stream/stream/Cargo.toml @@ -8,6 +8,7 @@ futures = "0.3.31" source = { version = "0.1.0", path = "../source" } wit-bindgen = { version = "0.36.0", path = "../../../../guest-rust" } wit-bindgen-rt = { version = "0.36.0", path = "../../../../guest-rust/rt" } +wit-bindgen-symmetric-rt = { version = "0.36.0", path = "../../../../symmetric_executor/rust-client" } [lib] crate-type = ["cdylib"] diff --git a/crates/cpp/tests/symmetric_stream/stream/src/stream_world.rs b/crates/cpp/tests/symmetric_stream/stream/src/stream_world.rs index 50b5b432f..4111ca7d7 100644 --- a/crates/cpp/tests/symmetric_stream/stream/src/stream_world.rs +++ b/crates/cpp/tests/symmetric_stream/stream/src/stream_world.rs @@ -12,15 +12,10 @@ pub mod test { super::super::super::__link_custom_section_describing_imports; use super::super::super::_rt; + use _rt::stream_and_future_support::WitStream; impl _rt::stream_and_future_support::StreamPayload for u32{ - fn new() -> u32 { - #[cfg(not(target_arch = "wasm32"))] - { - unreachable!(); - } - - #[cfg(target_arch = "wasm32")] + fn new() -> *mut WitStream { { #[link(wasm_import_module = "[import-payload]test:test/stream-source")] extern "C" { @@ -31,13 +26,7 @@ pub mod test { } } - async fn write(stream: u32, values: &[Self]) -> Option { - #[cfg(not(target_arch = "wasm32"))] - { - unreachable!(); - } - - #[cfg(target_arch = "wasm32")] + async fn write(stream: *mut WitStream, values: &[Self]) -> Option { { let address = values.as_ptr() as _; @@ -53,13 +42,7 @@ pub mod test { } } - async fn read(stream: u32, values: &mut [::core::mem::MaybeUninit::]) -> Option { - #[cfg(not(target_arch = "wasm32"))] - { - unreachable!(); - } - - #[cfg(target_arch = "wasm32")] + async fn read(stream: *mut WitStream, values: &mut [::core::mem::MaybeUninit::]) -> Option { { let address = values.as_mut_ptr() as _; #[link(wasm_import_module = "[import-payload]test:test/stream-source")] @@ -80,13 +63,7 @@ pub mod test { } } - fn cancel_write(writer: u32) { - #[cfg(not(target_arch = "wasm32"))] - { - unreachable!(); - } - - #[cfg(target_arch = "wasm32")] + fn cancel_write(writer: *mut WitStream) { { #[link(wasm_import_module = "[import-payload]test:test/stream-source")] extern "C" { @@ -97,13 +74,7 @@ pub mod test { } } - fn cancel_read(reader: u32) { - #[cfg(not(target_arch = "wasm32"))] - { - unreachable!(); - } - - #[cfg(target_arch = "wasm32")] + fn cancel_read(reader: *mut WitStream) { { #[link(wasm_import_module = "[import-payload]test:test/stream-source")] extern "C" { @@ -114,13 +85,7 @@ pub mod test { } } - fn close_writable(writer: u32) { - #[cfg(not(target_arch = "wasm32"))] - { - unreachable!(); - } - - #[cfg(target_arch = "wasm32")] + fn close_writable(writer: *mut WitStream) { { #[link(wasm_import_module = "[import-payload]test:test/stream-source")] extern "C" { @@ -131,13 +96,7 @@ pub mod test { } } - fn close_readable(reader: u32) { - #[cfg(not(target_arch = "wasm32"))] - { - unreachable!(); - } - - #[cfg(target_arch = "wasm32")] + fn close_readable(reader: *mut WitStream) { { #[link(wasm_import_module = "[import-payload]test:test/stream-source")] extern "C" { @@ -258,8 +217,9 @@ mod _rt { pin::Pin, task::{Context, Poll}, }, - wit_bindgen_rt::async_support::{self, Handle}, + wit_bindgen_symmetric_rt::async_support::{self, Handle}, }; + pub use wit_bindgen_symmetric_rt::async_support::Stream as WitStream; #[doc(hidden)] pub trait FuturePayload: Unpin + Sized + 'static { @@ -585,17 +545,17 @@ mod _rt { #[doc(hidden)] pub trait StreamPayload: Unpin + Sized + 'static { - fn new() -> u32; - async fn write(stream: u32, values: &[Self]) -> Option; - async fn read(stream: u32, values: &mut [MaybeUninit]) -> Option; - fn cancel_write(stream: u32); - fn cancel_read(stream: u32); - fn close_writable(stream: u32); - fn close_readable(stream: u32); + fn new() -> *mut WitStream; + async fn write(stream: *mut WitStream, values: &[Self]) -> Option; + async fn read(stream: *mut WitStream, values: &mut [MaybeUninit]) -> Option; + fn cancel_write(stream: *mut WitStream); + fn cancel_read(stream: *mut WitStream); + fn close_writable(stream: *mut WitStream); + fn close_readable(stream: *mut WitStream); } struct CancelWriteOnDrop { - handle: Option, + handle: Option<*mut WitStream>, _phantom: PhantomData, } @@ -621,7 +581,7 @@ mod _rt { /// Represents the writable end of a Component Model `stream`. pub struct StreamWriter { - handle: u32, + handle: *mut WitStream, future: Option + 'static>>>, _phantom: PhantomData, } @@ -765,7 +725,7 @@ mod _rt { } struct CancelReadOnDrop { - handle: Option, + handle: Option<*mut WitStream>, _phantom: PhantomData, } @@ -791,7 +751,7 @@ mod _rt { /// Represents the readable end of a Component Model `stream`. pub struct StreamReader { - handle: u32, + handle: *mut WitStream, future: Option>> + 'static>>>, _phantom: PhantomData, } @@ -816,7 +776,7 @@ mod _rt { impl StreamReader { #[doc(hidden)] - pub fn from_handle(handle: u32) -> Self { + pub fn from_handle(handle: *mut WitStream) -> Self { async_support::with_entry(handle, |entry| match entry { Entry::Vacant(entry) => { entry.insert(Handle::Read); @@ -843,7 +803,7 @@ mod _rt { } #[doc(hidden)] - pub fn into_handle(self) -> u32 { + pub fn into_handle(self) -> *mut WitStream { async_support::with_entry(self.handle, |entry| match entry { Entry::Vacant(_) => unreachable!(), Entry::Occupied(mut entry) => match entry.get() { diff --git a/crates/symmetric_executor/rust-client/src/async_support.rs b/crates/symmetric_executor/rust-client/src/async_support.rs index 618a9d718..b28753acd 100644 --- a/crates/symmetric_executor/rust-client/src/async_support.rs +++ b/crates/symmetric_executor/rust-client/src/async_support.rs @@ -33,6 +33,29 @@ pub enum Handle { Write, } +#[repr(C)] +pub struct StreamVtable { + // magic value for EOF(-1) and block(-MAX) + // asynchronous function, if this blocks wait for read ready event + pub read: fn(stream: *mut (), buf: *mut (), size: usize) -> isize, + pub close_read: fn(stream: *mut ()), + + pub write: fn(stream: *mut (), buf: *mut (), size: usize) -> isize, + pub close_write: fn(stream: *mut ()), + // post WASI 0.3, CPB + // pub allocate: fn(stream: *mut ()) -> (*mut (), isize), + // pub publish: fn(stream: *mut (), size: usize), +} + +#[repr(C)] +pub struct Stream { + vtable: *const StreamVtable, + read_ready_event_send: *mut (), + write_ready_event_send: *mut (), + read_addr: *mut (), + read_size: usize, +} + #[doc(hidden)] pub fn with_entry(_h: u32, _f: impl FnOnce(hash_map::Entry<'_, u32, Handle>) -> T) -> T { todo!()