Skip to content

Commit

Permalink
Refactoring the Durability construct, WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo committed Dec 1, 2024
1 parent e017f30 commit 435d750
Show file tree
Hide file tree
Showing 10 changed files with 603 additions and 423 deletions.
309 changes: 178 additions & 131 deletions golem-worker-executor-base/src/durable_host/blobstore/container.rs

Large diffs are not rendered by default.

66 changes: 37 additions & 29 deletions golem-worker-executor-base/src/durable_host/cli/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,59 @@
use async_trait::async_trait;

use crate::durable_host::serialized::SerializableError;
use crate::durable_host::{Durability, DurableWorkerCtx};
use crate::metrics::wasm::record_host_function_call;
use crate::durable_host::{Durability2, DurableWorkerCtx};
use crate::workerctx::WorkerCtx;
use golem_common::model::oplog::WrappedFunctionType;
use wasmtime_wasi::bindings::cli::environment::Host;

#[async_trait]
impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
async fn get_environment(&mut self) -> anyhow::Result<Vec<(String, String)>> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("cli::environment", "get_environment");
Durability::<Ctx, (), Vec<(String, String)>, SerializableError>::wrap(
let durability = Durability2::<Ctx, Vec<(String, String)>, SerializableError>::new(
self,
WrappedFunctionType::ReadLocal,
"golem_environment::get_environment",
(),
|ctx| Box::pin(async { Host::get_environment(&mut ctx.as_wasi_view()).await }),
)
.await
"golem environment",
"get_environment",
WrappedFunctionType::ReadLocal
).await?;

if durability.is_live() {
let result = Host::get_environment(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}

async fn get_arguments(&mut self) -> anyhow::Result<Vec<String>> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("cli::environment", "get_arguments");
Durability::<Ctx, (), Vec<String>, SerializableError>::wrap(
let durability = Durability2::<Ctx, Vec<String>, SerializableError>::new(
self,
WrappedFunctionType::ReadLocal,
"golem_environment::get_arguments",
(),
|ctx| Box::pin(async { Host::get_arguments(&mut ctx.as_wasi_view()).await }),
)
.await
"golem environment",
"get_arguments",
WrappedFunctionType::ReadLocal
).await?;

if durability.is_live() {
let result = Host::get_arguments(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}

async fn initial_cwd(&mut self) -> anyhow::Result<Option<String>> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("cli::environment", "initial_cwd");
Durability::<Ctx, (), Option<String>, SerializableError>::wrap(
let durability = Durability2::<Ctx, Option<String>, SerializableError>::new(
self,
WrappedFunctionType::ReadLocal,
"golem_environment::get_arguments", // NOTE: for backward compatibility with Golem 1.0
(),
|ctx| Box::pin(async { Host::initial_cwd(&mut ctx.as_wasi_view()).await }),
)
.await
"golem environment",
"get_arguments", // TODO: fix in 2.0 - for backward compatibility with Golem 1.0
WrappedFunctionType::ReadLocal
).await?;

if durability.is_live() {
let result = Host::initial_cwd(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use async_trait::async_trait;
use wasmtime::component::Resource;

use crate::durable_host::serialized::SerializableError;
use crate::durable_host::{Durability, DurableWorkerCtx};
use crate::durable_host::{Durability2, DurableWorkerCtx};
use crate::metrics::wasm::record_host_function_call;
use crate::services::oplog::CommitLevel;
use crate::workerctx::WorkerCtx;
Expand All @@ -26,29 +26,37 @@ use wasmtime_wasi::bindings::clocks::monotonic_clock::{Duration, Host, Instant,
#[async_trait]
impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
async fn now(&mut self) -> anyhow::Result<Instant> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("clocks::monotonic_clock", "now");
Durability::<Ctx, (), Instant, SerializableError>::wrap(
let durability = Durability2::<Ctx, Instant, SerializableError>::new(
self,
"monotonic_clock",
"now",
WrappedFunctionType::ReadLocal,
"monotonic_clock::now",
(),
|ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::now(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}

async fn resolution(&mut self) -> anyhow::Result<Instant> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("clocks::monotonic_clock", "resolution");
Durability::<Ctx, (), Instant, SerializableError>::wrap(
let durability = Durability2::<Ctx, Instant, SerializableError>::new(
self,
"monotonic_clock",
"resolution",
WrappedFunctionType::ReadLocal,
"monotonic_clock::resolution",
(),
|ctx| Box::pin(async { Host::resolution(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::resolution(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self)
}
}

async fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result<Resource<Pollable>> {
Expand All @@ -58,16 +66,23 @@ impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
}

async fn subscribe_duration(&mut self, when: Duration) -> anyhow::Result<Resource<Pollable>> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("clocks::monotonic_clock", "subscribe_duration");
let now = Durability::<Ctx, (), Instant, SerializableError>::wrap(
let durability = Durability2::<Ctx, Instant, SerializableError>::new(
self,
"monotonic_clock",
"now", // TODO: fix in 2.0 - should be 'subscribe_duration' but have to keep for backward compatibility with Golem 1.0
WrappedFunctionType::ReadLocal,
"monotonic_clock::now", // should be 'subscribe_duration' but have to keep for backward compatibility with Golem 1.0
(),
|ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }),
)
.await?;

let now = {
if durability.is_live() {
let result = Host::now(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}?;

self.state.oplog.commit(CommitLevel::DurableOnly).await;
let when = now.saturating_add(when);
Host::subscribe_instant(&mut self.as_wasi_view(), when).await
Expand Down
39 changes: 23 additions & 16 deletions golem-worker-executor-base/src/durable_host/clocks/wall_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,45 @@
use async_trait::async_trait;

use crate::durable_host::serialized::{SerializableDateTime, SerializableError};
use crate::durable_host::{Durability, DurableWorkerCtx};
use crate::metrics::wasm::record_host_function_call;
use crate::durable_host::{Durability2, DurableWorkerCtx};
use crate::workerctx::WorkerCtx;
use golem_common::model::oplog::WrappedFunctionType;
use wasmtime_wasi::bindings::clocks::wall_clock::{Datetime, Host};

#[async_trait]
impl<Ctx: WorkerCtx> Host for DurableWorkerCtx<Ctx> {
async fn now(&mut self) -> anyhow::Result<Datetime> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("clocks::wall_clock", "now");
Durability::<Ctx, (), SerializableDateTime, SerializableError>::wrap(
let durability = Durability2::<Ctx, SerializableDateTime, SerializableError>::new(
self,
"wall_clock",
"now",
WrappedFunctionType::ReadLocal,
"wall_clock::now",
(),
|ctx| Box::pin(async { Host::now(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::now(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}

async fn resolution(&mut self) -> anyhow::Result<Datetime> {
let _permit = self.begin_async_host_function().await?;
record_host_function_call("clocks::wall_clock", "resolution");
Durability::<Ctx, (), SerializableDateTime, SerializableError>::wrap(
let durability = Durability2::<Ctx, SerializableDateTime, SerializableError>::new(
self,
"wall_clock",
"resolution",
WrappedFunctionType::ReadLocal,
"wall_clock::resolution",
(),
|ctx| Box::pin(async { Host::resolution(&mut ctx.as_wasi_view()).await }),
)
.await
.await?;

if durability.is_live() {
let result = Host::resolution(&mut self.as_wasi_view()).await;
durability.persist(self, (), result).await
} else {
durability.replay(self).await
}
}
}

Expand Down
115 changes: 115 additions & 0 deletions golem-worker-executor-base/src/durable_host/durability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::durable_host::sync_helper::SyncHelperPermit;
use crate::durable_host::DurableWorkerCtx;
use crate::error::GolemError;
use crate::metrics::wasm::record_host_function_call;
use crate::model::PersistenceLevel;
use crate::services::oplog::{CommitLevel, Oplog, OplogOps};
use crate::workerctx::WorkerCtx;
Expand All @@ -22,10 +24,123 @@ use bincode::{Decode, Encode};
use golem_common::model::oplog::{OplogEntry, OplogIndex, WrappedFunctionType};
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use tracing::error;

// TODO: is_live and replay can be merged
// TODO: is SErr always SerializableError? - sometimes SerializableStreamError
pub struct Durability2<Ctx, SOk, SErr> {
package: &'static str,
function: &'static str,
function_type: WrappedFunctionType,
_permit: SyncHelperPermit,
begin_index: OplogIndex,
is_live: bool,
persistence_level: PersistenceLevel,
_ctx: PhantomData<Ctx>,
_sok: PhantomData<SOk>,
_serr: PhantomData<SErr>,
}

impl<Ctx: WorkerCtx, SOk, SErr> Durability2<Ctx, SOk, SErr> {
pub async fn new(
ctx: &mut DurableWorkerCtx<Ctx>,
package: &'static str,
function: &'static str,
function_type: WrappedFunctionType,
) -> Result<Self, GolemError>
{
let permit = ctx.begin_async_host_function().await?;
record_host_function_call(package, function);

let begin_index = ctx.state.begin_function(&function_type).await?;

Ok(Self {
package,
function,
function_type,
_permit: permit,
begin_index,
is_live: ctx.state.is_live(),
persistence_level: ctx.state.persistence_level.clone(),
_ctx: PhantomData,
_sok: PhantomData,
_serr: PhantomData,
})
}

pub fn is_live(&self) -> bool {
self.is_live || self.persistence_level == PersistenceLevel::PersistNothing
}

pub async fn persist<SIn, Ok, Err>(
&self,
ctx: &mut DurableWorkerCtx<Ctx>,
input: SIn,
result: Result<Ok, Err>,
) -> Result<Ok, Err>
where
Ok: Clone,
Err: From<SErr> + From<GolemError> + Send + Sync,
SIn: Debug + Encode + Send + Sync,
SErr: Debug + Encode + for<'a> From<&'a Err> + From<GolemError> + Send + Sync,
SOk: Debug + Encode + From<Ok> + Encode + Send + Sync,
{
let serializable_result: Result<SOk, SErr> = result
.as_ref()
.map(|result| result.clone().into())
.map_err(|err| err.into());

let function_name = self.function_name();
ctx.write_to_oplog::<SIn, SOk, Err, SErr>(
&self.function_type,
&function_name,
self.begin_index,
&input,
&serializable_result,
)
.await?;

result
}

pub async fn replay<Ok, Err>(&self, ctx: &mut DurableWorkerCtx<Ctx>) -> Result<Ok, Err>
where
Ok: From<SOk>,
Err: From<SErr> + From<GolemError>,
SErr: Debug + Encode + Decode + From<GolemError> + Send + Sync,
SOk: Debug + Encode + Decode + Send + Sync,
{
let (_, oplog_entry) = crate::get_oplog_entry!(
ctx.state.replay_state,
OplogEntry::ImportedFunctionInvoked,
OplogEntry::ImportedFunctionInvokedV1
)?;

let function_name = self.function_name();
DurableWorkerCtx::<Ctx>::validate_oplog_entry(&oplog_entry, &function_name)?;
let response: Result<SOk, SErr> =
DurableWorkerCtx::<Ctx>::default_load(ctx.state.oplog.clone(), &oplog_entry).await;

ctx.state
.end_function(&self.function_type, self.begin_index)
.await?;

response.map(|sok| sok.into()).map_err(|serr| serr.into())
}

fn function_name(&self) -> String {
if self.package.is_empty() {
// For backward compatibility - some of the recorded function names were not following the pattern
self.function.to_string()
} else {
format!("{}::{}", self.package, self.function)
}
}
}

#[async_trait]
pub trait Durability<Ctx: WorkerCtx, SerializableInput, SerializableSuccess, SerializableErr> {
/// A version of `wrap` allowing conversion between the success value and the serialized value within the mutable worker context.
Expand Down
Loading

0 comments on commit 435d750

Please sign in to comment.