diff --git a/eval/policy/delay/attach.toml b/eval/policy/delay/attach.toml new file mode 100644 index 00000000..6d86f6f0 --- /dev/null +++ b/eval/policy/delay/attach.toml @@ -0,0 +1,12 @@ +addon_engine = "DelayEngine" +tx_channels_replacements = [ + ["MrpcEngine", "DelayEngine", 0, 0], + ["DelayEngine", "TcpRpcAdapterEngine", 0, 0], +] +rx_channels_replacements = [] +group = ["MrpcEngine", "TcpRpcAdapterEngine"] +op = "attach" +config_string = ''' +delay_probability = 0.2 +delay_ms = 100 +''' diff --git a/eval/policy/delay/collect.py b/eval/policy/delay/collect.py new file mode 100755 index 00000000..04d08256 --- /dev/null +++ b/eval/policy/delay/collect.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +from typing import List +import glob +import sys + +OD = "/tmp/mrpc-eval" +if len(sys.argv) >= 2: + OD = sys.argv[1] + + +def convert_msg_size(s: str) -> int: + if s.endswith('gb'): + return int(s[:-2]) * 1024 * 1024 * 1024 + if s.endswith('mb'): + return int(s[:-2]) * 1024 * 1024 + if s.endswith('kb'): + return int(s[:-2]) * 1024 + if s.endswith('b'): + return int(s[:-1]) + + raise ValueError(f"unknown input: {s}") + + +def get_rate(path: str) -> List[float]: + rates = [] + with open(path, 'r') as fin: + for line in fin: + words = line.strip().split(' ') + if words[-3] == 'rps,': + rate = float(words[-4]) + rates.append(rate) + return rates[1:] + + +def load_result(sol_before, sol_after, f: str): + # print(f) + rates = get_rate(f) + before = rates[5:25] + after = rates[-25:-5] + for r in before: + print(f'{round(r/1000,2)},{sol_before},w/o Fault') + for r in after: + print(f'{round(r/1000,2)},{sol_after},w/ Fault') + + +for f in glob.glob(OD+"/policy/delay/rpc_bench_tput_32b/rpc_bench_client_danyang-04.stdout"): + load_result('mRPC', 'Native mRPC', f) diff --git a/eval/policy/delay/config.toml b/eval/policy/delay/config.toml new file mode 100644 index 00000000..d6ed4a5b --- /dev/null +++ b/eval/policy/delay/config.toml @@ -0,0 +1,9 @@ +workdir = "~/nfs/Developing/livingshade/phoenix/experimental/mrpc" + +[env] +RUST_BACKTRACE = "1" +RUST_LOG_STYLE = "never" +CARGO_TERM_COLOR = "never" +PHOENIX_LOG = "info" +PROTOC = "/usr/bin/protoc" +PHOENIX_PREFIX = "/tmp/phoenix" diff --git a/eval/policy/delay/detach.toml b/eval/policy/delay/detach.toml new file mode 100644 index 00000000..bfd5b661 --- /dev/null +++ b/eval/policy/delay/detach.toml @@ -0,0 +1,4 @@ +addon_engine = "DelayEngine" +tx_channels_replacements = [["MrpcEngine", "TcpRpcAdapterEngine", 0, 0]] +rx_channels_replacements = [["TcpRpcAdapterEngine", "MrpcEngine", 0, 0]] +op = "detach" diff --git a/eval/policy/delay/rpc_bench_tput_32b.toml b/eval/policy/delay/rpc_bench_tput_32b.toml new file mode 100644 index 00000000..67720038 --- /dev/null +++ b/eval/policy/delay/rpc_bench_tput_32b.toml @@ -0,0 +1,15 @@ +name = "policy/delay/rpc_bench_tput_32b" +description = "Run rpc_bench benchmark" +group = "delay" +timeout_secs = 70 + +[[worker]] +host = "danyang-06" +bin = "rpc_bench_server" +args = "--port 5002 -l info --transport tcp" + +[[worker]] +host = "danyang-04" +bin = "rpc_bench_client" +args = "--transport tcp -c rdma0.danyang-06 --concurrency 128 --req-size 32 --duration 65 -i 1 --port 5002 -l error" +dependencies = [0] diff --git a/eval/policy/delay/start_traffic.sh b/eval/policy/delay/start_traffic.sh new file mode 100755 index 00000000..f0c5c3d1 --- /dev/null +++ b/eval/policy/delay/start_traffic.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM SIGHUP EXIT + +OD=/tmp/mrpc-eval +if [[ $# -ge 1 ]]; then + OD=$1 +fi + +WORKDIR=$(dirname $(realpath $0)) +cd $WORKDIR + +# concurrency = 128 +cargo rr --bin launcher -- --output-dir ${OD} --timeout=120 --benchmark ./rpc_bench_tput_32b.toml --configfile ./config.toml & + +sleep 30 + +LIST_OUTPUT="${OD}"/policy/list.json +cargo rr --bin list -- --dump "${LIST_OUTPUT}" # Need to specifiy PHOENIX_PREFIX +cat "${LIST_OUTPUT}" +ARG_PID=$(cat "${LIST_OUTPUT}" | jq '.[] | select(.service == "Mrpc") | .pid') +ARG_SID=$(cat "${LIST_OUTPUT}" | jq '.[] | select(.service == "Mrpc") | .sid') +echo $ARG_SID + +sleep 1 + +cargo run --bin addonctl -- --config ./attach.toml --pid ${ARG_PID} --sid ${ARG_SID} # Need to specifiy PHOENIX_PREFIX + +wait diff --git a/experimental/mrpc/Cargo.lock b/experimental/mrpc/Cargo.lock index 6f4cbad1..cc3b30c7 100644 --- a/experimental/mrpc/Cargo.lock +++ b/experimental/mrpc/Cargo.lock @@ -1595,6 +1595,16 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "phoenix-api-policy-delay" +version = "0.1.0" +dependencies = [ + "itertools", + "phoenix-api", + "rand 0.8.5", + "serde", +] + [[package]] name = "phoenix-api-policy-fault" version = "0.1.0" @@ -1744,6 +1754,30 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "phoenix-delay" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "chrono", + "futures", + "itertools", + "minstant", + "mrpc-derive", + "mrpc-marshal", + "nix", + "phoenix-api", + "phoenix-api-policy-delay", + "phoenix_common", + "rand 0.8.5", + "serde", + "serde_json", + "shm", + "thiserror", + "toml", +] + [[package]] name = "phoenix-fault" version = "0.1.0" diff --git a/experimental/mrpc/Cargo.toml b/experimental/mrpc/Cargo.toml index 6ffae8b1..2083f5f9 100644 --- a/experimental/mrpc/Cargo.toml +++ b/experimental/mrpc/Cargo.toml @@ -58,6 +58,7 @@ members = [ "phoenix-api/policy/nofile-logging", "phoenix-api/policy/fault", "phoenix-api/policy/fault2", + "phoenix-api/policy/delay", # the pheonix plugins "plugin/mrpc", "plugin/rpc_adapter", @@ -74,6 +75,7 @@ members = [ "plugin/policy/nofile-logging", "plugin/policy/fault", "plugin/policy/fault2", + "plugin/policy/delay", # examples "examples/rpc_echo", "examples/rpc_bench", @@ -101,6 +103,7 @@ phoenix-api-policy-hello-acl = { path = "phoenix-api/policy/hello-acl" } phoenix-api-policy-nofile-logging = { path = "phoenix-api/policy/nofile-logging" } phoenix-api-policy-fault = { path = "phoenix-api/policy/fault" } phoenix-api-policy-fault2 = { path = "phoenix-api/policy/fault2" } +phoenix-api-policy-delay = { path = "phoenix-api/policy/delay" } mrpc-build = { path = "mrpc-build" } mrpc-derive = { path = "mrpc-derive" } diff --git a/experimental/mrpc/load-mrpc-plugins.toml b/experimental/mrpc/load-mrpc-plugins.toml index 78d6be7d..d7e68f54 100644 --- a/experimental/mrpc/load-mrpc-plugins.toml +++ b/experimental/mrpc/load-mrpc-plugins.toml @@ -90,3 +90,11 @@ name = "Fault2" lib_path = "plugins/libphoenix_fault2.rlib" config_string = ''' ''' + +[[addons]] +name = "Delay" +lib_path = "plugins/libphoenix_delay.rlib" +config_string = ''' +delay_probability = 0.2 +delay_ms = 100 +''' diff --git a/experimental/mrpc/phoenix-api/policy/delay/Cargo.toml b/experimental/mrpc/phoenix-api/policy/delay/Cargo.toml new file mode 100644 index 00000000..dedb9488 --- /dev/null +++ b/experimental/mrpc/phoenix-api/policy/delay/Cargo.toml @@ -0,0 +1,14 @@ + +[package] +name = "phoenix-api-policy-delay" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +phoenix-api.workspace = true + +serde.workspace = true +itertools.workspace = true +rand.workspace = true diff --git a/experimental/mrpc/phoenix-api/policy/delay/src/control_plane.rs b/experimental/mrpc/phoenix-api/policy/delay/src/control_plane.rs new file mode 100644 index 00000000..5820428f --- /dev/null +++ b/experimental/mrpc/phoenix-api/policy/delay/src/control_plane.rs @@ -0,0 +1,14 @@ +use serde::{Deserialize, Serialize}; + +type IResult = Result; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Request { + NewConfig(f32, u64), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ResponseKind {} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Response(pub IResult); diff --git a/experimental/mrpc/phoenix-api/policy/delay/src/lib.rs b/experimental/mrpc/phoenix-api/policy/delay/src/lib.rs new file mode 100644 index 00000000..412c5a55 --- /dev/null +++ b/experimental/mrpc/phoenix-api/policy/delay/src/lib.rs @@ -0,0 +1 @@ +pub mod control_plane; diff --git a/experimental/mrpc/plugin/policy/delay/Cargo.toml b/experimental/mrpc/plugin/policy/delay/Cargo.toml new file mode 100644 index 00000000..dd57021e --- /dev/null +++ b/experimental/mrpc/plugin/policy/delay/Cargo.toml @@ -0,0 +1,28 @@ + +[package] +name = "phoenix-delay" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +phoenix_common.workspace = true +phoenix-api-policy-delay.workspace = true +mrpc-marshal.workspace = true +mrpc-derive.workspace = true +shm.workspace = true +phoenix-api = { workspace = true, features = ["mrpc"] } + +futures.workspace = true +minstant.workspace = true +thiserror.workspace = true +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +anyhow.workspace = true +nix.workspace = true +toml = { workspace = true, features = ["preserve_order"] } +bincode.workspace = true +chrono.workspace = true +itertools.workspace = true +rand.workspace = true diff --git a/experimental/mrpc/plugin/policy/delay/src/config.rs b/experimental/mrpc/plugin/policy/delay/src/config.rs new file mode 100644 index 00000000..87d08086 --- /dev/null +++ b/experimental/mrpc/plugin/policy/delay/src/config.rs @@ -0,0 +1,25 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct DelayConfig { + pub delay_probability: f32, + pub delay_ms: u64, +} + +impl Default for DelayConfig { + fn default() -> Self { + DelayConfig { + delay_probability: 0.2, + delay_ms: 100, + } + } +} + +impl DelayConfig { + /// Get config from toml file + pub fn new(config: Option<&str>) -> anyhow::Result { + let config = toml::from_str(config.unwrap_or(""))?; + Ok(config) + } +} diff --git a/experimental/mrpc/plugin/policy/delay/src/engine.rs b/experimental/mrpc/plugin/policy/delay/src/engine.rs new file mode 100644 index 00000000..652a8ad4 --- /dev/null +++ b/experimental/mrpc/plugin/policy/delay/src/engine.rs @@ -0,0 +1,212 @@ +use std::collections::VecDeque; +use std::os::unix::ucred::UCred; +use std::pin::Pin; + +use anyhow::{anyhow, Result}; +use futures::future::BoxFuture; +use minstant::Instant; + +use phoenix_api_policy_delay::control_plane; + +use phoenix_common::engine::datapath::message::{EngineTxMessage, RpcMessageTx}; +use phoenix_common::engine::datapath::node::DataPathNode; +use phoenix_common::engine::{future, Decompose, Engine, EngineResult, Indicator, Vertex}; +use phoenix_common::envelop::ResourceDowncast; +use phoenix_common::impl_vertex_for_engine; +use phoenix_common::module::Version; +use phoenix_common::storage::{ResourceCollection, SharedStorage}; + +use super::DatapathError; +use crate::config::DelayConfig; + +pub(crate) struct DelayRpcInfo { + pub(crate) msg: RpcMessageTx, + pub(crate) timestamp: Instant, +} + +pub(crate) struct DelayEngine { + pub(crate) node: DataPathNode, + pub(crate) indicator: Indicator, + pub(crate) config: DelayConfig, + // The probability of delaying an RPC. + pub(crate) delay_probability: f32, + // Delaying time (in ms). + pub(crate) delay_ms: u64, + // The queue to buffer delayed requests. + pub(crate) queue: VecDeque, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Status { + Progress(usize), + Disconnected, +} + +use Status::Progress; + +impl Engine for DelayEngine { + fn activate<'a>(self: Pin<&'a mut Self>) -> BoxFuture<'a, EngineResult> { + Box::pin(async move { self.get_mut().mainloop().await }) + } + + fn description(self: Pin<&Self>) -> String { + "DelayEngine".to_owned() + } + + #[inline] + fn tracker(self: Pin<&mut Self>) -> &mut Indicator { + &mut self.get_mut().indicator + } + + fn handle_request(&mut self, request: Vec, _cred: UCred) -> Result<()> { + let request: control_plane::Request = bincode::deserialize(&request[..])?; + + match request { + control_plane::Request::NewConfig(delay_probability, delay_ms) => { + self.config = DelayConfig { + delay_probability, + delay_ms, + }; + } + } + Ok(()) + } +} + +impl_vertex_for_engine!(DelayEngine, node); + +impl Decompose for DelayEngine { + fn flush(&mut self) -> Result { + let mut work = 0; + while !self.tx_inputs()[0].is_empty() { + if let Progress(n) = self.check_input_queue()? { + work += n; + } + } + while !self.queue.is_empty() { + let DelayRpcInfo { msg, .. } = self.queue.pop_front().unwrap(); + self.tx_outputs()[0].send(EngineTxMessage::RpcMessage(msg))?; + work += 1; + } + Ok(work) + } + + fn decompose( + self: Box, + _shared: &mut SharedStorage, + _global: &mut ResourceCollection, + ) -> (ResourceCollection, DataPathNode) { + let engine = *self; + let mut collections = ResourceCollection::with_capacity(4); + collections.insert("config".to_string(), Box::new(engine.config)); + collections.insert( + "delay_probability".to_string(), + Box::new(engine.delay_probability), + ); + collections.insert("delay_ms".to_string(), Box::new(engine.delay_ms)); + collections.insert("queue".to_string(), Box::new(engine.queue)); + (collections, engine.node) + } +} + +impl DelayEngine { + pub(crate) fn restore( + mut local: ResourceCollection, + node: DataPathNode, + _prev_version: Version, + ) -> Result { + let config = *local + .remove("config") + .unwrap() + .downcast::() + .map_err(|x| anyhow!("fail to downcast, type_name={:?}", x.type_name()))?; + let delay_probability = *local + .remove("delay_probability") + .unwrap() + .downcast::() + .map_err(|x| anyhow!("fail to downcast, type_name={:?}", x.type_name()))?; + let delay_ms = *local + .remove("delay_ms") + .unwrap() + .downcast::() + .map_err(|x| anyhow!("fail to downcast, type_name={:?}", x.type_name()))?; + let queue = *local + .remove("queue") + .unwrap() + .downcast::>() + .map_err(|x| anyhow!("fail to downcast, type_name={:?}", x.type_name()))?; + + let engine = DelayEngine { + node, + indicator: Default::default(), + config, + delay_probability, + delay_ms, + queue, + }; + Ok(engine) + } +} + +impl DelayEngine { + async fn mainloop(&mut self) -> EngineResult { + loop { + let mut work = 0; + loop { + match self.check_input_queue()? { + Progress(0) => break, + Progress(n) => work += n, + Status::Disconnected => return Ok(()), + } + } + self.check_delay_buffer()?; + self.indicator.set_nwork(work); + future::yield_now().await; + } + } +} + +impl DelayEngine { + fn check_delay_buffer(&mut self) -> Result<(), DatapathError> { + while !self.queue.is_empty() { + let oldest_msg = self.queue.pop_front().unwrap(); + if oldest_msg.timestamp.elapsed().as_millis() as u64 > self.delay_ms { + self.tx_outputs()[0].send(EngineTxMessage::RpcMessage(oldest_msg.msg))?; + } else { + self.queue.push_front(oldest_msg); + break; + } + } + Ok(()) + } + + fn check_input_queue(&mut self) -> Result { + use phoenix_common::engine::datapath::TryRecvError; + + match self.tx_inputs()[0].try_recv() { + Ok(msg) => { + match msg { + EngineTxMessage::RpcMessage(msg) => { + if rand::random::() < self.delay_probability { + let delay_msg = DelayRpcInfo { + msg: msg, + timestamp: Instant::now(), + }; + self.queue.push_back(delay_msg); + } else { + self.tx_outputs()[0].send(EngineTxMessage::RpcMessage(msg))?; + } + } + m => self.tx_outputs()[0].send(m)?, + } + return Ok(Progress(1)); + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => { + return Ok(Status::Disconnected); + } + } + + Ok(Progress(0)) + } +} diff --git a/experimental/mrpc/plugin/policy/delay/src/lib.rs b/experimental/mrpc/plugin/policy/delay/src/lib.rs new file mode 100644 index 00000000..901438ea --- /dev/null +++ b/experimental/mrpc/plugin/policy/delay/src/lib.rs @@ -0,0 +1,33 @@ +#![feature(peer_credentials_unix_socket)] +#![feature(ptr_internals)] +#![feature(strict_provenance)] +use thiserror::Error; + +pub use phoenix_common::{InitFnResult, PhoenixAddon}; + +pub mod config; +pub(crate) mod engine; +pub mod module; + +#[derive(Error, Debug)] +pub(crate) enum DatapathError { + #[error("Internal queue send error")] + InternalQueueSend, +} + +use phoenix_common::engine::datapath::SendError; +impl From> for DatapathError { + fn from(_other: SendError) -> Self { + DatapathError::InternalQueueSend + } +} + +use crate::config::DelayConfig; +use crate::module::DelayAddon; + +#[no_mangle] +pub fn init_addon(config_string: Option<&str>) -> InitFnResult> { + let config = DelayConfig::new(config_string)?; + let addon = DelayAddon::new(config); + Ok(Box::new(addon)) +} diff --git a/experimental/mrpc/plugin/policy/delay/src/module.rs b/experimental/mrpc/plugin/policy/delay/src/module.rs new file mode 100644 index 00000000..ec08477a --- /dev/null +++ b/experimental/mrpc/plugin/policy/delay/src/module.rs @@ -0,0 +1,104 @@ +use std::collections::VecDeque; + +use anyhow::{bail, Result}; +use nix::unistd::Pid; + +use phoenix_common::addon::{PhoenixAddon, Version}; +use phoenix_common::engine::datapath::DataPathNode; +use phoenix_common::engine::{Engine, EngineType}; +use phoenix_common::storage::ResourceCollection; + +use super::engine::DelayEngine; +use crate::config::DelayConfig; + +pub(crate) struct DelayEngineBuilder { + node: DataPathNode, + config: DelayConfig, +} + +impl DelayEngineBuilder { + fn new(node: DataPathNode, config: DelayConfig) -> Self { + DelayEngineBuilder { node, config } + } + // TODO! LogFile + fn build(self) -> Result { + Ok(DelayEngine { + node: self.node, + indicator: Default::default(), + config: self.config, + delay_probability: self.config.delay_probability as _, + delay_ms: self.config.delay_ms as _, + queue: VecDeque::new(), + }) + } +} + +pub struct DelayAddon { + config: DelayConfig, +} + +impl DelayAddon { + pub const DELAY_ENGINE: EngineType = EngineType("DelayEngine"); + pub const ENGINES: &'static [EngineType] = &[DelayAddon::DELAY_ENGINE]; +} + +impl DelayAddon { + pub fn new(config: DelayConfig) -> Self { + DelayAddon { config } + } +} + +impl PhoenixAddon for DelayAddon { + fn check_compatibility(&self, _prev: Option<&Version>) -> bool { + true + } + + fn decompose(self: Box) -> ResourceCollection { + let addon = *self; + let mut collections = ResourceCollection::new(); + collections.insert("config".to_string(), Box::new(addon.config)); + collections + } + + #[inline] + fn migrate(&mut self, _prev_addon: Box) {} + + fn engines(&self) -> &[EngineType] { + DelayAddon::ENGINES + } + + fn update_config(&mut self, config: &str) -> Result<()> { + self.config = toml::from_str(config)?; + Ok(()) + } + + fn create_engine( + &mut self, + ty: EngineType, + _pid: Pid, + node: DataPathNode, + ) -> Result> { + if ty != DelayAddon::DELAY_ENGINE { + bail!("invalid engine type {:?}", ty) + } + + let builder = DelayEngineBuilder::new(node, self.config); + let engine = builder.build()?; + Ok(Box::new(engine)) + } + + fn restore_engine( + &mut self, + ty: EngineType, + local: ResourceCollection, + node: DataPathNode, + prev_version: Version, + ) -> Result> { + if ty != DelayAddon::DELAY_ENGINE { + bail!("invalid engine type {:?}", ty) + } + + let engine = DelayEngine::restore(local, node, prev_version)?; + Ok(Box::new(engine)) + } +} diff --git a/experimental/mrpc/plugin/policy/logging/src/config.rs b/experimental/mrpc/plugin/policy/logging/src/config.rs index f6270fe7..4be66f50 100644 --- a/experimental/mrpc/plugin/policy/logging/src/config.rs +++ b/experimental/mrpc/plugin/policy/logging/src/config.rs @@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize}; #[serde(deny_unknown_fields)] pub struct LoggingConfig {} - impl LoggingConfig { /// Get config from toml file pub fn new(config: Option<&str>) -> anyhow::Result { @@ -21,7 +20,7 @@ impl LoggingConfig { } /// Create a log file in `/tmp/phoenix/log` -/// This function will be called every time +/// This function will be called every time /// a logging engine is started or restored pub fn create_log_file() -> std::fs::File { std::fs::create_dir_all("/tmp/phoenix/log").expect("mkdir failed"); diff --git a/experimental/mrpc/plugin/policy/null/src/config.rs b/experimental/mrpc/plugin/policy/null/src/config.rs index f7e7a8db..5a93ab2f 100644 --- a/experimental/mrpc/plugin/policy/null/src/config.rs +++ b/experimental/mrpc/plugin/policy/null/src/config.rs @@ -1,7 +1,6 @@ use serde::{Deserialize, Serialize}; - -/// TODO Add your own config(parameter) here +/// TODO Add your own config(parameter) here #[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct NullConfig {}