diff --git a/golem-rib/src/function_name.rs b/golem-rib/src/function_name.rs index 0c8dc7eb4..0567fc0b1 100644 --- a/golem-rib/src/function_name.rs +++ b/golem-rib/src/function_name.rs @@ -652,6 +652,28 @@ impl ParsedFunctionName { function, }) } + + pub fn is_constructor(&self) -> Option<&str> { + match &self.function { + ParsedFunctionReference::RawResourceConstructor { resource, .. } + | ParsedFunctionReference::IndexedResourceConstructor { resource, .. } => { + Some(resource) + } + _ => None, + } + } + + pub fn is_method(&self) -> Option<&str> { + match &self.function { + ParsedFunctionReference::RawResourceMethod { resource, .. } + | ParsedFunctionReference::IndexedResourceMethod { resource, .. } + | ParsedFunctionReference::RawResourceStaticMethod { resource, .. } + | ParsedFunctionReference::IndexedResourceStaticMethod { resource, .. } => { + Some(resource) + } + _ => None, + } + } } #[cfg(feature = "protobuf")] diff --git a/golem-worker-executor-base/src/durable_host/dynamic_linking.rs b/golem-worker-executor-base/src/durable_host/dynamic_linking.rs new file mode 100644 index 000000000..27ff1d454 --- /dev/null +++ b/golem-worker-executor-base/src/durable_host/dynamic_linking.rs @@ -0,0 +1,824 @@ +use crate::durable_host::wasm_rpc::{UrnExtensions, WasmRpcEntryPayload}; +use crate::durable_host::DurableWorkerCtx; +use crate::services::rpc::RpcDemand; +use crate::workerctx::{DynamicLinking, WorkerCtx}; +use anyhow::anyhow; +use async_trait::async_trait; +use golem_common::model::OwnedWorkerId; +use golem_wasm_rpc::golem::rpc::types::{FutureInvokeResult, HostFutureInvokeResult}; +use golem_wasm_rpc::wasmtime::{decode_param, encode_output}; +use golem_wasm_rpc::{HostWasmRpc, Uri, Value, WasmRpcEntry, WitValue}; +use rib::{ParsedFunctionName, ParsedFunctionReference, ParsedFunctionSite}; +use tracing::debug; +use wasmtime::component::types::ComponentItem; +use wasmtime::component::{Component, Linker, Resource, ResourceType, Type, Val}; +use wasmtime::{AsContextMut, Engine, StoreContextMut}; +use wasmtime_wasi::WasiView; +// TODO: support multiple different dynamic linkers + +#[async_trait] +impl DynamicLinking + for DurableWorkerCtx +{ + fn link( + &mut self, + engine: &Engine, + linker: &mut Linker, + component: &Component, + ) -> anyhow::Result<()> { + let mut root = linker.root(); + + for (name, item) in component.component_type().imports(&engine) { + debug!("Import {name}: {item:?}"); + match item { + ComponentItem::ComponentFunc(_) => { + debug!("MUST LINK COMPONENT FUNC {name}"); + } + ComponentItem::CoreFunc(_) => { + debug!("MUST LINK CORE FUNC {name}"); + } + ComponentItem::Module(_) => { + debug!("MUST LINK MODULE {name}"); + } + ComponentItem::Component(_) => { + debug!("MUST LINK COMPONENT {name}"); + } + ComponentItem::ComponentInstance(ref inst) => { + if name == "auction:auction-stub/stub-auction" + || name == "auction:auction/api" + || name == "rpc:counters-stub/stub-counters" + || name == "rpc:counters/api" + || name == "rpc:ephemeral-stub/stub-ephemeral" + { + debug!("NAME == {name}"); + let mut instance = root.instance(name)?; + + for (ename, eitem) in inst.exports(&engine) { + let name = name.to_owned(); + let ename = ename.to_owned(); + debug!("Instance {name} export {ename}: {eitem:?}"); + + match eitem { + ComponentItem::ComponentFunc(fun) => { + let name2 = name.clone(); + let ename2 = ename.clone(); + // TODO: need to do the "rpc call type" detection here and if it's not an rpc call then not calling `func_new_async` (for example for 'subscribe' and 'get' on FutureInvokeResult wrappers) + instance.func_new_async( + // TODO: instrument async closure + &ename.clone(), + move |store, params, results| { + let name = name2.clone(); + let ename = ename2.clone(); + let param_types: Vec = fun.params().collect(); + let result_types: Vec = fun.results().collect(); + Box::new(async move { + Ctx::dynamic_function_call( + store, + &name, + &ename, + params, + ¶m_types, + results, + &result_types, + ) + .await?; + // TODO: failures here must be somehow handled + Ok(()) + }) + }, + )?; + debug!("LINKED {name} export {ename}"); + } + ComponentItem::CoreFunc(_) => {} + ComponentItem::Module(_) => {} + ComponentItem::Component(component) => { + debug!("MUST LINK COMPONENT {ename} {component:?}"); + } + ComponentItem::ComponentInstance(instance) => { + debug!("MUST LINK COMPONENT INSTANCE {ename} {instance:?}"); + } + ComponentItem::Type(_) => {} + ComponentItem::Resource(resource) => { + // TODO: need to detect future result resources and register them as FutureInvokeResult + if ename == "future-counter-get-value-result" { + debug!("LINKING FUTURE INVOKE RESULT {ename}"); + instance.resource( + &ename, + ResourceType::host::(), + |_store, _rep| Ok(()), + )?; + } else if ename != "pollable" { + // TODO: ?? this should be 'if it is not already linked' but not way to check that + debug!("LINKING RESOURCE {ename} {resource:?}"); + instance.resource_async( + &ename, + ResourceType::host::(), + |store, rep| { + Box::new(async move { + Ctx::drop_linked_resource(store, rep).await + }) + }, + )?; + } + } + } + } + } else { + debug!("NAME NOT MATCHING: {name}"); + } + } + ComponentItem::Type(_) => {} + ComponentItem::Resource(_) => {} + } + } + + Ok(()) + } + + async fn dynamic_function_call( + mut store: impl AsContextMut + Send, + interface_name: &str, + function_name: &str, + params: &[Val], + param_types: &[Type], + results: &mut [Val], + result_types: &[Type], + ) -> anyhow::Result<()> { + let mut store = store.as_context_mut(); + debug!( + "Instance {interface_name} export {function_name} called XXX {} params {} results", + params.len(), + results.len() + ); + + // TODO: this has to be moved to be calculated in the linking phase + let call_type = determine_call_type(interface_name, function_name, result_types)?; + + match call_type { + Some(DynamicRpcCall::GlobalStubConstructor) => { + // Simple stub interface constructor + + let target_worker_urn = params[0].clone(); + debug!("CREATING AUCTION STUB TARGETING WORKER {target_worker_urn:?}"); + + let (remote_worker_id, demand) = + Self::create_rpc_target(&mut store, target_worker_urn).await?; + + let handle = { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + table.push(WasmRpcEntry { + payload: Box::new(WasmRpcEntryPayload::Interface { + demand, + remote_worker_id, + }), + })? + }; + results[0] = Val::Resource(handle.try_into_resource_any(store)?); + } + Some(DynamicRpcCall::ResourceStubConstructor { + stub_constructor_name, + target_constructor_name, + }) => { + // Resource stub constructor + + // First parameter is the target uri + // Rest of the parameters must be sent to the remote constructor + + let target_worker_urn = params[0].clone(); + let (remote_worker_id, demand) = + Self::create_rpc_target(&mut store, target_worker_urn.clone()).await?; + + // First creating a resource for invoking the constructor (to avoid having to make a special case) + let handle = { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + table.push(WasmRpcEntry { + payload: Box::new(WasmRpcEntryPayload::Interface { + demand, + remote_worker_id, + }), + })? + }; + let temp_handle = handle.rep(); + + let constructor_result = Self::remote_invoke_and_wait( + stub_constructor_name, + target_constructor_name, + params, + param_types, + &mut store, + handle, + ) + .await?; + + // TODO: extract and clean up + let (resource_uri, resource_id) = if let Value::Tuple(values) = constructor_result { + if values.len() == 1 { + if let Value::Handle { uri, resource_id } = + values.into_iter().next().unwrap() + { + (Uri { value: uri }, resource_id) + } else { + return Err(anyhow!( + "Invalid constructor result: single handle expected" + )); + } + } else { + return Err(anyhow!( + "Invalid constructor result: single handle expected" + )); + } + } else { + return Err(anyhow!( + "Invalid constructor result: single handle expected" + )); + }; + + let (remote_worker_id, demand) = + Self::create_rpc_target(&mut store, target_worker_urn).await?; + + let handle = { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + + let temp_handle: Resource = Resource::new_own(temp_handle); + table.delete(temp_handle)?; // Removing the temporary handle + + table.push(WasmRpcEntry { + payload: Box::new(WasmRpcEntryPayload::Resource { + demand, + remote_worker_id, + resource_uri, + resource_id, + }), + })? + }; + results[0] = Val::Resource(handle.try_into_resource_any(store)?); + } + Some(DynamicRpcCall::BlockingFunctionCall { + stub_function_name, + target_function_name, + }) => { + // Simple stub interface method + debug!( + "{function_name} handle={:?}, rest={:?}", + params[0], + params.iter().skip(1).collect::>() + ); + + let handle = match params[0] { + Val::Resource(handle) => handle, + _ => return Err(anyhow!("Invalid handle parameter")), + }; + let handle: Resource = handle.try_into_resource(&mut store)?; + { + let mut wasi = store.data_mut().as_wasi_view(); + let entry = wasi.table().get(&handle)?; + let payload = entry.payload.downcast_ref::().unwrap(); + debug!("CALLING {function_name} ON {}", payload.remote_worker_id()); + } + + let result = Self::remote_invoke_and_wait( + stub_function_name, + target_function_name, + params, + param_types, + &mut store, + handle, + ) + .await?; + Self::value_result_to_wasmtime_vals(result, results, result_types, &mut store) + .await?; + } + Some(DynamicRpcCall::FireAndForgetFunctionCall { + stub_function_name, + target_function_name, + }) => { + // Async stub interface method + debug!( + "FNF {function_name} handle={:?}, rest={:?}", + params[0], + params.iter().skip(1).collect::>() + ); + + let handle = match params[0] { + Val::Resource(handle) => handle, + _ => return Err(anyhow!("Invalid handle parameter")), + }; + let handle: Resource = handle.try_into_resource(&mut store)?; + { + let mut wasi = store.data_mut().as_wasi_view(); + let entry = wasi.table().get(&handle)?; + let payload = entry.payload.downcast_ref::().unwrap(); + debug!("CALLING {function_name} ON {}", payload.remote_worker_id()); + } + + Self::remote_invoke( + stub_function_name, + target_function_name, + params, + param_types, + &mut store, + handle, + ) + .await?; + } + Some(DynamicRpcCall::AsyncFunctionCall { + stub_function_name, + target_function_name, + }) => { + // Async stub interface method + debug!( + "ASYNC {function_name} handle={:?}, rest={:?}", + params[0], + params.iter().skip(1).collect::>() + ); + + let handle = match params[0] { + Val::Resource(handle) => handle, + _ => return Err(anyhow!("Invalid handle parameter")), + }; + let handle: Resource = handle.try_into_resource(&mut store)?; + { + let mut wasi = store.data_mut().as_wasi_view(); + let entry = wasi.table().get(&handle)?; + let payload = entry.payload.downcast_ref::().unwrap(); + debug!("CALLING {function_name} ON {}", payload.remote_worker_id()); + } + + let result = Self::remote_async_invoke_and_await( + stub_function_name, + target_function_name, + params, + param_types, + &mut store, + handle, + ) + .await?; + + Self::value_result_to_wasmtime_vals(result, results, result_types, &mut store) + .await?; + } + Some(DynamicRpcCall::FutureInvokeResultSubscribe) => { + let handle = match params[0] { + Val::Resource(handle) => handle, + _ => return Err(anyhow!("Invalid handle parameter")), + }; + let handle: Resource = handle.try_into_resource(&mut store)?; + let pollable = store.data_mut().subscribe(handle).await?; + let pollable_any = pollable.try_into_resource_any(&mut store)?; + let resource_id = store.data_mut().add(pollable_any).await; + + let value_result = Value::Tuple(vec![Value::Handle { + uri: store.data().self_uri().value, + resource_id, + }]); + Self::value_result_to_wasmtime_vals( + value_result, + results, + result_types, + &mut store, + ) + .await?; + } + Some(DynamicRpcCall::FutureInvokeResultGet) => { + let handle = match params[0] { + Val::Resource(handle) => handle, + _ => return Err(anyhow!("Invalid handle parameter")), + }; + let handle: Resource = handle.try_into_resource(&mut store)?; + let result = HostFutureInvokeResult::get(store.data_mut(), handle).await?; + + // NOTE: we are currently failing on RpcError instead of passing it to the caller, as the generated stub interface requires + let value_result = Value::Tuple(vec![match result { + None => Value::Option(None), + Some(Ok(value)) => { + let value: Value = value.into(); + match value { + Value::Tuple(items) if items.len() == 1 => { + Value::Option(Some(Box::new(items.into_iter().next().unwrap()))) + } + _ => Err(anyhow!("Invalid future invoke result value"))?, // TODO: better error + } + } + Some(Err(err)) => Err(anyhow!("RPC invocation failed with {err:?}"))?, // TODO: more information into the error + }]); + + Self::value_result_to_wasmtime_vals( + value_result, + results, + result_types, + &mut store, + ) + .await?; + } + _ => todo!(), + } + + Ok(()) + } + + async fn drop_linked_resource( + mut store: StoreContextMut<'_, Ctx>, + rep: u32, + ) -> anyhow::Result<()> { + let must_drop = { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + let entry: &WasmRpcEntry = table.get_any_mut(rep)?.downcast_ref().unwrap(); // TODO: error handling + let payload = entry.payload.downcast_ref::().unwrap(); + + debug!("DROPPING RESOURCE {payload:?}"); + + matches!(payload, WasmRpcEntryPayload::Resource { .. }) + }; + if must_drop { + let resource: Resource = Resource::new_own(rep); + + let function_name = "rpc:counters/api.{counter.drop}".to_string(); // TODO: we need to pass the resource name here from the linker + let _ = store + .data_mut() + .invoke_and_await(resource, function_name, vec![]) + .await?; + } + Ok(()) + } +} + +// TODO: these helpers probably should not be directly living in DurableWorkerCtx +impl DurableWorkerCtx { + // TODO: stub_function_name can probably be removed + async fn remote_invoke_and_wait( + stub_function_name: ParsedFunctionName, + target_function_name: ParsedFunctionName, + params: &[Val], + param_types: &[Type], + store: &mut StoreContextMut<'_, Ctx>, + handle: Resource, + ) -> anyhow::Result { + let mut wit_value_params = Vec::new(); + for (param, typ) in params.iter().zip(param_types).skip(1) { + let value: Value = encode_output(param, typ, store.data_mut()) + .await + .map_err(|err| anyhow!(format!("{err:?}")))?; // TODO: proper error + let wit_value: WitValue = value.into(); + wit_value_params.push(wit_value); + } + + debug!( + "CALLING {stub_function_name} as {target_function_name} with parameters {wit_value_params:?}", + ); + + let wit_value_result = store + .data_mut() + .invoke_and_await(handle, target_function_name.to_string(), wit_value_params) + .await??; + + debug!( + "CALLING {stub_function_name} RESULTED IN {:?}", + wit_value_result + ); + + let value_result: Value = wit_value_result.into(); + Ok(value_result) + } + + // TODO: stub_function_name can probably be removed + async fn remote_async_invoke_and_await( + stub_function_name: ParsedFunctionName, + target_function_name: ParsedFunctionName, + params: &[Val], + param_types: &[Type], + store: &mut StoreContextMut<'_, Ctx>, + handle: Resource, + ) -> anyhow::Result { + let mut wit_value_params = Vec::new(); + for (param, typ) in params.iter().zip(param_types).skip(1) { + let value: Value = encode_output(param, typ, store.data_mut()) + .await + .map_err(|err| anyhow!(format!("{err:?}")))?; // TODO: proper error + let wit_value: WitValue = value.into(); + wit_value_params.push(wit_value); + } + + debug!( + "CALLING {stub_function_name} as {target_function_name} with parameters {wit_value_params:?}", + ); + + let invoke_result_resource = store + .data_mut() + .async_invoke_and_await(handle, target_function_name.to_string(), wit_value_params) + .await?; + + let invoke_result_resource_any = + invoke_result_resource.try_into_resource_any(&mut *store)?; + let resource_id = store.data_mut().add(invoke_result_resource_any).await; + + let value_result: Value = Value::Tuple(vec![Value::Handle { + uri: store.data().self_uri().value, + resource_id, + }]); + Ok(value_result) + } + + // TODO: stub_function_name can probably be removed + async fn remote_invoke( + stub_function_name: ParsedFunctionName, + target_function_name: ParsedFunctionName, + params: &[Val], + param_types: &[Type], + store: &mut StoreContextMut<'_, Ctx>, + handle: Resource, + ) -> anyhow::Result<()> { + let mut wit_value_params = Vec::new(); + for (param, typ) in params.iter().zip(param_types).skip(1) { + let value: Value = encode_output(param, typ, store.data_mut()) + .await + .map_err(|err| anyhow!(format!("{err:?}")))?; // TODO: proper error + let wit_value: WitValue = value.into(); + wit_value_params.push(wit_value); + } + + debug!( + "CALLING {stub_function_name} as {target_function_name} with parameters {wit_value_params:?}", + ); + + store + .data_mut() + .invoke(handle, target_function_name.to_string(), wit_value_params) + .await??; + + Ok(()) + } + + async fn value_result_to_wasmtime_vals( + value_result: Value, + results: &mut [Val], + result_types: &[Type], + store: &mut StoreContextMut<'_, Ctx>, + ) -> anyhow::Result<()> { + match value_result { + Value::Tuple(values) | Value::Record(values) => { + for (idx, (value, typ)) in values.iter().zip(result_types).enumerate() { + let result = decode_param(&value, &typ, store.data_mut()) + .await + .map_err(|err| anyhow!(format!("{err:?}")))?; // TODO: proper error + results[idx] = result.val; + + debug!("RESOURCES TO DROP {:?}", result.resources_to_drop); + // TODO: do we have to do something with result.resources_to_drop here? + } + } + _ => { + return Err(anyhow!( + "Unexpected result value {value_result:?}, expected tuple or record" + )); + } + } + + Ok(()) + } +} + +// TODO: these helpers probably should not be directly living in DurableWorkerCtx +impl DurableWorkerCtx { + async fn create_rpc_target( + store: &mut StoreContextMut<'_, Ctx>, + target_worker_urn: Val, + ) -> anyhow::Result<(OwnedWorkerId, Box)> { + let worker_urn = match target_worker_urn { + Val::Record(ref record) => { + let mut target = None; + for (key, val) in record.iter() { + if key == "value" { + match val { + Val::String(s) => { + target = Some(s.clone()); + } + _ => {} + } + } + } + target + } + _ => None, + }; + + let (remote_worker_id, demand) = if let Some(location) = worker_urn { + let uri = Uri { + value: location.clone(), + }; + match uri.parse_as_golem_urn() { + Some((remote_worker_id, None)) => { + let remote_worker_id = store + .data_mut() + .generate_unique_local_worker_id(remote_worker_id) + .await?; + + let remote_worker_id = OwnedWorkerId::new( + &store.data().owned_worker_id().account_id, + &remote_worker_id, + ); + let demand = store.data().rpc().create_demand(&remote_worker_id).await; + (remote_worker_id, demand) + } + _ => { + return Err(anyhow!( + "Invalid URI: {}. Must be urn:worker:component-id/worker-name", + location + )) + } + } + } else { + return Err(anyhow!("Missing or invalid worker URN parameter")); // TODO: more details; + }; + Ok((remote_worker_id, demand)) + } +} + +enum DynamicRpcCall { + GlobalStubConstructor, + ResourceStubConstructor { + stub_constructor_name: ParsedFunctionName, + target_constructor_name: ParsedFunctionName, + }, + BlockingFunctionCall { + stub_function_name: ParsedFunctionName, + target_function_name: ParsedFunctionName, + }, + FireAndForgetFunctionCall { + stub_function_name: ParsedFunctionName, + target_function_name: ParsedFunctionName, + }, + AsyncFunctionCall { + stub_function_name: ParsedFunctionName, + target_function_name: ParsedFunctionName, + }, + FutureInvokeResultSubscribe, + FutureInvokeResultGet, +} + +// TODO: this needs to be implementd based on component metadata and no hardcoded values +fn determine_call_type( + interface_name: &str, + function_name: &str, + result_types: &[Type], +) -> anyhow::Result> { + if (interface_name == "auction:auction-stub/stub-auction" + && function_name == "[constructor]api") + || (interface_name == "rpc:counters-stub/stub-counters" + && function_name == "[constructor]api") + { + Ok(Some(DynamicRpcCall::GlobalStubConstructor)) + } else if (interface_name == "auction:auction-stub/stub-auction" + && function_name == "[constructor]running-auction") + || (interface_name == "rpc:counters-stub/stub-counters" + && function_name == "[constructor]counter") + { + let stub_constructor_name = + ParsedFunctionName::parse(&format!("{interface_name}.{{{function_name}}}")) + .map_err(|err| anyhow!(err))?; // TODO: proper error + + let target_constructor_name = ParsedFunctionName { + site: if interface_name.starts_with("auction") { + ParsedFunctionSite::PackagedInterface { + // TODO: this must come from component metadata linking information + namespace: "auction".to_string(), + package: "auction".to_string(), + interface: "api".to_string(), + version: None, + } + } else { + ParsedFunctionSite::PackagedInterface { + namespace: "rpc".to_string(), + package: "counters".to_string(), + interface: "api".to_string(), + version: None, + } + }, + function: ParsedFunctionReference::RawResourceConstructor { + resource: stub_constructor_name + .function() + .resource_name() + .unwrap() + .to_string(), // TODO this has to come from a check earlier + }, + }; + + Ok(Some(DynamicRpcCall::ResourceStubConstructor { + stub_constructor_name, + target_constructor_name, + })) + } else if function_name.starts_with("[method]") { + let stub_function_name = + ParsedFunctionName::parse(&format!("{interface_name}.{{{function_name}}}")) + .map_err(|err| anyhow!(err))?; // TODO: proper error + debug!("STUB FUNCTION NAME: {stub_function_name:?}"); + + if stub_function_name.function.resource_name() + == Some(&"future-counter-get-value-result".to_string()) + { + if stub_function_name.function.resource_method_name() == Some("subscribe".to_string()) { + Ok(Some(DynamicRpcCall::FutureInvokeResultSubscribe)) + } else if stub_function_name.function.resource_method_name() == Some("get".to_string()) + { + Ok(Some(DynamicRpcCall::FutureInvokeResultGet)) + } else { + Ok(None) + } + } else { + let (blocking, target_function) = match &stub_function_name.function { + ParsedFunctionReference::RawResourceMethod { resource, method } + if resource == "counter" => + // TODO: this needs to be detected based on the matching constructor + { + if method.starts_with("blocking-") { + ( + true, + ParsedFunctionReference::RawResourceMethod { + resource: resource.to_string(), + method: method + .strip_prefix("blocking-") // TODO: we also have to support the non-blocking variants + .unwrap() + .to_string(), + }, + ) + } else { + ( + false, + ParsedFunctionReference::RawResourceMethod { + resource: resource.to_string(), + method: method.to_string(), + }, + ) + } + } + _ => { + let method = stub_function_name.function.resource_method_name().unwrap(); // TODO: proper error + + if method.starts_with("blocking-") { + ( + true, + ParsedFunctionReference::Function { + function: method + .strip_prefix("blocking-") // TODO: we also have to support the non-blocking variants + .unwrap() + .to_string(), + }, + ) + } else { + ( + false, + ParsedFunctionReference::Function { + function: method.to_string(), + }, + ) + } + } + }; + + let target_function_name = ParsedFunctionName { + site: if interface_name.starts_with("auction") { + ParsedFunctionSite::PackagedInterface { + // TODO: this must come from component metadata linking information + namespace: "auction".to_string(), + package: "auction".to_string(), + interface: "api".to_string(), + version: None, + } + } else { + ParsedFunctionSite::PackagedInterface { + namespace: "rpc".to_string(), + package: "counters".to_string(), + interface: "api".to_string(), + version: None, + } + }, + function: target_function, + }; + + if blocking { + Ok(Some(DynamicRpcCall::BlockingFunctionCall { + stub_function_name, + target_function_name, + })) + } else { + debug!("ASYNC FUNCTION RESULT TYPES: {result_types:?}"); + if result_types.len() > 0 { + Ok(Some(DynamicRpcCall::AsyncFunctionCall { + stub_function_name, + target_function_name, + })) + } else { + Ok(Some(DynamicRpcCall::FireAndForgetFunctionCall { + stub_function_name, + target_function_name, + })) + } + } + } + } else { + Ok(None) + } +} diff --git a/golem-worker-executor-base/src/durable_host/mod.rs b/golem-worker-executor-base/src/durable_host/mod.rs index 5bf184961..628ebb94e 100644 --- a/golem-worker-executor-base/src/durable_host/mod.rs +++ b/golem-worker-executor-base/src/durable_host/mod.rs @@ -18,6 +18,7 @@ use crate::durable_host::http::serialized::SerializableHttpRequest; use crate::durable_host::io::{ManagedStdErr, ManagedStdIn, ManagedStdOut}; use crate::durable_host::replay_state::ReplayState; +use crate::durable_host::serialized::SerializableError; use crate::durable_host::wasm_rpc::UrnExtensions; use crate::error::GolemError; use crate::function_result_interpreter::interpret_function_results; @@ -64,7 +65,6 @@ use golem_common::model::oplog::{ }; use golem_common::model::plugin::{PluginOwner, PluginScope}; use golem_common::model::regions::{DeletedRegions, OplogRegion}; -use golem_common::model::RetryConfig; use golem_common::model::{exports, PluginInstallationId}; use golem_common::model::{ AccountId, ComponentFilePath, ComponentFilePermissions, ComponentFileSystemNode, @@ -73,6 +73,7 @@ use golem_common::model::{ ScheduledAction, SuccessfulUpdateRecord, Timestamp, WorkerEvent, WorkerFilter, WorkerId, WorkerMetadata, WorkerResourceDescription, WorkerStatus, WorkerStatusRecord, }; +use golem_common::model::{RetryConfig, TargetWorkerId}; use golem_common::retries::get_delay; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::wasmtime::ResourceStore; @@ -116,6 +117,7 @@ mod sockets; pub mod wasm_rpc; mod durability; +mod dynamic_linking; mod replay_state; /// Partial implementation of the WorkerCtx interfaces for adding durable execution to workers. @@ -287,6 +289,10 @@ impl DurableWorkerCtx { &self.owned_worker_id.worker_id } + pub fn owned_worker_id(&self) -> &OwnedWorkerId { + &self.owned_worker_id + } + pub fn component_metadata(&self) -> &ComponentMetadata { &self.state.component_metadata } @@ -466,6 +472,32 @@ impl DurableWorkerCtx { } } } + + pub async fn generate_unique_local_worker_id( + &mut self, + remote_worker_id: TargetWorkerId, + ) -> Result { + match remote_worker_id.clone().try_into_worker_id() { + Some(worker_id) => Ok(worker_id), + None => { + let worker_id = Durability::::wrap( + self, + WrappedFunctionType::ReadLocal, + "golem::rpc::wasm-rpc::generate_unique_local_worker_id", + (), + |ctx| { + Box::pin(async move { + ctx.rpc() + .generate_unique_local_worker_id(remote_worker_id) + .await + }) + }, + ) + .await?; + Ok(worker_id) + } + } + } } impl> DurableWorkerCtx { diff --git a/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs b/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs index 967f71e9a..06a45d8d9 100644 --- a/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs +++ b/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs @@ -40,9 +40,10 @@ use golem_wasm_rpc::golem::rpc::types::{ }; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::{ - FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, ValueAndType, WasmRpcEntry, WitValue, + FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, Value, ValueAndType, WasmRpcEntry, WitValue, }; use std::any::Any; +use std::fmt::{Debug, Formatter}; use std::str::FromStr; use std::sync::Arc; use tracing::{error, warn}; @@ -59,14 +60,15 @@ impl HostWasmRpc for DurableWorkerCtx { match location.parse_as_golem_urn() { Some((remote_worker_id, None)) => { - let remote_worker_id = - generate_unique_local_worker_id(self, remote_worker_id).await?; + let remote_worker_id = self + .generate_unique_local_worker_id(remote_worker_id) + .await?; let remote_worker_id = OwnedWorkerId::new(&self.owned_worker_id.account_id, &remote_worker_id); let demand = self.rpc().create_demand(&remote_worker_id).await; let entry = self.table().push(WasmRpcEntry { - payload: Box::new(WasmRpcEntryPayload { + payload: Box::new(WasmRpcEntryPayload::Interface { demand, remote_worker_id, }), @@ -84,7 +86,7 @@ impl HostWasmRpc for DurableWorkerCtx { &mut self, self_: Resource, function_name: String, - function_params: Vec, + mut function_params: Vec, ) -> anyhow::Result> { record_host_function_call("golem::rpc::wasm-rpc", "invoke-and-await"); let args = self.get_arguments().await?; @@ -92,7 +94,26 @@ impl HostWasmRpc for DurableWorkerCtx { let entry = self.table().get(&self_)?; let payload = entry.payload.downcast_ref::().unwrap(); - let remote_worker_id = payload.remote_worker_id.clone(); + let remote_worker_id = payload.remote_worker_id().clone(); + + // TODO: remove redundancy + match payload { + WasmRpcEntryPayload::Resource { + resource_uri, + resource_id, + .. + } => { + function_params.insert( + 0, + Value::Handle { + uri: resource_uri.value.to_string(), + resource_id: *resource_id, + } + .into(), + ); + } + _ => {} + } let current_idempotency_key = self .get_current_idempotency_key() @@ -213,7 +234,7 @@ impl HostWasmRpc for DurableWorkerCtx { &mut self, self_: Resource, function_name: String, - function_params: Vec, + mut function_params: Vec, ) -> anyhow::Result> { record_host_function_call("golem::rpc::wasm-rpc", "invoke"); let args = self.get_arguments().await?; @@ -221,7 +242,26 @@ impl HostWasmRpc for DurableWorkerCtx { let entry = self.table().get(&self_)?; let payload = entry.payload.downcast_ref::().unwrap(); - let remote_worker_id = payload.remote_worker_id.clone(); + let remote_worker_id = payload.remote_worker_id().clone(); + + // TODO: remove redundancy + match payload { + WasmRpcEntryPayload::Resource { + resource_uri, + resource_id, + .. + } => { + function_params.insert( + 0, + Value::Handle { + uri: resource_uri.value.to_string(), + resource_id: *resource_id, + } + .into(), + ); + } + _ => {} + } let current_idempotency_key = self .get_current_idempotency_key() @@ -298,7 +338,7 @@ impl HostWasmRpc for DurableWorkerCtx { &mut self, this: Resource, function_name: String, - function_params: Vec, + mut function_params: Vec, ) -> anyhow::Result> { record_host_function_call("golem::rpc::wasm-rpc", "async-invoke-and-await"); let args = self.get_arguments().await?; @@ -311,7 +351,26 @@ impl HostWasmRpc for DurableWorkerCtx { let entry = self.table().get(&this)?; let payload = entry.payload.downcast_ref::().unwrap(); - let remote_worker_id = payload.remote_worker_id.clone(); + let remote_worker_id = payload.remote_worker_id().clone(); + + // TODO: remove redundancy + match payload { + WasmRpcEntryPayload::Resource { + resource_uri, + resource_id, + .. + } => { + function_params.insert( + 0, + Value::Handle { + uri: resource_uri.value.to_string(), + resource_id: *resource_id, + } + .into(), + ); + } + _ => {} + } let current_idempotency_key = self .get_current_idempotency_key() @@ -740,32 +799,6 @@ impl HostFutureInvokeResult for DurableWorkerCtx { #[async_trait] impl golem_wasm_rpc::Host for DurableWorkerCtx {} -async fn generate_unique_local_worker_id( - ctx: &mut DurableWorkerCtx, - remote_worker_id: TargetWorkerId, -) -> Result { - match remote_worker_id.clone().try_into_worker_id() { - Some(worker_id) => Ok(worker_id), - None => { - let worker_id = Durability::::wrap( - ctx, - WrappedFunctionType::ReadLocal, - "golem::rpc::wasm-rpc::generate_unique_local_worker_id", - (), - |ctx| { - Box::pin(async move { - ctx.rpc() - .generate_unique_local_worker_id(remote_worker_id) - .await - }) - }, - ) - .await?; - Ok(worker_id) - } - } -} - /// Tries to get a `ValueAndType` representation for the given `WitValue` parameters by querying the latest component metadata for the /// target component. /// If the query fails, or the expected function name is not in its metadata or the number of parameters does not match, then it returns an @@ -797,10 +830,63 @@ async fn try_get_typed_parameters( Vec::new() } -pub struct WasmRpcEntryPayload { - #[allow(dead_code)] - demand: Box, - remote_worker_id: OwnedWorkerId, +pub enum WasmRpcEntryPayload { + Interface { + #[allow(dead_code)] + demand: Box, + remote_worker_id: OwnedWorkerId, + }, + Resource { + #[allow(dead_code)] + demand: Box, + remote_worker_id: OwnedWorkerId, + resource_uri: Uri, + resource_id: u64, + }, +} + +impl Debug for WasmRpcEntryPayload { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Interface { + remote_worker_id, .. + } => f + .debug_struct("Interface") + .field("remote_worker_id", remote_worker_id) + .finish(), + Self::Resource { + remote_worker_id, + resource_uri, + resource_id, + .. + } => f + .debug_struct("Resource") + .field("remote_worker_id", remote_worker_id) + .field("resource_uri", resource_uri) + .field("resource_id", resource_id) + .finish(), + } + } +} + +impl WasmRpcEntryPayload { + pub fn remote_worker_id(&self) -> &OwnedWorkerId { + match self { + Self::Interface { + remote_worker_id, .. + } => remote_worker_id, + Self::Resource { + remote_worker_id, .. + } => remote_worker_id, + } + } + + pub fn demand(&self) -> &Box { + match self { + Self::Interface { demand, .. } => demand, + Self::Resource { demand, .. } => demand, + } + } } pub trait UrnExtensions { diff --git a/golem-worker-executor-base/src/worker.rs b/golem-worker-executor-base/src/worker.rs index a7de8b183..25146e7c1 100644 --- a/golem-worker-executor-base/src/worker.rs +++ b/golem-worker-executor-base/src/worker.rs @@ -473,6 +473,9 @@ impl Worker { } } + /// Invokes the worker and awaits for a result. + /// + /// Successful result is a `TypeAnnotatedValue` encoding either a tuple or a record. pub async fn invoke_and_await( &self, idempotency_key: IdempotencyKey, @@ -1350,7 +1353,8 @@ impl RunningWorker { ) .await?; - let mut store = Store::new(&parent.engine(), context); + let engine = parent.engine(); + let mut store = Store::new(&engine, context); store.set_epoch_deadline(parent.config().limits.epoch_ticks); let worker_id_clone = worker_metadata.worker_id.clone(); store.epoch_deadline_callback(move |mut store| { @@ -1371,7 +1375,15 @@ impl RunningWorker { store.limiter_async(|ctx| ctx.resource_limiter()); - let instance_pre = parent.linker().instantiate_pre(&component).map_err(|e| { + // TODO MOVE SOMEWHERE ELSE + let mut linker = (*parent.linker()).clone(); // fresh linker + store.data_mut().link(&engine, &mut linker, &component)?; + + // TODO: check parent.linker() is not affected + + // TODO ^^^ + + let instance_pre = linker.instantiate_pre(&component).map_err(|e| { GolemError::worker_creation_failed( parent.owned_worker_id.worker_id(), format!( @@ -1551,13 +1563,13 @@ impl RunningWorker { store, &instance, ) - .await; + .await; match result { Ok(InvokeResult::Succeeded { - output, - consumed_fuel, - }) => { + output, + consumed_fuel, + }) => { let component_metadata = store.as_context().data().component_metadata(); @@ -1577,9 +1589,9 @@ impl RunningWorker { output, function_results, ) - .map_err(|e| GolemError::ValueMismatch { - details: e.join(", "), - }); + .map_err(|e| GolemError::ValueMismatch { + details: e.join(", "), + }); match result { Ok(result) => { @@ -1685,8 +1697,8 @@ impl RunningWorker { } } } - .instrument(span) - .await; + .instrument(span) + .await; if do_break { break; } @@ -1713,7 +1725,7 @@ impl RunningWorker { vec![ "golem:api/save-snapshot@1.1.0.{save}".to_string(), "golem:api/save-snapshot@0.2.0.{save}".to_string(), - ] + ], ) { store.data_mut().begin_call_snapshotting_function(); @@ -1950,9 +1962,9 @@ impl InvocationResult { OplogEntry::Error { error, .. } => { let stderr = recover_stderr_logs(services, owned_worker_id, oplog_idx).await; Err(FailedInvocationResult { trap_type: TrapType::Error(error), stderr }) - }, - OplogEntry::Interrupted { .. } => Err(FailedInvocationResult { trap_type: TrapType::Interrupt(InterruptKind::Interrupt), stderr: "".to_string()}), - OplogEntry::Exited { .. } => Err(FailedInvocationResult { trap_type: TrapType::Exit, stderr: "".to_string()}), + } + OplogEntry::Interrupted { .. } => Err(FailedInvocationResult { trap_type: TrapType::Interrupt(InterruptKind::Interrupt), stderr: "".to_string() }), + OplogEntry::Exited { .. } => Err(FailedInvocationResult { trap_type: TrapType::Exit, stderr: "".to_string() }), _ => panic!("Unexpected oplog entry pointed by invocation result at index {oplog_idx} for {owned_worker_id:?}") }; diff --git a/golem-worker-executor-base/src/workerctx.rs b/golem-worker-executor-base/src/workerctx.rs index a46ac4cf1..0a98e5ccb 100644 --- a/golem-worker-executor-base/src/workerctx.rs +++ b/golem-worker-executor-base/src/workerctx.rs @@ -15,12 +15,6 @@ use std::collections::HashSet; use std::sync::{Arc, RwLock, Weak}; -use async_trait::async_trait; -use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; -use golem_wasm_rpc::wasmtime::ResourceStore; -use golem_wasm_rpc::Value; -use wasmtime::{AsContextMut, ResourceLimiterAsync}; - use crate::error::GolemError; use crate::model::{ CurrentResourceLimits, ExecutionStatus, InterruptKind, LastError, ListDirectoryResult, @@ -44,13 +38,22 @@ use crate::services::{ worker_enumeration, HasAll, HasConfig, HasOplog, HasOplogService, HasWorker, }; use crate::worker::{RetryDecision, Worker}; +use async_trait::async_trait; use golem_common::model::component::ComponentOwner; use golem_common::model::oplog::WorkerResourceId; use golem_common::model::plugin::PluginScope; use golem_common::model::{ AccountId, ComponentFilePath, ComponentVersion, IdempotencyKey, OwnedWorkerId, - PluginInstallationId, WorkerId, WorkerMetadata, WorkerStatus, WorkerStatusRecord, + PluginInstallationId, TargetWorkerId, WorkerId, WorkerMetadata, WorkerStatus, + WorkerStatusRecord, }; +use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; +use golem_wasm_rpc::wasmtime::ResourceStore; +use golem_wasm_rpc::Value; +use wasmtime::component::{Component, Linker, Type, Val}; +use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync, StoreContextMut}; +use wasmtime_wasi::WasiView; +use wasmtime_wasi_http::WasiHttpView; /// WorkerCtx is the primary customization and extension point of worker executor. It is the context /// associated with each running worker, and it is responsible for initializing the WASM linker as @@ -66,6 +69,7 @@ pub trait WorkerCtx: + IndexedResourceStore + UpdateManagement + FileSystemReading + + DynamicLinking + Send + Sync + Sized @@ -132,6 +136,9 @@ pub trait WorkerCtx: >, ) -> Result; + fn as_wasi_view(&mut self) -> impl WasiView; + fn as_wasi_http_view(&mut self) -> impl WasiHttpView; + /// Get the public part of the worker context fn get_public_state(&self) -> &Self::PublicState; @@ -144,6 +151,9 @@ pub trait WorkerCtx: /// Get the worker ID associated with this worker context fn worker_id(&self) -> &WorkerId; + /// Get the owned worker ID associated with this worker context + fn owned_worker_id(&self) -> &OwnedWorkerId; + fn component_metadata(&self) -> &ComponentMetadata; /// The WASI exit API can use a special error to exit from the WASM execution. As this depends @@ -157,6 +167,12 @@ pub trait WorkerCtx: /// Gets an interface to the worker-proxy which can direct calls to other worker executors /// in the cluster fn worker_proxy(&self) -> Arc; + + // TODO: where do this belong + async fn generate_unique_local_worker_id( + &mut self, + remote_worker_id: TargetWorkerId, + ) -> Result; } /// The fuel management interface of a worker context is responsible for borrowing and returning @@ -400,3 +416,25 @@ pub trait FileSystemReading { ) -> Result; async fn read_file(&self, path: &ComponentFilePath) -> Result; } + +#[async_trait] +pub trait DynamicLinking { + fn link( + &mut self, + engine: &Engine, + linker: &mut Linker, + component: &Component, + ) -> anyhow::Result<()>; + + async fn dynamic_function_call( + store: impl AsContextMut + Send, + interface_name: &str, + function_name: &str, + params: &[Val], + param_types: &[Type], + results: &mut [Val], + result_types: &[Type], + ) -> anyhow::Result<()>; + + async fn drop_linked_resource(store: StoreContextMut<'_, Ctx>, rep: u32) -> anyhow::Result<()>; +} diff --git a/golem-worker-executor-base/tests/common/mod.rs b/golem-worker-executor-base/tests/common/mod.rs index 76d82fb3b..d62c5357b 100644 --- a/golem-worker-executor-base/tests/common/mod.rs +++ b/golem-worker-executor-base/tests/common/mod.rs @@ -5,7 +5,7 @@ use std::collections::HashSet; use golem_service_base::service::initial_component_files::InitialComponentFilesService; use golem_service_base::storage::blob::BlobStorage; use golem_wasm_rpc::wasmtime::ResourceStore; -use golem_wasm_rpc::{Uri, Value}; +use golem_wasm_rpc::{HostWasmRpc, RpcError, Uri, Value, WitValue}; use golem_worker_executor_base::services::file_loader::FileLoader; use prometheus::Registry; @@ -18,8 +18,8 @@ use std::sync::{Arc, RwLock, Weak}; use golem_common::model::{ AccountId, ComponentFilePath, ComponentId, ComponentVersion, IdempotencyKey, OwnedWorkerId, - PluginInstallationId, ScanCursor, WorkerFilter, WorkerId, WorkerMetadata, WorkerStatus, - WorkerStatusRecord, + PluginInstallationId, ScanCursor, TargetWorkerId, WorkerFilter, WorkerId, WorkerMetadata, + WorkerStatus, WorkerStatusRecord, }; use golem_service_base::config::{BlobStorageConfig, LocalFileSystemBlobStorageConfig}; use golem_worker_executor_base::error::GolemError; @@ -51,8 +51,8 @@ use golem_worker_executor_base::services::worker_event::WorkerEventService; use golem_worker_executor_base::services::{plugins, All, HasAll, HasConfig, HasOplogService}; use golem_worker_executor_base::wasi_host::create_linker; use golem_worker_executor_base::workerctx::{ - ExternalOperations, FileSystemReading, FuelManagement, IndexedResourceStore, InvocationHooks, - InvocationManagement, StatusManagement, UpdateManagement, WorkerCtx, + DynamicLinking, ExternalOperations, FileSystemReading, FuelManagement, IndexedResourceStore, + InvocationHooks, InvocationManagement, StatusManagement, UpdateManagement, WorkerCtx, }; use golem_worker_executor_base::Bootstrap; @@ -79,6 +79,8 @@ use golem_test_framework::components::shard_manager::ShardManager; use golem_test_framework::components::worker_executor_cluster::WorkerExecutorCluster; use golem_test_framework::config::TestDependencies; use golem_test_framework::dsl::to_worker_metadata; +use golem_wasm_rpc::golem::rpc::types::{FutureInvokeResult, WasmRpc}; +use golem_wasm_rpc::golem::rpc::types::{HostFutureInvokeResult, Pollable}; use golem_worker_executor_base::preview2::golem; use golem_worker_executor_base::preview2::golem::api1_1_0; use golem_worker_executor_base::services::events::Events; @@ -94,8 +96,10 @@ use golem_worker_executor_base::services::worker_proxy::WorkerProxy; use golem_worker_executor_base::worker::{RetryDecision, Worker}; use tonic::transport::Channel; use tracing::{debug, info}; -use wasmtime::component::{Instance, Linker, ResourceAny}; -use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync}; +use wasmtime::component::{Component, Instance, Linker, Resource, ResourceAny, Type, Val}; +use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync, StoreContextMut}; +use wasmtime_wasi::WasiView; +use wasmtime_wasi_http::WasiHttpView; pub struct TestWorkerExecutor { _join_set: Option>>, @@ -581,7 +585,7 @@ impl ResourceStore for TestWorkerCtx { } async fn get(&mut self, resource_id: u64) -> Option { - self.durable_ctx.get(resource_id).await + ResourceStore::get(&mut self.durable_ctx, resource_id).await } async fn borrow(&self, resource_id: u64) -> Option { @@ -683,6 +687,14 @@ impl WorkerCtx for TestWorkerCtx { Ok(Self { durable_ctx }) } + fn as_wasi_view(&mut self) -> impl WasiView { + self.durable_ctx.as_wasi_view() + } + + fn as_wasi_http_view(&mut self) -> impl WasiHttpView { + self.durable_ctx.as_wasi_http_view() + } + fn get_public_state(&self) -> &Self::PublicState { &self.durable_ctx.public_state } @@ -695,6 +707,10 @@ impl WorkerCtx for TestWorkerCtx { self.durable_ctx.worker_id() } + fn owned_worker_id(&self) -> &OwnedWorkerId { + self.durable_ctx.owned_worker_id() + } + fn component_metadata(&self) -> &ComponentMetadata { self.durable_ctx.component_metadata() } @@ -710,6 +726,15 @@ impl WorkerCtx for TestWorkerCtx { fn worker_proxy(&self) -> Arc { self.durable_ctx.worker_proxy() } + + async fn generate_unique_local_worker_id( + &mut self, + remote_worker_id: TargetWorkerId, + ) -> Result { + self.durable_ctx + .generate_unique_local_worker_id(remote_worker_id) + .await + } } #[async_trait] @@ -766,6 +791,111 @@ impl FileSystemReading for TestWorkerCtx { } } +#[async_trait] +impl HostWasmRpc for TestWorkerCtx { + async fn new(&mut self, location: Uri) -> anyhow::Result> { + self.durable_ctx.new(location).await + } + + async fn invoke_and_await( + &mut self, + self_: Resource, + function_name: String, + function_params: Vec, + ) -> anyhow::Result> { + self.durable_ctx + .invoke_and_await(self_, function_name, function_params) + .await + } + + async fn invoke( + &mut self, + self_: Resource, + function_name: String, + function_params: Vec, + ) -> anyhow::Result> { + self.durable_ctx + .invoke(self_, function_name, function_params) + .await + } + + async fn async_invoke_and_await( + &mut self, + self_: Resource, + function_name: String, + function_params: Vec, + ) -> anyhow::Result> { + self.durable_ctx + .async_invoke_and_await(self_, function_name, function_params) + .await + } + + async fn drop(&mut self, rep: Resource) -> anyhow::Result<()> { + HostWasmRpc::drop(&mut self.durable_ctx, rep).await + } +} + +#[async_trait] +impl HostFutureInvokeResult for TestWorkerCtx { + async fn subscribe( + &mut self, + self_: Resource, + ) -> anyhow::Result> { + HostFutureInvokeResult::subscribe(&mut self.durable_ctx, self_).await + } + + async fn get( + &mut self, + self_: Resource, + ) -> anyhow::Result>> { + HostFutureInvokeResult::get(&mut self.durable_ctx, self_).await + } + + async fn drop(&mut self, rep: Resource) -> anyhow::Result<()> { + HostFutureInvokeResult::drop(&mut self.durable_ctx, rep).await + } +} + +#[async_trait] +impl DynamicLinking for TestWorkerCtx { + fn link( + &mut self, + engine: &Engine, + linker: &mut Linker, + component: &Component, + ) -> anyhow::Result<()> { + self.durable_ctx.link(engine, linker, component) + } + + async fn dynamic_function_call( + store: impl AsContextMut + Send, + interface_name: &str, + function_name: &str, + params: &[Val], + param_types: &[Type], + results: &mut [Val], + result_types: &[Type], + ) -> anyhow::Result<()> { + DurableWorkerCtx::::dynamic_function_call( + store, + interface_name, + function_name, + params, + param_types, + results, + result_types, + ) + .await + } + + async fn drop_linked_resource( + store: StoreContextMut<'_, TestWorkerCtx>, + rep: u32, + ) -> anyhow::Result<()> { + DurableWorkerCtx::::drop_linked_resource(store, rep).await + } +} + #[async_trait] impl Bootstrap for ServerBootstrap { fn create_active_workers( diff --git a/wasm-ast/src/analysis/mod.rs b/wasm-ast/src/analysis/mod.rs index 66e58fb93..18095ef1f 100644 --- a/wasm-ast/src/analysis/mod.rs +++ b/wasm-ast/src/analysis/mod.rs @@ -617,9 +617,11 @@ impl AnalysisContext { Ok((Mrc::new(ComponentSection::Type(component_type.clone())), self.clone())) } InstanceTypeDeclaration::Alias(alias) => { - let component_idx = self.component_stack.last().unwrap().component_idx.unwrap(); + let component_idx = self.component_stack.last().expect("Component stack is empty").component_idx.unwrap_or_default(); let new_ctx = self.push_component(self.get_component(), component_idx); // Emulating an inner scope by duplicating the current component on the stack (TODO: refactor this) + // Note: because we not in an inner component, but an inner instance declaration and the current analysis stack + // does not have this concept. new_ctx.follow_redirects(Mrc::new(ComponentSection::Alias(alias.clone()))) } InstanceTypeDeclaration::Export { .. } => { diff --git a/wasm-ast/tests/exports.rs b/wasm-ast/tests/exports.rs index 899c8071c..138bbbdee 100644 --- a/wasm-ast/tests/exports.rs +++ b/wasm-ast/tests/exports.rs @@ -700,3 +700,77 @@ fn exports_caller_composed_component() { pretty_assertions::assert_eq!(metadata, expected); } + +#[test] +fn exports_caller_component() { + // NOTE: Same as caller_composed.wasm but not composed with the generated stub + let source_bytes = include_bytes!("../wasm/caller.wasm"); + let component: Component = Component::from_bytes(source_bytes).unwrap(); + + let state = AnalysisContext::new(component); + let metadata = state.get_top_level_exports().unwrap(); + + let expected = vec![ + AnalysedExport::Function(AnalysedFunction { + name: "test1".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: list(tuple(vec![str(), u64()])), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test2".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: u64(), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test3".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: u64(), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test4".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: tuple(vec![list(str()), list(tuple(vec![str(), str()]))]), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test5".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: list(u64()), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "bug-wasm-rpc-i32".to_string(), + parameters: vec![AnalysedFunctionParameter { + name: "in".to_string(), + typ: variant(vec![unit_case("leaf")]), + }], + results: vec![AnalysedFunctionResult { + name: None, + typ: variant(vec![unit_case("leaf")]), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "ephemeral-test1".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: list(tuple(vec![str(), str()])), + }], + }), + ]; + + pretty_assertions::assert_eq!(metadata, expected); +} diff --git a/wasm-ast/wasm/caller.wasm b/wasm-ast/wasm/caller.wasm new file mode 100644 index 000000000..57794b168 Binary files /dev/null and b/wasm-ast/wasm/caller.wasm differ diff --git a/wasm-rpc/src/value_and_type.rs b/wasm-rpc/src/value_and_type.rs index 9c4365494..3ed7fef68 100644 --- a/wasm-rpc/src/value_and_type.rs +++ b/wasm-rpc/src/value_and_type.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::Value; -use golem_wasm_ast::analysis::analysed_type::{list, option, result, result_err, result_ok, tuple}; +use crate::{RpcError, Value}; +use golem_wasm_ast::analysis::analysed_type::{ + list, option, result, result_err, result_ok, tuple, variant, +}; use golem_wasm_ast::analysis::{analysed_type, AnalysedType}; use std::collections::HashMap; use std::time::{Duration, Instant}; @@ -532,3 +534,38 @@ impl IntoValue for Duration { analysed_type::u64() } } + +#[cfg(feature = "host-bindings")] +impl IntoValue for crate::RpcError { + fn into_value(self) -> Value { + match self { + RpcError::ProtocolError(value) => Value::Variant { + case_idx: 0, + case_value: Some(Box::new(Value::String(value))), + }, + RpcError::Denied(value) => Value::Variant { + case_idx: 1, + case_value: Some(Box::new(Value::String(value))), + }, + RpcError::NotFound(value) => Value::Variant { + case_idx: 2, + case_value: Some(Box::new(Value::String(value))), + }, + RpcError::RemoteInternalError(value) => Value::Variant { + case_idx: 3, + case_value: Some(Box::new(Value::String(value))), + }, + } + } + + fn get_type() -> AnalysedType { + use analysed_type::case; + + variant(vec![ + case("protocol-error", analysed_type::str()), + case("denied", analysed_type::str()), + case("not-found", analysed_type::str()), + case("remote-internal-error", analysed_type::str()), + ]) + } +} diff --git a/wasm-rpc/src/wasmtime.rs b/wasm-rpc/src/wasmtime.rs index 6b68e2f84..23b7f21fd 100644 --- a/wasm-rpc/src/wasmtime.rs +++ b/wasm-rpc/src/wasmtime.rs @@ -22,6 +22,7 @@ use golem_wasm_ast::analysis::analysed_type::{ use golem_wasm_ast::analysis::{AnalysedType, TypeResult}; use wasmtime::component::{types, ResourceAny, Type, Val}; +#[derive(Debug)] pub enum EncodingError { ParamTypeMismatch { details: String }, ValueMismatch { details: String }, @@ -464,7 +465,7 @@ async fn decode_param_impl( } } -/// Converts a wasmtime Val to a Golem protobuf Val +/// Converts a wasmtime Val to a wasm-rpc Value #[async_recursion] pub async fn encode_output( value: &Val,