Skip to content

Commit

Permalink
Work on dynamic linking for stubless RPC, WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo committed Dec 18, 2024
1 parent a9d8d76 commit 61152a2
Show file tree
Hide file tree
Showing 10 changed files with 873 additions and 73 deletions.
22 changes: 22 additions & 0 deletions golem-rib/src/function_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,28 @@ impl ParsedFunctionName {
function,
})
}

pub fn is_constructor(&self) -> Option<&str> {
match &self.function {
ParsedFunctionReference::RawResourceConstructor { resource, .. }
| ParsedFunctionReference::IndexedResourceConstructor { resource, .. } => {
Some(resource)
}
_ => None,
}
}

pub fn is_method(&self) -> Option<&str> {
match &self.function {
ParsedFunctionReference::RawResourceMethod { resource, .. }
| ParsedFunctionReference::IndexedResourceMethod { resource, .. }
| ParsedFunctionReference::RawResourceStaticMethod { resource, .. }
| ParsedFunctionReference::IndexedResourceStaticMethod { resource, .. } => {
Some(resource)
}
_ => None,
}
}
}

#[cfg(feature = "protobuf")]
Expand Down
460 changes: 460 additions & 0 deletions golem-worker-executor-base/src/durable_host/dynamic_linking.rs

Large diffs are not rendered by default.

34 changes: 33 additions & 1 deletion golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::durable_host::http::serialized::SerializableHttpRequest;
use crate::durable_host::io::{ManagedStdErr, ManagedStdIn, ManagedStdOut};
use crate::durable_host::replay_state::ReplayState;
use crate::durable_host::serialized::SerializableError;
use crate::durable_host::sync_helper::{SyncHelper, SyncHelperPermit};
use crate::durable_host::wasm_rpc::UrnExtensions;
use crate::error::GolemError;
Expand Down Expand Up @@ -65,7 +66,6 @@ use golem_common::model::oplog::{
};
use golem_common::model::plugin::{PluginOwner, PluginScope};
use golem_common::model::regions::{DeletedRegions, OplogRegion};
use golem_common::model::RetryConfig;
use golem_common::model::{exports, PluginInstallationId};
use golem_common::model::{
AccountId, ComponentFilePath, ComponentFilePermissions, ComponentFileSystemNode,
Expand All @@ -74,6 +74,7 @@ use golem_common::model::{
ScheduledAction, SuccessfulUpdateRecord, Timestamp, WorkerEvent, WorkerFilter, WorkerId,
WorkerMetadata, WorkerResourceDescription, WorkerStatus, WorkerStatusRecord,
};
use golem_common::model::{RetryConfig, TargetWorkerId};
use golem_common::retries::get_delay;
use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue;
use golem_wasm_rpc::wasmtime::ResourceStore;
Expand Down Expand Up @@ -116,6 +117,7 @@ mod sockets;
pub mod wasm_rpc;

mod durability;
mod dynamic_linking;
mod replay_state;
mod sync_helper;

Expand Down Expand Up @@ -288,6 +290,10 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
&self.owned_worker_id.worker_id
}

pub fn owned_worker_id(&self) -> &OwnedWorkerId {
&self.owned_worker_id
}

pub fn component_metadata(&self) -> &ComponentMetadata {
&self.state.component_metadata
}
Expand Down Expand Up @@ -476,6 +482,32 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
}
}
}

pub async fn generate_unique_local_worker_id(
&mut self,
remote_worker_id: TargetWorkerId,
) -> Result<WorkerId, GolemError> {
match remote_worker_id.clone().try_into_worker_id() {
Some(worker_id) => Ok(worker_id),
None => {
let worker_id = Durability::<Ctx, (), WorkerId, SerializableError>::wrap(
self,
WrappedFunctionType::ReadLocal,
"golem::rpc::wasm-rpc::generate_unique_local_worker_id",
(),
|ctx| {
Box::pin(async move {
ctx.rpc()
.generate_unique_local_worker_id(remote_worker_id)
.await
})
},
)
.await?;
Ok(worker_id)
}
}
}
}

impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> DurableWorkerCtx<Ctx> {
Expand Down
126 changes: 87 additions & 39 deletions golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ use golem_common::model::{
};
use golem_common::uri::oss::urn::{WorkerFunctionUrn, WorkerOrFunctionUrn};
use golem_wasm_rpc::golem::rpc::types::{
FutureInvokeResult, HostFutureInvokeResult, Pollable, Uri,
FutureInvokeResult, HostFutureInvokeResult, Pollable, Uri, WasmRpc,
};
use golem_wasm_rpc::protobuf::type_annotated_value::TypeAnnotatedValue;
use golem_wasm_rpc::{
FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, ValueAndType, WasmRpcEntry, WitValue,
FutureInvokeResultEntry, HostWasmRpc, SubscribeAny, Value, ValueAndType, WasmRpcEntry, WitValue,
};
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::str::FromStr;
use std::sync::Arc;
use tracing::{error, warn};
Expand All @@ -60,14 +61,15 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {

match location.parse_as_golem_urn() {
Some((remote_worker_id, None)) => {
let remote_worker_id =
generate_unique_local_worker_id(self, remote_worker_id).await?;
let remote_worker_id = self
.generate_unique_local_worker_id(remote_worker_id)
.await?;

let remote_worker_id =
OwnedWorkerId::new(&self.owned_worker_id.account_id, &remote_worker_id);
let demand = self.rpc().create_demand(&remote_worker_id).await;
let entry = self.table().push(WasmRpcEntry {
payload: Box::new(WasmRpcEntryPayload {
payload: Box::new(WasmRpcEntryPayload::Interface {
demand,
remote_worker_id,
}),
Expand All @@ -85,7 +87,7 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {
&mut self,
self_: Resource<WasmRpcEntry>,
function_name: String,
function_params: Vec<WitValue>,
mut function_params: Vec<WitValue>,
) -> anyhow::Result<Result<WitValue, golem_wasm_rpc::RpcError>> {
record_host_function_call("golem::rpc::wasm-rpc", "invoke-and-await");
let args = self.get_arguments().await?;
Expand All @@ -95,7 +97,26 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {

let entry = self.table().get(&self_)?;
let payload = entry.payload.downcast_ref::<WasmRpcEntryPayload>().unwrap();
let remote_worker_id = payload.remote_worker_id.clone();
let remote_worker_id = payload.remote_worker_id().clone();

// TODO: do this in other variants too
match payload {
WasmRpcEntryPayload::Resource {
resource_uri,
resource_id,
..
} => {
function_params.insert(
0,
Value::Handle {
uri: resource_uri.value.to_string(),
resource_id: *resource_id,
}
.into(),
);
}
_ => {}
}

let current_idempotency_key = self
.get_current_idempotency_key()
Expand Down Expand Up @@ -226,7 +247,7 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {

let entry = self.table().get(&self_)?;
let payload = entry.payload.downcast_ref::<WasmRpcEntryPayload>().unwrap();
let remote_worker_id = payload.remote_worker_id.clone();
let remote_worker_id = payload.remote_worker_id().clone();

let current_idempotency_key = self
.get_current_idempotency_key()
Expand Down Expand Up @@ -317,7 +338,7 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {

let entry = self.table().get(&this)?;
let payload = entry.payload.downcast_ref::<WasmRpcEntryPayload>().unwrap();
let remote_worker_id = payload.remote_worker_id.clone();
let remote_worker_id = payload.remote_worker_id().clone();

let current_idempotency_key = self
.get_current_idempotency_key()
Expand Down Expand Up @@ -748,32 +769,6 @@ impl<Ctx: WorkerCtx> HostFutureInvokeResult for DurableWorkerCtx<Ctx> {
#[async_trait]
impl<Ctx: WorkerCtx> golem_wasm_rpc::Host for DurableWorkerCtx<Ctx> {}

async fn generate_unique_local_worker_id<Ctx: WorkerCtx>(
ctx: &mut DurableWorkerCtx<Ctx>,
remote_worker_id: TargetWorkerId,
) -> Result<WorkerId, GolemError> {
match remote_worker_id.clone().try_into_worker_id() {
Some(worker_id) => Ok(worker_id),
None => {
let worker_id = Durability::<Ctx, (), WorkerId, SerializableError>::wrap(
ctx,
WrappedFunctionType::ReadLocal,
"golem::rpc::wasm-rpc::generate_unique_local_worker_id",
(),
|ctx| {
Box::pin(async move {
ctx.rpc()
.generate_unique_local_worker_id(remote_worker_id)
.await
})
},
)
.await?;
Ok(worker_id)
}
}
}

/// Tries to get a `ValueAndType` representation for the given `WitValue` parameters by querying the latest component metadata for the
/// target component.
/// If the query fails, or the expected function name is not in its metadata or the number of parameters does not match, then it returns an
Expand Down Expand Up @@ -805,10 +800,63 @@ async fn try_get_typed_parameters(
Vec::new()
}

pub struct WasmRpcEntryPayload {
#[allow(dead_code)]
demand: Box<dyn RpcDemand>,
remote_worker_id: OwnedWorkerId,
pub enum WasmRpcEntryPayload {
Interface {
#[allow(dead_code)]
demand: Box<dyn RpcDemand>,
remote_worker_id: OwnedWorkerId,
},
Resource {
#[allow(dead_code)]
demand: Box<dyn RpcDemand>,
remote_worker_id: OwnedWorkerId,
resource_uri: Uri,
resource_id: u64,
},
}

impl Debug for WasmRpcEntryPayload {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Interface {
remote_worker_id, ..
} => f
.debug_struct("Interface")
.field("remote_worker_id", remote_worker_id)
.finish(),
Self::Resource {
remote_worker_id,
resource_uri,
resource_id,
..
} => f
.debug_struct("Resource")
.field("remote_worker_id", remote_worker_id)
.field("resource_uri", resource_uri)
.field("resource_id", resource_id)
.finish(),
}
}
}

impl WasmRpcEntryPayload {
pub fn remote_worker_id(&self) -> &OwnedWorkerId {
match self {
Self::Interface {
remote_worker_id, ..
} => remote_worker_id,
Self::Resource {
remote_worker_id, ..
} => remote_worker_id,
}
}

pub fn demand(&self) -> &Box<dyn RpcDemand> {
match self {
Self::Interface { demand, .. } => demand,
Self::Resource { demand, .. } => demand,
}
}
}

pub trait UrnExtensions {
Expand Down
Loading

0 comments on commit 61152a2

Please sign in to comment.