Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Update Op #915

Merged
merged 22 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
[submodule "stdlib"]
path = stdlib
url = https://github.com/freenet/freenet-stdlib

6 changes: 4 additions & 2 deletions crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,14 +485,16 @@ pub(crate) mod test {
}
}
val if (35..80).contains(&val) => {
let new_state = UpdateData::State(State::from(self.random_byte_vec()));
if let Some(contract) = self.choose(&state.existing_contracts) {
let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec()));
// TODO: It will be used when the delta updates are available
// let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec()));
if !for_this_peer {
continue;
}
let request = ContractRequest::Update {
key: contract.key().clone(),
data: delta,
data: new_state,
};
if state.owns_contracts.contains(&contract.key()) {
return Some(request.into());
Expand Down
30 changes: 30 additions & 0 deletions crates/core/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,36 @@ where
error
})?;
}
ContractHandlerEvent::UpdateQuery {
key,
state,
related_contracts,
} => {
let update_result = contract_handler
.executor()
.upsert_contract_state(
key.clone(),
Either::Left(state.clone()),
related_contracts,
None,
)
.instrument(tracing::info_span!("upsert_contract_state", %key))
.await;

contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::UpdateResponse {
new_value: update_result.map_err(Into::into),
},
)
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
}
_ => unreachable!(),
}
}
Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,16 +397,17 @@ struct UpdateContract {

#[async_trait::async_trait]
impl ComposeNetworkMessage<operations::update::UpdateOp> for UpdateContract {
fn initiate_op(self, op_manager: &OpManager) -> operations::update::UpdateOp {
fn initiate_op(self, _op_manager: &OpManager) -> operations::update::UpdateOp {
let UpdateContract { key, new_state } = self;
operations::update::start_op(key, new_state, op_manager.ring.max_hops_to_live)
let related_contracts = RelatedContracts::default();
operations::update::start_op(key, new_state, related_contracts)
}

async fn resume_op(
op: operations::update::UpdateOp,
op_manager: &OpManager,
) -> Result<(), OpError> {
operations::update::request_update(op_manager, op, None).await
operations::update::request_update(op_manager, op).await
}
}

Expand Down
17 changes: 9 additions & 8 deletions crates/core/src/contract/executor/mock_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,15 @@ impl ContractExecutor for Executor<MockRuntime> {
.map_err(ExecutorError::other)?;
return Ok(incoming_state);
}
// (Either::Left(_), None) => {
// return Err(ExecutorError::request(RequestError::from(
// StdContractError::Get {
// key: key.clone(),
// cause: "Missing contract or parameters".into(),
// },
// )));
// }
(Either::Left(incoming_state), None) => {
// update case

self.state_store
.update(&key, incoming_state.clone())
.await
.map_err(ExecutorError::other)?;
return Ok(incoming_state);
}
(update, contract) => unreachable!("{update:?}, {contract:?}"),
}
}
Expand Down
21 changes: 21 additions & 0 deletions crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,16 @@ pub(crate) enum ContractHandlerEvent {
key: ContractKey,
response: Result<StoreResponse, ExecutorError>,
},
/// Updates a supposedly existing contract in this node
UpdateQuery {
key: ContractKey,
state: WrappedState,
related_contracts: RelatedContracts<'static>,
},
/// The response to an update query
UpdateResponse {
new_value: Result<WrappedState, ExecutorError>,
},
}

impl std::fmt::Display for ContractHandlerEvent {
Expand Down Expand Up @@ -399,6 +409,17 @@ impl std::fmt::Display for ContractHandlerEvent {
write!(f, "get query failed {{ {key} }}",)
}
},
ContractHandlerEvent::UpdateQuery { key, .. } => {
write!(f, "update query {{ {key} }}")
}
ContractHandlerEvent::UpdateResponse { new_value } => match new_value {
Ok(v) => {
write!(f, "update query response {{ {v} }}",)
}
Err(e) => {
write!(f, "update query failed {{ {e} }}",)
}
},
}
}
}
Expand Down
46 changes: 40 additions & 6 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
use either::Either;
use freenet_stdlib::{
client_api::{ClientRequest, ContractRequest, ErrorKind},
prelude::ContractKey,
prelude::{ContractKey, RelatedContracts, WrappedState},
};
use libp2p::{identity, multiaddr::Protocol, Multiaddr, PeerId as Libp2pPeerId};

Expand All @@ -40,7 +40,7 @@ use crate::{
message::{NetMessage, NodeEvent, Transaction, TransactionType},
operations::{
connect::{self, ConnectOp},
get, put, subscribe, OpEnum, OpError, OpOutcome,
get, put, subscribe, update, OpEnum, OpError, OpOutcome,
},
ring::{Location, PeerKeyLocation},
router::{RouteEvent, RouteOutcome},
Expand Down Expand Up @@ -396,15 +396,33 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc<OpM
tracing::error!("{}", err);
}
}
ContractRequest::Update {
key: _key,
data: _delta,
} => {
ContractRequest::Update { key, data } => {
// FIXME: perform updates
tracing::debug!(
this_peer = %op_manager.ring.peer_key,
"Received update from user event",
);
let state = match data {
freenet_stdlib::prelude::UpdateData::State(s) => s,
_ => {
unreachable!();
}
};

let wrapped_state = WrappedState::from(state.into_bytes());

let related_contracts = RelatedContracts::default();

let op = update::start_op(key, wrapped_state, related_contracts);

let _ = op_manager
.ch_outbound
.waiting_for_transaction_result(op.id, client_id)
.await;

if let Err(err) = update::request_update(&op_manager, op).await {
tracing::error!("request update error {}", err)
}
}
ContractRequest::Get {
key,
Expand Down Expand Up @@ -648,6 +666,22 @@ async fn process_message<CB>(
)
.await;
}
NetMessage::Update(op) => {
let op_result =
handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op)
.await;
handle_op_not_available!(op_result);
break report_result(
tx,
op_result,
&op_manager,
executor_callback,
cli_req,
&mut *event_listener,
)
.await;
}

NetMessage::Unsubscribed { key, .. } => {
subscribe(op_manager, key.clone(), None).await;
break;
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/node/testing_impl/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl<ER> Builder<ER> {
contract::contract_handling(contract_handler)
.instrument(tracing::info_span!(parent: parent_span.clone(), "contract_handling")),
);

let mut config = super::RunnerConfig {
peer_key: self.peer_key,
gateways,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/node/testing_impl/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl NetworkPeer {
#[cfg(feature = "trace-ot")]
{
use crate::tracing::{CombinedRegister, OTEventRegister};
crate::tracing::CombinedRegister::new([
CombinedRegister::new([
Box::new(EventRegister::new(
crate::config::Config::conf().event_log(),
)),
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ where
sender = s;
op.process_message(network_bridge, op_manager, msg).await
};

handle_op_result(op_manager, network_bridge, result, tx, sender).await
}

Expand All @@ -90,6 +91,7 @@ where
match result {
Err(OpError::StatePushed) => {
// do nothing and continue, the operation will just continue later on
tracing::debug!("entered in state pushed to continue with op");
return Ok(None);
}
Err(err) => {
Expand Down Expand Up @@ -134,6 +136,7 @@ where
}) => {
op_manager.completed(tx_id);
// finished the operation at this node, informing back

if let Some(target) = msg.target().cloned() {
network_bridge.send(&target.peer, msg).await?;
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ impl Operation for PutOp {
};

let broadcast_to = op_manager.get_broadcast_targets(&key, &sender.peer);

match try_to_broadcast(
*id,
last_hop,
Expand Down
Loading
Loading