From b5b8c2f5b1f5d48becef54c58ee07f106f348ea1 Mon Sep 17 00:00:00 2001 From: banruo Date: Sat, 2 Dec 2023 18:02:49 +0000 Subject: [PATCH] add server side impls --- experimental/mrpc/Cargo.lock | 116 +++++++++++++ .../plugin/policy/fault-server/Cargo.toml | 4 +- .../plugin/policy/fault-server/src/config.rs | 2 +- .../plugin/policy/fault-server/src/engine.rs | 132 +++++++------- .../plugin/policy/fault-server/src/module.rs | 19 +- .../plugin/policy/logging-server/Cargo.toml | 4 +- .../policy/logging-server/src/config.rs | 6 +- .../policy/logging-server/src/engine.rs | 78 +++------ .../plugin/policy/logging-server/src/lib.rs | 2 +- .../policy/logging-server/src/module.rs | 8 +- .../plugin/policy/metrics-server/Cargo.toml | 4 +- .../policy/metrics-server/src/engine.rs | 32 ++-- .../policy/metrics-server/src/module.rs | 8 +- .../plugin/policy/mutation-server/Cargo.toml | 4 +- .../policy/mutation-server/src/engine.rs | 43 ++--- .../policy/mutation-server/src/module.rs | 8 +- .../policy/ratelimit-drop-server/Cargo.toml | 14 +- .../ratelimit-drop-server/src/config.rs | 24 +++ .../ratelimit-drop-server/src/engine.rs | 164 ++++++++++-------- .../policy/ratelimit-drop-server/src/lib.rs | 7 +- .../ratelimit-drop-server/src/module.rs | 23 ++- 21 files changed, 425 insertions(+), 277 deletions(-) diff --git a/experimental/mrpc/Cargo.lock b/experimental/mrpc/Cargo.lock index 34b364fd..f862caf6 100644 --- a/experimental/mrpc/Cargo.lock +++ b/experimental/mrpc/Cargo.lock @@ -1927,6 +1927,30 @@ dependencies = [ "toml", ] +[[package]] +name = "phoenix-fault-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "chrono", + "futures", + "itertools", + "minstant", + "mrpc-derive", + "mrpc-marshal", + "nix", + "phoenix-api", + "phoenix-api-policy-fault-server", + "phoenix_common", + "rand 0.8.5", + "serde", + "serde_json", + "shm", + "thiserror", + "toml", +] + [[package]] name = "phoenix-fault2" version = "0.1.0" @@ -2096,6 +2120,30 @@ dependencies = [ "toml", ] +[[package]] +name = "phoenix-logging-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "chrono", + "futures", + "itertools", + "minstant", + "mrpc-derive", + "mrpc-marshal", + "nix", + "phoenix-api", + "phoenix-api-policy-logging-server", + "phoenix_common", + "rand 0.8.5", + "serde", + "serde_json", + "shm", + "thiserror", + "toml", +] + [[package]] name = "phoenix-metrics" version = "0.1.0" @@ -2118,6 +2166,28 @@ dependencies = [ "toml", ] +[[package]] +name = "phoenix-metrics-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "fnv", + "futures", + "minstant", + "mrpc-derive", + "mrpc-marshal", + "nix", + "phoenix-api", + "phoenix-api-policy-metrics-server", + "phoenix_common", + "serde", + "serde_json", + "shm", + "thiserror", + "toml", +] + [[package]] name = "phoenix-mrpc" version = "0.1.0" @@ -2204,6 +2274,28 @@ dependencies = [ "toml", ] +[[package]] +name = "phoenix-mutation-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "fnv", + "futures", + "minstant", + "mrpc-derive", + "mrpc-marshal", + "nix", + "phoenix-api", + "phoenix-api-policy-mutation-server", + "phoenix_common", + "serde", + "serde_json", + "shm", + "thiserror", + "toml", +] + [[package]] name = "phoenix-nofile-logging" version = "0.1.0" @@ -2301,6 +2393,30 @@ dependencies = [ "toml", ] +[[package]] +name = "phoenix-ratelimit-drop-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "chrono", + "futures", + "itertools", + "minstant", + "mrpc-derive", + "mrpc-marshal", + "nix", + "phoenix-api", + "phoenix-api-policy-ratelimit-drop-server", + "phoenix_common", + "rand 0.8.5", + "serde", + "serde_json", + "shm", + "thiserror", + "toml", +] + [[package]] name = "phoenix-rpc-adapter" version = "0.1.0" diff --git a/experimental/mrpc/plugin/policy/fault-server/Cargo.toml b/experimental/mrpc/plugin/policy/fault-server/Cargo.toml index 1bf2d0ec..ad5454f2 100644 --- a/experimental/mrpc/plugin/policy/fault-server/Cargo.toml +++ b/experimental/mrpc/plugin/policy/fault-server/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "phoenix-FaultServer-server" +name = "phoenix-fault-server" version = "0.1.0" edition = "2021" @@ -8,7 +8,7 @@ edition = "2021" [dependencies] phoenix_common.workspace = true -phoenix-api-policy-FaultServer.workspace = true +phoenix-api-policy-fault-server.workspace = true mrpc-marshal.workspace = true mrpc-derive.workspace = true shm.workspace = true diff --git a/experimental/mrpc/plugin/policy/fault-server/src/config.rs b/experimental/mrpc/plugin/policy/fault-server/src/config.rs index 199a271b..fa8605e5 100644 --- a/experimental/mrpc/plugin/policy/fault-server/src/config.rs +++ b/experimental/mrpc/plugin/policy/fault-server/src/config.rs @@ -6,7 +6,7 @@ use chrono::prelude::*; use itertools::iproduct; use rand::Rng; -#[derive(Debug, Clone, Copy, DeFaultServer, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct FaultServerConfig {} diff --git a/experimental/mrpc/plugin/policy/fault-server/src/engine.rs b/experimental/mrpc/plugin/policy/fault-server/src/engine.rs index 8a939fb2..b55b016d 100644 --- a/experimental/mrpc/plugin/policy/fault-server/src/engine.rs +++ b/experimental/mrpc/plugin/policy/fault-server/src/engine.rs @@ -1,18 +1,19 @@ use anyhow::{anyhow, Result}; use futures::future::BoxFuture; -use phoenix_api::rpc::{RpcId, TransportStatus}; +use phoenix_api::rpc::{RpcId, StatusCode, TransportStatus}; use std::fmt; use std::fs::File; use std::io::Write; use std::num::NonZeroU32; use std::os::unix::ucred::UCred; use std::pin::Pin; +use std::ptr::Unique; -use phoenix_api_policy_FaultServer::control_plane; - +use phoenix_api_policy_fault_server::control_plane; use phoenix_common::engine::datapath::message::{ EngineRxMessage, EngineTxMessage, RpcMessageGeneral, }; +use phoenix_common::engine::datapath::meta_pool::MetaBufferPool; use phoenix_common::engine::datapath::node::DataPathNode; use phoenix_common::engine::{future, Decompose, Engine, EngineResult, Indicator, Vertex}; @@ -21,7 +22,7 @@ use phoenix_common::impl_vertex_for_engine; use phoenix_common::log; use phoenix_common::module::Version; -use phoenix_common::engine::datapath::RpcMessageTx; +use phoenix_common::engine::datapath::{RpcMessageRx, RpcMessageTx}; use phoenix_common::storage::{ResourceCollection, SharedStorage}; use super::DatapathError; @@ -44,6 +45,7 @@ pub(crate) struct FaultServerEngine { pub(crate) node: DataPathNode, pub(crate) indicator: Indicator, pub(crate) config: FaultServerConfig, + pub(crate) meta_buf_pool: MetaBufferPool, pub(crate) var_probability: f32, } @@ -102,6 +104,7 @@ impl Decompose for FaultServerEngine { let engine = *self; let mut collections = ResourceCollection::with_capacity(4); collections.insert("config".to_string(), Box::new(engine.config)); + collections.insert("meta_buf_pool".to_string(), Box::new(engine.meta_buf_pool)); (collections, engine.node) } } @@ -117,12 +120,18 @@ impl FaultServerEngine { .unwrap() .downcast::() .map_err(|x| anyhow!("fail to downcast, type_name={:?}", x.type_name()))?; + let meta_buf_pool = *local + .remove("meta_buf_pool") + .unwrap() + .downcast::() + .map_err(|x| anyhow!("fail to downcast, type_name={:?}", x.type_name()))?; let var_probability = 0.01; let engine = FaultServerEngine { node, - indicator: DeFaultServer::deFaultServer(), + indicator: Default::default(), config, + meta_buf_pool, var_probability, }; Ok(engine) @@ -153,78 +162,79 @@ fn materialize_nocopy(msg: &RpcMessageTx) -> &hello::HelloRequest { return req; } +/// Copy the RPC request to a private heap and returns the request. +#[inline] +fn materialize_rx(msg: &RpcMessageRx) -> Box { + let req_ptr = Unique::new(msg.addr_backend as *mut hello::HelloRequest).unwrap(); + let req = unsafe { req_ptr.as_ref() }; + // returns a private_req + Box::new(req.clone()) +} + impl FaultServerEngine { fn check_input_queue(&mut self) -> Result { use phoenix_common::engine::datapath::TryRecvError; match self.tx_inputs()[0].try_recv() { Ok(msg) => { - match msg { - EngineTxMessage::RpcMessage(msg) => { - let meta_ref = unsafe { &*msg.meta_buf_ptr.as_meta_ptr() }; - let mut input = Vec::new(); - input.push(msg); - let output: Vec<_> = input - .iter() - .map(|msg| { - let rpc_message = materialize_nocopy(&msg); - let conn_id = unsafe { &*msg.meta_buf_ptr.as_meta_ptr() }.conn_id; - let call_id = unsafe { &*msg.meta_buf_ptr.as_meta_ptr() }.call_id; - let rpc_id = RpcId::new(conn_id, call_id); - if rand::random::() < self.var_probability { - let error = EngineRxMessage::Ack( - rpc_id, - TransportStatus::Error(unsafe { - NonZeroU32::new_unchecked(403) - }), - ); - RpcMessageGeneral::RxMessage(error) - } else { - let raw_ptr: *const hello::HelloRequest = rpc_message; - let new_msg = RpcMessageTx { - meta_buf_ptr: msg.meta_buf_ptr.clone(), - addr_backend: raw_ptr.addr(), - }; - RpcMessageGeneral::TxMessage(EngineTxMessage::RpcMessage( - new_msg, - )) - } - }) - .collect(); - - for msg in output { - match msg { - RpcMessageGeneral::TxMessage(msg) => { - self.tx_outputs()[0].send(msg)?; - } - RpcMessageGeneral::RxMessage(msg) => { - self.rx_outputs()[0].send(msg)?; - } - _ => {} - } - } - } - m => self.tx_outputs()[0].send(m)?, - } + self.tx_outputs()[0].send(msg)?; return Ok(Progress(1)); } Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => { - return Ok(Status::Disconnected); - } + Err(TryRecvError::Disconnected) => return Ok(Status::Disconnected), } match self.rx_inputs()[0].try_recv() { - Ok(msg) => { - match msg { + Ok(m) => { + match m { EngineRxMessage::Ack(rpc_id, status) => { - // todo - self.rx_outputs()[0].send(EngineRxMessage::Ack(rpc_id, status))?; + if let Ok(()) = self.meta_buf_pool.release(rpc_id) { + // log::info!( + // "Access denied ack received, rpc_id: {:?} metabuf released", + // rpc_id + // ); + } else { + // log::info!("release failed!: {:?}", rpc_id); + self.rx_outputs()[0].send(m)?; + } } EngineRxMessage::RpcMessage(msg) => { - self.rx_outputs()[0].send(EngineRxMessage::RpcMessage(msg))?; + let private_req = materialize_rx(&msg); + if rand::random::() < self.var_probability { + // We need to copy meta, add it to meta_buf_pool, and send it as the tx msg + // Is there better way to do this and avoid unsafe? + let mut meta = unsafe { msg.meta.as_ref().clone() }; + meta.status_code = StatusCode::AccessDenied; + let mut meta_ptr = self + .meta_buf_pool + .obtain(RpcId(meta.conn_id, meta.call_id)) + .expect("meta_buf_pool is full"); + unsafe { + meta_ptr.as_meta_ptr().write(meta); + meta_ptr.0.as_mut().num_sge = 0; + meta_ptr.0.as_mut().value_len = 0; + } + let rpc_msg = RpcMessageTx { + meta_buf_ptr: meta_ptr, + addr_backend: 0, + }; + let new_msg = EngineTxMessage::RpcMessage(rpc_msg); + self.tx_outputs()[0] + .send(new_msg) + .expect("send new message error"); + let msg_call_ids = + [meta.call_id, meta.call_id, meta.call_id, meta.call_id]; + self.tx_outputs()[0].send(EngineTxMessage::ReclaimRecvBuf( + meta.conn_id, + msg_call_ids, + ))?; + } else { + self.rx_outputs()[0].send(EngineRxMessage::RpcMessage(msg))?; + } + } + EngineRxMessage::RecvError(_, _) => { + self.rx_outputs()[0].send(m)?; } - m => self.rx_outputs()[0].send(m)?, } return Ok(Progress(1)); } diff --git a/experimental/mrpc/plugin/policy/fault-server/src/module.rs b/experimental/mrpc/plugin/policy/fault-server/src/module.rs index f986450f..a950423e 100644 --- a/experimental/mrpc/plugin/policy/fault-server/src/module.rs +++ b/experimental/mrpc/plugin/policy/fault-server/src/module.rs @@ -1,14 +1,14 @@ use anyhow::{bail, Result}; use nix::unistd::Pid; +use super::engine::FaultServerEngine; +use crate::config::{create_log_file, FaultServerConfig}; use phoenix_common::addon::{PhoenixAddon, Version}; +use phoenix_common::engine::datapath::meta_pool::MetaBufferPool; use phoenix_common::engine::datapath::DataPathNode; use phoenix_common::engine::{Engine, EngineType}; use phoenix_common::storage::ResourceCollection; -use super::engine::FaultServerEngine; -use crate::config::{create_log_file, FaultServerConfig}; - use chrono::prelude::*; use itertools::iproduct; use rand::Rng; @@ -25,11 +25,12 @@ impl FaultServerEngineBuilder { // TODO! LogFile fn build(self) -> Result { let var_probability = 0.01; - + const META_BUFFER_POOL_CAP: usize = 128; Ok(FaultServerEngine { node: self.node, - indicator: DeFaultServer::deFaultServer(), + indicator: Default::default(), config: self.config, + meta_buf_pool: MetaBufferPool::new(META_BUFFER_POOL_CAP), var_probability, }) } @@ -40,8 +41,8 @@ pub struct FaultServerAddon { } impl FaultServerAddon { - pub const FaultServer_ENGINE: EngineType = EngineType("FaultServerEngine"); - pub const ENGINES: &'static [EngineType] = &[FaultServerAddon::FaultServer_ENGINE]; + pub const FAULT_SERVER_ENGINE: EngineType = EngineType("FaultServerEngine"); + pub const ENGINES: &'static [EngineType] = &[FaultServerAddon::FAULT_SERVER_ENGINE]; } impl FaultServerAddon { @@ -80,7 +81,7 @@ impl PhoenixAddon for FaultServerAddon { _pid: Pid, node: DataPathNode, ) -> Result> { - if ty != FaultServerAddon::FaultServer_ENGINE { + if ty != FaultServerAddon::FAULT_SERVER_ENGINE { bail!("invalid engine type {:?}", ty) } @@ -96,7 +97,7 @@ impl PhoenixAddon for FaultServerAddon { node: DataPathNode, prev_version: Version, ) -> Result> { - if ty != FaultServerAddon::FaultServer_ENGINE { + if ty != FaultServerAddon::FAULT_SERVER_ENGINE { bail!("invalid engine type {:?}", ty) } diff --git a/experimental/mrpc/plugin/policy/logging-server/Cargo.toml b/experimental/mrpc/plugin/policy/logging-server/Cargo.toml index e058ccd2..d510ca4d 100644 --- a/experimental/mrpc/plugin/policy/logging-server/Cargo.toml +++ b/experimental/mrpc/plugin/policy/logging-server/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "phoenix-LoggingServer" +name = "phoenix-logging-server" version = "0.1.0" edition = "2021" @@ -7,7 +7,7 @@ edition = "2021" [dependencies] phoenix_common.workspace = true -phoenix-api-policy-LoggingServer.workspace = true +phoenix-api-policy-logging-server.workspace = true mrpc-marshal.workspace = true mrpc-derive.workspace = true shm.workspace = true diff --git a/experimental/mrpc/plugin/policy/logging-server/src/config.rs b/experimental/mrpc/plugin/policy/logging-server/src/config.rs index 5ba0472b..f6b8c2f9 100644 --- a/experimental/mrpc/plugin/policy/logging-server/src/config.rs +++ b/experimental/mrpc/plugin/policy/logging-server/src/config.rs @@ -6,7 +6,7 @@ use chrono::{Datelike, Timelike, Utc}; use phoenix_common::log; use serde::{Deserialize, Serialize}; -/// currently, LoggingServer engine does not need a config +/// currently, logging engine does not need a config #[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct LoggingServerConfig {} @@ -21,7 +21,7 @@ impl LoggingServerConfig { /// Create a log file in `/tmp/phoenix/log` /// This function will be called every time -/// a LoggingServer engine is started or restored +/// a logging engine is started or restored pub fn create_log_file() -> std::fs::File { std::fs::create_dir_all("/tmp/phoenix/log").expect("mkdir failed"); let now = Utc::now(); @@ -34,7 +34,7 @@ pub fn create_log_file() -> std::fs::File { now.minute(), now.second() ); - let file_name = format!("/tmp/phoenix/log/LoggingServer_engine_{}.log", date_string); + let file_name = format!("/tmp/phoenix/log/logging_engine_{}.log", date_string); log::info!("create log file {}", file_name); let log_file = std::fs::File::create(file_name).expect("create file failed"); log_file diff --git a/experimental/mrpc/plugin/policy/logging-server/src/engine.rs b/experimental/mrpc/plugin/policy/logging-server/src/engine.rs index 0b219a85..a1f4fbde 100644 --- a/experimental/mrpc/plugin/policy/logging-server/src/engine.rs +++ b/experimental/mrpc/plugin/policy/logging-server/src/engine.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result}; use chrono::Utc; use futures::future::BoxFuture; -use phoenix_api_policy_LoggingServer::control_plane; +use phoenix_api_policy_logging_server::control_plane; use phoenix_common::engine::datapath::RpcMessageTx; use std::io::Write; use std::os::unix::ucred::UCred; @@ -24,7 +24,7 @@ pub mod hello { include!("proto.rs"); } -/// The internal state of an LoggingServer engine, +/// The internal state of an logging engine, /// it contains some template fields like `node`, `indicator`, /// a config field, in that case `LoggingServerConfig` /// and other custome fields like `log_file @@ -157,81 +157,49 @@ fn materialize_nocopy(msg: &RpcMessageTx) -> &hello::HelloRequest { impl LoggingServerEngine { /// main logic about handling rx & tx input messages - /// note that a LoggingServer engine can be deployed in client-side or server-side + /// note that a logging engine can be deployed in client-side or server-side fn check_input_queue(&mut self) -> Result { use phoenix_common::engine::datapath::TryRecvError; - // tx logic - // For server it is `On-Response` logic, when sending response to network - // For client it is `On-Request` logic, when sending request to network match self.tx_inputs()[0].try_recv() { Ok(msg) => { - match msg { - // we care only log RPCs - // other types like ACK should not be logged since they are not - // ACKs between Client/Server, but communication between engines - // "Real" ACKs are logged in rx logic - EngineTxMessage::RpcMessage(msg) => { - // we get the metadata of RPC from the shared memory - let meta_ref = unsafe { &*msg.meta_buf_ptr.as_meta_ptr() }; - let rpc_message = materialize_nocopy(&msg); + self.tx_outputs()[0].send(msg)?; + return Ok(Progress(1)); + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => return Ok(Status::Disconnected), + } + + match self.rx_inputs()[0].try_recv() { + Ok(m) => { + match m { + EngineRxMessage::Ack(rpc_id, _status) => { + self.rx_outputs()[0].send(m)?; + } + EngineRxMessage::RpcMessage(msg) => { + let meta_ref = unsafe { msg.meta.as_ref() }; // write the metadata into the file // since meta_ref implements Debug, we can use {:?} // rather than manully parse the metadata struct write!( self.log_file, - "{}{}{}{}{}\n", + "{}{}{}{}\n", Utc::now(), format!("{:?}", meta_ref.msg_type), format!("{:?}", meta_ref.conn_id), format!("{:?}", meta_ref.conn_id), - format!("{}", String::from_utf8_lossy(&rpc_message.name)), ) .unwrap(); - - // after LoggingServer, we forward the message to the next engine - self.tx_outputs()[0].send(EngineTxMessage::RpcMessage(msg))?; - } - // if received message is not RPC, we simple forward it - m => self.tx_outputs()[0].send(m)?, - } - return Ok(Progress(1)); - } - Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => { - return Ok(Status::Disconnected); - } - } - - // tx logic - // For server it is `On-Request` logic, when recving request from network - // For client it is `On-Response` logic, when recving response from network - match self.rx_inputs()[0].try_recv() { - Ok(msg) => { - match msg { - // ACK means that - // If I am client: server received my request - // If I am server: client recevied my response - EngineRxMessage::Ack(rpc_id, status) => { - // log the info to the file - // forward the message - self.rx_outputs()[0].send(EngineRxMessage::Ack(rpc_id, status))?; - } - EngineRxMessage::RpcMessage(msg) => { - // forward the message - // again, this RpcMessage is not the application-level rpc - // so we don log them self.rx_outputs()[0].send(EngineRxMessage::RpcMessage(msg))?; } - // forward other unknown msg - m => self.rx_outputs()[0].send(m)?, + EngineRxMessage::RecvError(_, _) => { + self.rx_outputs()[0].send(m)?; + } } return Ok(Progress(1)); } Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => { - return Ok(Status::Disconnected); - } + Err(TryRecvError::Disconnected) => return Ok(Status::Disconnected), } Ok(Progress(0)) diff --git a/experimental/mrpc/plugin/policy/logging-server/src/lib.rs b/experimental/mrpc/plugin/policy/logging-server/src/lib.rs index d036b86a..cfcbc595 100644 --- a/experimental/mrpc/plugin/policy/logging-server/src/lib.rs +++ b/experimental/mrpc/plugin/policy/logging-server/src/lib.rs @@ -1,5 +1,5 @@ //! template file for export - +#![feature(ptr_internals)] #![feature(peer_credentials_unix_socket)] use thiserror::Error; diff --git a/experimental/mrpc/plugin/policy/logging-server/src/module.rs b/experimental/mrpc/plugin/policy/logging-server/src/module.rs index 11fd6475..31fd42fb 100644 --- a/experimental/mrpc/plugin/policy/logging-server/src/module.rs +++ b/experimental/mrpc/plugin/policy/logging-server/src/module.rs @@ -39,8 +39,8 @@ pub struct LoggingServerAddon { } impl LoggingServerAddon { - pub const LoggingServer_ENGINE: EngineType = EngineType("LoggingServerEngine"); - pub const ENGINES: &'static [EngineType] = &[LoggingServerAddon::LoggingServer_ENGINE]; + pub const LOGGING_ENGINE: EngineType = EngineType("LoggingServerEngine"); + pub const ENGINES: &'static [EngineType] = &[LoggingServerAddon::LOGGING_ENGINE]; } impl LoggingServerAddon { @@ -79,7 +79,7 @@ impl PhoenixAddon for LoggingServerAddon { _pid: Pid, node: DataPathNode, ) -> Result> { - if ty != LoggingServerAddon::LoggingServer_ENGINE { + if ty != LoggingServerAddon::LOGGING_ENGINE { bail!("invalid engine type {:?}", ty) } @@ -95,7 +95,7 @@ impl PhoenixAddon for LoggingServerAddon { node: DataPathNode, prev_version: Version, ) -> Result> { - if ty != LoggingServerAddon::LoggingServer_ENGINE { + if ty != LoggingServerAddon::LOGGING_ENGINE { bail!("invalid engine type {:?}", ty) } diff --git a/experimental/mrpc/plugin/policy/metrics-server/Cargo.toml b/experimental/mrpc/plugin/policy/metrics-server/Cargo.toml index b5118932..560932f9 100644 --- a/experimental/mrpc/plugin/policy/metrics-server/Cargo.toml +++ b/experimental/mrpc/plugin/policy/metrics-server/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "phoenix-metrics" +name = "phoenix-metrics-server" version = "0.1.0" edition = "2021" @@ -7,7 +7,7 @@ edition = "2021" [dependencies] -phoenix-api-policy-metrics.workspace = true +phoenix-api-policy-metrics-server.workspace = true mrpc-marshal.workspace = true mrpc-derive.workspace = true diff --git a/experimental/mrpc/plugin/policy/metrics-server/src/engine.rs b/experimental/mrpc/plugin/policy/metrics-server/src/engine.rs index 138fef39..22f6ea1a 100644 --- a/experimental/mrpc/plugin/policy/metrics-server/src/engine.rs +++ b/experimental/mrpc/plugin/policy/metrics-server/src/engine.rs @@ -9,7 +9,7 @@ use fnv::FnvHashMap as HashMap; use futures::future::BoxFuture; use phoenix_api::rpc::{RpcId, StatusCode, TransportStatus}; -use phoenix_api_policy_MetricsServer::control_plane; +use phoenix_api_policy_metrics_server::control_plane; use phoenix_common::engine::datapath::message::{EngineRxMessage, EngineTxMessage, RpcMessageTx}; use phoenix_common::engine::datapath::node::DataPathNode; @@ -156,7 +156,20 @@ impl MetricsServerEngine { match self.tx_inputs()[0].try_recv() { Ok(msg) => { - self.tx_outputs()[0].send(msg)?; + match msg { + EngineTxMessage::RpcMessage(msg) => { + let meta = unsafe { &*msg.meta_buf_ptr.as_meta_ptr() }; + if meta.status_code == StatusCode::Success { + self.num_succ += 1; + } else { + self.num_rej += 1; + } + self.tx_outputs()[0].send(EngineTxMessage::RpcMessage(msg))?; + } + m => { + self.tx_outputs()[0].send(m)?; + } + } return Ok(Progress(1)); } Err(TryRecvError::Empty) => {} @@ -166,20 +179,7 @@ impl MetricsServerEngine { // forward all rx msgs match self.rx_inputs()[0].try_recv() { Ok(m) => { - match m { - EngineRxMessage::RpcMessage(msg) => { - let meta = unsafe { &*msg.meta.as_ptr() }; - if meta.status_code == StatusCode::Success { - self.num_succ += 1; - } else { - self.num_rej += 1; - } - self.rx_outputs()[0].send(EngineRxMessage::RpcMessage(msg))?; - } - m => { - self.rx_outputs()[0].send(m)?; - } - }; + self.rx_outputs()[0].send(m)?; return Ok(Progress(1)); } Err(TryRecvError::Empty) => {} diff --git a/experimental/mrpc/plugin/policy/metrics-server/src/module.rs b/experimental/mrpc/plugin/policy/metrics-server/src/module.rs index d1225bce..e20b9246 100644 --- a/experimental/mrpc/plugin/policy/metrics-server/src/module.rs +++ b/experimental/mrpc/plugin/policy/metrics-server/src/module.rs @@ -36,8 +36,8 @@ pub struct MetricsServerAddon { } impl MetricsServerAddon { - pub const MetricsServer_ENGINE: EngineType = EngineType("MetricsServerEngine"); - pub const ENGINES: &'static [EngineType] = &[MetricsServerAddon::MetricsServer_ENGINE]; + pub const METRICS_ENGINE: EngineType = EngineType("MetricsServerEngine"); + pub const ENGINES: &'static [EngineType] = &[MetricsServerAddon::METRICS_ENGINE]; } impl MetricsServerAddon { @@ -76,7 +76,7 @@ impl PhoenixAddon for MetricsServerAddon { _pid: Pid, node: DataPathNode, ) -> Result> { - if ty != MetricsServerAddon::MetricsServer_ENGINE { + if ty != MetricsServerAddon::METRICS_ENGINE { bail!("invalid engine type {:?}", ty) } @@ -92,7 +92,7 @@ impl PhoenixAddon for MetricsServerAddon { node: DataPathNode, prev_version: Version, ) -> Result> { - if ty != MetricsServerAddon::MetricsServer_ENGINE { + if ty != MetricsServerAddon::METRICS_ENGINE { bail!("invalid engine type {:?}", ty) } diff --git a/experimental/mrpc/plugin/policy/mutation-server/Cargo.toml b/experimental/mrpc/plugin/policy/mutation-server/Cargo.toml index da4953c0..a6552545 100644 --- a/experimental/mrpc/plugin/policy/mutation-server/Cargo.toml +++ b/experimental/mrpc/plugin/policy/mutation-server/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "phoenix-mutation" +name = "phoenix-mutation-server" version = "0.1.0" edition = "2021" @@ -7,7 +7,7 @@ edition = "2021" [dependencies] -phoenix-api-policy-mutation.workspace = true +phoenix-api-policy-mutation-server.workspace = true mrpc-marshal.workspace = true mrpc-derive.workspace = true diff --git a/experimental/mrpc/plugin/policy/mutation-server/src/engine.rs b/experimental/mrpc/plugin/policy/mutation-server/src/engine.rs index eadc2dbc..97c6bbdb 100644 --- a/experimental/mrpc/plugin/policy/mutation-server/src/engine.rs +++ b/experimental/mrpc/plugin/policy/mutation-server/src/engine.rs @@ -9,10 +9,11 @@ use fnv::FnvHashMap as HashMap; use futures::future::BoxFuture; use phoenix_api::rpc::{RpcId, TransportStatus}; -use phoenix_api_policy_MutationServer::control_plane; +use phoenix_api_policy_mutation_server::control_plane; use phoenix_common::engine::datapath::message::{EngineRxMessage, EngineTxMessage, RpcMessageTx}; use phoenix_common::engine::datapath::node::DataPathNode; +use phoenix_common::engine::datapath::RpcMessageRx; use phoenix_common::engine::{future, Decompose, Engine, EngineResult, Indicator, Vertex}; use phoenix_common::envelop::ResourceDowncast; use phoenix_common::impl_vertex_for_engine; @@ -147,13 +148,11 @@ impl MutationServerEngine { } } -/// Copy the RPC request to a private heap and returns the request. #[inline] -fn materialize(msg: &RpcMessageTx) -> Box { - let req_ptr = Unique::new(msg.addr_backend as *mut hello::HelloRequest).unwrap(); - let req = unsafe { req_ptr.as_ref() }; - // returns a private_req - Box::new(req.clone()) +fn materialize_nocopy(msg: &RpcMessageRx) -> &mut hello::HelloRequest { + let req_ptr = msg.addr_backend as *mut hello::HelloRequest; + let req = unsafe { req_ptr.as_mut().unwrap() }; + return req; } impl MutationServerEngine { @@ -162,28 +161,30 @@ impl MutationServerEngine { match self.tx_inputs()[0].try_recv() { Ok(msg) => { - match msg { - EngineTxMessage::RpcMessage(msg) => { - let mut req_ptr = - Unique::new(msg.addr_backend as *mut hello::HelloRequest).unwrap(); - let req = unsafe { req_ptr.as_mut() }; - for i in 0..req.name.len() { - req.name[i] = 'a' as u8; - } - self.tx_outputs()[0].send(EngineTxMessage::RpcMessage(msg))?; - } - m => self.tx_outputs()[0].send(m)?, - } + self.tx_outputs()[0].send(msg)?; return Ok(Progress(1)); } Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => return Ok(Status::Disconnected), } - // forward all rx msgs match self.rx_inputs()[0].try_recv() { Ok(m) => { - self.rx_outputs()[0].send(m)?; + match m { + EngineRxMessage::Ack(rpc_id, _status) => { + self.rx_outputs()[0].send(m)?; + } + EngineRxMessage::RpcMessage(msg) => { + let mut req = materialize_nocopy(&msg); + for i in 0..req.name.len() { + req.name[i] = 'a' as u8; + } + self.rx_outputs()[0].send(EngineRxMessage::RpcMessage(msg))?; + } + EngineRxMessage::RecvError(_, _) => { + self.rx_outputs()[0].send(m)?; + } + } return Ok(Progress(1)); } Err(TryRecvError::Empty) => {} diff --git a/experimental/mrpc/plugin/policy/mutation-server/src/module.rs b/experimental/mrpc/plugin/policy/mutation-server/src/module.rs index d8683ad1..9f60e8aa 100644 --- a/experimental/mrpc/plugin/policy/mutation-server/src/module.rs +++ b/experimental/mrpc/plugin/policy/mutation-server/src/module.rs @@ -35,8 +35,8 @@ pub struct MutationServerAddon { } impl MutationServerAddon { - pub const MutationServer_ENGINE: EngineType = EngineType("MutationServerEngine"); - pub const ENGINES: &'static [EngineType] = &[MutationServerAddon::MutationServer_ENGINE]; + pub const MUTATION_ENGINE: EngineType = EngineType("MutationServerEngine"); + pub const ENGINES: &'static [EngineType] = &[MutationServerAddon::MUTATION_ENGINE]; } impl MutationServerAddon { @@ -75,7 +75,7 @@ impl PhoenixAddon for MutationServerAddon { _pid: Pid, node: DataPathNode, ) -> Result> { - if ty != MutationServerAddon::MutationServer_ENGINE { + if ty != MutationServerAddon::MUTATION_ENGINE { bail!("invalid engine type {:?}", ty) } @@ -91,7 +91,7 @@ impl PhoenixAddon for MutationServerAddon { node: DataPathNode, prev_version: Version, ) -> Result> { - if ty != MutationServerAddon::MutationServer_ENGINE { + if ty != MutationServerAddon::MUTATION_ENGINE { bail!("invalid engine type {:?}", ty) } diff --git a/experimental/mrpc/plugin/policy/ratelimit-drop-server/Cargo.toml b/experimental/mrpc/plugin/policy/ratelimit-drop-server/Cargo.toml index 4b592492..0f2a1eaa 100644 --- a/experimental/mrpc/plugin/policy/ratelimit-drop-server/Cargo.toml +++ b/experimental/mrpc/plugin/policy/ratelimit-drop-server/Cargo.toml @@ -1,5 +1,6 @@ + [package] -name = "phoenix-ratelimit-drop" +name = "phoenix-ratelimit-drop-server" version = "0.1.0" edition = "2021" @@ -7,7 +8,11 @@ edition = "2021" [dependencies] phoenix_common.workspace = true -phoenix-api-policy-ratelimit-drop.workspace = true +phoenix-api-policy-ratelimit-drop-server.workspace = true +mrpc-marshal.workspace = true +mrpc-derive.workspace = true +shm.workspace = true +phoenix-api = { workspace = true, features = ["mrpc"] } futures.workspace = true minstant.workspace = true @@ -18,9 +23,6 @@ anyhow.workspace = true nix.workspace = true toml = { workspace = true, features = ["preserve_order"] } bincode.workspace = true -mrpc-marshal.workspace = true -mrpc-derive.workspace = true -shm.workspace = true -phoenix-api = { workspace = true, features = ["mrpc"] } chrono.workspace = true itertools.workspace = true +rand.workspace = true diff --git a/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/config.rs b/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/config.rs index 2d4bdeb0..086114ea 100644 --- a/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/config.rs +++ b/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/config.rs @@ -1,7 +1,14 @@ +use chrono::{Datelike, Timelike, Utc}; +use phoenix_common::log; use serde::{Deserialize, Serialize}; +use chrono::prelude::*; +use itertools::iproduct; +use rand::Rng; + #[derive(Debug, Clone, Copy, Serialize, Deserialize)] #[serde(deny_unknown_fields)] + pub struct RateLimitDropServerConfig { pub requests_per_sec: u64, pub bucket_size: u64, @@ -22,3 +29,20 @@ impl RateLimitDropServerConfig { Ok(config) } } +pub fn create_log_file() -> std::fs::File { + std::fs::create_dir_all("/tmp/phoenix/log").expect("mkdir failed"); + let now = Utc::now(); + let date_string = format!( + "{}-{}-{}-{}-{}-{}", + now.year(), + now.month(), + now.day(), + now.hour(), + now.minute(), + now.second() + ); + let file_name = format!("/tmp/phoenix/log/logging_engine_{}.log", date_string); + ///log::info!("create log file {}", file_name); + let log_file = std::fs::File::create(file_name).expect("create file failed"); + log_file +} diff --git a/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/engine.rs b/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/engine.rs index b95b210d..cb5ecea9 100644 --- a/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/engine.rs +++ b/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/engine.rs @@ -1,24 +1,37 @@ -use std::collections::VecDeque; -use std::os::unix::ucred::UCred; -use std::pin::Pin; - -use super::DatapathError; -use crate::config::RateLimitDropServerConfig; use anyhow::{anyhow, Result}; use futures::future::BoxFuture; use minstant::Instant; -use phoenix_api::rpc::{RpcId, TransportStatus}; -use phoenix_api_policy_ratelimit_drop::control_plane; -use phoenix_common::engine::datapath::message::{EngineTxMessage, RpcMessageGeneral, RpcMessageTx}; +use phoenix_api::rpc::{RpcId, StatusCode, TransportStatus}; +use std::fmt; +use std::fs::File; +use std::io::Write; +use std::num::NonZeroU32; +use std::os::unix::ucred::UCred; +use std::pin::Pin; +use std::ptr::Unique; + +use phoenix_api_policy_ratelimit_drop_server::control_plane; +use phoenix_common::engine::datapath::message::{ + EngineRxMessage, EngineTxMessage, RpcMessageGeneral, +}; +use phoenix_common::engine::datapath::meta_pool::MetaBufferPool; + use phoenix_common::engine::datapath::node::DataPathNode; -use phoenix_common::engine::datapath::EngineRxMessage; use phoenix_common::engine::{future, Decompose, Engine, EngineResult, Indicator, Vertex}; use phoenix_common::envelop::ResourceDowncast; use phoenix_common::impl_vertex_for_engine; use phoenix_common::log; use phoenix_common::module::Version; + +use phoenix_common::engine::datapath::{RpcMessageRx, RpcMessageTx}; use phoenix_common::storage::{ResourceCollection, SharedStorage}; -use std::num::NonZeroU32; + +use super::DatapathError; +use crate::config::{create_log_file, RateLimitDropServerConfig}; + +use chrono::prelude::*; +use itertools::iproduct; +use rand::Rng; pub mod hello { include!("proto.rs"); @@ -39,6 +52,8 @@ pub(crate) struct RateLimitDropServerEngine { // pub(crate) filter: FnvHashSet, // Number of tokens to add for each seconds. pub(crate) config: RateLimitDropServerConfig, + pub(crate) meta_buf_pool: MetaBufferPool, + // The most recent timestamp we add the token to the bucket. pub(crate) last_ts: Instant, // The number of available tokens in the token bucket algorithm. @@ -87,12 +102,11 @@ impl_vertex_for_engine!(RateLimitDropServerEngine, node); impl Decompose for RateLimitDropServerEngine { fn flush(&mut self) -> Result { let mut work = 0; - while !self.tx_inputs()[0].is_empty() { + while !self.tx_inputs()[0].is_empty() || !self.rx_inputs()[0].is_empty() { if let Progress(n) = self.check_input_queue()? { work += n; } } - Ok(work) } @@ -102,9 +116,9 @@ impl Decompose for RateLimitDropServerEngine { _global: &mut ResourceCollection, ) -> (ResourceCollection, DataPathNode) { let engine = *self; - let mut collections = ResourceCollection::with_capacity(4); collections.insert("config".to_string(), Box::new(engine.config)); + collections.insert("meta_buf_pool".to_string(), Box::new(engine.meta_buf_pool)); collections.insert("last_ts".to_string(), Box::new(engine.last_ts)); collections.insert("num_tokens".to_string(), Box::new(engine.num_tokens)); (collections, engine.node) @@ -122,6 +136,11 @@ impl RateLimitDropServerEngine { .unwrap() .downcast::() .map_err(|x| anyhow!("fail to downcast, type_name={:?}", x.type_name()))?; + let meta_buf_pool = *local + .remove("meta_buf_pool") + .unwrap() + .downcast::() + .map_err(|x| anyhow!("fail to downcast, type_name={:?}", x.type_name()))?; let last_ts: Instant = *local .remove("last_ts") .unwrap() @@ -139,6 +158,7 @@ impl RateLimitDropServerEngine { config, last_ts, num_tokens, + meta_buf_pool, }; Ok(engine) } @@ -148,7 +168,6 @@ impl RateLimitDropServerEngine { async fn mainloop(&mut self) -> EngineResult { loop { let mut work = 0; - // check input queue, ~100ns loop { match self.check_input_queue()? { Progress(0) => break, @@ -156,10 +175,7 @@ impl RateLimitDropServerEngine { Status::Disconnected => return Ok(()), } } - - // If there's pending receives, there will always be future work to do. self.indicator.set_nwork(work); - future::yield_now().await; } } @@ -168,7 +184,6 @@ impl RateLimitDropServerEngine { fn current_timestamp() -> Instant { Instant::now() } - #[inline] fn materialize_nocopy(msg: &RpcMessageTx) -> &hello::HelloRequest { let req_ptr = msg.addr_backend as *mut hello::HelloRequest; @@ -176,77 +191,78 @@ fn materialize_nocopy(msg: &RpcMessageTx) -> &hello::HelloRequest { return req; } +/// Copy the RPC request to a private heap and returns the request. +#[inline] +fn materialize_rx(msg: &RpcMessageRx) -> Box { + let req_ptr = Unique::new(msg.addr_backend as *mut hello::HelloRequest).unwrap(); + let req = unsafe { req_ptr.as_ref() }; + // returns a private_req + Box::new(req.clone()) +} + impl RateLimitDropServerEngine { fn check_input_queue(&mut self) -> Result { use phoenix_common::engine::datapath::TryRecvError; match self.tx_inputs()[0].try_recv() { Ok(msg) => { - match msg { - EngineTxMessage::RpcMessage(msg) => { - let meta_ref = unsafe { &*msg.meta_buf_ptr.as_meta_ptr() }; - let mut input = Vec::new(); - input.push(msg); - self.num_tokens = self.num_tokens - + (current_timestamp() - self.last_ts).as_secs_f64() - * self.config.requests_per_sec as f64; - self.last_ts = current_timestamp(); - log::debug!("num_tokens: {}", self.num_tokens); - let limit = std::cmp::min(input.len() as i64, self.num_tokens as i64); - self.num_tokens = self.num_tokens - limit as f64; - - let output = input.iter().enumerate().map(|(index, req)| { - let rpc_message = materialize_nocopy(&req); - let conn_id = unsafe { &*req.meta_buf_ptr.as_meta_ptr() }.conn_id; - let call_id = unsafe { &*req.meta_buf_ptr.as_meta_ptr() }.call_id; - let rpc_id = RpcId::new(conn_id, call_id); - if index < limit as usize { - let raw_ptr: *const hello::HelloRequest = rpc_message; - let new_msg = RpcMessageTx { - meta_buf_ptr: req.meta_buf_ptr.clone(), - addr_backend: req.addr_backend, - }; - RpcMessageGeneral::TxMessage(EngineTxMessage::RpcMessage(new_msg)) - } else { - let error = EngineRxMessage::Ack( - rpc_id, - TransportStatus::Error(unsafe { - NonZeroU32::new_unchecked(403) - }), - ); - RpcMessageGeneral::RxMessage(error) - } - }); - for msg in output { - match msg { - RpcMessageGeneral::TxMessage(msg) => { - self.tx_outputs()[0].send(msg)?; - } - RpcMessageGeneral::RxMessage(msg) => { - self.rx_outputs()[0].send(msg)?; - } - _ => {} - } - } - } - m => self.tx_outputs()[0].send(m)?, - } + self.tx_outputs()[0].send(msg)?; return Ok(Progress(1)); } Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => return Ok(Status::Disconnected), } + match self.rx_inputs()[0].try_recv() { - Ok(msg) => { - match msg { + Ok(m) => { + match m { EngineRxMessage::Ack(rpc_id, status) => { - // todo - self.rx_outputs()[0].send(EngineRxMessage::Ack(rpc_id, status))?; + if let Ok(()) = self.meta_buf_pool.release(rpc_id) { + } else { + self.rx_outputs()[0].send(m)?; + } } EngineRxMessage::RpcMessage(msg) => { - self.rx_outputs()[0].send(EngineRxMessage::RpcMessage(msg))?; + self.num_tokens = self.num_tokens + + (current_timestamp() - self.last_ts).as_secs_f64() + * self.config.requests_per_sec as f64; + self.last_ts = current_timestamp(); + if self.num_tokens < 1.0 { + // We need to copy meta, add it to meta_buf_pool, and send it as the tx msg + // Is there better way to do this and avoid unsafe? + let mut meta = unsafe { msg.meta.as_ref().clone() }; + meta.status_code = StatusCode::AccessDenied; + let mut meta_ptr = self + .meta_buf_pool + .obtain(RpcId(meta.conn_id, meta.call_id)) + .expect("meta_buf_pool is full"); + unsafe { + meta_ptr.as_meta_ptr().write(meta); + meta_ptr.0.as_mut().num_sge = 0; + meta_ptr.0.as_mut().value_len = 0; + } + let rpc_msg = RpcMessageTx { + meta_buf_ptr: meta_ptr, + addr_backend: 0, + }; + let new_msg = EngineTxMessage::RpcMessage(rpc_msg); + self.tx_outputs()[0] + .send(new_msg) + .expect("send new message error"); + let msg_call_ids = + [meta.call_id, meta.call_id, meta.call_id, meta.call_id]; + self.tx_outputs()[0].send(EngineTxMessage::ReclaimRecvBuf( + meta.conn_id, + msg_call_ids, + ))?; + } else { + self.num_tokens = self.num_tokens - 1.0; + self.rx_outputs()[0].send(EngineRxMessage::RpcMessage(msg))?; + } + } + EngineRxMessage::RecvError(_, _) => { + self.rx_outputs()[0].send(m)?; } - m => self.rx_outputs()[0].send(m)?, } return Ok(Progress(1)); } diff --git a/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/lib.rs b/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/lib.rs index b77ef8bd..15fd5311 100644 --- a/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/lib.rs +++ b/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/lib.rs @@ -1,7 +1,12 @@ #![feature(peer_credentials_unix_socket)] - +#![feature(ptr_internals)] +#![feature(strict_provenance)] use thiserror::Error; +use chrono::prelude::*; +use itertools::iproduct; +use rand::Rng; + pub use phoenix_common::{InitFnResult, PhoenixAddon}; pub mod config; diff --git a/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/module.rs b/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/module.rs index baa16ebb..244bdff9 100644 --- a/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/module.rs +++ b/experimental/mrpc/plugin/policy/ratelimit-drop-server/src/module.rs @@ -1,16 +1,18 @@ -use std::collections::VecDeque; - use anyhow::{bail, Result}; use minstant::Instant; use nix::unistd::Pid; +use super::engine::RateLimitDropServerEngine; +use crate::config::{create_log_file, RateLimitDropServerConfig}; use phoenix_common::addon::{PhoenixAddon, Version}; +use phoenix_common::engine::datapath::meta_pool::{MetaBufferPool, META_BUFFER_SIZE}; use phoenix_common::engine::datapath::DataPathNode; use phoenix_common::engine::{Engine, EngineType}; use phoenix_common::storage::ResourceCollection; -use super::engine::RateLimitDropServerEngine; -use crate::config::RateLimitDropServerConfig; +use chrono::prelude::*; +use itertools::iproduct; +use rand::Rng; pub(crate) struct RateLimitDropServerEngineBuilder { node: DataPathNode, @@ -21,12 +23,14 @@ impl RateLimitDropServerEngineBuilder { fn new(node: DataPathNode, config: RateLimitDropServerConfig) -> Self { RateLimitDropServerEngineBuilder { node, config } } - + // TODO! LogFile fn build(self) -> Result { + let META_BUFFER_POOL_CAP = 200; Ok(RateLimitDropServerEngine { node: self.node, indicator: Default::default(), config: self.config, + meta_buf_pool: MetaBufferPool::new(META_BUFFER_POOL_CAP), last_ts: Instant::now(), num_tokens: self.config.bucket_size as _, }) @@ -38,8 +42,9 @@ pub struct RateLimitDropServerAddon { } impl RateLimitDropServerAddon { - pub const RATE_LIMIT_DROP_ENGINE: EngineType = EngineType("RateLimitDropServerEngine"); - pub const ENGINES: &'static [EngineType] = &[RateLimitDropServerAddon::RATE_LIMIT_DROP_ENGINE]; + pub const RATELIMIT_DROP_SERVER_ENGINE: EngineType = EngineType("RateLimitDropServerEngine"); + pub const ENGINES: &'static [EngineType] = + &[RateLimitDropServerAddon::RATELIMIT_DROP_SERVER_ENGINE]; } impl RateLimitDropServerAddon { @@ -78,7 +83,7 @@ impl PhoenixAddon for RateLimitDropServerAddon { _pid: Pid, node: DataPathNode, ) -> Result> { - if ty != RateLimitDropServerAddon::RATE_LIMIT_DROP_ENGINE { + if ty != RateLimitDropServerAddon::RATELIMIT_DROP_SERVER_ENGINE { bail!("invalid engine type {:?}", ty) } @@ -94,7 +99,7 @@ impl PhoenixAddon for RateLimitDropServerAddon { node: DataPathNode, prev_version: Version, ) -> Result> { - if ty != RateLimitDropServerAddon::RATE_LIMIT_DROP_ENGINE { + if ty != RateLimitDropServerAddon::RATELIMIT_DROP_SERVER_ENGINE { bail!("invalid engine type {:?}", ty) }