diff --git a/.gitmodules b/.gitmodules index 15387fbb9..0a1016135 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,3 @@ [submodule "stdlib"] path = stdlib url = https://github.com/freenet/freenet-stdlib - diff --git a/crates/core/src/client_events.rs b/crates/core/src/client_events.rs index d34c573ab..6e1ed9074 100644 --- a/crates/core/src/client_events.rs +++ b/crates/core/src/client_events.rs @@ -485,14 +485,16 @@ pub(crate) mod test { } } val if (35..80).contains(&val) => { + let new_state = UpdateData::State(State::from(self.random_byte_vec())); if let Some(contract) = self.choose(&state.existing_contracts) { - let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec())); + // TODO: It will be used when the delta updates are available + // let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec())); if !for_this_peer { continue; } let request = ContractRequest::Update { key: contract.key().clone(), - data: delta, + data: new_state, }; if state.owns_contracts.contains(&contract.key()) { return Some(request.into()); diff --git a/crates/core/src/contract.rs b/crates/core/src/contract.rs index 1b4ea3e39..0f1d55f01 100644 --- a/crates/core/src/contract.rs +++ b/crates/core/src/contract.rs @@ -111,6 +111,36 @@ where error })?; } + ContractHandlerEvent::UpdateQuery { + key, + state, + related_contracts, + } => { + let update_result = contract_handler + .executor() + .upsert_contract_state( + key.clone(), + Either::Left(state.clone()), + related_contracts, + None, + ) + .instrument(tracing::info_span!("upsert_contract_state", %key)) + .await; + + contract_handler + .channel() + .send_to_sender( + id, + ContractHandlerEvent::UpdateResponse { + new_value: update_result.map_err(Into::into), + }, + ) + .await + .map_err(|error| { + tracing::debug!(%error, "shutting down contract handler"); + error + })?; + } _ => unreachable!(), } } diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index 190fd9b8d..105ec5ff6 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -397,16 +397,17 @@ struct UpdateContract { #[async_trait::async_trait] impl ComposeNetworkMessage for UpdateContract { - fn initiate_op(self, op_manager: &OpManager) -> operations::update::UpdateOp { + fn initiate_op(self, _op_manager: &OpManager) -> operations::update::UpdateOp { let UpdateContract { key, new_state } = self; - operations::update::start_op(key, new_state, op_manager.ring.max_hops_to_live) + let related_contracts = RelatedContracts::default(); + operations::update::start_op(key, new_state, related_contracts) } async fn resume_op( op: operations::update::UpdateOp, op_manager: &OpManager, ) -> Result<(), OpError> { - operations::update::request_update(op_manager, op, None).await + operations::update::request_update(op_manager, op).await } } diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index 8ba725286..15e72e87d 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -102,14 +102,15 @@ impl ContractExecutor for Executor { .map_err(ExecutorError::other)?; return Ok(incoming_state); } - // (Either::Left(_), None) => { - // return Err(ExecutorError::request(RequestError::from( - // StdContractError::Get { - // key: key.clone(), - // cause: "Missing contract or parameters".into(), - // }, - // ))); - // } + (Either::Left(incoming_state), None) => { + // update case + + self.state_store + .update(&key, incoming_state.clone()) + .await + .map_err(ExecutorError::other)?; + return Ok(incoming_state); + } (update, contract) => unreachable!("{update:?}, {contract:?}"), } } diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index 1f6d91a41..a0143ff31 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -358,6 +358,16 @@ pub(crate) enum ContractHandlerEvent { key: ContractKey, response: Result, }, + /// Updates a supposedly existing contract in this node + UpdateQuery { + key: ContractKey, + state: WrappedState, + related_contracts: RelatedContracts<'static>, + }, + /// The response to an update query + UpdateResponse { + new_value: Result, + }, } impl std::fmt::Display for ContractHandlerEvent { @@ -399,6 +409,17 @@ impl std::fmt::Display for ContractHandlerEvent { write!(f, "get query failed {{ {key} }}",) } }, + ContractHandlerEvent::UpdateQuery { key, .. } => { + write!(f, "update query {{ {key} }}") + } + ContractHandlerEvent::UpdateResponse { new_value } => match new_value { + Ok(v) => { + write!(f, "update query response {{ {v} }}",) + } + Err(e) => { + write!(f, "update query failed {{ {e} }}",) + } + }, } } } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 4e79254ac..196a600eb 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -21,7 +21,7 @@ use std::{ use either::Either; use freenet_stdlib::{ client_api::{ClientRequest, ContractRequest, ErrorKind}, - prelude::ContractKey, + prelude::{ContractKey, RelatedContracts, WrappedState}, }; use libp2p::{identity, multiaddr::Protocol, Multiaddr, PeerId as Libp2pPeerId}; @@ -40,7 +40,7 @@ use crate::{ message::{NetMessage, NodeEvent, Transaction, TransactionType}, operations::{ connect::{self, ConnectOp}, - get, put, subscribe, OpEnum, OpError, OpOutcome, + get, put, subscribe, update, OpEnum, OpError, OpOutcome, }, ring::{Location, PeerKeyLocation}, router::{RouteEvent, RouteOutcome}, @@ -396,15 +396,33 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc { + ContractRequest::Update { key, data } => { // FIXME: perform updates tracing::debug!( this_peer = %op_manager.ring.peer_key, "Received update from user event", ); + let state = match data { + freenet_stdlib::prelude::UpdateData::State(s) => s, + _ => { + unreachable!(); + } + }; + + let wrapped_state = WrappedState::from(state.into_bytes()); + + let related_contracts = RelatedContracts::default(); + + let op = update::start_op(key, wrapped_state, related_contracts); + + let _ = op_manager + .ch_outbound + .waiting_for_transaction_result(op.id, client_id) + .await; + + if let Err(err) = update::request_update(&op_manager, op).await { + tracing::error!("request update error {}", err) + } } ContractRequest::Get { key, @@ -648,6 +666,22 @@ async fn process_message( ) .await; } + NetMessage::Update(op) => { + let op_result = + handle_op_request::(&op_manager, &mut conn_manager, op) + .await; + handle_op_not_available!(op_result); + break report_result( + tx, + op_result, + &op_manager, + executor_callback, + cli_req, + &mut *event_listener, + ) + .await; + } + NetMessage::Unsubscribed { key, .. } => { subscribe(op_manager, key.clone(), None).await; break; diff --git a/crates/core/src/node/testing_impl/in_memory.rs b/crates/core/src/node/testing_impl/in_memory.rs index df192bc89..2b81e8130 100644 --- a/crates/core/src/node/testing_impl/in_memory.rs +++ b/crates/core/src/node/testing_impl/in_memory.rs @@ -57,6 +57,7 @@ impl Builder { contract::contract_handling(contract_handler) .instrument(tracing::info_span!(parent: parent_span.clone(), "contract_handling")), ); + let mut config = super::RunnerConfig { peer_key: self.peer_key, gateways, diff --git a/crates/core/src/node/testing_impl/network.rs b/crates/core/src/node/testing_impl/network.rs index fc7104217..b44266432 100644 --- a/crates/core/src/node/testing_impl/network.rs +++ b/crates/core/src/node/testing_impl/network.rs @@ -73,7 +73,7 @@ impl NetworkPeer { #[cfg(feature = "trace-ot")] { use crate::tracing::{CombinedRegister, OTEventRegister}; - crate::tracing::CombinedRegister::new([ + CombinedRegister::new([ Box::new(EventRegister::new( crate::config::Config::conf().event_log(), )), diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 66bb9ee37..273aac4f7 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -73,6 +73,7 @@ where sender = s; op.process_message(network_bridge, op_manager, msg).await }; + handle_op_result(op_manager, network_bridge, result, tx, sender).await } @@ -90,6 +91,7 @@ where match result { Err(OpError::StatePushed) => { // do nothing and continue, the operation will just continue later on + tracing::debug!("entered in state pushed to continue with op"); return Ok(None); } Err(err) => { @@ -134,6 +136,7 @@ where }) => { op_manager.completed(tx_id); // finished the operation at this node, informing back + if let Some(target) = msg.target().cloned() { network_bridge.send(&target.peer, msg).await?; } diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 675888cd1..567eb080f 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -300,6 +300,7 @@ impl Operation for PutOp { }; let broadcast_to = op_manager.get_broadcast_targets(&key, &sender.peer); + match try_to_broadcast( *id, last_hop, diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index f5e3f0dac..ce9648794 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -1,17 +1,26 @@ +use freenet_stdlib::client_api::{ErrorKind, HostResponse}; +use std::time::Instant; // TODO: complete update logic in the network - use freenet_stdlib::prelude::*; use futures::future::BoxFuture; +use futures::FutureExt; -use super::{OpError, OpOutcome, Operation}; +use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; +use crate::contract::ContractHandlerEvent; +use crate::message::{InnerMessage, NetMessage, Transaction}; +use crate::ring::{Location, PeerKeyLocation, RingError}; use crate::{ - client_events::{ClientId, HostResult}, - node::{NetworkBridge, OpManager}, + client_events::HostResult, + node::{NetworkBridge, OpManager, PeerId}, }; pub(crate) use self::messages::UpdateMsg; -pub(crate) struct UpdateOp {} +pub(crate) struct UpdateOp { + pub id: Transaction, + pub(crate) state: Option, + stats: Option, +} impl UpdateOp { pub fn outcome(&self) -> OpOutcome { @@ -19,23 +28,82 @@ impl UpdateOp { } pub fn finalized(&self) -> bool { - todo!() + self.stats + .as_ref() + .map(|s| matches!(s.step, RecordingStats::Completed)) + .unwrap_or(false) + || matches!(self.state, Some(UpdateState::Finished { .. })) } - pub fn record_transfer(&mut self) {} + pub(super) fn record_transfer(&mut self) { + if let Some(stats) = self.stats.as_mut() { + match stats.step { + RecordingStats::Uninitialized => { + stats.transfer_time = Some((Instant::now(), None)); + stats.step = RecordingStats::InitUpdate; + } + RecordingStats::InitUpdate => { + if let Some((_, e)) = stats.transfer_time.as_mut() { + *e = Some(Instant::now()); + } + stats.step = RecordingStats::Completed; + } + RecordingStats::Completed => {} + } + } + } pub(super) fn to_host_result(&self) -> HostResult { - todo!() + if let Some(UpdateState::Finished { key, summary }) = &self.state { + Ok(HostResponse::ContractResponse( + freenet_stdlib::client_api::ContractResponse::UpdateResponse { + key: key.clone(), + summary: summary.clone(), + }, + )) + } else { + Err(ErrorKind::OperationError { + cause: "update didn't finish successfully".into(), + } + .into()) + } } } +struct UpdateStats { + // contract_location: Location, + // payload_size: usize, + // /// (start, end) + // first_response_time: Option<(Instant, Option)>, + /// (start, end) + transfer_time: Option<(Instant, Option)>, + target: Option, + step: RecordingStats, +} + +/// While timing, at what particular step we are now. +#[derive(Clone, Copy, Default)] +enum RecordingStats { + #[default] + Uninitialized, + InitUpdate, + Completed, +} + pub(crate) struct UpdateResult {} impl TryFrom for UpdateResult { type Error = OpError; - fn try_from(_value: UpdateOp) -> Result { - todo!() + fn try_from(op: UpdateOp) -> Result { + if let Some(true) = op + .stats + .map(|s| matches!(s.step, RecordingStats::Completed)) + { + Ok(UpdateResult {}) + } else { + Err(OpError::UnexpectedOpState) + } } } @@ -44,76 +112,709 @@ impl Operation for UpdateOp { type Result = UpdateResult; fn load_or_init<'a>( - _op_manager: &'a crate::node::OpManager, - _msg: &'a Self::Message, + op_manager: &'a crate::node::OpManager, + msg: &'a Self::Message, ) -> BoxFuture<'a, Result, OpError>> { - todo!() + async move { + let mut sender: Option = None; + if let Some(peer_key_loc) = msg.sender().cloned() { + sender = Some(peer_key_loc.peer); + }; + let tx = *msg.id(); + match op_manager.pop(msg.id()) { + Ok(Some(OpEnum::Update(update_op))) => { + Ok(OpInitialization { + op: update_op, + sender, + }) + // was an existing operation, other peer messaged back + } + Ok(Some(op)) => { + let _ = op_manager.push(tx, op).await; + Err(OpError::OpNotPresent(tx)) + } + Ok(None) => { + // new request to get a value for a contract, initialize the machine + tracing::debug!(tx = %tx, sender = ?sender, "initializing new op"); + Ok(OpInitialization { + op: Self { + state: Some(UpdateState::ReceivedRequest), + id: tx, + stats: None, // don't care about stats in target peers + }, + sender, + }) + } + Err(err) => Err(err.into()), + } + } + .boxed() } fn id(&self) -> &crate::message::Transaction { - todo!() + &self.id } fn process_message<'a, NB: NetworkBridge>( self, - _conn_manager: &'a mut NB, - _op_manager: &'a crate::node::OpManager, - _input: &Self::Message, + conn_manager: &'a mut NB, + op_manager: &'a crate::node::OpManager, + input: &'a Self::Message, // _client_id: Option, ) -> std::pin::Pin< Box> + Send + 'a>, > { - todo!() + Box::pin(async move { + let return_msg; + let new_state; + let stats = self.stats; + + match input { + UpdateMsg::RequestUpdate { + id, + key, + target, + related_contracts, + value, + } => { + let sender = op_manager.ring.own_location(); + + tracing::debug!( + "Requesting update for contract {} from {} to {}", + key, + sender.peer, + target.peer + ); + + return_msg = Some(UpdateMsg::SeekNode { + id: *id, + sender, + target: *target, + value: value.clone(), + key: key.clone(), + related_contracts: related_contracts.clone(), + }); + + // no changes to state yet, still in AwaitResponse state + new_state = self.state; + } + UpdateMsg::SeekNode { + id, + value, + key, + related_contracts, + target, + sender, + } => { + let is_subscribed_contract = op_manager.ring.is_seeding_contract(key); + + tracing::debug!( + tx = %id, + %key, + target = %target.peer, + "Updating contract at target peer", + ); + + let broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); + + if is_subscribed_contract { + tracing::debug!("Peer is subscribed to contract. About to update it"); + update_contract( + op_manager, + key.clone(), + value.clone(), + related_contracts.clone(), + ) + .await?; + tracing::debug!( + tx = %id, + "Successfully updated a value for contract {} @ {:?} - update", + key, + target.location + ); + } else { + tracing::debug!("contract not found in this peer. Should throw an error"); + return Err(OpError::RingError(RingError::NoCachingPeers(key.clone()))); + } + + match try_to_broadcast( + *id, + true, + op_manager, + self.state, + (broadcast_to, *sender), + key.clone(), + value.clone(), + false, + ) + .await + { + Ok((state, msg)) => { + new_state = state; + return_msg = msg; + } + Err(err) => return Err(err), + } + } + UpdateMsg::BroadcastTo { + id, + key, + new_value, + sender, + } => { + if let Some(UpdateState::AwaitingResponse { .. }) = self.state { + tracing::debug!("Trying to broadcast to a peer that was the initiator of the op because it received the client request, or is in the middle of a seek node process"); + return Err(OpError::StatePushed); + }; + + let target = op_manager.ring.own_location(); + + tracing::debug!("Attempting contract value update - BroadcastTo - update"); + let new_value = update_contract( + op_manager, + key.clone(), + new_value.clone(), + RelatedContracts::default(), + ) + .await?; + tracing::debug!("Contract successfully updated - BroadcastTo - update"); + + let broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); + + tracing::debug!( + "Successfully updated a value for contract {} @ {:?} - BroadcastTo - update", + key, + target.location + ); + + match try_to_broadcast( + *id, + false, + op_manager, + self.state, + (broadcast_to, *sender), + key.clone(), + new_value, + true, + ) + .await + { + Ok((state, msg)) => { + new_state = state; + return_msg = msg; + } + Err(err) => return Err(err), + } + } + UpdateMsg::Broadcasting { + id, + broadcast_to, + broadcasted_to, + key, + new_value, + upstream, + } => { + let sender = op_manager.ring.own_location(); + let mut broadcasted_to = *broadcasted_to; + + let mut broadcasting = Vec::with_capacity(broadcast_to.len()); + + for peer in broadcast_to.iter() { + let msg = UpdateMsg::BroadcastTo { + id: *id, + key: key.clone(), + new_value: new_value.clone(), + sender, + }; + let f = conn_manager.send(&peer.peer, msg.into()); + broadcasting.push(f); + } + let error_futures = futures::future::join_all(broadcasting) + .await + .into_iter() + .enumerate() + .filter_map(|(p, err)| { + if let Err(err) = err { + Some((p, err)) + } else { + None + } + }); + + let mut incorrect_results = 0; + for (peer_num, err) in error_futures { + // remove the failed peers in reverse order + let peer = broadcast_to.get(peer_num).unwrap(); + tracing::warn!( + "failed broadcasting update change to {} with error {}; dropping connection", + peer.peer, + err + ); + // TODO: review this, maybe we should just dropping this subscription + conn_manager.drop_connection(&peer.peer).await?; + incorrect_results += 1; + } + + broadcasted_to += broadcast_to.len() - incorrect_results; + tracing::debug!( + "Successfully broadcasted update contract {key} to {broadcasted_to} peers - Broadcasting" + ); + + let raw_state = State::from(new_value.clone()); + + let summary = StateSummary::from(raw_state.into_bytes()); + + // Subscriber nodes have been notified of the change, the operation is complete + return_msg = Some(UpdateMsg::SuccessfulUpdate { + id: *id, + target: *upstream, + summary, + }); + + new_state = None; + } + UpdateMsg::SuccessfulUpdate { id, summary, .. } => { + match self.state { + Some(UpdateState::AwaitingResponse { key, upstream }) => { + tracing::debug!( + tx = %id, + %key, + this_peer = %op_manager.ring.peer_key, + "Peer completed contract value update - SuccessfulUpdate", + ); + + new_state = Some(UpdateState::Finished { + key, + summary: summary.clone(), + }); + if let Some(upstream) = upstream { + return_msg = Some(UpdateMsg::SuccessfulUpdate { + id: *id, + target: upstream, + summary: summary.clone(), + }); + } else { + // this means op finalized + return_msg = None; + } + } + _ => { + tracing::error!( + state = ?self.state, + "invalid transition in UpdateMsg::SuccessfulUpdate -> match self.state" + ); + + return Err(OpError::invalid_transition(self.id)); + } + }; + } + _ => return Err(OpError::UnexpectedOpState), + } + + build_op_result(self.id, new_state, return_msg, stats) + }) } } +#[allow(clippy::too_many_arguments)] +async fn try_to_broadcast( + id: Transaction, + last_hop: bool, + op_manager: &OpManager, + state: Option, + (broadcast_to, upstream): (Vec, PeerKeyLocation), + key: ContractKey, + new_value: WrappedState, + is_from_a_broadcasted_to_peer: bool, +) -> Result<(Option, Option), OpError> { + let new_state; + let return_msg; + + match state { + Some(UpdateState::ReceivedRequest | UpdateState::BroadcastOngoing { .. }) => { + if broadcast_to.is_empty() && !last_hop { + // broadcast complete + tracing::debug!( + "Empty broadcast list while updating value for contract {} - try_to_broadcast", + key + ); + + return_msg = None; + + if is_from_a_broadcasted_to_peer { + new_state = None; + return Ok((new_state, return_msg)); + } + + // means the whole tx finished so can return early + new_state = Some(UpdateState::AwaitingResponse { + key, + upstream: Some(upstream), + }); + } else if !broadcast_to.is_empty() { + tracing::debug!( + "Callback to start broadcasting to other nodes. List size {}", + broadcast_to.len() + ); + new_state = Some(UpdateState::BroadcastOngoing); + + return_msg = Some(UpdateMsg::Broadcasting { + id, + new_value, + broadcasted_to: 0, + broadcast_to, + key, + upstream, + }); + + let op = UpdateOp { + id, + state: new_state, + stats: None, + }; + op_manager + .notify_op_change(NetMessage::from(return_msg.unwrap()), OpEnum::Update(op)) + .await?; + return Err(OpError::StatePushed); + } else { + let raw_state = State::from(new_value); + + let summary = StateSummary::from(raw_state.into_bytes()); + + new_state = None; + return_msg = Some(UpdateMsg::SuccessfulUpdate { + id, + target: upstream, + summary, + }); + } + } + _ => return Err(OpError::invalid_transition(id)), + }; + + Ok((new_state, return_msg)) +} + +impl OpManager { + pub(crate) fn get_broadcast_targets_update( + &self, + key: &ContractKey, + sender: &PeerId, + ) -> Vec { + let subscribers = self + .ring + .subscribers_of(key) + .map(|subs| { + subs.value() + .iter() + .filter(|pk| &pk.peer != sender) + .copied() + .collect::>() + }) + .unwrap_or_default(); + + subscribers + } +} + +fn build_op_result( + id: Transaction, + state: Option, + return_msg: Option, + stats: Option, +) -> Result { + let mut state_is_none = false; + if state.as_ref().is_none() { + state_is_none = true; + } + + let output_op = Some(UpdateOp { id, state, stats }); + + let op_enum_update = output_op.map(OpEnum::Update); + + Ok(OperationResult { + return_msg: return_msg.map(NetMessage::from), + state: { + if state_is_none { + None + } else { + op_enum_update + } + }, + }) +} + +async fn update_contract( + op_manager: &OpManager, + key: ContractKey, + state: WrappedState, + related_contracts: RelatedContracts<'static>, +) -> Result { + match op_manager + .notify_contract_handler(ContractHandlerEvent::UpdateQuery { + key, + state, + related_contracts, + }) + .await + { + Ok(ContractHandlerEvent::UpdateResponse { + new_value: Ok(new_val), + }) => Ok(new_val), + Ok(ContractHandlerEvent::UpdateResponse { + new_value: Err(_err), + }) => { + // return Err(OpError::from(ContractError::StorageError(err))); + todo!("not a valid value update, notify back to requester") + } + Err(err) => Err(err.into()), + Ok(_) => Err(OpError::UnexpectedOpState), + } +} + +/// This will be called from the node when processing an open request // todo: new_state should be a delta when possible! -pub(crate) fn start_op(_key: ContractKey, _new_state: WrappedState, _htl: usize) -> UpdateOp { - todo!() +pub(crate) fn start_op( + key: ContractKey, + new_state: WrappedState, + related_contracts: RelatedContracts<'static>, +) -> UpdateOp { + let contract_location = Location::from(&key); + tracing::debug!(%contract_location, %key, "Requesting update"); + let id = Transaction::new::(); + // let payload_size = contract.data().len(); + + let state = Some(UpdateState::PrepareRequest { + key, + related_contracts, + value: new_state, + }); + + UpdateOp { + id, + state, + stats: Some(UpdateStats { + // contract_location, + // payload_size, + target: None, + // first_response_time: None, + transfer_time: None, + step: Default::default(), + }), + } } +/// Entry point from node to operations logic pub(crate) async fn request_update( - _op_manager: &OpManager, - _update_op: UpdateOp, - _client_id: Option, + op_manager: &OpManager, + mut update_op: UpdateOp, ) -> Result<(), OpError> { - todo!() + let key = if let Some(UpdateState::PrepareRequest { key, .. }) = &update_op.state { + key + } else { + return Err(OpError::UnexpectedOpState); + }; + + let sender = op_manager.ring.own_location(); + + // the initial request must provide: + // - a peer as close as possible to the contract location + // - and the value to update + let target = if let Some(location) = op_manager.ring.subscribers_of(key) { + location + .clone() + .pop() + .ok_or(OpError::RingError(RingError::NoLocation))? + } else { + let closest = op_manager + .ring + .closest_potentially_caching(key, [sender.peer].as_slice()) + .into_iter() + .next() + .ok_or_else(|| RingError::EmptyRing)?; + + op_manager + .ring + .add_subscriber(key, sender) + .map_err(|_| RingError::NoCachingPeers(key.clone()))?; + + closest + }; + + let id = update_op.id; + if let Some(stats) = &mut update_op.stats { + stats.target = Some(target); + } + + match update_op.state { + Some(UpdateState::PrepareRequest { + key, + value, + related_contracts, + }) => { + let new_state = Some(UpdateState::AwaitingResponse { + key: key.clone(), + upstream: None, + }); + let msg = UpdateMsg::RequestUpdate { + id, + key, + related_contracts, + target, + value, + }; + + let op = UpdateOp { + state: new_state, + id, + stats: update_op.stats, + }; + + op_manager + .notify_op_change(NetMessage::from(msg), OpEnum::Update(op)) + .await?; + } + _ => return Err(OpError::invalid_transition(update_op.id)), + }; + + Ok(()) } mod messages { use std::fmt::Display; + use freenet_stdlib::prelude::{ContractKey, RelatedContracts, StateSummary, WrappedState}; use serde::{Deserialize, Serialize}; use crate::{ message::{InnerMessage, Transaction}, - ring::PeerKeyLocation, + ring::{Location, PeerKeyLocation}, }; #[derive(Debug, Serialize, Deserialize)] - pub(crate) enum UpdateMsg {} + pub(crate) enum UpdateMsg { + RequestUpdate { + id: Transaction, + key: ContractKey, + target: PeerKeyLocation, + #[serde(deserialize_with = "RelatedContracts::deser_related_contracts")] + related_contracts: RelatedContracts<'static>, + value: WrappedState, + }, + /// Value successfully inserted/updated. + SuccessfulUpdate { + id: Transaction, + target: PeerKeyLocation, + #[serde(deserialize_with = "StateSummary::deser_state_summary")] + summary: StateSummary<'static>, + }, + AwaitUpdate { + id: Transaction, + }, + SeekNode { + id: Transaction, + sender: PeerKeyLocation, + target: PeerKeyLocation, + value: WrappedState, + key: ContractKey, + #[serde(deserialize_with = "RelatedContracts::deser_related_contracts")] + related_contracts: RelatedContracts<'static>, + }, + /// Internal node instruction that a change (either a first time insert or an update). + Broadcasting { + id: Transaction, + broadcasted_to: usize, + broadcast_to: Vec, + key: ContractKey, + new_value: WrappedState, + //contract: ContractContainer, + upstream: PeerKeyLocation, + }, + /// Broadcasting a change to a peer, which then will relay the changes to other peers. + BroadcastTo { + id: Transaction, + sender: PeerKeyLocation, + key: ContractKey, + new_value: WrappedState, + }, + } impl InnerMessage for UpdateMsg { fn id(&self) -> &Transaction { - todo!() + match self { + UpdateMsg::RequestUpdate { id, .. } => id, + UpdateMsg::SuccessfulUpdate { id, .. } => id, + UpdateMsg::AwaitUpdate { id, .. } => id, + UpdateMsg::SeekNode { id, .. } => id, + UpdateMsg::Broadcasting { id, .. } => id, + UpdateMsg::BroadcastTo { id, .. } => id, + } } fn target(&self) -> Option<&PeerKeyLocation> { - todo!() + match self { + UpdateMsg::RequestUpdate { target, .. } => Some(target), + UpdateMsg::SuccessfulUpdate { target, .. } => Some(target), + UpdateMsg::SeekNode { target, .. } => Some(target), + _ => None, + } } fn terminal(&self) -> bool { - todo!() + use UpdateMsg::*; + matches!(self, SuccessfulUpdate { .. } | SeekNode { .. }) } fn requested_location(&self) -> Option { - todo!() + match self { + UpdateMsg::RequestUpdate { key, .. } => Some(Location::from(key.id())), + UpdateMsg::SeekNode { key, .. } => Some(Location::from(key.id())), + UpdateMsg::Broadcasting { key, .. } => Some(Location::from(key.id())), + UpdateMsg::BroadcastTo { key, .. } => Some(Location::from(key.id())), + _ => None, + } + } + } + + impl UpdateMsg { + pub fn sender(&self) -> Option<&PeerKeyLocation> { + match self { + Self::SeekNode { sender, .. } => Some(sender), + Self::BroadcastTo { sender, .. } => Some(sender), + _ => None, + } } } impl Display for UpdateMsg { - fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - todo!() + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + UpdateMsg::RequestUpdate { id, .. } => write!(f, "RequestUpdate(id: {id})"), + UpdateMsg::SuccessfulUpdate { id, .. } => write!(f, "SuccessfulUpdate(id: {id})"), + UpdateMsg::AwaitUpdate { id } => write!(f, "AwaitUpdate(id: {id})"), + UpdateMsg::SeekNode { id, .. } => write!(f, "SeekNode(id: {id})"), + UpdateMsg::Broadcasting { id, .. } => write!(f, "Broadcasting(id: {id})"), + UpdateMsg::BroadcastTo { id, .. } => write!(f, "BroadcastTo(id: {id})"), + } } } } + +#[derive(Debug)] +pub enum UpdateState { + ReceivedRequest, + AwaitingResponse { + key: ContractKey, + upstream: Option, + }, + Finished { + key: ContractKey, + summary: StateSummary<'static>, + }, + PrepareRequest { + key: ContractKey, + related_contracts: RelatedContracts<'static>, + value: WrappedState, + }, + BroadcastOngoing, +} diff --git a/stdlib b/stdlib index 0aff12d6b..f28e67163 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit 0aff12d6b0c5dd9421c46f07e47e29b2492f2b7c +Subproject commit f28e6716364b4e1c9ae8837344286393a2da4c82