diff --git a/implementations/rust/ockam/ockam_api/tests/portals.rs b/implementations/rust/ockam/ockam_api/tests/portals.rs index d1b0ab9d1e6..b4c4df9bc63 100644 --- a/implementations/rust/ockam/ockam_api/tests/portals.rs +++ b/implementations/rust/ockam/ockam_api/tests/portals.rs @@ -228,7 +228,7 @@ fn portal_low_bandwidth_connection_keep_working_for_60s() { // ┌────────┐ ┌───────────┐ ┌────────┐ // │ Node └─────► TCP └────────► Node │ // │ 1 ◄─────┐Passthrough◄────────┐ 2 │ - // └────┬───┘ │ 64KB/s │ └────▲───┘ + // └────┬───┘ │ 128KB/s │ └────▲───┘ // │ └───────────┘ │ // │ ┌───────────┐ │ // │ Portal │ TCP │ Outlet │ @@ -270,8 +270,8 @@ fn portal_low_bandwidth_connection_keep_working_for_60s() { let passthrough_server_handle = start_passthrough_server( &second_node_listen_address.to_string(), - Disruption::LimitBandwidth(64 * 1024), - Disruption::LimitBandwidth(64 * 1024), + Disruption::LimitBandwidth(128 * 1024), + Disruption::LimitBandwidth(128 * 1024), ) .await; diff --git a/implementations/rust/ockam/ockam_core/src/buffer_pool.rs b/implementations/rust/ockam/ockam_core/src/buffer_pool.rs new file mode 100644 index 00000000000..07d6968a993 --- /dev/null +++ b/implementations/rust/ockam/ockam_core/src/buffer_pool.rs @@ -0,0 +1,104 @@ +#[cfg(feature = "std")] +use once_cell::sync::Lazy; + +#[cfg(feature = "std")] +/// The global instance of [`BufferPool`]. +/// The goal of this pool is to reduce memory fragmentation by keep reusing the same buffers. +pub static GLOBAL_BUFFER_POOL: Lazy = Lazy::new(std::BufferPool::new); + +#[cfg(not(feature = "std"))] +/// The global instance of [`BufferPool`]. +/// The goal of this pool is to reduce memory fragmentation by keep reusing the same buffers. +pub static GLOBAL_BUFFER_POOL: no_std::BufferPool = no_std::BufferPool {}; + +#[cfg(feature = "std")] +mod std { + use crate::compat::sync::Mutex; + use crate::compat::vec::Vec; + + const MIN_BUFFER_SIZE: usize = 96 * 1024; + const MAX_BUFFER_SIZE: usize = 192 * 1024; + const MAX_BUFFERS: usize = 32; + + /// A buffer pool for reusing buffers at least big as [`MIN_BUFFER_SIZE`]. + pub struct BufferPool { + buffers: Mutex, + } + + struct Inner { + buffers: Vec>, + } + + impl BufferPool { + pub(super) fn new() -> Self { + // pre-allocate a big buffer to help reduce memory fragmentation + _ = Vec::::with_capacity(1024 * 1024 * 32); + Self { + buffers: Mutex::new(Inner { + buffers: Vec::new(), + }), + } + } + + /// When the size is big enough, it'll return a buffer from the pool, + /// otherwise it'll return a new buffer. + pub fn try_size(&self, size: usize) -> Vec { + if (MIN_BUFFER_SIZE..=MAX_BUFFER_SIZE).contains(&size) { + self.take() + } else { + Vec::with_capacity(size) + } + } + + /// Take a buffer from the pool. + pub fn take(&self) -> Vec { + let mut buffers = self.buffers.lock().unwrap(); + if let Some(mut buffer) = buffers.buffers.pop() { + buffer.clear(); + buffer + } else { + Vec::with_capacity(MIN_BUFFER_SIZE) + } + } + + /// Release a buffer back to the pool, the buffer will only be reused if + /// it's capacity is within [`MIN_BUFFER_SIZE`] and [`MAX_BUFFER_SIZE`]. + pub fn release(&self, buffer: Vec) { + if buffer.capacity() >= MIN_BUFFER_SIZE && buffer.capacity() <= MAX_BUFFER_SIZE { + let mut buffers = self.buffers.lock().unwrap(); + if buffers.buffers.len() < MAX_BUFFERS { + buffers.buffers.push(buffer); + // we can assume the smaller allocations are the newer ones, + // by returning the smaller ones first, we should be able + // to avoid "refreshing" the buffer pool too often + buffers.buffers.sort_by_key(|b| -(b.capacity() as i64)); + } + } + } + } +} + +#[cfg(not(feature = "std"))] +mod no_std { + use crate::compat::vec::Vec; + + /// A buffer pool for reusing buffers at least big as [`MIN_BUFFER_SIZE`]. + pub struct BufferPool; + + impl BufferPool { + /// When the size is big enough, it'll return a buffer from the pool, + /// otherwise it'll return a new buffer. + pub fn try_size(&self, size: usize) -> Vec { + Vec::with_capacity(size) + } + + /// Take a buffer from the pool. + pub fn take(&self) -> Vec { + Vec::new() + } + + /// Release a buffer back to the pool, the buffer will only be reused if + /// it's capacity is within [`MIN_BUFFER_SIZE`] and [`MAX_BUFFER_SIZE`]. + pub fn release(&self, _buffer: Vec) {} + } +} diff --git a/implementations/rust/ockam/ockam_core/src/cbor.rs b/implementations/rust/ockam/ockam_core/src/cbor.rs index 81a11c1b899..98605f7f371 100644 --- a/implementations/rust/ockam/ockam_core/src/cbor.rs +++ b/implementations/rust/ockam/ockam_core/src/cbor.rs @@ -6,7 +6,7 @@ mod cow_str; pub(crate) mod schema; use crate::compat::vec::Vec; -use crate::Result; +use crate::{Result, GLOBAL_BUFFER_POOL}; use minicbor::{CborLen, Encode}; /// Encode a type implementing [`Encode`] and return the encoded byte vector. @@ -18,7 +18,7 @@ where T: Encode<()> + CborLen<()>, { let expected_len = minicbor::len(&x); - let mut output = Vec::with_capacity(expected_len); + let mut output = GLOBAL_BUFFER_POOL.try_size(expected_len); minicbor::encode(x, &mut output)?; Ok(output) } diff --git a/implementations/rust/ockam/ockam_core/src/lib.rs b/implementations/rust/ockam/ockam_core/src/lib.rs index a72fec734a6..e94b05c4bab 100644 --- a/implementations/rust/ockam/ockam_core/src/lib.rs +++ b/implementations/rust/ockam/ockam_core/src/lib.rs @@ -78,6 +78,7 @@ pub mod hex_encoding; pub mod env; pub mod bare; +mod buffer_pool; mod cbor; mod error; mod identity; @@ -88,6 +89,7 @@ mod uint; mod worker; pub use access_control::*; +pub use buffer_pool::*; pub use cbor::*; pub use error::*; pub use identity::*; diff --git a/implementations/rust/ockam/ockam_core/src/routing/route.rs b/implementations/rust/ockam/ockam_core/src/routing/route.rs index acfacecd80d..a4b8a90ce0d 100644 --- a/implementations/rust/ockam/ockam_core/src/routing/route.rs +++ b/implementations/rust/ockam/ockam_core/src/routing/route.rs @@ -176,7 +176,7 @@ impl Route { /// let route: Route = route!["1#alice", "bob"]; /// /// // "0#bob" - /// let final_hop: Address = route.recipient()?; + /// let final_hop: &Address = route.recipient()?; /// /// // ["1#alice", "0#bob"] /// route diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs index 5036efceda5..e815e1c8edf 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs @@ -1,7 +1,7 @@ use core::sync::atomic::Ordering; use ockam_core::compat::sync::Arc; use ockam_core::compat::vec::Vec; -use ockam_core::{route, Any, Result, Route, Routed, SecureChannelLocalInfo}; +use ockam_core::{route, Any, Result, Route, Routed, SecureChannelLocalInfo, GLOBAL_BUFFER_POOL}; use ockam_core::{Decodable, LocalMessage}; use ockam_node::Context; @@ -219,6 +219,8 @@ impl DecryptorHandler { SecureChannelMessage::Close => self.handle_close(ctx).await?, }; + GLOBAL_BUFFER_POOL.release(decrypted_payload); + Ok(()) } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs index 54a5c644fee..c9c256b8660 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs @@ -9,6 +9,7 @@ use ockam_core::compat::vec::Vec; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{ async_trait, route, CowBytes, Decodable, Error, LocalMessage, NeutralMessage, Route, + GLOBAL_BUFFER_POOL, }; use ockam_core::{Any, Result, Routed, Worker}; use ockam_node::Context; @@ -99,9 +100,9 @@ impl EncryptorWorker { msg: SecureChannelPaddedMessage<'_>, ) -> Result> { let payload = ockam_core::cbor_encode_preallocate(&msg)?; - let mut destination = Vec::with_capacity(SIZE_OF_ENCRYPT_OVERHEAD + payload.len()); + let mut destination = GLOBAL_BUFFER_POOL.try_size(SIZE_OF_ENCRYPT_OVERHEAD + payload.len()); - match self.encryptor.encrypt(&mut destination, &payload).await { + let result = match self.encryptor.encrypt(&mut destination, &payload).await { Ok(()) => Ok(destination), // If encryption failed, that means we have some internal error, // and we may be in an invalid state, it's better to stop the Worker @@ -111,7 +112,16 @@ impl EncryptorWorker { ctx.stop_worker(address).await?; Err(err) } + }; + + GLOBAL_BUFFER_POOL.release(payload); + if let SecureChannelMessage::Payload(plaintext) = msg.message { + if !plaintext.payload.is_borrowed() { + GLOBAL_BUFFER_POOL.release(plaintext.payload.into_owned()); + } } + + result } #[instrument(skip_all)] diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs index 6c2ba0fb5b0..3d25c52641d 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs @@ -1,6 +1,6 @@ use ockam_core::bare::{read_slice, write_slice}; use ockam_core::errcode::{Kind, Origin}; -use ockam_core::{Encodable, Encoded, Message, NeutralMessage}; +use ockam_core::{Encodable, Encoded, Message, NeutralMessage, GLOBAL_BUFFER_POOL}; use serde::{Deserialize, Serialize}; /// A command message type for a Portal @@ -84,7 +84,7 @@ impl PortalMessage<'_> { let capacity = 1 + payload.len() + if counter.is_some() { 3 } else { 1 } + { ockam_core::bare::size_of_variable_length(payload.len() as u64) }; - let mut vec = Vec::with_capacity(capacity); + let mut vec = GLOBAL_BUFFER_POOL.try_size(capacity); vec.push(3); write_slice(&mut vec, payload); // TODO: re-enable once orchestrator accepts packet counter @@ -108,7 +108,7 @@ pub enum PortalInternalMessage { } /// Maximum allowed size for a payload -pub const MAX_PAYLOAD_SIZE: usize = 48 * 1024; +pub const MAX_PAYLOAD_SIZE: usize = 96 * 1024; #[cfg(test)] mod test { diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs index 6210a023038..781d7bbcd3a 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs @@ -20,7 +20,7 @@ use tracing::{error, instrument, warn}; /// [`TcpPortalWorker::start_receiver`](crate::TcpPortalWorker::start_receiver) pub(crate) struct TcpPortalRecvProcessor { registry: TcpRegistry, - buf: Vec, + incoming_buffer: Vec, read_half: R, addresses: Addresses, onward_route: Route, @@ -37,7 +37,7 @@ impl TcpPortalRecvProcessor { ) -> Self { Self { registry, - buf: Vec::with_capacity(MAX_PAYLOAD_SIZE), + incoming_buffer: Vec::with_capacity(MAX_PAYLOAD_SIZE), read_half, addresses, onward_route, @@ -67,9 +67,9 @@ impl Processor for TcpPortalRecvPr #[instrument(skip_all, name = "TcpPortalRecvProcessor::process")] async fn process(&mut self, ctx: &mut Context) -> Result { - self.buf.clear(); + self.incoming_buffer.clear(); - let _len = match self.read_half.read_buf(&mut self.buf).await { + let _len = match self.read_half.read_buf(&mut self.incoming_buffer).await { Ok(len) => len, Err(err) => { error!("Tcp Portal connection read failed with error: {}", err); @@ -82,7 +82,7 @@ impl Processor for TcpPortalRecvPr OpenTelemetryContext::inject(&cx) }); - if self.buf.is_empty() { + if self.incoming_buffer.is_empty() { // Notify Sender that connection was closed ctx.set_tracing_context(tracing_context.clone()); if let Err(err) = ctx @@ -113,7 +113,7 @@ impl Processor for TcpPortalRecvPr } // Loop just in case buf was extended (should not happen though) - for chunk in self.buf.chunks(MAX_PAYLOAD_SIZE) { + for chunk in self.incoming_buffer.chunks(MAX_PAYLOAD_SIZE) { let msg = LocalMessage::new() .with_tracing_context(tracing_context.clone()) .with_onward_route(self.onward_route.clone()) diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs index ef4c9f5049a..147bb3b28a8 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs @@ -7,6 +7,7 @@ use ockam_core::compat::{boxed::Box, sync::Arc}; use ockam_core::{ async_trait, AllowOnwardAddress, AllowSourceAddress, Decodable, DenyAll, IncomingAccessControl, LocalInfoIdentifier, Mailbox, Mailboxes, OutgoingAccessControl, SecureChannelLocalInfo, + GLOBAL_BUFFER_POOL, }; use ockam_core::{Any, Result, Route, Routed, Worker}; use ockam_node::{Context, ProcessorBuilder, WorkerBuilder}; @@ -509,11 +510,11 @@ impl Worker for TcpPortalWorker { // Send to Tcp stream match msg { PortalMessage::Payload(payload, packet_counter) => { - self.handle_payload(ctx, payload, packet_counter).await + self.handle_payload(ctx, payload, packet_counter).await?; } PortalMessage::Disconnect => { self.start_disconnection(ctx, DisconnectionReason::Remote) - .await + .await?; } PortalMessage::Ping | PortalMessage::Pong => { return Err(TransportError::Protocol)?; @@ -524,8 +525,11 @@ impl Worker for TcpPortalWorker { if msg != PortalInternalMessage::Disconnect { return Err(TransportError::Protocol)?; }; - self.handle_disconnect(ctx).await + self.handle_disconnect(ctx).await?; } + + GLOBAL_BUFFER_POOL.release(payload); + Ok(()) } State::SendPing { .. } | State::SendPong { .. } => { return Err(TransportError::PortalInvalidState)?; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs index fdd3abf809e..1d27050a742 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/privileged_portal/workers/remote_worker.rs @@ -175,8 +175,9 @@ impl Worker for RemoteWorker { let their_identifier = SecureChannelLocalInfo::find_info(msg.local_message()) .map(|l| l.their_identifier()) .ok(); - let return_route = msg.return_route(); - let payload = msg.into_payload(); + let msg = msg.into_local_message(); + let return_route = msg.return_route; + let payload = msg.payload; // TODO: Add borrowing let msg: OckamPortalPacket = minicbor::decode(&payload) diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport_message.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport_message.rs index 10ab6f94963..44da8b9a1f4 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport_message.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport_message.rs @@ -3,7 +3,7 @@ use minicbor::{CborLen, Decode, Encode}; use ockam_core::compat::string::String; #[cfg(feature = "std")] use ockam_core::OpenTelemetryContext; -use ockam_core::{CowBytes, LocalMessage, Route}; +use ockam_core::{CowBytes, LocalMessage, Route, GLOBAL_BUFFER_POOL}; /// TCP transport message type. #[derive(Debug, Clone, Eq, PartialEq, Encode, Decode, CborLen)] @@ -57,10 +57,19 @@ impl From> for LocalMessage { #[cfg(feature = "std")] let local_message = local_message.with_tracing_context(value.tracing_context()); + let payload = if !value.payload.is_borrowed() { + value.payload.into_owned() + } else { + let mut payload = GLOBAL_BUFFER_POOL.try_size(value.payload.len()); + payload.resize(value.payload.len(), 0); + payload.copy_from_slice(&value.payload); + payload + }; + local_message .with_onward_route(value.onward_route) .with_return_route(value.return_route) - .with_payload(value.payload.into_owned()) + .with_payload(payload) } } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs index 16de8f86fc7..5bda2e63644 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs @@ -28,6 +28,7 @@ use tracing::{info, instrument, trace}; /// the node message system. pub(crate) struct TcpRecvProcessor { registry: TcpRegistry, + incoming_buffer: Vec, read_half: OwnedReadHalf, socket_address: SocketAddr, addresses: Addresses, @@ -47,6 +48,7 @@ impl TcpRecvProcessor { ) -> Self { Self { registry, + incoming_buffer: Vec::new(), read_half, socket_address, addresses, @@ -210,10 +212,12 @@ impl Processor for TcpRecvProcessor { trace!("Received message header for {} bytes", len); // Allocate a buffer of that size - let mut buf = vec![0; len_usize]; + self.incoming_buffer.clear(); + self.incoming_buffer.reserve(len_usize); + self.incoming_buffer.resize(len_usize, 0); // Then read into the buffer - match self.read_half.read_exact(&mut buf).await { + match self.read_half.read_exact(&mut self.incoming_buffer).await { Ok(_) => {} Err(e) => { self.notify_sender_stream_dropped(ctx, e).await?; @@ -222,7 +226,7 @@ impl Processor for TcpRecvProcessor { } // Deserialize the message now - let transport_message: TcpTransportMessage = match minicbor::decode(&buf) { + let transport_message: TcpTransportMessage = match minicbor::decode(&self.incoming_buffer) { Ok(msg) => msg, Err(e) => { self.notify_sender_stream_dropped(ctx, e).await?; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs index 9f5cbb0e9a1..79b303f9fa8 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs @@ -4,7 +4,7 @@ use ockam_core::flow_control::FlowControlId; use ockam_core::{ async_trait, compat::{net::SocketAddr, sync::Arc}, - AllowAll, AllowSourceAddress, DenyAll, LocalMessage, + AllowAll, AllowSourceAddress, DenyAll, LocalMessage, GLOBAL_BUFFER_POOL, }; use ockam_core::{Any, Decodable, Mailbox, Mailboxes, Message, Result, Routed, Worker}; use ockam_node::{Context, WorkerBuilder}; @@ -138,6 +138,10 @@ impl TcpSendWorker { minicbor::encode(&transport_message, &mut self.buffer) .map_err(|_| TransportError::Encoding)?; + if !transport_message.payload.is_borrowed() { + GLOBAL_BUFFER_POOL.release(transport_message.payload.into_owned()); + } + // Should not ever happen... if self.buffer.len() < LENGTH_VALUE_SIZE { return Err(TransportError::Encoding)?; @@ -244,7 +248,6 @@ impl Worker for TcpSendWorker { if let Err(err) = self.serialize_message(local_message) { // Close the stream self.stop(ctx).await?; - return Err(err); }; diff --git a/tools/profile/portal_baseline b/tools/profile/portal.baseline similarity index 100% rename from tools/profile/portal_baseline rename to tools/profile/portal.baseline diff --git a/tools/profile/portal.memory_profile b/tools/profile/portal.memory_profile index 9e006c973a2..33b909d72d4 100755 --- a/tools/profile/portal.memory_profile +++ b/tools/profile/portal.memory_profile @@ -27,6 +27,7 @@ fi "${OCKAM}" node delete portal -y >/dev/null 2>&1 || true export OCKAM_LOG_LEVEL=info +export OCKAM_OPENTELEMETRY_EXPORT=0 if [ "$(uname)" == "Darwin" ]; then rm -rf /tmp/ockam.trace/ diff --git a/tools/profile/portal_two_nodes.baseline b/tools/profile/portal_two_nodes.baseline new file mode 100755 index 00000000000..699fe75d603 --- /dev/null +++ b/tools/profile/portal_two_nodes.baseline @@ -0,0 +1,35 @@ +#!/bin/bash + +if ! [ -x "$(command -v iperf3)" ]; then + echo 'Error: iperf3 is not installed.' >&2 + exit 1 +fi + +set -e + +if [ -z "${OCKAM}" ]; then + RUSTFLAGS="-C force-frame-pointers=yes" cargo build --profile profiling -p ockam_command -F ockam_vault/aws-lc + OCKAM=target/profiling/ockam +fi + +"${OCKAM}" node delete portal -y >/dev/null 2>&1 || true +export OCKAM_LOG_LEVEL=info +export OCKAM_OPENTELEMETRY_EXPORT=0 + +"${OCKAM}" node create inlet -f & +"${OCKAM}" node create outlet -f & + +sleep 1 +"${OCKAM}" tcp-outlet create --to 5500 --at outlet +"${OCKAM}" tcp-inlet create --from 8200 --to /node/outlet/secure/api/service/outlet --at inlet + +iperf3 --server --port 5500 --one-off & +iperf3_server_pid=$! + +sleep 0.3 # wait for server to start +iperf3 --zerocopy --client 127.0.0.1 --port 8200 --time 60 + +kill ${iperf3_server_pid} +"${OCKAM}" node delete inlet -y +"${OCKAM}" node delete outlet -y +