diff --git a/golem-rib/src/function_name.rs b/golem-rib/src/function_name.rs index 41f1b09c4..3cf8efa0c 100644 --- a/golem-rib/src/function_name.rs +++ b/golem-rib/src/function_name.rs @@ -652,6 +652,28 @@ impl ParsedFunctionName { function, }) } + + pub fn is_constructor(&self) -> Option<&str> { + match &self.function { + ParsedFunctionReference::RawResourceConstructor { resource, .. } + | ParsedFunctionReference::IndexedResourceConstructor { resource, .. } => { + Some(resource) + } + _ => None, + } + } + + pub fn is_method(&self) -> Option<&str> { + match &self.function { + ParsedFunctionReference::RawResourceMethod { resource, .. } + | ParsedFunctionReference::IndexedResourceMethod { resource, .. } + | ParsedFunctionReference::RawResourceStaticMethod { resource, .. } + | ParsedFunctionReference::IndexedResourceStaticMethod { resource, .. } => { + Some(resource) + } + _ => None, + } + } } #[cfg(feature = "protobuf")] diff --git a/golem-worker-executor-base/src/durable_host/dynamic_linking.rs b/golem-worker-executor-base/src/durable_host/dynamic_linking.rs new file mode 100644 index 000000000..d47ce3958 --- /dev/null +++ b/golem-worker-executor-base/src/durable_host/dynamic_linking.rs @@ -0,0 +1,460 @@ +use crate::durable_host::wasm_rpc::{UrnExtensions, WasmRpcEntryPayload}; +use crate::durable_host::DurableWorkerCtx; +use crate::services::rpc::RpcDemand; +use crate::workerctx::{DynamicLinking, WorkerCtx}; +use anyhow::anyhow; +use async_trait::async_trait; +use golem_common::model::OwnedWorkerId; +use golem_wasm_rpc::wasmtime::{decode_param, encode_output}; +use golem_wasm_rpc::{HostWasmRpc, Uri, Value, WasmRpcEntry, WitValue}; +use rib::{ParsedFunctionName, ParsedFunctionReference, ParsedFunctionSite}; +use tracing::debug; +use wasmtime::component::types::ComponentItem; +use wasmtime::component::{Component, Linker, Resource, ResourceType, Type, Val}; +use wasmtime::{AsContextMut, Engine, StoreContextMut}; +use wasmtime_wasi::WasiView; + +// TODO: support multiple different dynamic linkers + +#[async_trait] +impl DynamicLinking for DurableWorkerCtx { + fn link( + &mut self, + engine: &Engine, + linker: &mut Linker, + component: &Component, + ) -> anyhow::Result<()> { + let mut root = linker.root(); + + for (name, item) in component.component_type().imports(&engine) { + debug!("Import {name}: {item:?}"); + match item { + ComponentItem::ComponentFunc(_) => { + debug!("MUST LINK COMPONENT FUNC {name}"); + } + ComponentItem::CoreFunc(_) => { + debug!("MUST LINK CORE FUNC {name}"); + } + ComponentItem::Module(_) => { + debug!("MUST LINK MODULE {name}"); + } + ComponentItem::Component(_) => { + debug!("MUST LINK COMPONENT {name}"); + } + ComponentItem::ComponentInstance(ref inst) => { + if name == "auction:auction-stub/stub-auction" + || name == "auction:auction/api" + || name == "rpc:counters-stub/stub-counters" + || name == "rpc:counters/api" + || name == "rpc:ephemeral-stub/stub-ephemeral" + { + debug!("NAME == {name}"); + let mut instance = root.instance(name)?; + + for (ename, eitem) in inst.exports(&engine) { + let name = name.to_owned(); + let ename = ename.to_owned(); + debug!("Instance {name} export {ename}: {eitem:?}"); + + match eitem { + ComponentItem::ComponentFunc(fun) => { + let name2 = name.clone(); + let ename2 = ename.clone(); + instance.func_new_async( + // TODO: instrument async closure + &ename.clone(), + move |store, params, results| { + let name = name2.clone(); + let ename = ename2.clone(); + let param_types: Vec = fun.params().collect(); + let result_types: Vec = fun.results().collect(); + Box::new(async move { + Ctx::dynamic_function_call( + store, + &name, + &ename, + params, + ¶m_types, + results, + &result_types, + ) + .await?; + // TODO: failures here must be somehow handled + Ok(()) + }) + }, + )?; + debug!("LINKED {name} export {ename}"); + } + ComponentItem::CoreFunc(_) => {} + ComponentItem::Module(_) => {} + ComponentItem::Component(component) => { + debug!("MUST LINK COMPONENT {ename} {component:?}"); + } + ComponentItem::ComponentInstance(instance) => { + debug!("MUST LINK COMPONENT INSTANCE {ename} {instance:?}"); + } + ComponentItem::Type(_) => {} + ComponentItem::Resource(resource) => { + if ename != "pollable" { + // TODO: ?? this should be 'if it is not already linked' but not way to check that + debug!("LINKING RESOURCE {ename} {resource:?}"); + instance.resource( + &ename, + ResourceType::host::(), + Ctx::drop_linked_resource, + )?; + } + } + } + } + } else { + debug!("NAME NOT MATCHING: {name}"); + } + } + ComponentItem::Type(_) => {} + ComponentItem::Resource(_) => {} + } + } + + Ok(()) + } + + async fn dynamic_function_call( + mut store: impl AsContextMut + Send, + interface_name: &str, + function_name: &str, + params: &[Val], + param_types: &[Type], + results: &mut [Val], + result_types: &[Type], + ) -> anyhow::Result<()> { + let mut store = store.as_context_mut(); + debug!( + "Instance {interface_name} export {function_name} called XXX {} params {} results", + params.len(), + results.len() + ); + + // TODO: add an enum with the call types (interface stub constructor, resource stub constructor, etc) + // TODO: and detect which one it is based on metadata + type info + + if (interface_name == "auction:auction-stub/stub-auction" + && function_name == "[constructor]api") + || (interface_name == "rpc:counters-stub/stub-counters" + && function_name == "[constructor]api") + { + // Simple stub interface constructor + + let target_worker_urn = params[0].clone(); + debug!("CREATING AUCTION STUB TARGETING WORKER {target_worker_urn:?}"); + // Record([("value", String("urn:worker:2a174805-bdd5-49e1-b1e8-124208123b4a/auction-5f0a94f1-1d14-4b65-8e6c-3a8fa3c24ea9"))]) + + let (remote_worker_id, demand) = + Self::create_rpc_target(&mut store, target_worker_urn).await?; + + let handle = { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + table.push(WasmRpcEntry { + payload: Box::new(WasmRpcEntryPayload::Interface { + demand, + remote_worker_id, + }), + })? + }; + results[0] = Val::Resource(handle.try_into_resource_any(store)?); + } else if (interface_name == "auction:auction-stub/stub-auction" + && function_name == "[constructor]running-auction") + || (interface_name == "rpc:counters-stub/stub-counters" + && function_name == "[constructor]counter") + { + // Resource stub constructor + + // First parameter is the target uri + // Rest of the parameters must be sent to the remote constructor + + let target_worker_urn = params[0].clone(); + let (remote_worker_id, demand) = + Self::create_rpc_target(&mut store, target_worker_urn.clone()).await?; + + // First creating a resource for invoking the constructor (to avoid having to make a special case) + let handle = { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + table.push(WasmRpcEntry { + payload: Box::new(WasmRpcEntryPayload::Interface { + demand, + remote_worker_id, + }), + })? + }; + let temp_handle = handle.rep(); + + let constructor_result = Self::remote_invoke( + &interface_name, + &function_name, + params, + param_types, + &mut store, + handle, + ) + .await?; + + // TODO: extract and clean up + let (resource_uri, resource_id) = if let Value::Tuple(values) = constructor_result { + if values.len() == 1 { + if let Value::Handle { uri, resource_id } = values.into_iter().next().unwrap() { + (Uri { value: uri }, resource_id) + } else { + return Err(anyhow!( + "Invalid constructor result: single handle expected" + )); + } + } else { + return Err(anyhow!( + "Invalid constructor result: single handle expected" + )); + } + } else { + return Err(anyhow!( + "Invalid constructor result: single handle expected" + )); + }; + + let (remote_worker_id, demand) = + Self::create_rpc_target(&mut store, target_worker_urn).await?; + + let handle = { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + + let temp_handle: Resource = Resource::new_own(temp_handle); + table.delete(temp_handle)?; // Removing the temporary handle + + table.push(WasmRpcEntry { + payload: Box::new(WasmRpcEntryPayload::Resource { + demand, + remote_worker_id, + resource_uri, + resource_id, + }), + })? + }; + results[0] = Val::Resource(handle.try_into_resource_any(store)?); + } else if function_name.starts_with("[method]") { + // Simple stub interface method + debug!( + "{function_name} handle={:?}, rest={:?}", + params[0], + params.iter().skip(1).collect::>() + ); + + let handle = match params[0] { + Val::Resource(handle) => handle, + _ => return Err(anyhow!("Invalid handle parameter")), + }; + let handle: Resource = handle.try_into_resource(&mut store)?; + { + let mut wasi = store.data_mut().as_wasi_view(); + let entry = wasi.table().get(&handle)?; + let payload = entry.payload.downcast_ref::().unwrap(); + debug!("CALLING {function_name} ON {}", payload.remote_worker_id()); + } + + let result = Self::remote_invoke( + &interface_name, + &function_name, + params, + param_types, + &mut store, + handle, + ) + .await?; + Self::value_result_to_wasmtime_vals(result, results, result_types, &mut store).await?; + } + + Ok(()) + } + + fn drop_linked_resource(mut store: StoreContextMut<'_, Ctx>, rep: u32) -> anyhow::Result<()> { + let mut wasi = store.data_mut().as_wasi_view(); + let table = wasi.table(); + let entry: &WasmRpcEntry = table.get_any_mut(rep).unwrap().downcast_ref().unwrap(); // TODO: error handling + let payload = entry.payload.downcast_ref::().unwrap(); + debug!("DROPPING RESOURCE {payload:?}"); + if let WasmRpcEntryPayload::Resource { .. } = payload { + // TODO: remote drop + } + Ok(()) + } +} + +// TODO: these helpers probably should not be directly living in DurableWorkerCtx +impl DurableWorkerCtx { + async fn remote_invoke( + interface_name: &&str, + function_name: &&str, + params: &[Val], + param_types: &[Type], + store: &mut StoreContextMut<'_, Ctx>, + handle: Resource, + ) -> anyhow::Result { + let stub_function_name = + ParsedFunctionName::parse(&format!("{interface_name}.{{{function_name}}}")) + .map_err(|err| anyhow!(err))?; // TODO: proper error + debug!("STUB FUNCTION NAME: {stub_function_name:?}"); + let target_function_name = ParsedFunctionName { + site: if interface_name.starts_with("auction") { + ParsedFunctionSite::PackagedInterface { + // TODO: this must come from component metadata linking information + namespace: "auction".to_string(), + package: "auction".to_string(), + interface: "api".to_string(), + version: None, + } + } else { + ParsedFunctionSite::PackagedInterface { + namespace: "rpc".to_string(), + package: "counters".to_string(), + interface: "api".to_string(), + version: None, + } + }, + function: if let Some(resource) = stub_function_name.is_constructor() { + ParsedFunctionReference::RawResourceConstructor { + resource: resource.to_string(), + } + } else { + match &stub_function_name.function { + ParsedFunctionReference::RawResourceMethod { resource, method } + if resource == "counter" => + { + ParsedFunctionReference::RawResourceMethod { + resource: resource.to_string(), + method: method + .strip_prefix("blocking-") // TODO: we also have to support the non-blocking variants + .unwrap() + .to_string(), + } + } + _ => ParsedFunctionReference::Function { + function: stub_function_name + .function + .resource_method_name() + .unwrap() // TODO: proper error + .strip_prefix("blocking-") // TODO: we also have to support the non-blocking variants + .unwrap() + .to_string(), + }, + } + }, + }; + + let mut wit_value_params = Vec::new(); + for (param, typ) in params.iter().zip(param_types).skip(1) { + let value: Value = encode_output(param, typ, store.data_mut()) + .await + .map_err(|err| anyhow!(format!("{err:?}")))?; // TODO: proper error + let wit_value: WitValue = value.into(); + wit_value_params.push(wit_value); + } + + debug!( + "CALLING {function_name} as {target_function_name} with parameters {wit_value_params:?}", + ); + + // "auction:auction/api.{initialize}", + let wit_value_result = store + .data_mut() + .invoke_and_await(handle, target_function_name.to_string(), wit_value_params) + .await??; + + debug!("CALLING {function_name} RESULTED IN {:?}", wit_value_result); + + let value_result: Value = wit_value_result.into(); + Ok(value_result) + } + + async fn value_result_to_wasmtime_vals( + value_result: Value, + results: &mut [Val], + result_types: &[Type], + store: &mut StoreContextMut<'_, Ctx>, + ) -> anyhow::Result<()> { + match value_result { + Value::Tuple(values) | Value::Record(values) => { + for (idx, (value, typ)) in values.iter().zip(result_types).enumerate() { + let result = decode_param(&value, &typ, store.data_mut()) + .await + .map_err(|err| anyhow!(format!("{err:?}")))?; // TODO: proper error + results[idx] = result.val; + // TODO: do we have to do something with result.resources_to_drop here? + } + } + _ => { + return Err(anyhow!( + "Unexpected result value {value_result:?}, expected tuple or record" + )); + } + } + + Ok(()) + } +} + +// TODO: these helpers probably should not be directly living in DurableWorkerCtx +impl DurableWorkerCtx { + async fn create_rpc_target( + store: &mut StoreContextMut<'_, Ctx>, + target_worker_urn: Val, + ) -> anyhow::Result<(OwnedWorkerId, Box)> { + let worker_urn = match target_worker_urn { + Val::Record(ref record) => { + let mut target = None; + for (key, val) in record.iter() { + if key == "value" { + match val { + Val::String(s) => { + target = Some(s.clone()); + } + _ => {} + } + } + } + target + } + _ => None, + }; + + let (remote_worker_id, demand) = if let Some(location) = worker_urn { + let uri = Uri { + value: location.clone(), + }; + match uri.parse_as_golem_urn() { + Some((remote_worker_id, None)) => { + let remote_worker_id = store + .data_mut() + .generate_unique_local_worker_id(remote_worker_id) + .await?; + + let remote_worker_id = OwnedWorkerId::new( + &store.data().owned_worker_id().account_id, + &remote_worker_id, + ); + let demand = store.data().rpc().create_demand(&remote_worker_id).await; + (remote_worker_id, demand) + } + _ => { + return Err(anyhow!( + "Invalid URI: {}. Must be urn:worker:component-id/worker-name", + location + )) + } + } + } else { + return Err(anyhow!("Missing or invalid worker URN parameter")); // TODO: more details; + }; + Ok((remote_worker_id, demand)) + } +} diff --git a/golem-worker-executor-base/src/durable_host/mod.rs b/golem-worker-executor-base/src/durable_host/mod.rs index b42fdd74b..dee490a98 100644 --- a/golem-worker-executor-base/src/durable_host/mod.rs +++ b/golem-worker-executor-base/src/durable_host/mod.rs @@ -18,6 +18,7 @@ use crate::durable_host::http::serialized::SerializableHttpRequest; use crate::durable_host::io::{ManagedStdErr, ManagedStdIn, ManagedStdOut}; use crate::durable_host::replay_state::ReplayState; +use crate::durable_host::serialized::SerializableError; use crate::durable_host::sync_helper::{SyncHelper, SyncHelperPermit}; use crate::durable_host::wasm_rpc::UrnExtensions; use crate::error::GolemError; @@ -65,7 +66,6 @@ use golem_common::model::oplog::{ }; use golem_common::model::plugin::{PluginOwner, PluginScope}; use golem_common::model::regions::{DeletedRegions, OplogRegion}; -use golem_common::model::RetryConfig; use golem_common::model::{exports, PluginInstallationId}; use golem_common::model::{ AccountId, ComponentFilePath, ComponentFilePermissions, ComponentFileSystemNode, @@ -74,6 +74,7 @@ use golem_common::model::{ ScheduledAction, SuccessfulUpdateRecord, Timestamp, WorkerEvent, WorkerFilter, WorkerId, WorkerMetadata, WorkerResourceDescription, WorkerStatus, WorkerStatusRecord, }; +use golem_common::model::{RetryConfig, TargetWorkerId}; use golem_common::retries::get_delay; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::wasmtime::ResourceStore; @@ -116,6 +117,7 @@ mod sockets; pub mod wasm_rpc; mod durability; +mod dynamic_linking; mod replay_state; mod sync_helper; @@ -288,6 +290,10 @@ impl DurableWorkerCtx { &self.owned_worker_id.worker_id } + pub fn owned_worker_id(&self) -> &OwnedWorkerId { + &self.owned_worker_id + } + pub fn component_metadata(&self) -> &ComponentMetadata { &self.state.component_metadata } @@ -476,6 +482,32 @@ impl DurableWorkerCtx { } } } + + pub async fn generate_unique_local_worker_id( + &mut self, + remote_worker_id: TargetWorkerId, + ) -> Result { + match remote_worker_id.clone().try_into_worker_id() { + Some(worker_id) => Ok(worker_id), + None => { + let worker_id = Durability::::wrap( + self, + WrappedFunctionType::ReadLocal, + "golem::rpc::wasm-rpc::generate_unique_local_worker_id", + (), + |ctx| { + Box::pin(async move { + ctx.rpc() + .generate_unique_local_worker_id(remote_worker_id) + .await + }) + }, + ) + .await?; + Ok(worker_id) + } + } + } } impl> DurableWorkerCtx { diff --git a/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs b/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs index 6853c11c7..748103e55 100644 --- a/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs +++ b/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs @@ -36,13 +36,14 @@ use golem_common::model::{ }; use golem_common::uri::oss::urn::{WorkerFunctionUrn, WorkerOrFunctionUrn}; use golem_wasm_rpc::golem::rpc::types::{ - FutureInvokeResult, HostFutureInvokeResult, Pollable, Uri, + FutureInvokeResult, HostFutureInvokeResult, Pollable, Uri, WasmRpc, }; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; use golem_wasm_rpc::{ - FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, ValueAndType, WasmRpcEntry, WitValue, + FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, Value, ValueAndType, WasmRpcEntry, WitValue, }; use std::any::Any; +use std::fmt::{Debug, Formatter}; use std::str::FromStr; use std::sync::Arc; use tracing::{error, warn}; @@ -60,14 +61,15 @@ impl HostWasmRpc for DurableWorkerCtx { match location.parse_as_golem_urn() { Some((remote_worker_id, None)) => { - let remote_worker_id = - generate_unique_local_worker_id(self, remote_worker_id).await?; + let remote_worker_id = self + .generate_unique_local_worker_id(remote_worker_id) + .await?; let remote_worker_id = OwnedWorkerId::new(&self.owned_worker_id.account_id, &remote_worker_id); let demand = self.rpc().create_demand(&remote_worker_id).await; let entry = self.table().push(WasmRpcEntry { - payload: Box::new(WasmRpcEntryPayload { + payload: Box::new(WasmRpcEntryPayload::Interface { demand, remote_worker_id, }), @@ -85,7 +87,7 @@ impl HostWasmRpc for DurableWorkerCtx { &mut self, self_: Resource, function_name: String, - function_params: Vec, + mut function_params: Vec, ) -> anyhow::Result> { record_host_function_call("golem::rpc::wasm-rpc", "invoke-and-await"); let args = self.get_arguments().await?; @@ -95,7 +97,26 @@ impl HostWasmRpc for DurableWorkerCtx { let entry = self.table().get(&self_)?; let payload = entry.payload.downcast_ref::().unwrap(); - let remote_worker_id = payload.remote_worker_id.clone(); + let remote_worker_id = payload.remote_worker_id().clone(); + + // TODO: do this in other variants too + match payload { + WasmRpcEntryPayload::Resource { + resource_uri, + resource_id, + .. + } => { + function_params.insert( + 0, + Value::Handle { + uri: resource_uri.value.to_string(), + resource_id: *resource_id, + } + .into(), + ); + } + _ => {} + } let current_idempotency_key = self .get_current_idempotency_key() @@ -226,7 +247,7 @@ impl HostWasmRpc for DurableWorkerCtx { let entry = self.table().get(&self_)?; let payload = entry.payload.downcast_ref::().unwrap(); - let remote_worker_id = payload.remote_worker_id.clone(); + let remote_worker_id = payload.remote_worker_id().clone(); let current_idempotency_key = self .get_current_idempotency_key() @@ -317,7 +338,7 @@ impl HostWasmRpc for DurableWorkerCtx { let entry = self.table().get(&this)?; let payload = entry.payload.downcast_ref::().unwrap(); - let remote_worker_id = payload.remote_worker_id.clone(); + let remote_worker_id = payload.remote_worker_id().clone(); let current_idempotency_key = self .get_current_idempotency_key() @@ -748,32 +769,6 @@ impl HostFutureInvokeResult for DurableWorkerCtx { #[async_trait] impl golem_wasm_rpc::Host for DurableWorkerCtx {} -async fn generate_unique_local_worker_id( - ctx: &mut DurableWorkerCtx, - remote_worker_id: TargetWorkerId, -) -> Result { - match remote_worker_id.clone().try_into_worker_id() { - Some(worker_id) => Ok(worker_id), - None => { - let worker_id = Durability::::wrap( - ctx, - WrappedFunctionType::ReadLocal, - "golem::rpc::wasm-rpc::generate_unique_local_worker_id", - (), - |ctx| { - Box::pin(async move { - ctx.rpc() - .generate_unique_local_worker_id(remote_worker_id) - .await - }) - }, - ) - .await?; - Ok(worker_id) - } - } -} - /// Tries to get a `ValueAndType` representation for the given `WitValue` parameters by querying the latest component metadata for the /// target component. /// If the query fails, or the expected function name is not in its metadata or the number of parameters does not match, then it returns an @@ -805,10 +800,63 @@ async fn try_get_typed_parameters( Vec::new() } -pub struct WasmRpcEntryPayload { - #[allow(dead_code)] - demand: Box, - remote_worker_id: OwnedWorkerId, +pub enum WasmRpcEntryPayload { + Interface { + #[allow(dead_code)] + demand: Box, + remote_worker_id: OwnedWorkerId, + }, + Resource { + #[allow(dead_code)] + demand: Box, + remote_worker_id: OwnedWorkerId, + resource_uri: Uri, + resource_id: u64, + }, +} + +impl Debug for WasmRpcEntryPayload { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Interface { + remote_worker_id, .. + } => f + .debug_struct("Interface") + .field("remote_worker_id", remote_worker_id) + .finish(), + Self::Resource { + remote_worker_id, + resource_uri, + resource_id, + .. + } => f + .debug_struct("Resource") + .field("remote_worker_id", remote_worker_id) + .field("resource_uri", resource_uri) + .field("resource_id", resource_id) + .finish(), + } + } +} + +impl WasmRpcEntryPayload { + pub fn remote_worker_id(&self) -> &OwnedWorkerId { + match self { + Self::Interface { + remote_worker_id, .. + } => remote_worker_id, + Self::Resource { + remote_worker_id, .. + } => remote_worker_id, + } + } + + pub fn demand(&self) -> &Box { + match self { + Self::Interface { demand, .. } => demand, + Self::Resource { demand, .. } => demand, + } + } } pub trait UrnExtensions { diff --git a/golem-worker-executor-base/src/worker.rs b/golem-worker-executor-base/src/worker.rs index a7de8b183..a8480dc97 100644 --- a/golem-worker-executor-base/src/worker.rs +++ b/golem-worker-executor-base/src/worker.rs @@ -20,6 +20,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use crate::durable_host::recover_stderr_logs; +use crate::durable_host::wasm_rpc::{UrnExtensions, WasmRpcEntryPayload}; use crate::error::{GolemError, WorkerOutOfMemory}; use crate::function_result_interpreter::interpret_function_results; use crate::invocation::{find_first_available_function, invoke_worker, InvokeResult}; @@ -30,6 +31,7 @@ use crate::model::{ use crate::services::component::ComponentMetadata; use crate::services::events::Event; use crate::services::oplog::{CommitLevel, Oplog, OplogOps}; +use crate::services::rpc::RpcDemand; use crate::services::worker_event::{WorkerEventService, WorkerEventServiceDefault}; use crate::services::{ All, HasActiveWorkers, HasAll, HasBlobStoreService, HasComponentService, HasConfig, HasEvents, @@ -57,15 +59,17 @@ use golem_common::model::{ }; use golem_common::retries::get_delay; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; -use golem_wasm_rpc::Value; +use golem_wasm_rpc::{Uri, Value, WasmRpcEntry}; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{Mutex, MutexGuard, OwnedSemaphorePermit}; use tokio::task::JoinHandle; use tracing::{debug, error, info, span, warn, Instrument, Level}; -use wasmtime::component::Instance; +use wasmtime::component::types::ComponentItem; +use wasmtime::component::{Instance, Resource, ResourceType, Val}; use wasmtime::{AsContext, Store, UpdateDeadline}; +use wasmtime_wasi::WasiView; /// Represents worker that may be running or suspended. /// @@ -473,6 +477,9 @@ impl Worker { } } + /// Invokes the worker and awaits for a result. + /// + /// Successful result is a `TypeAnnotatedValue` encoding either a tuple or a record. pub async fn invoke_and_await( &self, idempotency_key: IdempotencyKey, @@ -1350,7 +1357,8 @@ impl RunningWorker { ) .await?; - let mut store = Store::new(&parent.engine(), context); + let engine = parent.engine(); + let mut store = Store::new(&engine, context); store.set_epoch_deadline(parent.config().limits.epoch_ticks); let worker_id_clone = worker_metadata.worker_id.clone(); store.epoch_deadline_callback(move |mut store| { @@ -1371,7 +1379,15 @@ impl RunningWorker { store.limiter_async(|ctx| ctx.resource_limiter()); - let instance_pre = parent.linker().instantiate_pre(&component).map_err(|e| { + // TODO MOVE SOMEWHERE ELSE + let mut linker = (*parent.linker()).clone(); // fresh linker + store.data_mut().link(&engine, &mut linker, &component)?; + + // TODO: check parent.linker() is not affected + + // TODO ^^^ + + let instance_pre = linker.instantiate_pre(&component).map_err(|e| { GolemError::worker_creation_failed( parent.owned_worker_id.worker_id(), format!( @@ -1551,13 +1567,13 @@ impl RunningWorker { store, &instance, ) - .await; + .await; match result { Ok(InvokeResult::Succeeded { - output, - consumed_fuel, - }) => { + output, + consumed_fuel, + }) => { let component_metadata = store.as_context().data().component_metadata(); @@ -1577,9 +1593,9 @@ impl RunningWorker { output, function_results, ) - .map_err(|e| GolemError::ValueMismatch { - details: e.join(", "), - }); + .map_err(|e| GolemError::ValueMismatch { + details: e.join(", "), + }); match result { Ok(result) => { @@ -1685,8 +1701,8 @@ impl RunningWorker { } } } - .instrument(span) - .await; + .instrument(span) + .await; if do_break { break; } @@ -1713,7 +1729,7 @@ impl RunningWorker { vec![ "golem:api/save-snapshot@1.1.0.{save}".to_string(), "golem:api/save-snapshot@0.2.0.{save}".to_string(), - ] + ], ) { store.data_mut().begin_call_snapshotting_function(); @@ -1950,9 +1966,9 @@ impl InvocationResult { OplogEntry::Error { error, .. } => { let stderr = recover_stderr_logs(services, owned_worker_id, oplog_idx).await; Err(FailedInvocationResult { trap_type: TrapType::Error(error), stderr }) - }, - OplogEntry::Interrupted { .. } => Err(FailedInvocationResult { trap_type: TrapType::Interrupt(InterruptKind::Interrupt), stderr: "".to_string()}), - OplogEntry::Exited { .. } => Err(FailedInvocationResult { trap_type: TrapType::Exit, stderr: "".to_string()}), + } + OplogEntry::Interrupted { .. } => Err(FailedInvocationResult { trap_type: TrapType::Interrupt(InterruptKind::Interrupt), stderr: "".to_string() }), + OplogEntry::Exited { .. } => Err(FailedInvocationResult { trap_type: TrapType::Exit, stderr: "".to_string() }), _ => panic!("Unexpected oplog entry pointed by invocation result at index {oplog_idx} for {owned_worker_id:?}") }; diff --git a/golem-worker-executor-base/src/workerctx.rs b/golem-worker-executor-base/src/workerctx.rs index a46ac4cf1..c666e2ddd 100644 --- a/golem-worker-executor-base/src/workerctx.rs +++ b/golem-worker-executor-base/src/workerctx.rs @@ -15,12 +15,6 @@ use std::collections::HashSet; use std::sync::{Arc, RwLock, Weak}; -use async_trait::async_trait; -use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; -use golem_wasm_rpc::wasmtime::ResourceStore; -use golem_wasm_rpc::Value; -use wasmtime::{AsContextMut, ResourceLimiterAsync}; - use crate::error::GolemError; use crate::model::{ CurrentResourceLimits, ExecutionStatus, InterruptKind, LastError, ListDirectoryResult, @@ -44,13 +38,22 @@ use crate::services::{ worker_enumeration, HasAll, HasConfig, HasOplog, HasOplogService, HasWorker, }; use crate::worker::{RetryDecision, Worker}; +use async_trait::async_trait; use golem_common::model::component::ComponentOwner; use golem_common::model::oplog::WorkerResourceId; use golem_common::model::plugin::PluginScope; use golem_common::model::{ AccountId, ComponentFilePath, ComponentVersion, IdempotencyKey, OwnedWorkerId, - PluginInstallationId, WorkerId, WorkerMetadata, WorkerStatus, WorkerStatusRecord, + PluginInstallationId, TargetWorkerId, WorkerId, WorkerMetadata, WorkerStatus, + WorkerStatusRecord, }; +use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; +use golem_wasm_rpc::wasmtime::ResourceStore; +use golem_wasm_rpc::Value; +use wasmtime::component::{Component, Linker, Type, Val}; +use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync, StoreContextMut}; +use wasmtime_wasi::WasiView; +use wasmtime_wasi_http::WasiHttpView; /// WorkerCtx is the primary customization and extension point of worker executor. It is the context /// associated with each running worker, and it is responsible for initializing the WASM linker as @@ -66,6 +69,7 @@ pub trait WorkerCtx: + IndexedResourceStore + UpdateManagement + FileSystemReading + + DynamicLinking + Send + Sync + Sized @@ -132,6 +136,9 @@ pub trait WorkerCtx: >, ) -> Result; + fn as_wasi_view(&mut self) -> impl WasiView; + fn as_wasi_http_view(&mut self) -> impl WasiHttpView; + /// Get the public part of the worker context fn get_public_state(&self) -> &Self::PublicState; @@ -144,6 +151,9 @@ pub trait WorkerCtx: /// Get the worker ID associated with this worker context fn worker_id(&self) -> &WorkerId; + /// Get the owned worker ID associated with this worker context + fn owned_worker_id(&self) -> &OwnedWorkerId; + fn component_metadata(&self) -> &ComponentMetadata; /// The WASI exit API can use a special error to exit from the WASM execution. As this depends @@ -157,6 +167,12 @@ pub trait WorkerCtx: /// Gets an interface to the worker-proxy which can direct calls to other worker executors /// in the cluster fn worker_proxy(&self) -> Arc; + + // TODO: where do this belong + async fn generate_unique_local_worker_id( + &mut self, + remote_worker_id: TargetWorkerId, + ) -> Result; } /// The fuel management interface of a worker context is responsible for borrowing and returning @@ -400,3 +416,25 @@ pub trait FileSystemReading { ) -> Result; async fn read_file(&self, path: &ComponentFilePath) -> Result; } + +#[async_trait] +pub trait DynamicLinking { + fn link( + &mut self, + engine: &Engine, + linker: &mut Linker, + component: &Component, + ) -> anyhow::Result<()>; + + async fn dynamic_function_call( + store: impl AsContextMut + Send, + interface_name: &str, + function_name: &str, + params: &[Val], + param_types: &[Type], + results: &mut [Val], + result_types: &[Type] + ) -> anyhow::Result<()>; + + fn drop_linked_resource(store: StoreContextMut<'_, Ctx>, rep: u32) -> anyhow::Result<()>; +} diff --git a/golem-worker-executor-base/tests/common/mod.rs b/golem-worker-executor-base/tests/common/mod.rs index 0d5d83f1d..f15965677 100644 --- a/golem-worker-executor-base/tests/common/mod.rs +++ b/golem-worker-executor-base/tests/common/mod.rs @@ -5,7 +5,9 @@ use std::collections::HashSet; use golem_service_base::service::initial_component_files::InitialComponentFilesService; use golem_service_base::storage::blob::BlobStorage; use golem_wasm_rpc::wasmtime::ResourceStore; -use golem_wasm_rpc::{Uri, Value}; +use golem_wasm_rpc::{ + FutureInvokeResultEntry, HostWasmRpc, RpcError, Uri, Value, WasmRpcEntry, WitValue, +}; use golem_worker_executor_base::services::file_loader::FileLoader; use prometheus::Registry; @@ -18,8 +20,8 @@ use std::sync::{Arc, RwLock, Weak}; use golem_common::model::{ AccountId, ComponentFilePath, ComponentId, ComponentVersion, IdempotencyKey, OwnedWorkerId, - PluginInstallationId, ScanCursor, WorkerFilter, WorkerId, WorkerMetadata, WorkerStatus, - WorkerStatusRecord, + PluginInstallationId, ScanCursor, TargetWorkerId, WorkerFilter, WorkerId, WorkerMetadata, + WorkerStatus, WorkerStatusRecord, }; use golem_service_base::config::{BlobStorageConfig, LocalFileSystemBlobStorageConfig}; use golem_worker_executor_base::error::GolemError; @@ -51,8 +53,8 @@ use golem_worker_executor_base::services::worker_event::WorkerEventService; use golem_worker_executor_base::services::{plugins, All, HasAll, HasConfig, HasOplogService}; use golem_worker_executor_base::wasi_host::create_linker; use golem_worker_executor_base::workerctx::{ - ExternalOperations, FileSystemReading, FuelManagement, IndexedResourceStore, InvocationHooks, - InvocationManagement, StatusManagement, UpdateManagement, WorkerCtx, + DynamicLinking, ExternalOperations, FileSystemReading, FuelManagement, IndexedResourceStore, + InvocationHooks, InvocationManagement, StatusManagement, UpdateManagement, WorkerCtx, }; use golem_worker_executor_base::Bootstrap; @@ -79,6 +81,7 @@ use golem_test_framework::components::shard_manager::ShardManager; use golem_test_framework::components::worker_executor_cluster::WorkerExecutorCluster; use golem_test_framework::config::TestDependencies; use golem_test_framework::dsl::to_worker_metadata; +use golem_wasm_rpc::golem::rpc::types::{FutureInvokeResult, WasmRpc}; use golem_worker_executor_base::preview2::golem; use golem_worker_executor_base::preview2::golem::api1_1_0; use golem_worker_executor_base::services::events::Events; @@ -94,8 +97,10 @@ use golem_worker_executor_base::services::worker_proxy::WorkerProxy; use golem_worker_executor_base::worker::{RetryDecision, Worker}; use tonic::transport::Channel; use tracing::{debug, info}; -use wasmtime::component::{Instance, Linker, ResourceAny}; -use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync}; +use wasmtime::component::{Component, Instance, Linker, Resource, ResourceAny, Type, Val}; +use wasmtime::{AsContextMut, Engine, ResourceLimiterAsync, StoreContextMut}; +use wasmtime_wasi::WasiView; +use wasmtime_wasi_http::WasiHttpView; pub struct TestWorkerExecutor { _join_set: Option>>, @@ -683,6 +688,14 @@ impl WorkerCtx for TestWorkerCtx { Ok(Self { durable_ctx }) } + fn as_wasi_view(&mut self) -> impl WasiView { + self.durable_ctx.as_wasi_view() + } + + fn as_wasi_http_view(&mut self) -> impl WasiHttpView { + self.durable_ctx.as_wasi_http_view() + } + fn get_public_state(&self) -> &Self::PublicState { &self.durable_ctx.public_state } @@ -695,6 +708,10 @@ impl WorkerCtx for TestWorkerCtx { self.durable_ctx.worker_id() } + fn owned_worker_id(&self) -> &OwnedWorkerId { + self.durable_ctx.owned_worker_id() + } + fn component_metadata(&self) -> &ComponentMetadata { self.durable_ctx.component_metadata() } @@ -710,6 +727,15 @@ impl WorkerCtx for TestWorkerCtx { fn worker_proxy(&self) -> Arc { self.durable_ctx.worker_proxy() } + + async fn generate_unique_local_worker_id( + &mut self, + remote_worker_id: TargetWorkerId, + ) -> Result { + self.durable_ctx + .generate_unique_local_worker_id(remote_worker_id) + .await + } } #[async_trait] @@ -766,6 +792,87 @@ impl FileSystemReading for TestWorkerCtx { } } +#[async_trait] +impl HostWasmRpc for TestWorkerCtx { + async fn new(&mut self, location: Uri) -> anyhow::Result> { + self.durable_ctx.new(location).await + } + + async fn invoke_and_await( + &mut self, + self_: Resource, + function_name: String, + function_params: Vec, + ) -> anyhow::Result> { + self.durable_ctx + .invoke_and_await(self_, function_name, function_params) + .await + } + + async fn invoke( + &mut self, + self_: Resource, + function_name: String, + function_params: Vec, + ) -> anyhow::Result> { + self.durable_ctx + .invoke(self_, function_name, function_params) + .await + } + + async fn async_invoke_and_await( + &mut self, + self_: Resource, + function_name: String, + function_params: Vec, + ) -> anyhow::Result> { + self.durable_ctx + .async_invoke_and_await(self_, function_name, function_params) + .await + } + + fn drop(&mut self, rep: Resource) -> anyhow::Result<()> { + self.durable_ctx.drop(rep) + } +} + +#[async_trait] +impl DynamicLinking for TestWorkerCtx { + fn link( + &mut self, + engine: &Engine, + linker: &mut Linker, + component: &Component, + ) -> anyhow::Result<()> { + self.durable_ctx.link(engine, linker, component) + } + + async fn dynamic_function_call( + store: impl AsContextMut + Send, + interface_name: &str, + function_name: &str, + params: &[Val], + param_types: &[Type], + results: &mut [Val], + result_types: &[Type], + ) -> anyhow::Result<()> { + DurableWorkerCtx::::dynamic_function_call( + store, + interface_name, + function_name, + params, + param_types, + results, + result_types + ) + .await + } + + fn drop_linked_resource(store: StoreContextMut<'_, TestWorkerCtx>, rep: u32) -> anyhow::Result<()> { + DurableWorkerCtx::::drop_linked_resource(store, rep) + } +} + #[async_trait] impl Bootstrap for ServerBootstrap { fn create_active_workers( diff --git a/wasm-ast/src/analysis/mod.rs b/wasm-ast/src/analysis/mod.rs index 66e58fb93..18095ef1f 100644 --- a/wasm-ast/src/analysis/mod.rs +++ b/wasm-ast/src/analysis/mod.rs @@ -617,9 +617,11 @@ impl AnalysisContext { Ok((Mrc::new(ComponentSection::Type(component_type.clone())), self.clone())) } InstanceTypeDeclaration::Alias(alias) => { - let component_idx = self.component_stack.last().unwrap().component_idx.unwrap(); + let component_idx = self.component_stack.last().expect("Component stack is empty").component_idx.unwrap_or_default(); let new_ctx = self.push_component(self.get_component(), component_idx); // Emulating an inner scope by duplicating the current component on the stack (TODO: refactor this) + // Note: because we not in an inner component, but an inner instance declaration and the current analysis stack + // does not have this concept. new_ctx.follow_redirects(Mrc::new(ComponentSection::Alias(alias.clone()))) } InstanceTypeDeclaration::Export { .. } => { diff --git a/wasm-ast/tests/exports.rs b/wasm-ast/tests/exports.rs index 899c8071c..733c5c2dc 100644 --- a/wasm-ast/tests/exports.rs +++ b/wasm-ast/tests/exports.rs @@ -700,3 +700,77 @@ fn exports_caller_composed_component() { pretty_assertions::assert_eq!(metadata, expected); } + +#[test] +fn exports_caller_component() { + // NOTE: Same as caller_composed.wasm but not composed with the generated stub + let source_bytes = include_bytes!("../wasm/caller.wasm"); + let component: Component = Component::from_bytes(source_bytes).unwrap(); + + let state = AnalysisContext::new(component); + let metadata = state.get_top_level_exports().unwrap(); + + let expected = vec![ + AnalysedExport::Function(AnalysedFunction { + name: "test1".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: list(tuple(vec![str(), u64()])), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test2".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: u64(), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test3".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: u64(), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test4".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: tuple(vec![list(str()), list(tuple(vec![str(), str()]))]), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "test5".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: list(u64()), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "bug-wasm-rpc-i32".to_string(), + parameters: vec![AnalysedFunctionParameter { + name: "in".to_string(), + typ: variant(vec![unit_case("leaf")]), + }], + results: vec![AnalysedFunctionResult { + name: None, + typ: variant(vec![unit_case("leaf")]), + }], + }), + AnalysedExport::Function(AnalysedFunction { + name: "ephemeral-test1".to_string(), + parameters: vec![], + results: vec![AnalysedFunctionResult { + name: None, + typ: list(tuple(vec![str(), str()])), + }], + }) + ]; + + pretty_assertions::assert_eq!(metadata, expected); +} diff --git a/wasm-rpc/src/wasmtime.rs b/wasm-rpc/src/wasmtime.rs index 6b68e2f84..23b7f21fd 100644 --- a/wasm-rpc/src/wasmtime.rs +++ b/wasm-rpc/src/wasmtime.rs @@ -22,6 +22,7 @@ use golem_wasm_ast::analysis::analysed_type::{ use golem_wasm_ast::analysis::{AnalysedType, TypeResult}; use wasmtime::component::{types, ResourceAny, Type, Val}; +#[derive(Debug)] pub enum EncodingError { ParamTypeMismatch { details: String }, ValueMismatch { details: String }, @@ -464,7 +465,7 @@ async fn decode_param_impl( } } -/// Converts a wasmtime Val to a Golem protobuf Val +/// Converts a wasmtime Val to a wasm-rpc Value #[async_recursion] pub async fn encode_output( value: &Val,