diff --git a/Cargo.lock b/Cargo.lock index bbc1fb0b9..8ef19d83e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9710,9 +9710,9 @@ dependencies = [ [[package]] name = "test-r" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f222e9ef5e70ba864934fc58a46e076f5fefdec71b2f8638f3da28470361a270" +checksum = "096af9a5318c22b4f7bcf483eeacac44d831ae3ac78f4fab065be61c25713a10" dependencies = [ "ctor", "test-r-core", @@ -9722,9 +9722,9 @@ dependencies = [ [[package]] name = "test-r-core" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e3e1de4496823b3fd5c0b5411900988f79ca2b9c1e33c8a22a48e65934c65d0" +checksum = "be35981a41cf8814f5cf4c01cebdf1a32b5e3b2c77436db13dc6c6f6669485ab" dependencies = [ "anstream", "anstyle", @@ -9745,9 +9745,9 @@ dependencies = [ [[package]] name = "test-r-macro" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46b98fd96e98960e8b99b0840008563f8c528335307287ff6fa7636ac4d0840c" +checksum = "040d55dfc68c3a12628b74488faa4bf39487b32d506e0b03de0edeb468d152be" dependencies = [ "heck 0.5.0", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index 096234cd8..a4316aa0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -180,7 +180,7 @@ testcontainers-modules = { version = "0.11.4", features = [ "redis", "minio", ] } -test-r = { version = "1.1.0", default-features = true } +test-r = { version = "1.2.0", default-features = true } thiserror = "2.0.6" tokio = { version = "1.42", features = [ "macros", diff --git a/golem-rib/src/function_name.rs b/golem-rib/src/function_name.rs index 41f1b09c4..3cf8efa0c 100644 --- a/golem-rib/src/function_name.rs +++ b/golem-rib/src/function_name.rs @@ -652,6 +652,28 @@ impl ParsedFunctionName { function, }) } + + pub fn is_constructor(&self) -> Option<&str> { + match &self.function { + ParsedFunctionReference::RawResourceConstructor { resource, .. } + | ParsedFunctionReference::IndexedResourceConstructor { resource, .. } => { + Some(resource) + } + _ => None, + } + } + + pub fn is_method(&self) -> Option<&str> { + match &self.function { + ParsedFunctionReference::RawResourceMethod { resource, .. } + | ParsedFunctionReference::IndexedResourceMethod { resource, .. } + | ParsedFunctionReference::RawResourceStaticMethod { resource, .. } + | ParsedFunctionReference::IndexedResourceStaticMethod { resource, .. } => { + Some(resource) + } + _ => None, + } + } } #[cfg(feature = "protobuf")] diff --git a/golem-worker-executor-base/src/durable_host/dynamic_linking.rs b/golem-worker-executor-base/src/durable_host/dynamic_linking.rs new file mode 100644 index 000000000..d47ce3958 --- /dev/null +++ b/golem-worker-executor-base/src/durable_host/dynamic_linking.rs @@ -0,0 +1,460 @@ +use crate::durable_host::wasm_rpc::{UrnExtensions, WasmRpcEntryPayload}; +use crate::durable_host::DurableWorkerCtx; +use crate::services::rpc::RpcDemand; +use crate::workerctx::{DynamicLinking, WorkerCtx}; +use anyhow::anyhow; +use async_trait::async_trait; +use golem_common::model::OwnedWorkerId; +use golem_wasm_rpc::wasmtime::{decode_param, encode_output}; +use golem_wasm_rpc::{HostWasmRpc, Uri, Value, WasmRpcEntry, WitValue}; +use rib::{ParsedFunctionName, ParsedFunctionReference, ParsedFunctionSite}; +use tracing::debug; +use wasmtime::component::types::ComponentItem; +use wasmtime::component::{Component, Linker, Resource, ResourceType, Type, Val}; +use wasmtime::{AsContextMut, Engine, StoreContextMut}; +use wasmtime_wasi::WasiView; + +// TODO: support multiple different dynamic linkers + +#[async_trait] +impl DynamicLinking for DurableWorkerCtx { + fn link( + &mut self, + engine: &Engine, + linker: &mut Linker, + component: &Component, + ) -> anyhow::Result<()> { + let mut root = linker.root(); + + for (name, item) in component.component_type().imports(&engine) { + debug!("Import {name}: {item:?}"); + match item { + ComponentItem::ComponentFunc(_) => { + debug!("MUST LINK COMPONENT FUNC {name}"); + } + ComponentItem::CoreFunc(_) => { + debug!("MUST LINK CORE FUNC {name}"); + } + ComponentItem::Module(_) => { + debug!("MUST LINK MODULE {name}"); + } + ComponentItem::Component(_) => { + debug!("MUST LINK COMPONENT {name}"); + } + ComponentItem::ComponentInstance(ref inst) => { + if name == "auction:auction-stub/stub-auction" + || name == "auction:auction/api" + || name == "rpc:counters-stub/stub-counters" + || name == "rpc:counters/api" + || name == "rpc:ephemeral-stub/stub-ephemeral" + { + debug!("NAME == {name}"); + let mut instance = root.instance(name)?; + + for (ename, eitem) in inst.exports(&engine) { + let name = name.to_owned(); + let ename = ename.to_owned(); + debug!("Instance {name} export {ename}: {eitem:?}"); + + match eitem { + ComponentItem::ComponentFunc(fun) => { + let name2 = name.clone(); + let ename2 = ename.clone(); + instance.func_new_async( + // TODO: instrument async closure + &ename.clone(), + move |store, params, results| { + let name = name2.clone(); + let ename = ename2.clone(); + let param_types: Vec = fun.params().collect(); + let result_types: Vec = fun.results().collect(); + Box::new(async move { + Ctx::dynamic_function_call( + store, + &name, + &ename, + params, + ¶m_types, + results, + &result_types, + ) + .await?; + // TODO: failures here must be somehow handled + Ok(()) + }) + }, + )?; + debug!("LINKED {name} export {ename}"); + } + ComponentItem::CoreFunc(_) => {} + ComponentItem::Module(_) => {} + ComponentItem::Component(component) => { + debug!("MUST LINK COMPONENT {ename} {component:?}"); + } + ComponentItem::ComponentInstance(instance) => { + debug!("MUST LINK COMPONENT INSTANCE {ename} {instance:?}"); + } + ComponentItem::Type(_) => {} + ComponentItem::Resource(resource) => { + if ename != "pollable" { + // TODO: ?? this should be 'if it is not already linked' but not way to check that + debug!("LINKING RESOURCE {ename} {resource:?}"); + instance.resource( + &ename, + ResourceType::host::(), + Ctx::drop_linked_resource, + )?; + } + } + } + } + } else { + debug!("NAME NOT MATCHING: {name}"); + } + } + ComponentItem::Type(_) => {} + ComponentItem::Resource(_) => {} + } + } + + Ok(()) + } + + async fn dynamic_function_call( + mut store: impl AsContextMut + Send, + interface_name: &str, + function_name: &str, + params: &[Val], + param_types: &[Type], + results: &mut [Val], + result_types: &[Type], + ) -> anyhow::Result<()> { + let mut store = store.as_context_mut(); + debug!( + "Instance {interface_name} export {function_name} called XXX {} params {} results", + params.len(), + results.len() + ); + + // TODO: add an enum with the call types (interface stub constructor, resource stub constructor, etc) + // TODO: and detect which one it is based on metadata + type info + + if (interface_name == "auction:auction-stub/stub-auction" + && function_name == "[constructor]api") + || (interface_name == "rpc:counters-stub/stub-counters" + && function_name == "[constructor]api") + { + // Simple stub interface constructor + + let target_worker_urn = params[0].clone(); + debug!("CREATING AUCTION STUB TARGETING WORKER {target_worker_urn:?}"); + // Record([("value", String("urn:worker:2a174805-bdd5-49e1-b1e8-124208123b4a/auction-5f0a94f1-1d14-4b65-8e6c-3a8fa3c24ea9"))]) + + let (remote_worker_id, demand) = + Self::create_rpc_target(&mut store, target_worker_urn).await?; + + let handle = { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + table.push(WasmRpcEntry { + payload: Box::new(WasmRpcEntryPayload::Interface { + demand, + remote_worker_id, + }), + })? + }; + results[0] = Val::Resource(handle.try_into_resource_any(store)?); + } else if (interface_name == "auction:auction-stub/stub-auction" + && function_name == "[constructor]running-auction") + || (interface_name == "rpc:counters-stub/stub-counters" + && function_name == "[constructor]counter") + { + // Resource stub constructor + + // First parameter is the target uri + // Rest of the parameters must be sent to the remote constructor + + let target_worker_urn = params[0].clone(); + let (remote_worker_id, demand) = + Self::create_rpc_target(&mut store, target_worker_urn.clone()).await?; + + // First creating a resource for invoking the constructor (to avoid having to make a special case) + let handle = { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + table.push(WasmRpcEntry { + payload: Box::new(WasmRpcEntryPayload::Interface { + demand, + remote_worker_id, + }), + })? + }; + let temp_handle = handle.rep(); + + let constructor_result = Self::remote_invoke( + &interface_name, + &function_name, + params, + param_types, + &mut store, + handle, + ) + .await?; + + // TODO: extract and clean up + let (resource_uri, resource_id) = if let Value::Tuple(values) = constructor_result { + if values.len() == 1 { + if let Value::Handle { uri, resource_id } = values.into_iter().next().unwrap() { + (Uri { value: uri }, resource_id) + } else { + return Err(anyhow!( + "Invalid constructor result: single handle expected" + )); + } + } else { + return Err(anyhow!( + "Invalid constructor result: single handle expected" + )); + } + } else { + return Err(anyhow!( + "Invalid constructor result: single handle expected" + )); + }; + + let (remote_worker_id, demand) = + Self::create_rpc_target(&mut store, target_worker_urn).await?; + + let handle = { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + + let temp_handle: Resource = Resource::new_own(temp_handle); + table.delete(temp_handle)?; // Removing the temporary handle + + table.push(WasmRpcEntry { + payload: Box::new(WasmRpcEntryPayload::Resource { + demand, + remote_worker_id, + resource_uri, + resource_id, + }), + })? + }; + results[0] = Val::Resource(handle.try_into_resource_any(store)?); + } else if function_name.starts_with("[method]") { + // Simple stub interface method + debug!( + "{function_name} handle={:?}, rest={:?}", + params[0], + params.iter().skip(1).collect::>() + ); + + let handle = match params[0] { + Val::Resource(handle) => handle, + _ => return Err(anyhow!("Invalid handle parameter")), + }; + let handle: Resource = handle.try_into_resource(&mut store)?; + { + let mut wasi = store.data_mut().as_wasi_view(); + let entry = wasi.table().get(&handle)?; + let payload = entry.payload.downcast_ref::().unwrap(); + debug!("CALLING {function_name} ON {}", payload.remote_worker_id()); + } + + let result = Self::remote_invoke( + &interface_name, + &function_name, + params, + param_types, + &mut store, + handle, + ) + .await?; + Self::value_result_to_wasmtime_vals(result, results, result_types, &mut store).await?; + } + + Ok(()) + } + + fn drop_linked_resource(mut store: StoreContextMut<'_, Ctx>, rep: u32) -> anyhow::Result<()> { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + let entry: &WasmRpcEntry = table.get_any_mut(rep).unwrap().downcast_ref().unwrap(); // TODO: error handling + let payload = entry.payload.downcast_ref::().unwrap(); + debug!("DROPPING RESOURCE {payload:?}"); + if let WasmRpcEntryPayload::Resource { .. } = payload { + // TODO: remote drop + } + Ok(()) + } +} + +// TODO: these helpers probably should not be directly living in DurableWorkerCtx +impl DurableWorkerCtx { + async fn remote_invoke( + interface_name: &&str, + function_name: &&str, + params: &[Val], + param_types: &[Type], + store: &mut StoreContextMut<'_, Ctx>, + handle: Resource, + ) -> anyhow::Result { + let stub_function_name = + ParsedFunctionName::parse(&format!("{interface_name}.{{{function_name}}}")) + .map_err(|err| anyhow!(err))?; // TODO: proper error + debug!("STUB FUNCTION NAME: {stub_function_name:?}"); + let target_function_name = ParsedFunctionName { + site: if interface_name.starts_with("auction") { + ParsedFunctionSite::PackagedInterface { + // TODO: this must come from component metadata linking information + namespace: "auction".to_string(), + package: "auction".to_string(), + interface: "api".to_string(), + version: None, + } + } else { + ParsedFunctionSite::PackagedInterface { + namespace: "rpc".to_string(), + package: "counters".to_string(), + interface: "api".to_string(), + version: None, + } + }, + function: if let Some(resource) = stub_function_name.is_constructor() { + ParsedFunctionReference::RawResourceConstructor { + resource: resource.to_string(), + } + } else { + match &stub_function_name.function { + ParsedFunctionReference::RawResourceMethod { resource, method } + if resource == "counter" => + { + ParsedFunctionReference::RawResourceMethod { + resource: resource.to_string(), + method: method + .strip_prefix("blocking-") // TODO: we also have to support the non-blocking variants + .unwrap() + .to_string(), + } + } + _ => ParsedFunctionReference::Function { + function: stub_function_name + .function + .resource_method_name() + .unwrap() // TODO: proper error + .strip_prefix("blocking-") // TODO: we also have to support the non-blocking variants + .unwrap() + .to_string(), + }, + } + }, + }; + + let mut wit_value_params = Vec::new(); + for (param, typ) in params.iter().zip(param_types).skip(1) { + let value: Value = encode_output(param, typ, store.data_mut()) + .await + .map_err(|err| anyhow!(format!("{err:?}")))?; // TODO: proper error + let wit_value: WitValue = value.into(); + wit_value_params.push(wit_value); + } + + debug!( + "CALLING {function_name} as {target_function_name} with parameters {wit_value_params:?}", + ); + + // "auction:auction/api.{initialize}", + let wit_value_result = store + .data_mut() + .invoke_and_await(handle, target_function_name.to_string(), wit_value_params) + .await??; + + debug!("CALLING {function_name} RESULTED IN {:?}", wit_value_result); + + let value_result: Value = wit_value_result.into(); + Ok(value_result) + } + + async fn value_result_to_wasmtime_vals( + value_result: Value, + results: &mut [Val], + result_types: &[Type], + store: &mut StoreContextMut<'_, Ctx>, + ) -> anyhow::Result<()> { + match value_result { + Value::Tuple(values) | Value::Record(values) => { + for (idx, (value, typ)) in values.iter().zip(result_types).enumerate() { + let result = decode_param(&value, &typ, store.data_mut()) + .await + .map_err(|err| anyhow!(format!("{err:?}")))?; // TODO: proper error + results[idx] = result.val; + // TODO: do we have to do something with result.resources_to_drop here? + } + } + _ => { + return Err(anyhow!( + "Unexpected result value {value_result:?}, expected tuple or record" + )); + } + } + + Ok(()) + } +} + +// TODO: these helpers probably should not be directly living in DurableWorkerCtx +impl DurableWorkerCtx { + async fn create_rpc_target( + store: &mut StoreContextMut<'_, Ctx>, + target_worker_urn: Val, + ) -> anyhow::Result<(OwnedWorkerId, Box)> { + let worker_urn = match target_worker_urn { + Val::Record(ref record) => { + let mut target = None; + for (key, val) in record.iter() { + if key == "value" { + match val { + Val::String(s) => { + target = Some(s.clone()); + } + _ => {} + } + } + } + target + } + _ => None, + }; + + let (remote_worker_id, demand) = if let Some(location) = worker_urn { + let uri = Uri { + value: location.clone(), + }; + match uri.parse_as_golem_urn() { + Some((remote_worker_id, None)) => { + let remote_worker_id = store + .data_mut() + .generate_unique_local_worker_id(remote_worker_id) + .await?; + + let remote_worker_id = OwnedWorkerId::new( + &store.data().owned_worker_id().account_id, + &remote_worker_id, + ); + let demand = store.data().rpc().create_demand(&remote_worker_id).await; + (remote_worker_id, demand) + } + _ => { + return Err(anyhow!( + "Invalid URI: {}. Must be urn:worker:component-id/worker-name", + location + )) + } + } + } else { + return Err(anyhow!("Missing or invalid worker URN parameter")); // TODO: more details; + }; + Ok((remote_worker_id, demand)) + } +} diff --git a/golem-worker-executor-base/src/durable_host/mod.rs b/golem-worker-executor-base/src/durable_host/mod.rs index b42fdd74b..dee490a98 100644 --- a/golem-worker-executor-base/src/durable_host/mod.rs +++ b/golem-worker-executor-base/src/durable_host/mod.rs @@ -18,6 +18,7 @@ use crate::durable_host::http::serialized::SerializableHttpRequest; use crate::durable_host::io::{ManagedStdErr, ManagedStdIn, ManagedStdOut}; use crate::durable_host::replay_state::ReplayState; +use crate::durable_host::serialized::SerializableError; use crate::durable_host::sync_helper::{SyncHelper, SyncHelperPermit}; use crate::durable_host::wasm_rpc::UrnExtensions; use crate::error::GolemError; @@ -65,7 +66,6 @@ use golem_common::model::oplog::{ }; use golem_common::model::plugin::{PluginOwner, PluginScope}; use golem_common::model::regions::{DeletedRegions, OplogRegion}; -use golem_common::model::RetryConfig; use golem_common::model::{exports, PluginInstallationId}; use golem_common::model::{ AccountId, ComponentFilePath, ComponentFilePermissions, ComponentFileSystemNode, @@ -74,6 +74,7 @@ use golem_common::model::{ ScheduledAction, SuccessfulUpdateRecord, Timestamp, WorkerEvent, WorkerFilter, WorkerId, WorkerMetadata, WorkerResourceDescription, WorkerStatus, WorkerStatusRecord, }; +use golem_common::model::{RetryConfig, TargetWorkerId}; use golem_common::retries::get_delay; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::wasmtime::ResourceStore; @@ -116,6 +117,7 @@ mod sockets; pub mod wasm_rpc; mod durability; +mod dynamic_linking; mod replay_state; mod sync_helper; @@ -288,6 +290,10 @@ impl DurableWorkerCtx { &self.owned_worker_id.worker_id } + pub fn owned_worker_id(&self) -> &OwnedWorkerId { + &self.owned_worker_id + } + pub fn component_metadata(&self) -> &ComponentMetadata { &self.state.component_metadata } @@ -476,6 +482,32 @@ impl DurableWorkerCtx { } } } + + pub async fn generate_unique_local_worker_id( + &mut self, + remote_worker_id: TargetWorkerId, + ) -> Result { + match remote_worker_id.clone().try_into_worker_id() { + Some(worker_id) => Ok(worker_id), + None => { + let worker_id = Durability::::wrap( + self, + WrappedFunctionType::ReadLocal, + "golem::rpc::wasm-rpc::generate_unique_local_worker_id", + (), + |ctx| { + Box::pin(async move { + ctx.rpc() + .generate_unique_local_worker_id(remote_worker_id) + .await + }) + }, + ) + .await?; + Ok(worker_id) + } + } + } } impl> DurableWorkerCtx { diff --git a/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs b/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs index 6853c11c7..748103e55 100644 --- a/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs +++ b/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs @@ -36,13 +36,14 @@ use golem_common::model::{ }; use golem_common::uri::oss::urn::{WorkerFunctionUrn, WorkerOrFunctionUrn}; use golem_wasm_rpc::golem::rpc::types::{ - FutureInvokeResult, HostFutureInvokeResult, Pollable, Uri, + FutureInvokeResult, HostFutureInvokeResult, Pollable, Uri, WasmRpc, }; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::{ - FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, ValueAndType, WasmRpcEntry, WitValue, + FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, Value, ValueAndType, WasmRpcEntry, WitValue, }; use std::any::Any; +use std::fmt::{Debug, Formatter}; use std::str::FromStr; use std::sync::Arc; use tracing::{error, warn}; @@ -60,14 +61,15 @@ impl HostWasmRpc for DurableWorkerCtx { match location.parse_as_golem_urn() { Some((remote_worker_id, None)) => { - let remote_worker_id = - generate_unique_local_worker_id(self, remote_worker_id).await?; + let remote_worker_id = self + .generate_unique_local_worker_id(remote_worker_id) + .await?; let remote_worker_id = OwnedWorkerId::new(&self.owned_worker_id.account_id, &remote_worker_id); let demand = self.rpc().create_demand(&remote_worker_id).await; let entry = self.table().push(WasmRpcEntry { - payload: Box::new(WasmRpcEntryPayload { + payload: Box::new(WasmRpcEntryPayload::Interface { demand, remote_worker_id, }), @@ -85,7 +87,7 @@ impl HostWasmRpc for DurableWorkerCtx { &mut self, self_: Resource, function_name: String, - function_params: Vec, + mut function_params: Vec, ) -> anyhow::Result> { record_host_function_call("golem::rpc::wasm-rpc", "invoke-and-await"); let args = self.get_arguments().await?; @@ -95,7 +97,26 @@ impl HostWasmRpc for DurableWorkerCtx { let entry = self.table().get(&self_)?; let payload = entry.payload.downcast_ref::().unwrap(); - let remote_worker_id = payload.remote_worker_id.clone(); + let remote_worker_id = payload.remote_worker_id().clone(); + + // TODO: do this in other variants too + match payload { + WasmRpcEntryPayload::Resource { + resource_uri, + resource_id, + .. + } => { + function_params.insert( + 0, + Value::Handle { + uri: resource_uri.value.to_string(), + resource_id: *resource_id, + } + .into(), + ); + } + _ => {} + } let current_idempotency_key = self .get_current_idempotency_key() @@ -226,7 +247,7 @@ impl HostWasmRpc for DurableWorkerCtx { let entry = self.table().get(&self_)?; let payload = entry.payload.downcast_ref::().unwrap(); - let remote_worker_id = payload.remote_worker_id.clone(); + let remote_worker_id = payload.remote_worker_id().clone(); let current_idempotency_key = self .get_current_idempotency_key() @@ -317,7 +338,7 @@ impl HostWasmRpc for DurableWorkerCtx { let entry = self.table().get(&this)?; let payload = entry.payload.downcast_ref::().unwrap(); - let remote_worker_id = payload.remote_worker_id.clone(); + let remote_worker_id = payload.remote_worker_id().clone(); let current_idempotency_key = self .get_current_idempotency_key() @@ -748,32 +769,6 @@ impl HostFutureInvokeResult for DurableWorkerCtx { #[async_trait] impl golem_wasm_rpc::Host for DurableWorkerCtx {} -async fn generate_unique_local_worker_id( - ctx: &mut DurableWorkerCtx, - remote_worker_id: TargetWorkerId, -) -> Result { - match remote_worker_id.clone().try_into_worker_id() { - Some(worker_id) => Ok(worker_id), - None => { - let worker_id = Durability::::wrap( - ctx, - WrappedFunctionType::ReadLocal, - "golem::rpc::wasm-rpc::generate_unique_local_worker_id", - (), - |ctx| { - Box::pin(async move { - ctx.rpc() - .generate_unique_local_worker_id(remote_worker_id) - .await - }) - }, - ) - .await?; - Ok(worker_id) - } - } -} - /// Tries to get a `ValueAndType` representation for the given `WitValue` parameters by querying the latest component metadata for the /// target component. /// If the query fails, or the expected function name is not in its metadata or the number of parameters does not match, then it returns an @@ -805,10 +800,63 @@ async fn try_get_typed_parameters( Vec::new() } -pub struct WasmRpcEntryPayload { - #[allow(dead_code)] - demand: Box, - remote_worker_id: OwnedWorkerId, +pub enum WasmRpcEntryPayload { + Interface { + #[allow(dead_code)] + demand: Box, + remote_worker_id: OwnedWorkerId, + }, + Resource { + #[allow(dead_code)] + demand: Box, + remote_worker_id: OwnedWorkerId, + resource_uri: Uri, + resource_id: u64, + }, +} + +impl Debug for WasmRpcEntryPayload { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Interface { + remote_worker_id, .. + } => f + .debug_struct("Interface") + .field("remote_worker_id", remote_worker_id) + .finish(), + Self::Resource { + remote_worker_id, + resource_uri, + resource_id, + .. + } => f + .debug_struct("Resource") + .field("remote_worker_id", remote_worker_id) + .field("resource_uri", resource_uri) + .field("resource_id", resource_id) + .finish(), + } + } +} + +impl WasmRpcEntryPayload { + pub fn remote_worker_id(&self) -> &OwnedWorkerId { + match self { + Self::Interface { + remote_worker_id, .. + } => remote_worker_id, + Self::Resource { + remote_worker_id, .. + } => remote_worker_id, + } + } + + pub fn demand(&self) -> &Box { + match self { + Self::Interface { demand, .. } => demand, + Self::Resource { demand, .. } => demand, + } + } } pub trait UrnExtensions { diff --git a/golem-worker-executor-base/src/worker.rs b/golem-worker-executor-base/src/worker.rs index a7de8b183..a8480dc97 100644 --- a/golem-worker-executor-base/src/worker.rs +++ b/golem-worker-executor-base/src/worker.rs @@ -20,6 +20,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use crate::durable_host::recover_stderr_logs; +use crate::durable_host::wasm_rpc::{UrnExtensions, WasmRpcEntryPayload}; use crate::error::{GolemError, WorkerOutOfMemory}; use crate::function_result_interpreter::interpret_function_results; use crate::invocation::{find_first_available_function, invoke_worker, InvokeResult}; @@ -30,6 +31,7 @@ use crate::model::{ use crate::services::component::ComponentMetadata; use crate::services::events::Event; use crate::services::oplog::{CommitLevel, Oplog, OplogOps}; +use crate::services::rpc::RpcDemand; use crate::services::worker_event::{WorkerEventService, WorkerEventServiceDefault}; use crate::services::{ All, HasActiveWorkers, HasAll, HasBlobStoreService, HasComponentService, HasConfig, HasEvents, @@ -57,15 +59,17 @@ use golem_common::model::{ }; use golem_common::retries::get_delay; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; -use golem_wasm_rpc::Value; +use golem_wasm_rpc::{Uri, Value, WasmRpcEntry}; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{Mutex, MutexGuard, OwnedSemaphorePermit}; use tokio::task::JoinHandle; use tracing::{debug, error, info, span, warn, Instrument, Level}; -use wasmtime::component::Instance; +use wasmtime::component::types::ComponentItem; +use wasmtime::component::{Instance, Resource, ResourceType, Val}; use wasmtime::{AsContext, Store, UpdateDeadline}; +use wasmtime_wasi::WasiView; /// Represents worker that may be running or suspended. /// @@ -473,6 +477,9 @@ impl Worker { } } + /// Invokes the worker and awaits for a result. + /// + /// Successful result is a `TypeAnnotatedValue` encoding either a tuple or a record. pub async fn invoke_and_await( &self, idempotency_key: IdempotencyKey, @@ -1350,7 +1357,8 @@ impl RunningWorker { ) .await?; - let mut store = Store::new(&parent.engine(), context); + let engine = parent.engine(); + let mut store = Store::new(&engine, context); store.set_epoch_deadline(parent.config().limits.epoch_ticks); let worker_id_clone = worker_metadata.worker_id.clone(); store.epoch_deadline_callback(move |mut store| { @@ -1371,7 +1379,15 @@ impl RunningWorker { store.limiter_async(|ctx| ctx.resource_limiter()); - let instance_pre = parent.linker().instantiate_pre(&component).map_err(|e| { + // TODO MOVE SOMEWHERE ELSE + let mut linker = (*parent.linker()).clone(); // fresh linker + store.data_mut().link(&engine, &mut linker, &component)?; + + // TODO: check parent.linker() is not affected + + // TODO ^^^ + + let instance_pre = linker.instantiate_pre(&component).map_err(|e| { GolemError::worker_creation_failed( parent.owned_worker_id.worker_id(), format!( @@ -1551,13 +1567,13 @@ impl RunningWorker { store, &instance, ) - .await; + .await; match result { Ok(InvokeResult::Succeeded { - output, - consumed_fuel, - }) => { + output, + consumed_fuel, + }) => { let component_metadata = store.as_context().data().component_metadata(); @@ -1577,9 +1593,9 @@ impl RunningWorker { output, function_results, ) - .map_err(|e| GolemError::ValueMismatch { - details: e.join(", "), - }); + .map_err(|e| GolemError::ValueMismatch { + details: e.join(", "), + }); match result { Ok(result) => { @@ -1685,8 +1701,8 @@ impl RunningWorker { } } } - .instrument(span) - .await; + .instrument(span) + .await; if do_break { break; } @@ -1713,7 +1729,7 @@ impl RunningWorker { vec![ "golem:api/save-snapshot@1.1.0.{save}".to_string(), "golem:api/save-snapshot@0.2.0.{save}".to_string(), - ] + ], ) { store.data_mut().begin_call_snapshotting_function(); @@ -1950,9 +1966,9 @@ impl InvocationResult { OplogEntry::Error { error, .. } => { let stderr = recover_stderr_logs(services, owned_worker_id, oplog_idx).await; Err(FailedInvocationResult { trap_type: TrapType::Error(error), stderr }) - }, - OplogEntry::Interrupted { .. } => Err(FailedInvocationResult { trap_type: TrapType::Interrupt(InterruptKind::Interrupt), stderr: "".to_string()}), - OplogEntry::Exited { .. } => Err(FailedInvocationResult { trap_type: TrapType::Exit, stderr: "".to_string()}), + } + OplogEntry::Interrupted { .. } => Err(FailedInvocationResult { trap_type: TrapType::Interrupt(InterruptKind::Interrupt), stderr: "".to_string() }), + OplogEntry::Exited { .. } => Err(FailedInvocationResult { trap_type: TrapType::Exit, stderr: "".to_string() }), _ => panic!("Unexpected oplog entry pointed by invocation result at index {oplog_idx} for {owned_worker_id:?}") }; diff --git a/golem-worker-executor-base/src/workerctx.rs b/golem-worker-executor-base/src/workerctx.rs index a46ac4cf1..c666e2ddd 100644 --- a/golem-worker-executor-base/src/workerctx.rs +++ b/golem-worker-executor-base/src/workerctx.rs @@ -15,12 +15,6 @@ use std::collections::HashSet; use std::sync::{Arc, RwLock, Weak}; -use async_trait::async_trait; -use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; -use golem_wasm_rpc::wasmtime::ResourceStore; -use golem_wasm_rpc::Value; -use wasmtime::{AsContextMut, ResourceLimiterAsync}; - use crate::error::GolemError; use crate::model::{ CurrentResourceLimits, ExecutionStatus, InterruptKind, LastError, ListDirectoryResult, @@ -44,13 +38,22 @@ use crate::services::{ worker_enumeration, HasAll, HasConfig, HasOplog, HasOplogService, HasWorker, }; use crate::worker::{RetryDecision, Worker}; +use async_trait::async_trait; use golem_common::model::component::ComponentOwner; use golem_common::model::oplog::WorkerResourceId; use golem_common::model::plugin::PluginScope; use golem_common::model::{ AccountId, ComponentFilePath, ComponentVersion, IdempotencyKey, OwnedWorkerId, - PluginInstallationId, WorkerId, WorkerMetadata, WorkerStatus, WorkerStatusRecord, + PluginInstallationId, TargetWorkerId, WorkerId, WorkerMetadata, WorkerStatus, + WorkerStatusRecord, }; +use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; +use golem_wasm_rpc::wasmtime::ResourceStore; +use golem_wasm_rpc::Value; +use wasmtime::component::{Component, Linker, Type, Val}; +use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync, StoreContextMut}; +use wasmtime_wasi::WasiView; +use wasmtime_wasi_http::WasiHttpView; /// WorkerCtx is the primary customization and extension point of worker executor. It is the context /// associated with each running worker, and it is responsible for initializing the WASM linker as @@ -66,6 +69,7 @@ pub trait WorkerCtx: + IndexedResourceStore + UpdateManagement + FileSystemReading + + DynamicLinking + Send + Sync + Sized @@ -132,6 +136,9 @@ pub trait WorkerCtx: >, ) -> Result; + fn as_wasi_view(&mut self) -> impl WasiView; + fn as_wasi_http_view(&mut self) -> impl WasiHttpView; + /// Get the public part of the worker context fn get_public_state(&self) -> &Self::PublicState; @@ -144,6 +151,9 @@ pub trait WorkerCtx: /// Get the worker ID associated with this worker context fn worker_id(&self) -> &WorkerId; + /// Get the owned worker ID associated with this worker context + fn owned_worker_id(&self) -> &OwnedWorkerId; + fn component_metadata(&self) -> &ComponentMetadata; /// The WASI exit API can use a special error to exit from the WASM execution. As this depends @@ -157,6 +167,12 @@ pub trait WorkerCtx: /// Gets an interface to the worker-proxy which can direct calls to other worker executors /// in the cluster fn worker_proxy(&self) -> Arc; + + // TODO: where do this belong + async fn generate_unique_local_worker_id( + &mut self, + remote_worker_id: TargetWorkerId, + ) -> Result; } /// The fuel management interface of a worker context is responsible for borrowing and returning @@ -400,3 +416,25 @@ pub trait FileSystemReading { ) -> Result; async fn read_file(&self, path: &ComponentFilePath) -> Result; } + +#[async_trait] +pub trait DynamicLinking { + fn link( + &mut self, + engine: &Engine, + linker: &mut Linker, + component: &Component, + ) -> anyhow::Result<()>; + + async fn dynamic_function_call( + store: impl AsContextMut + Send, + interface_name: &str, + function_name: &str, + params: &[Val], + param_types: &[Type], + results: &mut [Val], + result_types: &[Type] + ) -> anyhow::Result<()>; + + fn drop_linked_resource(store: StoreContextMut<'_, Ctx>, rep: u32) -> anyhow::Result<()>; +} diff --git a/golem-worker-executor-base/tests/common/mod.rs b/golem-worker-executor-base/tests/common/mod.rs index 0d5d83f1d..f15965677 100644 --- a/golem-worker-executor-base/tests/common/mod.rs +++ b/golem-worker-executor-base/tests/common/mod.rs @@ -5,7 +5,9 @@ use std::collections::HashSet; use golem_service_base::service::initial_component_files::InitialComponentFilesService; use golem_service_base::storage::blob::BlobStorage; use golem_wasm_rpc::wasmtime::ResourceStore; -use golem_wasm_rpc::{Uri, Value}; +use golem_wasm_rpc::{ + FutureInvokeResultEntry, HostWasmRpc, RpcError, Uri, Value, WasmRpcEntry, WitValue, +}; use golem_worker_executor_base::services::file_loader::FileLoader; use prometheus::Registry; @@ -18,8 +20,8 @@ use std::sync::{Arc, RwLock, Weak}; use golem_common::model::{ AccountId, ComponentFilePath, ComponentId, ComponentVersion, IdempotencyKey, OwnedWorkerId, - PluginInstallationId, ScanCursor, WorkerFilter, WorkerId, WorkerMetadata, WorkerStatus, - WorkerStatusRecord, + PluginInstallationId, ScanCursor, TargetWorkerId, WorkerFilter, WorkerId, WorkerMetadata, + WorkerStatus, WorkerStatusRecord, }; use golem_service_base::config::{BlobStorageConfig, LocalFileSystemBlobStorageConfig}; use golem_worker_executor_base::error::GolemError; @@ -51,8 +53,8 @@ use golem_worker_executor_base::services::worker_event::WorkerEventService; use golem_worker_executor_base::services::{plugins, All, HasAll, HasConfig, HasOplogService}; use golem_worker_executor_base::wasi_host::create_linker; use golem_worker_executor_base::workerctx::{ - ExternalOperations, FileSystemReading, FuelManagement, IndexedResourceStore, InvocationHooks, - InvocationManagement, StatusManagement, UpdateManagement, WorkerCtx, + DynamicLinking, ExternalOperations, FileSystemReading, FuelManagement, IndexedResourceStore, + InvocationHooks, InvocationManagement, StatusManagement, UpdateManagement, WorkerCtx, }; use golem_worker_executor_base::Bootstrap; @@ -79,6 +81,7 @@ use golem_test_framework::components::shard_manager::ShardManager; use golem_test_framework::components::worker_executor_cluster::WorkerExecutorCluster; use golem_test_framework::config::TestDependencies; use golem_test_framework::dsl::to_worker_metadata; +use golem_wasm_rpc::golem::rpc::types::{FutureInvokeResult, WasmRpc}; use golem_worker_executor_base::preview2::golem; use golem_worker_executor_base::preview2::golem::api1_1_0; use golem_worker_executor_base::services::events::Events; @@ -94,8 +97,10 @@ use golem_worker_executor_base::services::worker_proxy::WorkerProxy; use golem_worker_executor_base::worker::{RetryDecision, Worker}; use tonic::transport::Channel; use tracing::{debug, info}; -use wasmtime::component::{Instance, Linker, ResourceAny}; -use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync}; +use wasmtime::component::{Component, Instance, Linker, Resource, ResourceAny, Type, Val}; +use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync, StoreContextMut}; +use wasmtime_wasi::WasiView; +use wasmtime_wasi_http::WasiHttpView; pub struct TestWorkerExecutor { _join_set: Option>>, @@ -683,6 +688,14 @@ impl WorkerCtx for TestWorkerCtx { Ok(Self { durable_ctx }) } + fn as_wasi_view(&mut self) -> impl WasiView { + self.durable_ctx.as_wasi_view() + } + + fn as_wasi_http_view(&mut self) -> impl WasiHttpView { + self.durable_ctx.as_wasi_http_view() + } + fn get_public_state(&self) -> &Self::PublicState { &self.durable_ctx.public_state } @@ -695,6 +708,10 @@ impl WorkerCtx for TestWorkerCtx { self.durable_ctx.worker_id() } + fn owned_worker_id(&self) -> &OwnedWorkerId { + self.durable_ctx.owned_worker_id() + } + fn component_metadata(&self) -> &ComponentMetadata { self.durable_ctx.component_metadata() } @@ -710,6 +727,15 @@ impl WorkerCtx for TestWorkerCtx { fn worker_proxy(&self) -> Arc { self.durable_ctx.worker_proxy() } + + async fn generate_unique_local_worker_id( + &mut self, + remote_worker_id: TargetWorkerId, + ) -> Result { + self.durable_ctx + .generate_unique_local_worker_id(remote_worker_id) + .await + } } #[async_trait] @@ -766,6 +792,87 @@ impl FileSystemReading for TestWorkerCtx { } } +#[async_trait] +impl HostWasmRpc for TestWorkerCtx { + async fn new(&mut self, location: Uri) -> anyhow::Result> { + self.durable_ctx.new(location).await + } + + async fn invoke_and_await( + &mut self, + self_: Resource, + function_name: String, + function_params: Vec, + ) -> anyhow::Result> { + self.durable_ctx + .invoke_and_await(self_, function_name, function_params) + .await + } + + async fn invoke( + &mut self, + self_: Resource, + function_name: String, + function_params: Vec, + ) -> anyhow::Result> { + self.durable_ctx + .invoke(self_, function_name, function_params) + .await + } + + async fn async_invoke_and_await( + &mut self, + self_: Resource, + function_name: String, + function_params: Vec, + ) -> anyhow::Result> { + self.durable_ctx + .async_invoke_and_await(self_, function_name, function_params) + .await + } + + fn drop(&mut self, rep: Resource) -> anyhow::Result<()> { + self.durable_ctx.drop(rep) + } +} + +#[async_trait] +impl DynamicLinking for TestWorkerCtx { + fn link( + &mut self, + engine: &Engine, + linker: &mut Linker, + component: &Component, + ) -> anyhow::Result<()> { + self.durable_ctx.link(engine, linker, component) + } + + async fn dynamic_function_call( + store: impl AsContextMut + Send, + interface_name: &str, + function_name: &str, + params: &[Val], + param_types: &[Type], + results: &mut [Val], + result_types: &[Type], + ) -> anyhow::Result<()> { + DurableWorkerCtx::::dynamic_function_call( + store, + interface_name, + function_name, + params, + param_types, + results, + result_types + ) + .await + } + + fn drop_linked_resource(store: StoreContextMut<'_, TestWorkerCtx>, rep: u32) -> anyhow::Result<()> { + DurableWorkerCtx::::drop_linked_resource(store, rep) + } +} + #[async_trait] impl Bootstrap for ServerBootstrap { fn create_active_workers( diff --git a/golem-worker-executor-base/tests/lib.rs b/golem-worker-executor-base/tests/lib.rs index e5adf099d..bad3b0556 100644 --- a/golem-worker-executor-base/tests/lib.rs +++ b/golem-worker-executor-base/tests/lib.rs @@ -56,10 +56,13 @@ pub mod keyvalue; pub mod measure_test_component_mem; pub mod observability; pub mod rust_rpc; +pub mod rust_rpc_stubless; pub mod scalability; pub mod transactions; pub mod ts_rpc1; +pub mod ts_rpc1_stubless; pub mod ts_rpc2; +pub mod ts_rpc2_stubless; pub mod wasi; test_r::enable!(); @@ -76,14 +79,17 @@ tag_suite!(wasi, group3); tag_suite!(scalability, group4); tag_suite!(hot_update, group4); tag_suite!(rust_rpc, group4); +tag_suite!(rust_rpc_stubless, group4); tag_suite!(guest_languages2, group5); tag_suite!(ts_rpc1, group6); +tag_suite!(ts_rpc1_stubless, group6); tag_suite!(guest_languages3, group7); tag_suite!(ts_rpc2, group8); +tag_suite!(ts_rpc2_stubless, group8); #[derive(Clone)] pub struct WorkerExecutorPerTestDependencies { diff --git a/golem-worker-executor-base/tests/rust_rpc_stubless.rs b/golem-worker-executor-base/tests/rust_rpc_stubless.rs new file mode 100644 index 000000000..156b2364c --- /dev/null +++ b/golem-worker-executor-base/tests/rust_rpc_stubless.rs @@ -0,0 +1,768 @@ +// Copyright 2024 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use test_r::{inherit_test_dep, test}; + +use crate::common::{start, TestContext}; +use crate::{LastUniqueId, Tracing, WorkerExecutorTestDependencies}; +use assert2::check; +use golem_test_framework::dsl::{worker_error_message, TestDslUnsafe}; +use golem_wasm_rpc::Value; +use std::collections::HashMap; +use std::time::SystemTime; +use tracing::{debug, info}; + +inherit_test_dep!(WorkerExecutorTestDependencies); +inherit_test_dep!(LastUniqueId); +inherit_test_dep!(Tracing); + +#[test] +#[tracing::instrument] +#[ignore] +async fn auction_example_1( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let registry_component_id = executor.store_component("auction_registry").await; + let auction_component_id = executor.store_component("auction").await; + + let mut env = HashMap::new(); + env.insert( + "AUCTION_COMPONENT_ID".to_string(), + auction_component_id.to_string(), + ); + let registry_worker_id = executor + .start_worker_with(®istry_component_id, "auction-registry-1", vec![], env) + .await; + + let _ = executor.log_output(®istry_worker_id).await; + + let expiration = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let create_auction_result = executor + .invoke_and_await( + ®istry_worker_id, + "auction:registry/api.{create-auction}", + vec![ + Value::String("test-auction".to_string()), + Value::String("this is a test".to_string()), + Value::F32(100.0), + Value::U64(expiration + 600), + ], + ) + .await; + + let get_auctions_result = executor + .invoke_and_await( + ®istry_worker_id, + "auction:registry/api.{get-auctions}", + vec![], + ) + .await; + + drop(executor); + + println!("result: {:?}", create_auction_result); + println!("result: {:?}", get_auctions_result); + check!(create_auction_result.is_ok()); + + let auction_id = &create_auction_result.unwrap()[0]; + + check!( + get_auctions_result + == Ok(vec![Value::List(vec![Value::Record(vec![ + auction_id.clone(), + Value::String("test-auction".to_string()), + Value::String("this is a test".to_string()), + Value::F32(100.0), + Value::U64(expiration + 600) + ]),])]) + ); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn auction_example_2( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let registry_component_id = executor.store_component("auction_registry").await; + let auction_component_id = executor.store_component("auction").await; + + let mut env = HashMap::new(); + env.insert( + "AUCTION_COMPONENT_ID".to_string(), + auction_component_id.to_string(), + ); + let registry_worker_id = executor + .start_worker_with(®istry_component_id, "auction-registry-2", vec![], env) + .await; + + let _ = executor.log_output(®istry_worker_id).await; + + let expiration = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let create_auction_result = executor + .invoke_and_await( + ®istry_worker_id, + "auction:registry/api.{create-auction-res}", + vec![ + Value::String("test-auction".to_string()), + Value::String("this is a test".to_string()), + Value::F32(100.0), + Value::U64(expiration + 600), + ], + ) + .await; + + let get_auctions_result = executor + .invoke_and_await( + ®istry_worker_id, + "auction:registry/api.{get-auctions}", + vec![], + ) + .await; + + drop(executor); + + println!("result: {:?}", create_auction_result); + println!("result: {:?}", get_auctions_result); + check!(create_auction_result.is_ok()); + + let auction_id = &create_auction_result.unwrap()[0]; + + check!( + get_auctions_result + == Ok(vec![Value::List(vec![Value::Record(vec![ + auction_id.clone(), + Value::String("test-auction".to_string()), + Value::String("this is a test".to_string()), + Value::F32(100.0), + Value::U64(expiration + 600) + ]),])]) + ); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_1( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component("counters").await; + let caller_component_id = executor.store_component("caller").await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-1", vec![], env) + .await; + + let result = executor + .invoke_and_await(&caller_worker_id, "test1", vec![]) + .await; + + drop(executor); + + check!( + result + == Ok(vec![Value::List(vec![ + Value::Tuple(vec![Value::String("counter3".to_string()), Value::U64(3)]), + Value::Tuple(vec![Value::String("counter2".to_string()), Value::U64(3)]), + Value::Tuple(vec![Value::String("counter1".to_string()), Value::U64(3)]) + ])]) + ); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_2( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component("counters").await; + let caller_component_id = executor.store_component("caller").await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-2", vec![], env) + .await; + + let result1 = executor + .invoke_and_await(&caller_worker_id, "test2", vec![]) + .await; + let result2 = executor + .invoke_and_await(&caller_worker_id, "test2", vec![]) + .await; + + drop(executor); + + check!(result1 == Ok(vec![Value::U64(1)])); + check!(result2 == Ok(vec![Value::U64(2)])); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_2_with_restart( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component("counters").await; + let caller_component_id = executor.store_component("caller").await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-2r", vec![], env) + .await; + + let result1 = executor + .invoke_and_await(&caller_worker_id, "test2", vec![]) + .await; + + drop(executor); + let executor = start(deps, &context).await.unwrap(); + + let result2 = executor + .invoke_and_await(&caller_worker_id, "test2", vec![]) + .await; + + drop(executor); + + check!(result1 == Ok(vec![Value::U64(1)])); + check!(result2 == Ok(vec![Value::U64(2)])); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_3( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component("counters").await; + let caller_component_id = executor.store_component("caller").await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-3", vec![], env) + .await; + + let result1 = executor + .invoke_and_await(&caller_worker_id, "test3", vec![]) + .await; + let result2 = executor + .invoke_and_await(&caller_worker_id, "test3", vec![]) + .await; + + drop(executor); + + check!(result1 == Ok(vec![Value::U64(1)])); + check!(result2 == Ok(vec![Value::U64(2)])); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_3_with_restart( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component("counters").await; + let caller_component_id = executor.store_component("caller").await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-3r", vec![], env) + .await; + + let result1 = executor + .invoke_and_await(&caller_worker_id, "test3", vec![]) + .await; + + drop(executor); + let executor = start(deps, &context).await.unwrap(); + + let result2 = executor + .invoke_and_await(&caller_worker_id, "test3", vec![]) + .await; + + drop(executor); + + check!(result1 == Ok(vec![Value::U64(1)])); + check!(result2 == Ok(vec![Value::U64(2)])); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn context_inheritance( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component("counters").await; + let caller_component_id = executor.store_component("caller").await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + env.insert("TEST_CONFIG".to_string(), "123".to_string()); + let caller_worker_id = executor + .start_worker_with( + &caller_component_id, + "rpc-counters-4", + vec!["a".to_string(), "b".to_string(), "c".to_string()], + env, + ) + .await; + + let result = executor + .invoke_and_await(&caller_worker_id, "test4", vec![]) + .await; + + drop(executor); + + let result = result.unwrap(); + let result_tuple = match &result[0] { + Value::Tuple(result) => result, + _ => panic!("Unexpected result: {:?}", result), + }; + let args = match &result_tuple[0] { + Value::List(args) => args.clone(), + _ => panic!("Unexpected result: {:?}", result), + }; + let mut env = match &result_tuple[1] { + Value::List(env) => env + .clone() + .into_iter() + .map(|value| match value { + Value::Tuple(tuple) => match (&tuple[0], &tuple[1]) { + (Value::String(key), Value::String(value)) => (key.clone(), value.clone()), + _ => panic!("Unexpected result: {:?}", result), + }, + _ => panic!("Unexpected result: {:?}", result), + }) + .collect::>(), + _ => panic!("Unexpected result: {:?}", result), + }; + env.sort_by_key(|(k, _v)| k.clone()); + + check!( + args == vec![ + Value::String("a".to_string()), + Value::String("b".to_string()), + Value::String("c".to_string()) + ] + ); + check!( + env == vec![ + ( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string() + ), + ( + "GOLEM_COMPONENT_ID".to_string(), + counters_component_id.to_string() + ), + ("GOLEM_COMPONENT_VERSION".to_string(), "0".to_string()), + ( + "GOLEM_WORKER_NAME".to_string(), + "counters_test4".to_string() + ), + ("TEST_CONFIG".to_string(), "123".to_string()) + ] + ); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_5( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component("counters").await; + let caller_component_id = executor.store_component("caller").await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-5", vec![], env) + .await; + + executor.log_output(&caller_worker_id).await; + + let result = executor + .invoke_and_await(&caller_worker_id, "test5", vec![]) + .await; + + drop(executor); + + check!( + result + == Ok(vec![Value::List(vec![ + Value::U64(3), + Value::U64(3), + Value::U64(3), + ]),]) + ); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_5_with_restart( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + // using store_unique_component to avoid collision with counter_resource_test_5 + let counters_component_id = executor.store_unique_component("counters").await; + let caller_component_id = executor.store_unique_component("caller").await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-5r", vec![], env) + .await; + + executor.log_output(&caller_worker_id).await; + + let result1 = executor + .invoke_and_await(&caller_worker_id, "test5", vec![]) + .await; + + drop(executor); + + let executor = start(deps, &context).await.unwrap(); + + let result2 = executor + .invoke_and_await(&caller_worker_id, "test5", vec![]) + .await; + + drop(executor); + + check!( + result1 + == Ok(vec![Value::List(vec![ + Value::U64(3), + Value::U64(3), + Value::U64(3), + ]),]) + ); + // The second call has the same result because new resources are created within test5() + check!( + result2 + == Ok(vec![Value::List(vec![ + Value::U64(3), + Value::U64(3), + Value::U64(3), + ]),]), + ); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn wasm_rpc_bug_32_test( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component("counters").await; + let caller_component_id = executor.store_component("caller").await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-bug32", vec![], env) + .await; + + let result = executor + .invoke_and_await( + &caller_worker_id, + "bug-wasm-rpc-i32", + vec![Value::Variant { + case_idx: 0, + case_value: None, + }], + ) + .await; + + drop(executor); + + check!( + result + == Ok(vec![Value::Variant { + case_idx: 0, + case_value: None, + }]) + ); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn error_message_invalid_uri( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let registry_component_id = executor.store_component("auction_registry").await; + + let mut env = HashMap::new(); + env.insert( + "AUCTION_COMPONENT_ID".to_string(), + "invalid-component-id".to_string(), + ); + let registry_worker_id = executor + .start_worker_with( + ®istry_component_id, + "auction-registry-invalid-uri", + vec![], + env, + ) + .await; + + let _ = executor.log_output(®istry_worker_id).await; + + let expiration = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let create_auction_result = executor + .invoke_and_await( + ®istry_worker_id, + "auction:registry/api.{create-auction}", + vec![ + Value::String("test-auction".to_string()), + Value::String("this is a test".to_string()), + Value::F32(100.0), + Value::U64(expiration + 600), + ], + ) + .await; + + drop(executor); + + debug!( + "Error message: {}", + worker_error_message(&create_auction_result.clone().err().unwrap()) + ); + check!(worker_error_message(&create_auction_result.err().unwrap()) + .contains("Invalid URI: urn:worker:invalid-component-id")); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn error_message_non_existing_target_component( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let registry_component_id = executor.store_component("auction_registry").await; + + let mut env = HashMap::new(); + env.insert( + "AUCTION_COMPONENT_ID".to_string(), + "FB2F8E32-7B94-4699-B6EC-82BCE80FF9F2".to_string(), // valid UUID, but not an existing component + ); + let registry_worker_id = executor + .start_worker_with( + ®istry_component_id, + "auction-registry-non-existing-target", + vec![], + env, + ) + .await; + + let _ = executor.log_output(®istry_worker_id).await; + + let expiration = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let create_auction_result = executor + .invoke_and_await( + ®istry_worker_id, + "auction:registry/api.{create-auction}", + vec![ + Value::String("test-auction".to_string()), + Value::String("this is a test".to_string()), + Value::F32(100.0), + Value::U64(expiration + 600), + ], + ) + .await; + + drop(executor); + + check!(worker_error_message(&create_auction_result.err().unwrap()) + .contains("Could not find any component with the given id")); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn ephemeral_worker_invocation_via_rpc1( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = TestContext::new(last_unique_id); + let executor = start(deps, &context).await.unwrap(); + + let ephemeral_component_id = executor.store_ephemeral_component("ephemeral").await; + let caller_component_id = executor.store_component("caller").await; + + let mut env = HashMap::new(); + env.insert( + "EPHEMERAL_COMPONENT_ID".to_string(), + ephemeral_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-ephemeral-1", vec![], env) + .await; + + let result = executor + .invoke_and_await(&caller_worker_id, "ephemeral-test1", vec![]) + .await + .unwrap(); + + drop(executor); + + info!("result is: {result:?}"); + + match result.into_iter().next() { + Some(Value::List(items)) => { + let pairs = items + .into_iter() + .filter_map(|item| match item { + Value::Tuple(values) if values.len() == 2 => { + let mut iter = values.into_iter(); + let key = iter.next(); + let value = iter.next(); + match (key, value) { + (Some(Value::String(key)), Some(Value::String(value))) => { + Some((key, value)) + } + _ => None, + } + } + _ => None, + }) + .collect::>(); + + check!(pairs.len() == 3); + let name1 = &pairs[0].0; + let value1 = &pairs[0].1; + let name2 = &pairs[1].0; + let value2 = &pairs[1].1; + let name3 = &pairs[2].0; + let value3 = &pairs[2].1; + + check!(name1 == name2); + check!(name2 != name3); + check!(value1 != value2); + check!(value2 != value3); + } + _ => panic!("Unexpected result value"), + } +} diff --git a/golem-worker-executor-base/tests/ts_rpc1_stubless.rs b/golem-worker-executor-base/tests/ts_rpc1_stubless.rs new file mode 100644 index 000000000..9c20fac73 --- /dev/null +++ b/golem-worker-executor-base/tests/ts_rpc1_stubless.rs @@ -0,0 +1,191 @@ +// Copyright 2024 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use test_r::{inherit_test_dep, test}; + +use crate::{common, LastUniqueId, Tracing, WorkerExecutorTestDependencies}; +use assert2::check; +use golem_test_framework::dsl::TestDslUnsafe; +use golem_wasm_rpc::Value; +use std::collections::HashMap; + +inherit_test_dep!(WorkerExecutorTestDependencies); +inherit_test_dep!(LastUniqueId); +inherit_test_dep!(Tracing); + +static COUNTER_COMPONENT_NAME: &str = "counter-ts"; +static CALLER_COMPONENT_NAME: &str = "caller-ts"; + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_1( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = common::TestContext::new(last_unique_id); + let executor = common::start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component(COUNTER_COMPONENT_NAME).await; + let caller_component_id = executor.store_component(CALLER_COMPONENT_NAME).await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-1", vec![], env) + .await; + + let result1 = executor + .invoke_and_await(&caller_worker_id, "test1", vec![]) + .await; + let result2 = executor + .invoke_and_await(&caller_worker_id, "test1", vec![]) + .await; + + drop(executor); + + check!(result1 == Ok(vec![Value::U64(1)])); + check!(result2 == Ok(vec![Value::U64(2)])); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_1_with_restart( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = common::TestContext::new(last_unique_id); + let executor = common::start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component(COUNTER_COMPONENT_NAME).await; + let caller_component_id = executor.store_component(CALLER_COMPONENT_NAME).await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-1r", vec![], env) + .await; + + let result1 = executor + .invoke_and_await(&caller_worker_id, "test1", vec![]) + .await; + + drop(executor); + let executor = common::start(deps, &context).await.unwrap(); + + let result2 = executor + .invoke_and_await(&caller_worker_id, "test1", vec![]) + .await; + + drop(executor); + + check!(result1 == Ok(vec![Value::U64(1)])); + check!(result2 == Ok(vec![Value::U64(2)])); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn context_inheritance( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = common::TestContext::new(last_unique_id); + let executor = common::start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component(COUNTER_COMPONENT_NAME).await; + let caller_component_id = executor.store_component(CALLER_COMPONENT_NAME).await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + env.insert("TEST_CONFIG".to_string(), "123".to_string()); + let caller_worker_id = executor + .start_worker_with( + &caller_component_id, + "rpc-counters-4", + vec!["a".to_string(), "b".to_string(), "c".to_string()], + env, + ) + .await; + + let result = executor + .invoke_and_await(&caller_worker_id, "test3", vec![]) + .await; + + drop(executor); + + let result = result.unwrap(); + let result_tuple = match &result[0] { + Value::Tuple(result) => result, + _ => panic!("Unexpected result: {:?}", result), + }; + let args = match &result_tuple[0] { + Value::List(args) => args.clone(), + _ => panic!("Unexpected result: {:?}", result), + }; + let mut env = match &result_tuple[1] { + Value::List(env) => env + .clone() + .into_iter() + .map(|value| match value { + Value::Tuple(tuple) => match (&tuple[0], &tuple[1]) { + (Value::String(key), Value::String(value)) => (key.clone(), value.clone()), + _ => panic!("Unexpected result: {:?}", result), + }, + _ => panic!("Unexpected result: {:?}", result), + }) + .collect::>(), + _ => panic!("Unexpected result: {:?}", result), + }; + env.sort_by_key(|(k, _v)| k.clone()); + + check!( + args == vec![ + Value::String("a".to_string()), + Value::String("b".to_string()), + Value::String("c".to_string()) + ] + ); + check!( + env == vec![ + ( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string() + ), + ( + "GOLEM_COMPONENT_ID".to_string(), + counters_component_id.to_string() + ), + ("GOLEM_COMPONENT_VERSION".to_string(), "0".to_string()), + ( + "GOLEM_WORKER_NAME".to_string(), + "counters_test4".to_string() + ), + ("TEST_CONFIG".to_string(), "123".to_string()) + ] + ); +} diff --git a/golem-worker-executor-base/tests/ts_rpc2_stubless.rs b/golem-worker-executor-base/tests/ts_rpc2_stubless.rs new file mode 100644 index 000000000..18f27e2af --- /dev/null +++ b/golem-worker-executor-base/tests/ts_rpc2_stubless.rs @@ -0,0 +1,104 @@ +// Copyright 2024 Golem Cloud +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use test_r::{inherit_test_dep, test}; + +use crate::{common, LastUniqueId, Tracing, WorkerExecutorTestDependencies}; +use assert2::check; +use golem_test_framework::dsl::TestDslUnsafe; +use golem_wasm_rpc::Value; +use std::collections::HashMap; + +inherit_test_dep!(WorkerExecutorTestDependencies); +inherit_test_dep!(LastUniqueId); +inherit_test_dep!(Tracing); + +static COUNTER_COMPONENT_NAME: &str = "counter-ts"; +static CALLER_COMPONENT_NAME: &str = "caller-ts"; + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_2( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = common::TestContext::new(last_unique_id); + let executor = common::start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component(COUNTER_COMPONENT_NAME).await; + let caller_component_id = executor.store_component(CALLER_COMPONENT_NAME).await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-2", vec![], env) + .await; + + let result1 = executor + .invoke_and_await(&caller_worker_id, "test2", vec![]) + .await; + let result2 = executor + .invoke_and_await(&caller_worker_id, "test2", vec![]) + .await; + + drop(executor); + + check!(result1 == Ok(vec![Value::U64(1)])); + check!(result2 == Ok(vec![Value::U64(2)])); +} + +#[test] +#[tracing::instrument] +#[ignore] +async fn counter_resource_test_2_with_restart( + last_unique_id: &LastUniqueId, + deps: &WorkerExecutorTestDependencies, + _tracing: &Tracing, +) { + let context = common::TestContext::new(last_unique_id); + let executor = common::start(deps, &context).await.unwrap(); + + let counters_component_id = executor.store_component(COUNTER_COMPONENT_NAME).await; + let caller_component_id = executor.store_component(CALLER_COMPONENT_NAME).await; + + let mut env = HashMap::new(); + env.insert( + "COUNTERS_COMPONENT_ID".to_string(), + counters_component_id.to_string(), + ); + let caller_worker_id = executor + .start_worker_with(&caller_component_id, "rpc-counters-2r", vec![], env) + .await; + + let result1 = executor + .invoke_and_await(&caller_worker_id, "test2", vec![]) + .await; + + drop(executor); + let executor = common::start(deps, &context).await.unwrap(); + + let result2 = executor + .invoke_and_await(&caller_worker_id, "test2", vec![]) + .await; + + drop(executor); + + check!(result1 == Ok(vec![Value::U64(1)])); + check!(result2 == Ok(vec![Value::U64(2)])); +} diff --git a/test-components/auction-example/Cargo.lock b/test-components/auction-example/Cargo.lock index 670f07b5d..a8ac801e6 100644 --- a/test-components/auction-example/Cargo.lock +++ b/test-components/auction-example/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "aho-corasick" @@ -63,6 +63,38 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "camino" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e35af189006b9c0f00a064685c727031e3ed2d8020f7ba284d78cc2671bd36ea" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d886547e41f740c616ae73108f6eb70afe6d940c7bc697cb30f13daec073037" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -114,11 +146,34 @@ dependencies = [ "wasi", ] +[[package]] +name = "git-version" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad568aa3db0fcbc81f2f116137f263d7304f512a1209b35b85150d3ef88ad19" +dependencies = [ + "git-version-macro", +] + +[[package]] +name = "git-version-macro" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53010ccb100b96a67bc32c0175f0ed1426b31b655d562898e57325f81c023ac0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "golem-wasm-rpc" version = "0.0.0" dependencies = [ + "cargo_metadata", + "git-version", "prost-build", + "uuid", "wit-bindgen-rt", ] @@ -153,6 +208,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" + [[package]] name = "libc" version = "0.2.153" @@ -217,9 +278,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.78" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -367,31 +428,58 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "semver" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cb6eb87a131f756572d7fb904f6e7b68633f09cca868c5df1c4b8d1a694bbba" +dependencies = [ + "serde", +] + [[package]] name = "serde" -version = "1.0.196" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.196" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" dependencies = [ "proc-macro2", "quote", "syn", ] +[[package]] +name = "serde_json" +version = "1.0.133" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + [[package]] name = "syn" -version = "2.0.48" +version = "2.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" +checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31" dependencies = [ "proc-macro2", "quote", @@ -411,6 +499,26 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.12" @@ -419,9 +527,9 @@ checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "uuid" -version = "1.7.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "getrandom", "serde", diff --git a/test-components/auction-example/build.sh b/test-components/auction-example/build.sh index 5300fb988..86ddb0cbe 100755 --- a/test-components/auction-example/build.sh +++ b/test-components/auction-example/build.sh @@ -7,4 +7,5 @@ cp target/wasm32-wasi/release/auction_stub.wasm target/wasm32-wasi/release/aucti wasm-tools compose -v target/wasm32-wasi/release/auction_registry.wasm -o target/wasm32-wasi/release/auction_registry_composed.wasm cp target/wasm32-wasi/release/auction_registry_composed.wasm .. +cp target/wasm32-wasi/release/auction_registry.wasm .. cp target/wasm32-wasi/release/auction.wasm .. \ No newline at end of file diff --git a/test-components/auction.wasm b/test-components/auction.wasm index 9be00a485..c716cfabb 100755 Binary files a/test-components/auction.wasm and b/test-components/auction.wasm differ diff --git a/test-components/auction_registry.wasm b/test-components/auction_registry.wasm new file mode 100644 index 000000000..fab443ece Binary files /dev/null and b/test-components/auction_registry.wasm differ diff --git a/test-components/auction_registry_composed.wasm b/test-components/auction_registry_composed.wasm index b3d4d50b5..fb32c57be 100644 Binary files a/test-components/auction_registry_composed.wasm and b/test-components/auction_registry_composed.wasm differ diff --git a/test-components/caller-ts.wasm b/test-components/caller-ts.wasm new file mode 100644 index 000000000..7a0db4219 Binary files /dev/null and b/test-components/caller-ts.wasm differ diff --git a/test-components/caller.wasm b/test-components/caller.wasm new file mode 100644 index 000000000..57794b168 Binary files /dev/null and b/test-components/caller.wasm differ diff --git a/test-components/caller_composed.wasm b/test-components/caller_composed.wasm index ed5f61f9f..4b8a77822 100644 Binary files a/test-components/caller_composed.wasm and b/test-components/caller_composed.wasm differ diff --git a/test-components/counters.wasm b/test-components/counters.wasm index d58151ac5..2a5fdb5ba 100755 Binary files a/test-components/counters.wasm and b/test-components/counters.wasm differ diff --git a/test-components/ephemeral.wasm b/test-components/ephemeral.wasm index 469169168..8321baf71 100644 Binary files a/test-components/ephemeral.wasm and b/test-components/ephemeral.wasm differ diff --git a/test-components/rpc/Cargo.lock b/test-components/rpc/Cargo.lock index d862db634..4c9841747 100644 --- a/test-components/rpc/Cargo.lock +++ b/test-components/rpc/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "aho-corasick" @@ -159,11 +159,31 @@ dependencies = [ "wasi", ] +[[package]] +name = "git-version" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad568aa3db0fcbc81f2f116137f263d7304f512a1209b35b85150d3ef88ad19" +dependencies = [ + "git-version-macro", +] + +[[package]] +name = "git-version-macro" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53010ccb100b96a67bc32c0175f0ed1426b31b655d562898e57325f81c023ac0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "golem-rust" -version = "1.1.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ef0670496530779ccc42076031e1ebce93aee07129b1c410e31bee577be85c" +checksum = "c967eb388fb81f9b9f4df5d5b6634de803f21cd410c1bf687202794a4fbc0267" dependencies = [ "golem-rust-macro", "serde", @@ -174,9 +194,9 @@ dependencies = [ [[package]] name = "golem-rust-macro" -version = "1.1.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4eb977d485e1d5d0d2fc7db022689b740615639867f932702f7d176f786a8e0" +checksum = "1bb87f831cfe4371427c63f5f4cabcc3bae1b66974c8fbcf22be9274fee3a7d1" dependencies = [ "heck", "proc-macro2", @@ -189,7 +209,9 @@ name = "golem-wasm-rpc" version = "0.0.0" dependencies = [ "cargo_metadata", + "git-version", "prost-build", + "uuid", "wit-bindgen-rt", ] diff --git a/test-components/rpc/build-debug.sh b/test-components/rpc/build-debug.sh index a1d8feb59..3348f1596 100755 --- a/test-components/rpc/build-debug.sh +++ b/test-components/rpc/build-debug.sh @@ -14,5 +14,6 @@ cp target/wasm32-wasi/debug/ephemeral_stub.wasm target/wasm32-wasi/debug/rpc:eph wasm-tools compose -v target/wasm32-wasi/debug/caller_composed1.wasm -o target/wasm32-wasi/debug/caller_composed.wasm cp target/wasm32-wasi/debug/caller_composed.wasm .. +cp target/wasm32-wasi/debug/caller.wasm .. cp target/wasm32-wasi/debug/counters.wasm .. cp target/wasm32-wasi/debug/ephemeral.wasm .. diff --git a/test-components/rpc/build.sh b/test-components/rpc/build.sh index 5cb4ff2f5..9b77ef5f3 100755 --- a/test-components/rpc/build.sh +++ b/test-components/rpc/build.sh @@ -14,5 +14,6 @@ cp target/wasm32-wasi/release/ephemeral_stub.wasm target/wasm32-wasi/release/rpc wasm-tools compose -v target/wasm32-wasi/release/caller_composed1.wasm -o target/wasm32-wasi/release/caller_composed.wasm cp target/wasm32-wasi/release/caller_composed.wasm .. +cp target/wasm32-wasi/release/caller.wasm .. cp target/wasm32-wasi/release/counters.wasm .. cp target/wasm32-wasi/release/ephemeral.wasm .. diff --git a/wasm-ast/src/analysis/mod.rs b/wasm-ast/src/analysis/mod.rs index 66e58fb93..18095ef1f 100644 --- a/wasm-ast/src/analysis/mod.rs +++ b/wasm-ast/src/analysis/mod.rs @@ -617,9 +617,11 @@ impl AnalysisContext { Ok((Mrc::new(ComponentSection::Type(component_type.clone())), self.clone())) } InstanceTypeDeclaration::Alias(alias) => { - let component_idx = self.component_stack.last().unwrap().component_idx.unwrap(); + let component_idx = self.component_stack.last().expect("Component stack is empty").component_idx.unwrap_or_default(); let new_ctx = self.push_component(self.get_component(), component_idx); // Emulating an inner scope by duplicating the current component on the stack (TODO: refactor this) + // Note: because we not in an inner component, but an inner instance declaration and the current analysis stack + // does not have this concept. new_ctx.follow_redirects(Mrc::new(ComponentSection::Alias(alias.clone()))) } InstanceTypeDeclaration::Export { .. } => { diff --git a/wasm-ast/tests/exports.rs b/wasm-ast/tests/exports.rs index 899c8071c..733c5c2dc 100644 --- a/wasm-ast/tests/exports.rs +++ b/wasm-ast/tests/exports.rs @@ -700,3 +700,77 @@ fn exports_caller_composed_component() { pretty_assertions::assert_eq!(metadata, expected); } + +#[test] +fn exports_caller_component() { + // NOTE: Same as caller_composed.wasm but not composed with the generated stub + let source_bytes = include_bytes!("../wasm/caller.wasm"); + let component: Component = Component::from_bytes(source_bytes).unwrap(); + + let state = AnalysisContext::new(component); + let metadata = state.get_top_level_exports().unwrap(); + + let expected = vec![ + AnalysedExport::Function(AnalysedFunction { + name: "test1".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: list(tuple(vec![str(), u64()])), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test2".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: u64(), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test3".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: u64(), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test4".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: tuple(vec![list(str()), list(tuple(vec![str(), str()]))]), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test5".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: list(u64()), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "bug-wasm-rpc-i32".to_string(), + parameters: vec![AnalysedFunctionParameter { + name: "in".to_string(), + typ: variant(vec![unit_case("leaf")]), + }], + results: vec![AnalysedFunctionResult { + name: None, + typ: variant(vec![unit_case("leaf")]), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "ephemeral-test1".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: list(tuple(vec![str(), str()])), + }], + }) + ]; + + pretty_assertions::assert_eq!(metadata, expected); +} diff --git a/wasm-rpc/src/wasmtime.rs b/wasm-rpc/src/wasmtime.rs index 6b68e2f84..23b7f21fd 100644 --- a/wasm-rpc/src/wasmtime.rs +++ b/wasm-rpc/src/wasmtime.rs @@ -22,6 +22,7 @@ use golem_wasm_ast::analysis::analysed_type::{ use golem_wasm_ast::analysis::{AnalysedType, TypeResult}; use wasmtime::component::{types, ResourceAny, Type, Val}; +#[derive(Debug)] pub enum EncodingError { ParamTypeMismatch { details: String }, ValueMismatch { details: String }, @@ -464,7 +465,7 @@ async fn decode_param_impl( } } -/// Converts a wasmtime Val to a Golem protobuf Val +/// Converts a wasmtime Val to a wasm-rpc Value #[async_recursion] pub async fn encode_output( value: &Val,