diff --git a/golem-worker-executor-base/src/durable_host/dynamic_linking.rs b/golem-worker-executor-base/src/durable_host/dynamic_linking.rs index ccc48ee77..27ff1d454 100644 --- a/golem-worker-executor-base/src/durable_host/dynamic_linking.rs +++ b/golem-worker-executor-base/src/durable_host/dynamic_linking.rs @@ -5,6 +5,7 @@ use crate::workerctx::{DynamicLinking, WorkerCtx}; use anyhow::anyhow; use async_trait::async_trait; use golem_common::model::OwnedWorkerId; +use golem_wasm_rpc::golem::rpc::types::{FutureInvokeResult, HostFutureInvokeResult}; use golem_wasm_rpc::wasmtime::{decode_param, encode_output}; use golem_wasm_rpc::{HostWasmRpc, Uri, Value, WasmRpcEntry, WitValue}; use rib::{ParsedFunctionName, ParsedFunctionReference, ParsedFunctionSite}; @@ -13,11 +14,12 @@ use wasmtime::component::types::ComponentItem; use wasmtime::component::{Component, Linker, Resource, ResourceType, Type, Val}; use wasmtime::{AsContextMut, Engine, StoreContextMut}; use wasmtime_wasi::WasiView; - // TODO: support multiple different dynamic linkers #[async_trait] -impl DynamicLinking for DurableWorkerCtx { +impl DynamicLinking + for DurableWorkerCtx +{ fn link( &mut self, engine: &Engine, @@ -60,6 +62,7 @@ impl DynamicLinking for DurableWorkerCtx ComponentItem::ComponentFunc(fun) => { let name2 = name.clone(); let ename2 = ename.clone(); + // TODO: need to do the "rpc call type" detection here and if it's not an rpc call then not calling `func_new_async` (for example for 'subscribe' and 'get' on FutureInvokeResult wrappers) instance.func_new_async( // TODO: instrument async closure &ename.clone(), @@ -96,7 +99,15 @@ impl DynamicLinking for DurableWorkerCtx } ComponentItem::Type(_) => {} ComponentItem::Resource(resource) => { - if ename != "pollable" { + // TODO: need to detect future result resources and register them as FutureInvokeResult + if ename == "future-counter-get-value-result" { + debug!("LINKING FUTURE INVOKE RESULT {ename}"); + instance.resource( + &ename, + ResourceType::host::(), + |_store, _rep| Ok(()), + )?; + } else if ename != "pollable" { // TODO: ?? this should be 'if it is not already linked' but not way to check that debug!("LINKING RESOURCE {ename} {resource:?}"); instance.resource_async( @@ -141,7 +152,7 @@ impl DynamicLinking for DurableWorkerCtx ); // TODO: this has to be moved to be calculated in the linking phase - let call_type = determine_call_type(interface_name, function_name)?; + let call_type = determine_call_type(interface_name, function_name, result_types)?; match call_type { Some(DynamicRpcCall::GlobalStubConstructor) => { @@ -280,10 +291,43 @@ impl DynamicLinking for DurableWorkerCtx Self::value_result_to_wasmtime_vals(result, results, result_types, &mut store) .await?; } + Some(DynamicRpcCall::FireAndForgetFunctionCall { + stub_function_name, + target_function_name, + }) => { + // Async stub interface method + debug!( + "FNF {function_name} handle={:?}, rest={:?}", + params[0], + params.iter().skip(1).collect::>() + ); + + let handle = match params[0] { + Val::Resource(handle) => handle, + _ => return Err(anyhow!("Invalid handle parameter")), + }; + let handle: Resource = handle.try_into_resource(&mut store)?; + { + let mut wasi = store.data_mut().as_wasi_view(); + let entry = wasi.table().get(&handle)?; + let payload = entry.payload.downcast_ref::().unwrap(); + debug!("CALLING {function_name} ON {}", payload.remote_worker_id()); + } + + Self::remote_invoke( + stub_function_name, + target_function_name, + params, + param_types, + &mut store, + handle, + ) + .await?; + } Some(DynamicRpcCall::AsyncFunctionCall { - stub_function_name, - target_function_name, - }) => { + stub_function_name, + target_function_name, + }) => { // Async stub interface method debug!( "ASYNC {function_name} handle={:?}, rest={:?}", @@ -303,17 +347,71 @@ impl DynamicLinking for DurableWorkerCtx debug!("CALLING {function_name} ON {}", payload.remote_worker_id()); } - // let result = Self::remote_invoke( - // stub_function_name, - // target_function_name, - // params, - // param_types, - // &mut store, - // handle, - // ) - // .await?; - // Self::value_result_to_wasmtime_vals(result, results, result_types, &mut store) - // .await?; + let result = Self::remote_async_invoke_and_await( + stub_function_name, + target_function_name, + params, + param_types, + &mut store, + handle, + ) + .await?; + + Self::value_result_to_wasmtime_vals(result, results, result_types, &mut store) + .await?; + } + Some(DynamicRpcCall::FutureInvokeResultSubscribe) => { + let handle = match params[0] { + Val::Resource(handle) => handle, + _ => return Err(anyhow!("Invalid handle parameter")), + }; + let handle: Resource = handle.try_into_resource(&mut store)?; + let pollable = store.data_mut().subscribe(handle).await?; + let pollable_any = pollable.try_into_resource_any(&mut store)?; + let resource_id = store.data_mut().add(pollable_any).await; + + let value_result = Value::Tuple(vec![Value::Handle { + uri: store.data().self_uri().value, + resource_id, + }]); + Self::value_result_to_wasmtime_vals( + value_result, + results, + result_types, + &mut store, + ) + .await?; + } + Some(DynamicRpcCall::FutureInvokeResultGet) => { + let handle = match params[0] { + Val::Resource(handle) => handle, + _ => return Err(anyhow!("Invalid handle parameter")), + }; + let handle: Resource = handle.try_into_resource(&mut store)?; + let result = HostFutureInvokeResult::get(store.data_mut(), handle).await?; + + // NOTE: we are currently failing on RpcError instead of passing it to the caller, as the generated stub interface requires + let value_result = Value::Tuple(vec![match result { + None => Value::Option(None), + Some(Ok(value)) => { + let value: Value = value.into(); + match value { + Value::Tuple(items) if items.len() == 1 => { + Value::Option(Some(Box::new(items.into_iter().next().unwrap()))) + } + _ => Err(anyhow!("Invalid future invoke result value"))?, // TODO: better error + } + } + Some(Err(err)) => Err(anyhow!("RPC invocation failed with {err:?}"))?, // TODO: more information into the error + }]); + + Self::value_result_to_wasmtime_vals( + value_result, + results, + result_types, + &mut store, + ) + .await?; } _ => todo!(), } @@ -372,7 +470,6 @@ impl DurableWorkerCtx { "CALLING {stub_function_name} as {target_function_name} with parameters {wit_value_params:?}", ); - // "auction:auction/api.{initialize}", let wit_value_result = store .data_mut() .invoke_and_await(handle, target_function_name.to_string(), wit_value_params) @@ -387,6 +484,74 @@ impl DurableWorkerCtx { Ok(value_result) } + // TODO: stub_function_name can probably be removed + async fn remote_async_invoke_and_await( + stub_function_name: ParsedFunctionName, + target_function_name: ParsedFunctionName, + params: &[Val], + param_types: &[Type], + store: &mut StoreContextMut<'_, Ctx>, + handle: Resource, + ) -> anyhow::Result { + let mut wit_value_params = Vec::new(); + for (param, typ) in params.iter().zip(param_types).skip(1) { + let value: Value = encode_output(param, typ, store.data_mut()) + .await + .map_err(|err| anyhow!(format!("{err:?}")))?; // TODO: proper error + let wit_value: WitValue = value.into(); + wit_value_params.push(wit_value); + } + + debug!( + "CALLING {stub_function_name} as {target_function_name} with parameters {wit_value_params:?}", + ); + + let invoke_result_resource = store + .data_mut() + .async_invoke_and_await(handle, target_function_name.to_string(), wit_value_params) + .await?; + + let invoke_result_resource_any = + invoke_result_resource.try_into_resource_any(&mut *store)?; + let resource_id = store.data_mut().add(invoke_result_resource_any).await; + + let value_result: Value = Value::Tuple(vec![Value::Handle { + uri: store.data().self_uri().value, + resource_id, + }]); + Ok(value_result) + } + + // TODO: stub_function_name can probably be removed + async fn remote_invoke( + stub_function_name: ParsedFunctionName, + target_function_name: ParsedFunctionName, + params: &[Val], + param_types: &[Type], + store: &mut StoreContextMut<'_, Ctx>, + handle: Resource, + ) -> anyhow::Result<()> { + let mut wit_value_params = Vec::new(); + for (param, typ) in params.iter().zip(param_types).skip(1) { + let value: Value = encode_output(param, typ, store.data_mut()) + .await + .map_err(|err| anyhow!(format!("{err:?}")))?; // TODO: proper error + let wit_value: WitValue = value.into(); + wit_value_params.push(wit_value); + } + + debug!( + "CALLING {stub_function_name} as {target_function_name} with parameters {wit_value_params:?}", + ); + + store + .data_mut() + .invoke(handle, target_function_name.to_string(), wit_value_params) + .await??; + + Ok(()) + } + async fn value_result_to_wasmtime_vals( value_result: Value, results: &mut [Val], @@ -482,16 +647,23 @@ enum DynamicRpcCall { stub_function_name: ParsedFunctionName, target_function_name: ParsedFunctionName, }, + FireAndForgetFunctionCall { + stub_function_name: ParsedFunctionName, + target_function_name: ParsedFunctionName, + }, AsyncFunctionCall { stub_function_name: ParsedFunctionName, target_function_name: ParsedFunctionName, }, + FutureInvokeResultSubscribe, + FutureInvokeResultGet, } // TODO: this needs to be implementd based on component metadata and no hardcoded values fn determine_call_type( interface_name: &str, function_name: &str, + result_types: &[Type], ) -> anyhow::Result> { if (interface_name == "auction:auction-stub/stub-auction" && function_name == "[constructor]api") @@ -544,86 +716,107 @@ fn determine_call_type( .map_err(|err| anyhow!(err))?; // TODO: proper error debug!("STUB FUNCTION NAME: {stub_function_name:?}"); - let (blocking, target_function) = match &stub_function_name.function { - ParsedFunctionReference::RawResourceMethod { resource, method } - if resource == "counter" => - // TODO: this needs to be detected based on the matching constructor + if stub_function_name.function.resource_name() + == Some(&"future-counter-get-value-result".to_string()) + { + if stub_function_name.function.resource_method_name() == Some("subscribe".to_string()) { + Ok(Some(DynamicRpcCall::FutureInvokeResultSubscribe)) + } else if stub_function_name.function.resource_method_name() == Some("get".to_string()) { - if method.starts_with("blocking-") { - ( - true, - ParsedFunctionReference::RawResourceMethod { - resource: resource.to_string(), - method: method - .strip_prefix("blocking-") // TODO: we also have to support the non-blocking variants - .unwrap() - .to_string(), - }, - ) - } else { - ( - false, - ParsedFunctionReference::RawResourceMethod { - resource: resource.to_string(), - method: method.to_string(), - }, - ) - } + Ok(Some(DynamicRpcCall::FutureInvokeResultGet)) + } else { + Ok(None) } - _ => { - let method = stub_function_name.function.resource_method_name().unwrap(); // TODO: proper error - - if method.starts_with("blocking-") { - ( - true, - ParsedFunctionReference::Function { - function: method - .strip_prefix("blocking-") // TODO: we also have to support the non-blocking variants - .unwrap() - .to_string(), - }, - ) - } else { - ( - false, - ParsedFunctionReference::Function { - function: method.to_string(), - }, - ) + } else { + let (blocking, target_function) = match &stub_function_name.function { + ParsedFunctionReference::RawResourceMethod { resource, method } + if resource == "counter" => + // TODO: this needs to be detected based on the matching constructor + { + if method.starts_with("blocking-") { + ( + true, + ParsedFunctionReference::RawResourceMethod { + resource: resource.to_string(), + method: method + .strip_prefix("blocking-") // TODO: we also have to support the non-blocking variants + .unwrap() + .to_string(), + }, + ) + } else { + ( + false, + ParsedFunctionReference::RawResourceMethod { + resource: resource.to_string(), + method: method.to_string(), + }, + ) + } } - } - }; - - let target_function_name = ParsedFunctionName { - site: if interface_name.starts_with("auction") { - ParsedFunctionSite::PackagedInterface { - // TODO: this must come from component metadata linking information - namespace: "auction".to_string(), - package: "auction".to_string(), - interface: "api".to_string(), - version: None, + _ => { + let method = stub_function_name.function.resource_method_name().unwrap(); // TODO: proper error + + if method.starts_with("blocking-") { + ( + true, + ParsedFunctionReference::Function { + function: method + .strip_prefix("blocking-") // TODO: we also have to support the non-blocking variants + .unwrap() + .to_string(), + }, + ) + } else { + ( + false, + ParsedFunctionReference::Function { + function: method.to_string(), + }, + ) + } } + }; + + let target_function_name = ParsedFunctionName { + site: if interface_name.starts_with("auction") { + ParsedFunctionSite::PackagedInterface { + // TODO: this must come from component metadata linking information + namespace: "auction".to_string(), + package: "auction".to_string(), + interface: "api".to_string(), + version: None, + } + } else { + ParsedFunctionSite::PackagedInterface { + namespace: "rpc".to_string(), + package: "counters".to_string(), + interface: "api".to_string(), + version: None, + } + }, + function: target_function, + }; + + if blocking { + Ok(Some(DynamicRpcCall::BlockingFunctionCall { + stub_function_name, + target_function_name, + })) } else { - ParsedFunctionSite::PackagedInterface { - namespace: "rpc".to_string(), - package: "counters".to_string(), - interface: "api".to_string(), - version: None, + debug!("ASYNC FUNCTION RESULT TYPES: {result_types:?}"); + if result_types.len() > 0 { + Ok(Some(DynamicRpcCall::AsyncFunctionCall { + stub_function_name, + target_function_name, + })) + } else { + Ok(Some(DynamicRpcCall::FireAndForgetFunctionCall { + stub_function_name, + target_function_name, + })) } - }, - function: target_function, - }; - - if blocking { - Ok(Some(DynamicRpcCall::BlockingFunctionCall { - stub_function_name, - target_function_name, - })) - } else { - Ok(Some(DynamicRpcCall::AsyncFunctionCall { - stub_function_name, - target_function_name, - })) + } } } else { Ok(None) diff --git a/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs b/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs index 3d8fc41d6..06a45d8d9 100644 --- a/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs +++ b/golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs @@ -96,7 +96,7 @@ impl HostWasmRpc for DurableWorkerCtx { let payload = entry.payload.downcast_ref::().unwrap(); let remote_worker_id = payload.remote_worker_id().clone(); - // TODO: do this in other variants too + // TODO: remove redundancy match payload { WasmRpcEntryPayload::Resource { resource_uri, @@ -234,7 +234,7 @@ impl HostWasmRpc for DurableWorkerCtx { &mut self, self_: Resource, function_name: String, - function_params: Vec, + mut function_params: Vec, ) -> anyhow::Result> { record_host_function_call("golem::rpc::wasm-rpc", "invoke"); let args = self.get_arguments().await?; @@ -244,6 +244,25 @@ impl HostWasmRpc for DurableWorkerCtx { let payload = entry.payload.downcast_ref::().unwrap(); let remote_worker_id = payload.remote_worker_id().clone(); + // TODO: remove redundancy + match payload { + WasmRpcEntryPayload::Resource { + resource_uri, + resource_id, + .. + } => { + function_params.insert( + 0, + Value::Handle { + uri: resource_uri.value.to_string(), + resource_id: *resource_id, + } + .into(), + ); + } + _ => {} + } + let current_idempotency_key = self .get_current_idempotency_key() .await @@ -319,7 +338,7 @@ impl HostWasmRpc for DurableWorkerCtx { &mut self, this: Resource, function_name: String, - function_params: Vec, + mut function_params: Vec, ) -> anyhow::Result> { record_host_function_call("golem::rpc::wasm-rpc", "async-invoke-and-await"); let args = self.get_arguments().await?; @@ -334,6 +353,25 @@ impl HostWasmRpc for DurableWorkerCtx { let payload = entry.payload.downcast_ref::().unwrap(); let remote_worker_id = payload.remote_worker_id().clone(); + // TODO: remove redundancy + match payload { + WasmRpcEntryPayload::Resource { + resource_uri, + resource_id, + .. + } => { + function_params.insert( + 0, + Value::Handle { + uri: resource_uri.value.to_string(), + resource_id: *resource_id, + } + .into(), + ); + } + _ => {} + } + let current_idempotency_key = self .get_current_idempotency_key() .await diff --git a/golem-worker-executor-base/src/worker.rs b/golem-worker-executor-base/src/worker.rs index a8480dc97..25146e7c1 100644 --- a/golem-worker-executor-base/src/worker.rs +++ b/golem-worker-executor-base/src/worker.rs @@ -20,7 +20,6 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use crate::durable_host::recover_stderr_logs; -use crate::durable_host::wasm_rpc::{UrnExtensions, WasmRpcEntryPayload}; use crate::error::{GolemError, WorkerOutOfMemory}; use crate::function_result_interpreter::interpret_function_results; use crate::invocation::{find_first_available_function, invoke_worker, InvokeResult}; @@ -31,7 +30,6 @@ use crate::model::{ use crate::services::component::ComponentMetadata; use crate::services::events::Event; use crate::services::oplog::{CommitLevel, Oplog, OplogOps}; -use crate::services::rpc::RpcDemand; use crate::services::worker_event::{WorkerEventService, WorkerEventServiceDefault}; use crate::services::{ All, HasActiveWorkers, HasAll, HasBlobStoreService, HasComponentService, HasConfig, HasEvents, @@ -59,17 +57,15 @@ use golem_common::model::{ }; use golem_common::retries::get_delay; use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue; -use golem_wasm_rpc::{Uri, Value, WasmRpcEntry}; +use golem_wasm_rpc::Value; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{Mutex, MutexGuard, OwnedSemaphorePermit}; use tokio::task::JoinHandle; use tracing::{debug, error, info, span, warn, Instrument, Level}; -use wasmtime::component::types::ComponentItem; -use wasmtime::component::{Instance, Resource, ResourceType, Val}; +use wasmtime::component::Instance; use wasmtime::{AsContext, Store, UpdateDeadline}; -use wasmtime_wasi::WasiView; /// Represents worker that may be running or suspended. /// diff --git a/golem-worker-executor-base/tests/common/mod.rs b/golem-worker-executor-base/tests/common/mod.rs index 16e5a7fdd..d62c5357b 100644 --- a/golem-worker-executor-base/tests/common/mod.rs +++ b/golem-worker-executor-base/tests/common/mod.rs @@ -5,9 +5,7 @@ use std::collections::HashSet; use golem_service_base::service::initial_component_files::InitialComponentFilesService; use golem_service_base::storage::blob::BlobStorage; use golem_wasm_rpc::wasmtime::ResourceStore; -use golem_wasm_rpc::{ - FutureInvokeResultEntry, HostWasmRpc, RpcError, Uri, Value, WasmRpcEntry, WitValue, -}; +use golem_wasm_rpc::{HostWasmRpc, RpcError, Uri, Value, WitValue}; use golem_worker_executor_base::services::file_loader::FileLoader; use prometheus::Registry; @@ -82,6 +80,7 @@ use golem_test_framework::components::worker_executor_cluster::WorkerExecutorClu use golem_test_framework::config::TestDependencies; use golem_test_framework::dsl::to_worker_metadata; use golem_wasm_rpc::golem::rpc::types::{FutureInvokeResult, WasmRpc}; +use golem_wasm_rpc::golem::rpc::types::{HostFutureInvokeResult, Pollable}; use golem_worker_executor_base::preview2::golem; use golem_worker_executor_base::preview2::golem::api1_1_0; use golem_worker_executor_base::services::events::Events; @@ -586,7 +585,7 @@ impl ResourceStore for TestWorkerCtx { } async fn get(&mut self, resource_id: u64) -> Option { - self.durable_ctx.get(resource_id).await + ResourceStore::get(&mut self.durable_ctx, resource_id).await } async fn borrow(&self, resource_id: u64) -> Option { @@ -832,7 +831,28 @@ impl HostWasmRpc for TestWorkerCtx { } async fn drop(&mut self, rep: Resource) -> anyhow::Result<()> { - self.durable_ctx.drop(rep).await + HostWasmRpc::drop(&mut self.durable_ctx, rep).await + } +} + +#[async_trait] +impl HostFutureInvokeResult for TestWorkerCtx { + async fn subscribe( + &mut self, + self_: Resource, + ) -> anyhow::Result> { + HostFutureInvokeResult::subscribe(&mut self.durable_ctx, self_).await + } + + async fn get( + &mut self, + self_: Resource, + ) -> anyhow::Result>> { + HostFutureInvokeResult::get(&mut self.durable_ctx, self_).await + } + + async fn drop(&mut self, rep: Resource) -> anyhow::Result<()> { + HostFutureInvokeResult::drop(&mut self.durable_ctx, rep).await } } diff --git a/wasm-rpc/src/value_and_type.rs b/wasm-rpc/src/value_and_type.rs index 9c4365494..3ed7fef68 100644 --- a/wasm-rpc/src/value_and_type.rs +++ b/wasm-rpc/src/value_and_type.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::Value; -use golem_wasm_ast::analysis::analysed_type::{list, option, result, result_err, result_ok, tuple}; +use crate::{RpcError, Value}; +use golem_wasm_ast::analysis::analysed_type::{ + list, option, result, result_err, result_ok, tuple, variant, +}; use golem_wasm_ast::analysis::{analysed_type, AnalysedType}; use std::collections::HashMap; use std::time::{Duration, Instant}; @@ -532,3 +534,38 @@ impl IntoValue for Duration { analysed_type::u64() } } + +#[cfg(feature = "host-bindings")] +impl IntoValue for crate::RpcError { + fn into_value(self) -> Value { + match self { + RpcError::ProtocolError(value) => Value::Variant { + case_idx: 0, + case_value: Some(Box::new(Value::String(value))), + }, + RpcError::Denied(value) => Value::Variant { + case_idx: 1, + case_value: Some(Box::new(Value::String(value))), + }, + RpcError::NotFound(value) => Value::Variant { + case_idx: 2, + case_value: Some(Box::new(Value::String(value))), + }, + RpcError::RemoteInternalError(value) => Value::Variant { + case_idx: 3, + case_value: Some(Box::new(Value::String(value))), + }, + } + } + + fn get_type() -> AnalysedType { + use analysed_type::case; + + variant(vec![ + case("protocol-error", analysed_type::str()), + case("denied", analysed_type::str()), + case("not-found", analysed_type::str()), + case("remote-internal-error", analysed_type::str()), + ]) + } +}