Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work on dynamic linking for stubless RPC, WIP #1199

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions golem-rib/src/function_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,28 @@ impl ParsedFunctionName {
function,
})
}

pub fn is_constructor(&self) -> Option<&str> {
match &self.function {
ParsedFunctionReference::RawResourceConstructor { resource, .. }
| ParsedFunctionReference::IndexedResourceConstructor { resource, .. } => {
Some(resource)
}
_ => None,
}
}

pub fn is_method(&self) -> Option<&str> {
match &self.function {
ParsedFunctionReference::RawResourceMethod { resource, .. }
| ParsedFunctionReference::IndexedResourceMethod { resource, .. }
| ParsedFunctionReference::RawResourceStaticMethod { resource, .. }
| ParsedFunctionReference::IndexedResourceStaticMethod { resource, .. } => {
Some(resource)
}
_ => None,
}
}
}

#[cfg(feature = "protobuf")]
Expand Down
824 changes: 824 additions & 0 deletions golem-worker-executor-base/src/durable_host/dynamic_linking.rs

Large diffs are not rendered by default.

34 changes: 33 additions & 1 deletion golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::durable_host::http::serialized::SerializableHttpRequest;
use crate::durable_host::io::{ManagedStdErr, ManagedStdIn, ManagedStdOut};
use crate::durable_host::replay_state::ReplayState;
use crate::durable_host::serialized::SerializableError;
use crate::durable_host::wasm_rpc::UrnExtensions;
use crate::error::GolemError;
use crate::function_result_interpreter::interpret_function_results;
Expand Down Expand Up @@ -64,7 +65,6 @@ use golem_common::model::oplog::{
};
use golem_common::model::plugin::{PluginOwner, PluginScope};
use golem_common::model::regions::{DeletedRegions, OplogRegion};
use golem_common::model::RetryConfig;
use golem_common::model::{exports, PluginInstallationId};
use golem_common::model::{
AccountId, ComponentFilePath, ComponentFilePermissions, ComponentFileSystemNode,
Expand All @@ -73,6 +73,7 @@ use golem_common::model::{
ScheduledAction, SuccessfulUpdateRecord, Timestamp, WorkerEvent, WorkerFilter, WorkerId,
WorkerMetadata, WorkerResourceDescription, WorkerStatus, WorkerStatusRecord,
};
use golem_common::model::{RetryConfig, TargetWorkerId};
use golem_common::retries::get_delay;
use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue;
use golem_wasm_rpc::wasmtime::ResourceStore;
Expand Down Expand Up @@ -116,6 +117,7 @@ mod sockets;
pub mod wasm_rpc;

mod durability;
mod dynamic_linking;
mod replay_state;

/// Partial implementation of the WorkerCtx interfaces for adding durable execution to workers.
Expand Down Expand Up @@ -287,6 +289,10 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
&self.owned_worker_id.worker_id
}

pub fn owned_worker_id(&self) -> &OwnedWorkerId {
&self.owned_worker_id
}

pub fn component_metadata(&self) -> &ComponentMetadata {
&self.state.component_metadata
}
Expand Down Expand Up @@ -466,6 +472,32 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
}
}
}

pub async fn generate_unique_local_worker_id(
&mut self,
remote_worker_id: TargetWorkerId,
) -> Result<WorkerId, GolemError> {
match remote_worker_id.clone().try_into_worker_id() {
Some(worker_id) => Ok(worker_id),
None => {
let worker_id = Durability::<Ctx, (), WorkerId, SerializableError>::wrap(
self,
WrappedFunctionType::ReadLocal,
"golem::rpc::wasm-rpc::generate_unique_local_worker_id",
(),
|ctx| {
Box::pin(async move {
ctx.rpc()
.generate_unique_local_worker_id(remote_worker_id)
.await
})
},
)
.await?;
Ok(worker_id)
}
}
}
}

impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> DurableWorkerCtx<Ctx> {
Expand Down
166 changes: 126 additions & 40 deletions golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ use golem_wasm_rpc::golem::rpc::types::{
};
use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue;
use golem_wasm_rpc::{
FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, ValueAndType, WasmRpcEntry, WitValue,
FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, Value, ValueAndType, WasmRpcEntry, WitValue,
};
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::str::FromStr;
use std::sync::Arc;
use tracing::{error, warn};
Expand All @@ -59,14 +60,15 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {

match location.parse_as_golem_urn() {
Some((remote_worker_id, None)) => {
let remote_worker_id =
generate_unique_local_worker_id(self, remote_worker_id).await?;
let remote_worker_id = self
.generate_unique_local_worker_id(remote_worker_id)
.await?;

let remote_worker_id =
OwnedWorkerId::new(&self.owned_worker_id.account_id, &remote_worker_id);
let demand = self.rpc().create_demand(&remote_worker_id).await;
let entry = self.table().push(WasmRpcEntry {
payload: Box::new(WasmRpcEntryPayload {
payload: Box::new(WasmRpcEntryPayload::Interface {
demand,
remote_worker_id,
}),
Expand All @@ -84,15 +86,34 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {
&mut self,
self_: Resource<WasmRpcEntry>,
function_name: String,
function_params: Vec<WitValue>,
mut function_params: Vec<WitValue>,
) -> anyhow::Result<Result<WitValue, golem_wasm_rpc::RpcError>> {
record_host_function_call("golem::rpc::wasm-rpc", "invoke-and-await");
let args = self.get_arguments().await?;
let env = self.get_environment().await?;

let entry = self.table().get(&self_)?;
let payload = entry.payload.downcast_ref::<WasmRpcEntryPayload>().unwrap();
let remote_worker_id = payload.remote_worker_id.clone();
let remote_worker_id = payload.remote_worker_id().clone();

// TODO: remove redundancy
match payload {
WasmRpcEntryPayload::Resource {
resource_uri,
resource_id,
..
} => {
function_params.insert(
0,
Value::Handle {
uri: resource_uri.value.to_string(),
resource_id: *resource_id,
}
.into(),
);
}
_ => {}
}

let current_idempotency_key = self
.get_current_idempotency_key()
Expand Down Expand Up @@ -213,15 +234,34 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {
&mut self,
self_: Resource<WasmRpcEntry>,
function_name: String,
function_params: Vec<WitValue>,
mut function_params: Vec<WitValue>,
) -> anyhow::Result<Result<(), golem_wasm_rpc::RpcError>> {
record_host_function_call("golem::rpc::wasm-rpc", "invoke");
let args = self.get_arguments().await?;
let env = self.get_environment().await?;

let entry = self.table().get(&self_)?;
let payload = entry.payload.downcast_ref::<WasmRpcEntryPayload>().unwrap();
let remote_worker_id = payload.remote_worker_id.clone();
let remote_worker_id = payload.remote_worker_id().clone();

// TODO: remove redundancy
match payload {
WasmRpcEntryPayload::Resource {
resource_uri,
resource_id,
..
} => {
function_params.insert(
0,
Value::Handle {
uri: resource_uri.value.to_string(),
resource_id: *resource_id,
}
.into(),
);
}
_ => {}
}

let current_idempotency_key = self
.get_current_idempotency_key()
Expand Down Expand Up @@ -298,7 +338,7 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {
&mut self,
this: Resource<WasmRpcEntry>,
function_name: String,
function_params: Vec<WitValue>,
mut function_params: Vec<WitValue>,
) -> anyhow::Result<Resource<FutureInvokeResult>> {
record_host_function_call("golem::rpc::wasm-rpc", "async-invoke-and-await");
let args = self.get_arguments().await?;
Expand All @@ -311,7 +351,26 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {

let entry = self.table().get(&this)?;
let payload = entry.payload.downcast_ref::<WasmRpcEntryPayload>().unwrap();
let remote_worker_id = payload.remote_worker_id.clone();
let remote_worker_id = payload.remote_worker_id().clone();

// TODO: remove redundancy
match payload {
WasmRpcEntryPayload::Resource {
resource_uri,
resource_id,
..
} => {
function_params.insert(
0,
Value::Handle {
uri: resource_uri.value.to_string(),
resource_id: *resource_id,
}
.into(),
);
}
_ => {}
}

let current_idempotency_key = self
.get_current_idempotency_key()
Expand Down Expand Up @@ -740,32 +799,6 @@ impl<Ctx: WorkerCtx> HostFutureInvokeResult for DurableWorkerCtx<Ctx> {
#[async_trait]
impl<Ctx: WorkerCtx> golem_wasm_rpc::Host for DurableWorkerCtx<Ctx> {}

async fn generate_unique_local_worker_id<Ctx: WorkerCtx>(
ctx: &mut DurableWorkerCtx<Ctx>,
remote_worker_id: TargetWorkerId,
) -> Result<WorkerId, GolemError> {
match remote_worker_id.clone().try_into_worker_id() {
Some(worker_id) => Ok(worker_id),
None => {
let worker_id = Durability::<Ctx, (), WorkerId, SerializableError>::wrap(
ctx,
WrappedFunctionType::ReadLocal,
"golem::rpc::wasm-rpc::generate_unique_local_worker_id",
(),
|ctx| {
Box::pin(async move {
ctx.rpc()
.generate_unique_local_worker_id(remote_worker_id)
.await
})
},
)
.await?;
Ok(worker_id)
}
}
}

/// Tries to get a `ValueAndType` representation for the given `WitValue` parameters by querying the latest component metadata for the
/// target component.
/// If the query fails, or the expected function name is not in its metadata or the number of parameters does not match, then it returns an
Expand Down Expand Up @@ -797,10 +830,63 @@ async fn try_get_typed_parameters(
Vec::new()
}

pub struct WasmRpcEntryPayload {
#[allow(dead_code)]
demand: Box<dyn RpcDemand>,
remote_worker_id: OwnedWorkerId,
pub enum WasmRpcEntryPayload {
Interface {
#[allow(dead_code)]
demand: Box<dyn RpcDemand>,
remote_worker_id: OwnedWorkerId,
},
Resource {
#[allow(dead_code)]
demand: Box<dyn RpcDemand>,
remote_worker_id: OwnedWorkerId,
resource_uri: Uri,
resource_id: u64,
},
}

impl Debug for WasmRpcEntryPayload {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Interface {
remote_worker_id, ..
} => f
.debug_struct("Interface")
.field("remote_worker_id", remote_worker_id)
.finish(),
Self::Resource {
remote_worker_id,
resource_uri,
resource_id,
..
} => f
.debug_struct("Resource")
.field("remote_worker_id", remote_worker_id)
.field("resource_uri", resource_uri)
.field("resource_id", resource_id)
.finish(),
}
}
}

impl WasmRpcEntryPayload {
pub fn remote_worker_id(&self) -> &OwnedWorkerId {
match self {
Self::Interface {
remote_worker_id, ..
} => remote_worker_id,
Self::Resource {
remote_worker_id, ..
} => remote_worker_id,
}
}

pub fn demand(&self) -> &Box<dyn RpcDemand> {
match self {
Self::Interface { demand, .. } => demand,
Self::Resource { demand, .. } => demand,
}
}
}

pub trait UrnExtensions {
Expand Down
Loading
Loading