Skip to content

Commit

Permalink
fix issues in experimental/mrpc
Browse files Browse the repository at this point in the history
  • Loading branch information
crazyboycjr committed May 10, 2024
1 parent d0e018d commit 62c094b
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 49 deletions.
92 changes: 68 additions & 24 deletions experimental/mrpc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion experimental/mrpc/plugin/load_balancer/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl Engine for LoadBalancerEngine {
fn handle_request(
&mut self,
request: Vec<u8>,
_cred: std::os::unix::ucred::UCred,
_cred: std::os::unix::net::UCred,
) -> Result<()> {
let request: control_plane::Request = bincode::deserialize(&request[..])?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! This engine can only be placed at the sender side for now.
use std::os::unix::ucred::UCred;
use std::os::unix::net::UCred;
use std::pin::Pin;
use std::ptr::Unique;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! This engine can only be placed at the sender side for now.
use std::num::NonZeroU32;
use std::os::unix::ucred::UCred;
use std::os::unix::net::UCred;
use std::pin::Pin;
use std::ptr::Unique;

Expand Down
2 changes: 1 addition & 1 deletion experimental/mrpc/plugin/policy/hotel-acl/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! This engine can only be placed at the sender side for now.
use std::num::NonZeroU32;
use std::os::unix::ucred::UCred;
use std::os::unix::net::UCred;
use std::pin::Pin;
use std::ptr::Unique;

Expand Down
2 changes: 1 addition & 1 deletion experimental/mrpc/plugin/policy/logging/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{anyhow, Result};
use futures::future::BoxFuture;
use std::io::Write;
use std::os::unix::ucred::UCred;
use std::os::unix::net::UCred;
use std::pin::Pin;

use phoenix_api_policy_logging::control_plane;
Expand Down
2 changes: 1 addition & 1 deletion experimental/mrpc/plugin/policy/null/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::os::unix::ucred::UCred;
use std::os::unix::net::UCred;
use std::pin::Pin;

use anyhow::{anyhow, Result};
Expand Down
4 changes: 2 additions & 2 deletions experimental/mrpc/plugin/policy/qos/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cell::RefCell;
use std::cmp::Reverse;
use std::collections::binary_heap::BinaryHeap;
use std::os::unix::ucred::UCred;
use std::os::unix::net::UCred;
use std::pin::Pin;
use std::time::Duration;

Expand Down Expand Up @@ -103,7 +103,7 @@ impl Engine for QosEngine {
BUFFER.with_borrow_mut(|buf| {
let mut messages = buf.drain().collect::<Vec<_>>();
let client_pid = self.client_pid;
let drained = messages.drain_filter(|msg| msg.0.source == client_pid);
let drained = messages.extract_if(|msg| msg.0.source == client_pid);
for msg in drained {
self.tx_outputs()[0].send(msg.0.message)?;
}
Expand Down
2 changes: 1 addition & 1 deletion experimental/mrpc/plugin/policy/qos/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![feature(peer_credentials_unix_socket)]
#![feature(local_key_cell_methods)]
#![feature(drain_filter)]
#![feature(extract_if)]

use thiserror::Error;

Expand Down
2 changes: 1 addition & 1 deletion experimental/mrpc/plugin/policy/ratelimit/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::VecDeque;
use std::os::unix::ucred::UCred;
use std::os::unix::net::UCred;
use std::pin::Pin;

use anyhow::{anyhow, Result};
Expand Down
10 changes: 5 additions & 5 deletions experimental/mrpc/plugin/rpc_adapter/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl Engine for RpcAdapterEngine {
fn handle_request(
&mut self,
request: Vec<u8>,
_cred: std::os::unix::ucred::UCred,
_cred: std::os::unix::net::UCred,
) -> Result<()> {
let request: control_plane::Request = bincode::deserialize(&request[..])?;

Expand Down Expand Up @@ -464,7 +464,7 @@ impl RpcAdapterEngine {
.push_back(ReqContext { call_id, sg_len: 1 });
}

let off = meta_buf_ptr.0.as_ptr().expose_addr();
let off = meta_buf_ptr.0.as_ptr().addr();
let meta_buf = unsafe { meta_buf_ptr.0.as_mut() };

// TODO(cjr): impl Serialize for SgList
Expand Down Expand Up @@ -538,7 +538,7 @@ impl RpcAdapterEngine {
let ctx = self.rpc_ctx.insert(RpcId::new(cmid.as_handle(), call_id));

let meta_sge = SgE {
ptr: (meta_ref as *const MessageMeta).expose_addr(),
ptr: (meta_ref as *const MessageMeta).addr(),
len: mem::size_of::<MessageMeta>(),
};

Expand Down Expand Up @@ -665,7 +665,7 @@ impl RpcAdapterEngine {
let (_prefix, lens, _suffix): (_, &[u32], _) = unsafe { meta_buf.lens_buffer().align_to() };
debug_assert!(_prefix.is_empty() && _suffix.is_empty());

let value_buf_base = meta_buf.value_buffer().as_ptr().expose_addr();
let value_buf_base = meta_buf.value_buffer().as_ptr().addr();
let mut value_offset = 0;

#[allow(clippy::needless_range_loop)]
Expand Down Expand Up @@ -1047,7 +1047,7 @@ impl RpcAdapterEngine {
cmd::Command::NewMappedAddrs(conn_handle, app_vaddrs) => {
for (mr_handle, app_vaddr) in app_vaddrs.iter() {
let region = self.state.resource().recv_buffer_pool.find(mr_handle)?;
let mr_local_addr = region.as_ptr().expose_addr();
let mr_local_addr = region.as_ptr().addr();
let mr_remote_mapped = mrpc_marshal::ShmRecvMr {
ptr: *app_vaddr,
len: region.len(),
Expand Down
2 changes: 1 addition & 1 deletion experimental/mrpc/plugin/rpc_adapter/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl AsHandle for RecvBuffer {
impl RecvBuffer {
#[inline]
pub(crate) fn addr(&self) -> usize {
self.storage.as_ptr().expose_addr() + self.offset
self.storage.as_ptr().addr() + self.offset
}

#[inline]
Expand Down
Loading

0 comments on commit 62c094b

Please sign in to comment.