Skip to content

Commit

Permalink
Add a Delay Engine (#237)
Browse files Browse the repository at this point in the history
* feat(engine): delay

Delay each RPC for 100ms with probability 0.2

* refactor(engine): add a buffer in delay engine

* feat: configurable delay_probability & delay_ms

---------

Co-authored-by: starling <[email protected]>
  • Loading branch information
Kristoff-starling and starling authored Jul 14, 2023
1 parent b924a77 commit c1073f1
Show file tree
Hide file tree
Showing 19 changed files with 594 additions and 4 deletions.
12 changes: 12 additions & 0 deletions eval/policy/delay/attach.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
addon_engine = "DelayEngine"
tx_channels_replacements = [
["MrpcEngine", "DelayEngine", 0, 0],
["DelayEngine", "TcpRpcAdapterEngine", 0, 0],
]
rx_channels_replacements = []
group = ["MrpcEngine", "TcpRpcAdapterEngine"]
op = "attach"
config_string = '''
delay_probability = 0.2
delay_ms = 100
'''
47 changes: 47 additions & 0 deletions eval/policy/delay/collect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python3
from typing import List
import glob
import sys

OD = "/tmp/mrpc-eval"
if len(sys.argv) >= 2:
OD = sys.argv[1]


def convert_msg_size(s: str) -> int:
if s.endswith('gb'):
return int(s[:-2]) * 1024 * 1024 * 1024
if s.endswith('mb'):
return int(s[:-2]) * 1024 * 1024
if s.endswith('kb'):
return int(s[:-2]) * 1024
if s.endswith('b'):
return int(s[:-1])

raise ValueError(f"unknown input: {s}")


def get_rate(path: str) -> List[float]:
rates = []
with open(path, 'r') as fin:
for line in fin:
words = line.strip().split(' ')
if words[-3] == 'rps,':
rate = float(words[-4])
rates.append(rate)
return rates[1:]


def load_result(sol_before, sol_after, f: str):
# print(f)
rates = get_rate(f)
before = rates[5:25]
after = rates[-25:-5]
for r in before:
print(f'{round(r/1000,2)},{sol_before},w/o Fault')
for r in after:
print(f'{round(r/1000,2)},{sol_after},w/ Fault')


for f in glob.glob(OD+"/policy/delay/rpc_bench_tput_32b/rpc_bench_client_danyang-04.stdout"):
load_result('mRPC', 'Native mRPC', f)
9 changes: 9 additions & 0 deletions eval/policy/delay/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
workdir = "~/nfs/Developing/livingshade/phoenix/experimental/mrpc"

[env]
RUST_BACKTRACE = "1"
RUST_LOG_STYLE = "never"
CARGO_TERM_COLOR = "never"
PHOENIX_LOG = "info"
PROTOC = "/usr/bin/protoc"
PHOENIX_PREFIX = "/tmp/phoenix"
4 changes: 4 additions & 0 deletions eval/policy/delay/detach.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
addon_engine = "DelayEngine"
tx_channels_replacements = [["MrpcEngine", "TcpRpcAdapterEngine", 0, 0]]
rx_channels_replacements = [["TcpRpcAdapterEngine", "MrpcEngine", 0, 0]]
op = "detach"
15 changes: 15 additions & 0 deletions eval/policy/delay/rpc_bench_tput_32b.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name = "policy/delay/rpc_bench_tput_32b"
description = "Run rpc_bench benchmark"
group = "delay"
timeout_secs = 70

[[worker]]
host = "danyang-06"
bin = "rpc_bench_server"
args = "--port 5002 -l info --transport tcp"

[[worker]]
host = "danyang-04"
bin = "rpc_bench_client"
args = "--transport tcp -c rdma0.danyang-06 --concurrency 128 --req-size 32 --duration 65 -i 1 --port 5002 -l error"
dependencies = [0]
29 changes: 29 additions & 0 deletions eval/policy/delay/start_traffic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/usr/bin/env bash

trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM SIGHUP EXIT

OD=/tmp/mrpc-eval
if [[ $# -ge 1 ]]; then
OD=$1
fi

WORKDIR=$(dirname $(realpath $0))
cd $WORKDIR

# concurrency = 128
cargo rr --bin launcher -- --output-dir ${OD} --timeout=120 --benchmark ./rpc_bench_tput_32b.toml --configfile ./config.toml &

sleep 30

LIST_OUTPUT="${OD}"/policy/list.json
cargo rr --bin list -- --dump "${LIST_OUTPUT}" # Need to specifiy PHOENIX_PREFIX
cat "${LIST_OUTPUT}"
ARG_PID=$(cat "${LIST_OUTPUT}" | jq '.[] | select(.service == "Mrpc") | .pid')
ARG_SID=$(cat "${LIST_OUTPUT}" | jq '.[] | select(.service == "Mrpc") | .sid')
echo $ARG_SID

sleep 1

cargo run --bin addonctl -- --config ./attach.toml --pid ${ARG_PID} --sid ${ARG_SID} # Need to specifiy PHOENIX_PREFIX

wait
34 changes: 34 additions & 0 deletions experimental/mrpc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions experimental/mrpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ members = [
"phoenix-api/policy/nofile-logging",
"phoenix-api/policy/fault",
"phoenix-api/policy/fault2",
"phoenix-api/policy/delay",
# the pheonix plugins
"plugin/mrpc",
"plugin/rpc_adapter",
Expand All @@ -74,6 +75,7 @@ members = [
"plugin/policy/nofile-logging",
"plugin/policy/fault",
"plugin/policy/fault2",
"plugin/policy/delay",
# examples
"examples/rpc_echo",
"examples/rpc_bench",
Expand Down Expand Up @@ -101,6 +103,7 @@ phoenix-api-policy-hello-acl = { path = "phoenix-api/policy/hello-acl" }
phoenix-api-policy-nofile-logging = { path = "phoenix-api/policy/nofile-logging" }
phoenix-api-policy-fault = { path = "phoenix-api/policy/fault" }
phoenix-api-policy-fault2 = { path = "phoenix-api/policy/fault2" }
phoenix-api-policy-delay = { path = "phoenix-api/policy/delay" }

mrpc-build = { path = "mrpc-build" }
mrpc-derive = { path = "mrpc-derive" }
Expand Down
8 changes: 8 additions & 0 deletions experimental/mrpc/load-mrpc-plugins.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,11 @@ name = "Fault2"
lib_path = "plugins/libphoenix_fault2.rlib"
config_string = '''
'''

[[addons]]
name = "Delay"
lib_path = "plugins/libphoenix_delay.rlib"
config_string = '''
delay_probability = 0.2
delay_ms = 100
'''
14 changes: 14 additions & 0 deletions experimental/mrpc/phoenix-api/policy/delay/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

[package]
name = "phoenix-api-policy-delay"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
phoenix-api.workspace = true

serde.workspace = true
itertools.workspace = true
rand.workspace = true
14 changes: 14 additions & 0 deletions experimental/mrpc/phoenix-api/policy/delay/src/control_plane.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use serde::{Deserialize, Serialize};

type IResult<T> = Result<T, phoenix_api::Error>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Request {
NewConfig(f32, u64),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ResponseKind {}

#[derive(Debug, Serialize, Deserialize)]
pub struct Response(pub IResult<ResponseKind>);
1 change: 1 addition & 0 deletions experimental/mrpc/phoenix-api/policy/delay/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod control_plane;
28 changes: 28 additions & 0 deletions experimental/mrpc/plugin/policy/delay/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

[package]
name = "phoenix-delay"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
phoenix_common.workspace = true
phoenix-api-policy-delay.workspace = true
mrpc-marshal.workspace = true
mrpc-derive.workspace = true
shm.workspace = true
phoenix-api = { workspace = true, features = ["mrpc"] }

futures.workspace = true
minstant.workspace = true
thiserror.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
anyhow.workspace = true
nix.workspace = true
toml = { workspace = true, features = ["preserve_order"] }
bincode.workspace = true
chrono.workspace = true
itertools.workspace = true
rand.workspace = true
25 changes: 25 additions & 0 deletions experimental/mrpc/plugin/policy/delay/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct DelayConfig {
pub delay_probability: f32,
pub delay_ms: u64,
}

impl Default for DelayConfig {
fn default() -> Self {
DelayConfig {
delay_probability: 0.2,
delay_ms: 100,
}
}
}

impl DelayConfig {
/// Get config from toml file
pub fn new(config: Option<&str>) -> anyhow::Result<Self> {
let config = toml::from_str(config.unwrap_or(""))?;
Ok(config)
}
}
Loading

0 comments on commit c1073f1

Please sign in to comment.