Skip to content

Commit

Permalink
add server side impls
Browse files Browse the repository at this point in the history
  • Loading branch information
livingshade committed Dec 2, 2023
1 parent e0370c1 commit b5b8c2f
Show file tree
Hide file tree
Showing 21 changed files with 425 additions and 277 deletions.
116 changes: 116 additions & 0 deletions experimental/mrpc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions experimental/mrpc/plugin/policy/fault-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@

[package]
name = "phoenix-FaultServer-server"
name = "phoenix-fault-server"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
phoenix_common.workspace = true
phoenix-api-policy-FaultServer.workspace = true
phoenix-api-policy-fault-server.workspace = true
mrpc-marshal.workspace = true
mrpc-derive.workspace = true
shm.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion experimental/mrpc/plugin/policy/fault-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chrono::prelude::*;
use itertools::iproduct;
use rand::Rng;

#[derive(Debug, Clone, Copy, DeFaultServer, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct FaultServerConfig {}

Expand Down
132 changes: 71 additions & 61 deletions experimental/mrpc/plugin/policy/fault-server/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use anyhow::{anyhow, Result};
use futures::future::BoxFuture;
use phoenix_api::rpc::{RpcId, TransportStatus};
use phoenix_api::rpc::{RpcId, StatusCode, TransportStatus};
use std::fmt;
use std::fs::File;
use std::io::Write;
use std::num::NonZeroU32;
use std::os::unix::ucred::UCred;
use std::pin::Pin;
use std::ptr::Unique;

use phoenix_api_policy_FaultServer::control_plane;

use phoenix_api_policy_fault_server::control_plane;
use phoenix_common::engine::datapath::message::{
EngineRxMessage, EngineTxMessage, RpcMessageGeneral,
};
use phoenix_common::engine::datapath::meta_pool::MetaBufferPool;

use phoenix_common::engine::datapath::node::DataPathNode;
use phoenix_common::engine::{future, Decompose, Engine, EngineResult, Indicator, Vertex};
Expand All @@ -21,7 +22,7 @@ use phoenix_common::impl_vertex_for_engine;
use phoenix_common::log;
use phoenix_common::module::Version;

use phoenix_common::engine::datapath::RpcMessageTx;
use phoenix_common::engine::datapath::{RpcMessageRx, RpcMessageTx};
use phoenix_common::storage::{ResourceCollection, SharedStorage};

use super::DatapathError;
Expand All @@ -44,6 +45,7 @@ pub(crate) struct FaultServerEngine {
pub(crate) node: DataPathNode,
pub(crate) indicator: Indicator,
pub(crate) config: FaultServerConfig,
pub(crate) meta_buf_pool: MetaBufferPool,
pub(crate) var_probability: f32,
}

Expand Down Expand Up @@ -102,6 +104,7 @@ impl Decompose for FaultServerEngine {
let engine = *self;
let mut collections = ResourceCollection::with_capacity(4);
collections.insert("config".to_string(), Box::new(engine.config));
collections.insert("meta_buf_pool".to_string(), Box::new(engine.meta_buf_pool));
(collections, engine.node)
}
}
Expand All @@ -117,12 +120,18 @@ impl FaultServerEngine {
.unwrap()
.downcast::<FaultServerConfig>()
.map_err(|x| anyhow!("fail to downcast, type_name={:?}", x.type_name()))?;
let meta_buf_pool = *local
.remove("meta_buf_pool")
.unwrap()
.downcast::<MetaBufferPool>()
.map_err(|x| anyhow!("fail to downcast, type_name={:?}", x.type_name()))?;
let var_probability = 0.01;

let engine = FaultServerEngine {
node,
indicator: DeFaultServer::deFaultServer(),
indicator: Default::default(),
config,
meta_buf_pool,
var_probability,
};
Ok(engine)
Expand Down Expand Up @@ -153,78 +162,79 @@ fn materialize_nocopy(msg: &RpcMessageTx) -> &hello::HelloRequest {
return req;
}

/// Copy the RPC request to a private heap and returns the request.
#[inline]
fn materialize_rx(msg: &RpcMessageRx) -> Box<hello::HelloRequest> {
let req_ptr = Unique::new(msg.addr_backend as *mut hello::HelloRequest).unwrap();
let req = unsafe { req_ptr.as_ref() };
// returns a private_req
Box::new(req.clone())
}

impl FaultServerEngine {
fn check_input_queue(&mut self) -> Result<Status, DatapathError> {
use phoenix_common::engine::datapath::TryRecvError;

match self.tx_inputs()[0].try_recv() {
Ok(msg) => {
match msg {
EngineTxMessage::RpcMessage(msg) => {
let meta_ref = unsafe { &*msg.meta_buf_ptr.as_meta_ptr() };
let mut input = Vec::new();
input.push(msg);
let output: Vec<_> = input
.iter()
.map(|msg| {
let rpc_message = materialize_nocopy(&msg);
let conn_id = unsafe { &*msg.meta_buf_ptr.as_meta_ptr() }.conn_id;
let call_id = unsafe { &*msg.meta_buf_ptr.as_meta_ptr() }.call_id;
let rpc_id = RpcId::new(conn_id, call_id);
if rand::random::<f32>() < self.var_probability {
let error = EngineRxMessage::Ack(
rpc_id,
TransportStatus::Error(unsafe {
NonZeroU32::new_unchecked(403)
}),
);
RpcMessageGeneral::RxMessage(error)
} else {
let raw_ptr: *const hello::HelloRequest = rpc_message;
let new_msg = RpcMessageTx {
meta_buf_ptr: msg.meta_buf_ptr.clone(),
addr_backend: raw_ptr.addr(),
};
RpcMessageGeneral::TxMessage(EngineTxMessage::RpcMessage(
new_msg,
))
}
})
.collect();

for msg in output {
match msg {
RpcMessageGeneral::TxMessage(msg) => {
self.tx_outputs()[0].send(msg)?;
}
RpcMessageGeneral::RxMessage(msg) => {
self.rx_outputs()[0].send(msg)?;
}
_ => {}
}
}
}
m => self.tx_outputs()[0].send(m)?,
}
self.tx_outputs()[0].send(msg)?;
return Ok(Progress(1));
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => {
return Ok(Status::Disconnected);
}
Err(TryRecvError::Disconnected) => return Ok(Status::Disconnected),
}

match self.rx_inputs()[0].try_recv() {
Ok(msg) => {
match msg {
Ok(m) => {
match m {
EngineRxMessage::Ack(rpc_id, status) => {
// todo
self.rx_outputs()[0].send(EngineRxMessage::Ack(rpc_id, status))?;
if let Ok(()) = self.meta_buf_pool.release(rpc_id) {
// log::info!(
// "Access denied ack received, rpc_id: {:?} metabuf released",
// rpc_id
// );
} else {
// log::info!("release failed!: {:?}", rpc_id);
self.rx_outputs()[0].send(m)?;
}
}
EngineRxMessage::RpcMessage(msg) => {
self.rx_outputs()[0].send(EngineRxMessage::RpcMessage(msg))?;
let private_req = materialize_rx(&msg);
if rand::random::<f32>() < self.var_probability {
// We need to copy meta, add it to meta_buf_pool, and send it as the tx msg
// Is there better way to do this and avoid unsafe?
let mut meta = unsafe { msg.meta.as_ref().clone() };
meta.status_code = StatusCode::AccessDenied;
let mut meta_ptr = self
.meta_buf_pool
.obtain(RpcId(meta.conn_id, meta.call_id))
.expect("meta_buf_pool is full");
unsafe {
meta_ptr.as_meta_ptr().write(meta);
meta_ptr.0.as_mut().num_sge = 0;
meta_ptr.0.as_mut().value_len = 0;
}
let rpc_msg = RpcMessageTx {
meta_buf_ptr: meta_ptr,
addr_backend: 0,
};
let new_msg = EngineTxMessage::RpcMessage(rpc_msg);
self.tx_outputs()[0]
.send(new_msg)
.expect("send new message error");
let msg_call_ids =
[meta.call_id, meta.call_id, meta.call_id, meta.call_id];
self.tx_outputs()[0].send(EngineTxMessage::ReclaimRecvBuf(
meta.conn_id,
msg_call_ids,
))?;
} else {
self.rx_outputs()[0].send(EngineRxMessage::RpcMessage(msg))?;
}
}
EngineRxMessage::RecvError(_, _) => {
self.rx_outputs()[0].send(m)?;
}
m => self.rx_outputs()[0].send(m)?,
}
return Ok(Progress(1));
}
Expand Down
Loading

0 comments on commit b5b8c2f

Please sign in to comment.