From 072237a588c720ac727e2da9f9430f56dee2a6e9 Mon Sep 17 00:00:00 2001 From: Oleksandr Deundiak Date: Thu, 12 Sep 2024 18:55:26 +0200 Subject: [PATCH] feat(rust): implementation of reliable `TCP` portals --- Cargo.lock | 257 +++++++++++- ...bute-based-authentication-control-plane.rs | 3 +- .../examples/01-inlet-outlet.rs | 2 +- .../examples/02-outlet.rs | 2 +- .../examples/03-outlet.rs | 2 +- .../examples/04-outlet.rs | 2 +- .../src/nodes/service/tcp_outlets.rs | 2 +- .../rust/ockam/ockam_transport_tcp/Cargo.toml | 14 +- .../rust/ockam/ockam_transport_tcp/build.rs | 7 + .../src/ebpf_portal/common.rs | 138 +++++++ .../src/ebpf_portal/ebpf_support.rs | 368 ++++++++++++++++++ .../src/ebpf_portal/mod.rs | 17 + .../src/ebpf_portal/outlet_listener_worker.rs | 153 ++++++++ .../src/ebpf_portal/portal_processor.rs | 105 +++++ .../src/ebpf_portal/portal_worker.rs | 120 ++++++ .../src/ebpf_portal/portals.rs | 166 ++++++++ .../src/ebpf_portal/processor.rs | 253 ++++++++++++ .../src/ebpf_portal/registry/inlet.rs | 99 +++++ .../src/ebpf_portal/registry/mod.rs | 5 + .../src/ebpf_portal/registry/outlet.rs | 96 +++++ .../src/ebpf_portal/transport.rs | 32 ++ .../rust/ockam/ockam_transport_tcp/src/lib.rs | 4 + .../src/portal/addresses.rs | 44 ++- .../src/portal/inlet_listener.rs | 27 +- .../ockam_transport_tcp/src/portal/mod.rs | 2 +- .../ockam_transport_tcp/src/portal/options.rs | 14 +- .../src/portal/outlet_listener.rs | 2 + .../src/transport/lifecycle.rs | 5 +- .../ockam_transport_tcp/src/transport/mod.rs | 15 + .../src/transport/portals.rs | 123 +++--- .../ockam_transport_tcp/tests/interceptor.rs | 8 +- .../ockam/ockam_transport_tcp/tests/portal.rs | 32 +- 32 files changed, 2006 insertions(+), 113 deletions(-) create mode 100644 implementations/rust/ockam/ockam_transport_tcp/build.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/ebpf_support.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/mod.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/outlet_listener_worker.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_processor.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_worker.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/processor.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/mod.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs create mode 100644 implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/transport.rs diff --git a/Cargo.lock b/Cargo.lock index c143b7c84cd..3038921d6ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,6 +256,12 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" + [[package]] name = "async-channel" version = "1.9.0" @@ -961,6 +967,61 @@ dependencies = [ "tower-service", ] +[[package]] +name = "aya" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90eea657cc8028447cbda5068f4e10c4fadba0131624f4f7dd1a9c46ffc8d81f" +dependencies = [ + "assert_matches", + "aya-obj", + "bitflags 2.6.0", + "bytes 1.7.1", + "lazy_static", + "libc", + "log", + "object 0.32.2", + "thiserror", + "tokio", +] + +[[package]] +name = "aya-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f11a92f305b983e9f53433457dede617a4ad0aa22e4702220092f39e844c1a2" +dependencies = [ + "aya", + "aya-log-common", + "bytes 1.7.1", + "log", + "thiserror", + "tokio", +] + +[[package]] +name = "aya-log-common" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d38a351ee2d5dc24e04cac6184b1b39408642d9a8b585892c99146f8dd4edb" +dependencies = [ + "num_enum", +] + +[[package]] +name = "aya-obj" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c02024a307161cf3d1f052161958fd13b1a33e3e038083e58082c0700fdab85" +dependencies = [ + "bytes 1.7.1", + "core-error", + "hashbrown 0.14.5", + "log", + "object 0.32.2", + "thiserror", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -972,7 +1033,7 @@ dependencies = [ "cfg-if", "libc", "miniz_oxide 0.7.4", - "object", + "object 0.36.4", "rustc-demangle", "serde", ] @@ -1684,6 +1745,15 @@ dependencies = [ "x11-clipboard", ] +[[package]] +name = "core-error" +version = "0.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efcdb2972eb64230b4c50646d8498ff73f5128d196a90c7236eec4cbe8619b8f" +dependencies = [ + "version_check", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -2421,6 +2491,16 @@ dependencies = [ "syn 2.0.77", ] +[[package]] +name = "env_filter" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.8.4" @@ -2431,6 +2511,19 @@ dependencies = [ "regex", ] +[[package]] +name = "env_logger" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -3294,6 +3387,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.30" @@ -3541,6 +3640,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "ipnetwork" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e" +dependencies = [ + "serde", +] + [[package]] name = "is-docker" version = "0.2.0" @@ -3963,6 +4071,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memoffset" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" +dependencies = [ + "autocfg", +] + [[package]] name = "miette" version = "7.2.0" @@ -4266,7 +4383,7 @@ dependencies = [ "bitflags 1.3.2", "cfg-if", "libc", - "memoffset", + "memoffset 0.6.5", ] [[package]] @@ -4291,8 +4408,15 @@ dependencies = [ "cfg-if", "cfg_aliases 0.2.1", "libc", + "memoffset 0.9.1", ] +[[package]] +name = "no-std-net" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43794a0ace135be66a25d3ae77d41b91615fb68ae937f904090203e81f755b65" + [[package]] name = "nom" version = "7.1.3" @@ -4393,6 +4517,26 @@ dependencies = [ "libm", ] +[[package]] +name = "num_enum" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "num_threads" version = "0.1.7" @@ -4536,6 +4680,15 @@ dependencies = [ "objc", ] +[[package]] +name = "object" +version = "0.32.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" +dependencies = [ + "memchr", +] + [[package]] name = "object" version = "0.36.4" @@ -4969,14 +5122,21 @@ dependencies = [ name = "ockam_transport_tcp" version = "0.123.0" dependencies = [ + "aya", + "aya-log", "cfg-if", + "cfg_aliases 0.2.1", + "env_logger 0.11.5", + "libc", "log", "minicbor", + "nix 0.29.0", "ockam_core", "ockam_macros", "ockam_node", "ockam_transport_core", "opentelemetry", + "pnet", "rand", "rustls 0.23.12", "rustls-native-certs 0.7.3", @@ -5477,6 +5637,97 @@ dependencies = [ "time", ] +[[package]] +name = "pnet" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "682396b533413cc2e009fbb48aadf93619a149d3e57defba19ff50ce0201bd0d" +dependencies = [ + "ipnetwork", + "pnet_base", + "pnet_datalink", + "pnet_packet", + "pnet_sys", + "pnet_transport", +] + +[[package]] +name = "pnet_base" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc190d4067df16af3aba49b3b74c469e611cad6314676eaf1157f31aa0fb2f7" +dependencies = [ + "no-std-net", +] + +[[package]] +name = "pnet_datalink" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e79e70ec0be163102a332e1d2d5586d362ad76b01cec86f830241f2b6452a7b7" +dependencies = [ + "ipnetwork", + "libc", + "pnet_base", + "pnet_sys", + "winapi", +] + +[[package]] +name = "pnet_macros" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13325ac86ee1a80a480b0bc8e3d30c25d133616112bb16e86f712dcf8a71c863" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "syn 2.0.77", +] + +[[package]] +name = "pnet_macros_support" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed67a952585d509dd0003049b1fc56b982ac665c8299b124b90ea2bdb3134ab" +dependencies = [ + "pnet_base", +] + +[[package]] +name = "pnet_packet" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c96ebadfab635fcc23036ba30a7d33a80c39e8461b8bd7dc7bb186acb96560f" +dependencies = [ + "glob", + "pnet_base", + "pnet_macros", + "pnet_macros_support", +] + +[[package]] +name = "pnet_sys" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d4643d3d4db6b08741050c2f3afa9a892c4244c085a72fcda93c9c2c9a00f4b" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "pnet_transport" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f604d98bc2a6591cf719b58d3203fd882bdd6bf1db696c4ac97978e9f4776bf" +dependencies = [ + "libc", + "pnet_base", + "pnet_packet", + "pnet_sys", +] + [[package]] name = "png" version = "0.17.13" @@ -5692,7 +5943,7 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6" dependencies = [ - "env_logger", + "env_logger 0.8.4", "log", "rand", ] diff --git a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs index ee15de6137a..bddca089016 100644 --- a/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs +++ b/examples/rust/get_started/examples/11-attribute-based-authentication-control-plane.rs @@ -8,6 +8,7 @@ use ockam::identity::{ }; use ockam::remote::RemoteRelayOptions; use ockam::tcp::{TcpOutletOptions, TcpTransportExtension}; +use ockam::transport::HostnamePort; use ockam::{node, Context, Result}; use ockam_api::authenticator::enrollment_tokens::TokenAcceptor; use ockam_api::authenticator::one_time_code::OneTimeCode; @@ -107,7 +108,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime // 4. create a tcp outlet with the above policy tcp.create_outlet( "outlet", - "127.0.0.1:5000", + HostnamePort::new("127.0.0.1", 5000), TcpOutletOptions::new() .with_incoming_access_control_impl(incoming_access_control) .with_outgoing_access_control_impl(outgoing_access_control), diff --git a/examples/rust/tcp_inlet_and_outlet/examples/01-inlet-outlet.rs b/examples/rust/tcp_inlet_and_outlet/examples/01-inlet-outlet.rs index 27d45436828..4c6379f68a1 100644 --- a/examples/rust/tcp_inlet_and_outlet/examples/01-inlet-outlet.rs +++ b/examples/rust/tcp_inlet_and_outlet/examples/01-inlet-outlet.rs @@ -24,7 +24,7 @@ async fn main(ctx: Context) -> Result<()> { // a previous message from the Inlet. let outlet_target = std::env::args().nth(2).expect("no outlet target given"); - tcp.create_outlet("outlet", outlet_target, TcpOutletOptions::new()) + tcp.create_outlet("outlet", outlet_target.try_into()?, TcpOutletOptions::new()) .await?; // Expect first command line argument to be the TCP address on which to start an Inlet diff --git a/examples/rust/tcp_inlet_and_outlet/examples/02-outlet.rs b/examples/rust/tcp_inlet_and_outlet/examples/02-outlet.rs index 338fbfeff83..d3ad2ba8631 100644 --- a/examples/rust/tcp_inlet_and_outlet/examples/02-outlet.rs +++ b/examples/rust/tcp_inlet_and_outlet/examples/02-outlet.rs @@ -28,7 +28,7 @@ async fn main(ctx: Context) -> Result<()> { let outlet_target = std::env::args().nth(1).expect("no outlet target given"); tcp.create_outlet( "outlet", - outlet_target, + outlet_target.try_into()?, TcpOutletOptions::new().as_consumer(&tcp_listener_options.spawner_flow_control_id()), ) .await?; diff --git a/examples/rust/tcp_inlet_and_outlet/examples/03-outlet.rs b/examples/rust/tcp_inlet_and_outlet/examples/03-outlet.rs index 6d45b49a607..cca1c5226d3 100644 --- a/examples/rust/tcp_inlet_and_outlet/examples/03-outlet.rs +++ b/examples/rust/tcp_inlet_and_outlet/examples/03-outlet.rs @@ -41,7 +41,7 @@ async fn main(ctx: Context) -> Result<()> { let outlet_target = std::env::args().nth(1).expect("no outlet target given"); tcp.create_outlet( "outlet", - outlet_target, + outlet_target.try_into()?, TcpOutletOptions::new().as_consumer(&secure_channel_flow_control_id), ) .await?; diff --git a/examples/rust/tcp_inlet_and_outlet/examples/04-outlet.rs b/examples/rust/tcp_inlet_and_outlet/examples/04-outlet.rs index 2a9858c9c75..d8a3cf53a64 100644 --- a/examples/rust/tcp_inlet_and_outlet/examples/04-outlet.rs +++ b/examples/rust/tcp_inlet_and_outlet/examples/04-outlet.rs @@ -38,7 +38,7 @@ async fn main(ctx: Context) -> Result<()> { let outlet_target = std::env::args().nth(1).expect("no outlet target given"); tcp.create_outlet( "outlet", - outlet_target, + outlet_target.try_into()?, TcpOutletOptions::new().as_consumer(&secure_channel_flow_control_id), ) .await?; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs index 46784c98acb..facbab7e8e2 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs @@ -161,7 +161,7 @@ impl NodeManager { let res = self .tcp_transport - .create_tcp_outlet(worker_addr.clone(), to.clone(), options) + .create_outlet(worker_addr.clone(), to.clone(), options) .await; Ok(match res { diff --git a/implementations/rust/ockam/ockam_transport_tcp/Cargo.toml b/implementations/rust/ockam/ockam_transport_tcp/Cargo.toml index 61da119fee8..5d2d200b79c 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/Cargo.toml +++ b/implementations/rust/ockam/ockam_transport_tcp/Cargo.toml @@ -23,12 +23,16 @@ TCP Transport for the Ockam Routing Protocol. """ [features] -default = ["std", "ring"] +default = ["std", "ring", "ebpf"] std = ["ockam_macros/std", "ockam_transport_core/std", "opentelemetry"] no_std = ["ockam_macros/no_std", "ockam_transport_core/no_std"] alloc = [] aws-lc = ["tokio-rustls/aws-lc-rs"] ring = ["tokio-rustls/ring"] +ebpf = ["pnet", "aya", "aya-log", "env_logger", "libc", "nix"] + +[build-dependencies] +cfg_aliases = "0.2.1" [dependencies] cfg-if = "1.0.0" @@ -48,3 +52,11 @@ socket2 = { version = "0.5.6", features = ["all"] } tokio = { version = "1.39", features = ["rt-multi-thread", "sync", "net", "macros", "time", "io-util"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "tls12"] } tracing = { version = "0.1", default-features = false } + +[target.'cfg( target_os = "linux" )'.dependencies] +pnet = { version = "0.35.0", optional = true } +aya = { version = "0.12", optional = true } +aya-log = { version = "0.2", optional = true } +env_logger = { version = "0.11", optional = true } +libc = { version = "0.2", optional = true } +nix = { version = "0.29", features = ["net"], optional = true } diff --git a/implementations/rust/ockam/ockam_transport_tcp/build.rs b/implementations/rust/ockam/ockam_transport_tcp/build.rs new file mode 100644 index 00000000000..f53df3ae53e --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/build.rs @@ -0,0 +1,7 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + cfg_aliases! { + ebpf_alias: { all(target_os = "linux", feature = "ebpf") } + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs new file mode 100644 index 00000000000..906df07a07c --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/common.rs @@ -0,0 +1,138 @@ +use minicbor::{Decode, Encode}; +use ockam_core::CowBytes; +use pnet::packet::tcp::TcpPacket; +use pnet::packet::Packet; +use std::net::Ipv4Addr; + +#[allow(missing_docs)] +#[derive(Encode, Decode)] +#[rustfmt::skip] +pub struct OckamPortalPacket<'a> { + #[n(0)] pub sequence: u32, + #[n(1)] pub acknowledgement: u32, + #[n(2)] pub data_offset: u8, + #[n(3)] pub reserved: u8, + #[n(4)] pub flags: u8, + #[n(5)] pub window: u16, + #[n(6)] pub urgent_ptr: u16, + #[n(7)] pub options: Vec, + #[b(8)] pub payload: CowBytes<'a>, +} + +#[allow(missing_docs)] +#[derive(Encode, Decode)] +#[rustfmt::skip] +pub struct TcpOption { + #[n(0)] pub kind: u8, + #[n(1)] pub length: Vec, + #[n(2)] pub data: Vec, +} + +impl From for pnet::packet::tcp::TcpOption { + fn from(value: TcpOption) -> Self { + Self { + number: pnet::packet::tcp::TcpOptionNumber(value.kind), + length: value.length, + data: value.data, + } + } +} + +impl OckamPortalPacket<'_> { + /// Clone data to make an owned version of an instance. + pub fn into_owned(self) -> OckamPortalPacket<'static> { + OckamPortalPacket { + sequence: self.sequence, + acknowledgement: self.acknowledgement, + data_offset: self.data_offset, + reserved: self.reserved, + flags: self.flags, + window: self.window, + urgent_ptr: self.urgent_ptr, + options: self.options, + payload: self.payload.to_owned(), + } + } +} + +impl From for OckamPortalPacket<'_> { + fn from(value: RawSocketMessage) -> Self { + Self { + sequence: value.sequence, + acknowledgement: value.acknowledgement, + data_offset: value.data_offset, + reserved: value.reserved, + flags: value.flags, + window: value.window, + urgent_ptr: value.urgent_ptr, + options: value.options.into_iter().map(Into::into).collect(), + payload: value.payload.into(), + } + } +} + +#[allow(missing_docs)] +pub struct RawSocketMessage { + pub source_ip: Ipv4Addr, + + pub source: u16, + pub destination: u16, + pub sequence: u32, + pub acknowledgement: u32, + pub data_offset: u8, + pub reserved: u8, + pub flags: u8, + pub window: u16, + pub checksum: u16, + pub urgent_ptr: u16, + pub options: Vec, + pub payload: Vec, +} + +impl From for RawTcpOption { + fn from(value: pnet::packet::tcp::TcpOption) -> Self { + Self { + kind: value.number.0, + length: value.length, + data: value.data, + } + } +} + +impl From for TcpOption { + fn from(value: RawTcpOption) -> Self { + Self { + kind: value.kind, + length: value.length, + data: value.data, + } + } +} + +#[allow(missing_docs)] +pub struct RawTcpOption { + pub kind: u8, + pub length: Vec, + pub data: Vec, +} + +#[allow(missing_docs)] +impl RawSocketMessage { + pub fn from_packet(packet: TcpPacket<'_>, source_ip: Ipv4Addr) -> Self { + Self { + source_ip, + source: packet.get_source(), + destination: packet.get_destination(), + sequence: packet.get_sequence(), + acknowledgement: packet.get_acknowledgement(), + data_offset: packet.get_data_offset(), + reserved: packet.get_reserved(), + flags: packet.get_flags(), + window: packet.get_window(), + checksum: packet.get_checksum(), + urgent_ptr: packet.get_urgent_ptr(), + options: packet.get_options().into_iter().map(Into::into).collect(), + payload: packet.payload().to_vec(), + } + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/ebpf_support.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/ebpf_support.rs new file mode 100644 index 00000000000..b2f50228c21 --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/ebpf_support.rs @@ -0,0 +1,368 @@ +#![allow(unsafe_code)] + +use crate::ebpf_portal::{InletRegistry, OutletRegistry, RawSocketProcessor}; +use aya::maps::{MapData, MapError}; +use aya::programs::tc::SchedClassifierLink; +use aya::programs::{tc, Link, ProgramError, SchedClassifier, TcAttachType}; +use aya::{Bpf, BpfError}; +use aya_log::BpfLogger; +use core::fmt::{Debug, Formatter}; +use ockam_core::compat::collections::HashMap; +use ockam_core::compat::sync::RwLock; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::{Address, Error, Result}; +use ockam_node::compat::asynchronous::Mutex as AsyncMutex; +use ockam_node::Context; +use pnet::transport::TransportSender; +use rand::random; +use std::sync::{Arc, Mutex}; +use tracing::{debug, error, info, warn}; + +/// Network interface name +pub type Iface = String; + +/// IP Protocol +pub type Proto = u8; + +/// Port +pub type Port = u16; + +/// eBPF support for [`TcpTransport`] +#[derive(Clone)] +pub struct TcpTransportEbpfSupport { + pub(crate) ip_proto: u8, + + pub(crate) inlet_registry: InletRegistry, + pub(crate) outlet_registry: OutletRegistry, + + links: Arc>>, + + socket_write_handle: Arc>>>>, + + bpf: Arc>>, +} + +struct IfaceLink { + ingress: SchedClassifierLink, + egress: SchedClassifierLink, +} + +struct OckamBpf { + bpf: Bpf, + + inlet_port_map: aya::maps::HashMap, + outlet_port_map: aya::maps::HashMap, +} + +impl Default for TcpTransportEbpfSupport { + fn default() -> Self { + let rnd: u16 = random(); + + // Random in range [146, 252] + let ip_proto = (146 + rnd % 107) as u8; + + Self { + ip_proto, + inlet_registry: Default::default(), + outlet_registry: Default::default(), + links: Default::default(), + socket_write_handle: Default::default(), + bpf: Default::default(), + } + } +} + +impl Debug for TcpTransportEbpfSupport { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + write!(f, "TcpTransportEbpfSupport") + } +} + +impl TcpTransportEbpfSupport { + /// Start [`RawSocketProcessor`]. Should be done once. + pub(crate) async fn start_raw_socket_processor_if_needed( + &self, + ctx: &Context, + ) -> Result>> { + info!("Starting RawSocket"); + + let mut socket_write_handle_lock = self.socket_write_handle.lock().await; + if let Some(socket_write_handle_lock) = socket_write_handle_lock.as_ref() { + return Ok(socket_write_handle_lock.clone()); + } + + let processor = RawSocketProcessor::create( + self.ip_proto, + self.inlet_registry.clone(), + self.outlet_registry.clone(), + ) + .await?; + + let address = Address::random_tagged("RawSocketProcessor"); + + let socket_write_handle = processor.socket_write_handle(); + + *socket_write_handle_lock = Some(socket_write_handle.clone()); + + ctx.start_processor(address, processor).await?; + + info!("Started RawSocket"); + + Ok(socket_write_handle) + } + + /// Start [`RawSocketProcessor`]. Should be done once. + pub(crate) fn attach_ebpf_if_needed(&self, iface: Iface) -> Result<()> { + self.init_ebpf()?; + + self.attach_ebpf(iface)?; + + Ok(()) + } + + /// Init eBPF system + pub fn init_ebpf(&self) -> Result<()> { + // FIXME: eBPF I doubt we can reuse that instance for different interfaces. + let mut bpf_lock = self.bpf.lock().unwrap(); + if bpf_lock.is_some() { + debug!("Skipping eBPF initialization"); + return Ok(()); + } + + info!("Initializing eBPF"); + + if let Some(err) = env_logger::try_init().err() { + error!("Error initializing env_logger: {}", err); + }; + + // Bump the memlock rlimit. This is needed for older kernels that don't use the + // new memcg based accounting, see https://lwn.net/Articles/837122/ + let rlim = libc::rlimit { + rlim_cur: libc::RLIM_INFINITY, + rlim_max: libc::RLIM_INFINITY, + }; + let ret = unsafe { libc::setrlimit(libc::RLIMIT_MEMLOCK, &rlim) }; + if ret != 0 { + warn!("remove limit on locked memory failed, ret is: {}", ret); + } + + // This will include your eBPF object file as raw bytes at compile-time and load it at + // runtime. This approach is recommended for most real-world use cases. If you would + // like to specify the eBPF program at runtime rather than at compile-time, you can + // reach for `Bpf::load_file` instead. + + let ebpf_binary = aya::include_bytes_aligned!("../../../ockam_ebpf/ockam_ebpf"); + let mut bpf = Bpf::load(ebpf_binary).map_err(map_bpf_error)?; + // eBPF can be read from the filesystem in the runtime for development purposes + // let ebpf_binary = std::fs::read(PATH).unwrap(); + // let mut bpf = Bpf::load(&ebpf_binary).map_err(map_bpf_error)?; + + if let Err(e) = BpfLogger::init(&mut bpf) { + // This can happen if you remove all log statements from your eBPF program. + warn!("failed to initialize eBPF logger for ingress: {}", e); + } + + let inlet_port_map = + aya::maps::HashMap::<_, Port, Proto>::try_from(bpf.take_map("INLET_PORT_MAP").unwrap()) + .map_err(map_map_error)?; + let outlet_port_map = aya::maps::HashMap::<_, Port, Proto>::try_from( + bpf.take_map("OUTLET_PORT_MAP").unwrap(), + ) + .map_err(map_map_error)?; + + let bpf = OckamBpf { + bpf, + inlet_port_map, + outlet_port_map, + }; + + *bpf_lock = Some(bpf); + + info!("Initialized eBPF"); + + Ok(()) + } + + /// Attach eBPF to both ingress and egress of the given interface + pub fn attach_ebpf(&self, iface: String) -> Result<()> { + // error adding clsact to the interface if it is already added is harmless + // the full cleanup can be done with 'sudo tc qdisc del dev eth0 clsact'. + let _ = tc::qdisc_add_clsact(&iface); + + let mut links = self.links.lock().unwrap(); + + if links.contains_key(&iface) { + return Ok(()); + } + let skip_load = !links.is_empty(); + + let mut bpf_lock = self.bpf.lock().unwrap(); + let bpf = bpf_lock.as_mut().unwrap(); + + // TODO: eBPF Avoid loading multiple times + let ingress_link = self.attach_ebpf_ingress(iface.clone(), bpf, skip_load)?; + let egress_link = self.attach_ebpf_egress(iface.clone(), bpf, skip_load)?; + + links.insert( + iface.clone(), + IfaceLink { + ingress: ingress_link, + egress: egress_link, + }, + ); + + Ok(()) + } + + fn attach_ebpf_ingress( + &self, + iface: String, + bpf: &mut OckamBpf, + skip_load: bool, + ) -> Result { + info!("Attaching eBPF ingress to {}", iface); + + let program_ingress: &mut SchedClassifier = bpf + .bpf + .program_mut("ockam_ingress") + .unwrap() + .try_into() + .map_err(map_program_error)?; + if !skip_load { + program_ingress.load().map_err(map_program_error)?; + } + let link_id = program_ingress + .attach(&iface, TcAttachType::Ingress) + .map_err(map_program_error)?; + let link_id = program_ingress + .take_link(link_id) + .map_err(map_program_error)?; + + info!("eBPF ingress attached to {}", iface); + + Ok(link_id) + } + + fn attach_ebpf_egress( + &self, + iface: String, + bpf: &mut OckamBpf, + skip_load: bool, + ) -> Result { + info!("Attaching eBPF egress to {}", iface); + + let program_egress: &mut SchedClassifier = bpf + .bpf + .program_mut("ockam_egress") + .unwrap() + .try_into() + .map_err(map_program_error)?; + if !skip_load { + program_egress.load().map_err(map_program_error)?; + } + let link_id = program_egress + .attach(&iface, TcAttachType::Egress) + .map_err(map_program_error)?; + let link_id = program_egress + .take_link(link_id) + .map_err(map_program_error)?; + + info!("eBPF egress attached to {}", iface); + + Ok(link_id) + } + + /// Detach the eBPF. + pub fn detach_ebpfs(&self) { + for (_iface, link) in self.links.lock().unwrap().drain() { + _ = link.ingress.detach(); + _ = link.egress.detach(); + } + } + + /// Add inlet port + pub fn add_inlet_port(&self, port: u16) -> Result<()> { + let mut bpf = self.bpf.lock().unwrap(); + + bpf.as_mut() + .unwrap() + .inlet_port_map + .insert(port, self.ip_proto, 0) + .unwrap(); + + Ok(()) + } + + /// Remove inlet port + pub fn remove_inlet_port(&self, port: u16) -> Result<()> { + let mut bpf = self.bpf.lock().unwrap(); + + bpf.as_mut().unwrap().inlet_port_map.remove(&port).unwrap(); + + Ok(()) + } + + /// Add outlet port + pub fn add_outlet_port(&self, port: u16) -> Result<()> { + let mut bpf = self.bpf.lock().unwrap(); + + bpf.as_mut() + .unwrap() + .outlet_port_map + .insert(port, self.ip_proto, 0) + .unwrap(); + + Ok(()) + } + + /// Remove outlet port + pub fn remove_outlet_port(&self, port: u16) -> Result<()> { + let mut bpf = self.bpf.lock().unwrap(); + + bpf.as_mut().unwrap().outlet_port_map.remove(&port).unwrap(); + + Ok(()) + } +} + +fn map_bpf_error(bpf_error: BpfError) -> Error { + Error::new(Origin::Core, Kind::Io, bpf_error) +} + +fn map_program_error(program_error: ProgramError) -> Error { + Error::new(Origin::Core, Kind::Io, program_error) +} + +fn map_map_error(map_error: MapError) -> Error { + Error::new(Origin::Core, Kind::Io, map_error) +} + +#[cfg(test)] +// requires root to run +mod tests { + use crate::ebpf_portal::TcpTransportEbpfSupport; + use ockam_core::Result; + use ockam_node::Context; + + #[ignore] + #[ockam_macros::test] + async fn test_init(_ctx: &mut Context) -> Result<()> { + let ebpf_support = TcpTransportEbpfSupport::default(); + + ebpf_support.init_ebpf()?; + + Ok(()) + } + + #[ignore] + #[ockam_macros::test] + async fn test_attach(_ctx: &mut Context) -> Result<()> { + let ebpf_support = TcpTransportEbpfSupport::default(); + + ebpf_support.init_ebpf()?; + + ebpf_support.attach_ebpf("lo".to_string())?; + + Ok(()) + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/mod.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/mod.rs new file mode 100644 index 00000000000..a9beb984d6e --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/mod.rs @@ -0,0 +1,17 @@ +mod common; +mod ebpf_support; +mod outlet_listener_worker; +mod portal_processor; +mod portal_worker; +mod portals; +mod processor; +mod registry; +mod transport; + +pub use common::*; +pub use ebpf_support::*; +pub use outlet_listener_worker::*; +pub(crate) use portal_processor::*; +pub use portal_worker::*; +pub use processor::*; +pub use registry::*; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/outlet_listener_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/outlet_listener_worker.rs new file mode 100644 index 00000000000..c94cf300200 --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/outlet_listener_worker.rs @@ -0,0 +1,153 @@ +use crate::ebpf_portal::{ + OckamPortalPacket, OutletMappingValue, OutletRegistry, PortalProcessor, PortalWorker, + TcpTransportEbpfSupport, +}; +use crate::portal::addresses::{Addresses, PortalType}; +use crate::TcpOutletOptions; +use ockam_core::{async_trait, Address, AllowAll, Any, DenyAll, Result, Route, Routed, Worker}; +use ockam_node::Context; +use pnet::transport::TransportSender; +use std::net::Ipv4Addr; +use std::sync::{Arc, RwLock}; +use tokio::net::TcpListener; +use tracing::{debug, info, warn}; + +/// Worker listens for new incoming connections. +pub struct OutletListenerWorker { + options: TcpOutletOptions, + + socket_write_handle: Arc>, + outlet_registry: OutletRegistry, + + dst_ip: Ipv4Addr, + dst_port: u16, + + ebpf_support: TcpTransportEbpfSupport, +} + +impl OutletListenerWorker { + /// Constructor. + pub fn new( + options: TcpOutletOptions, + socket_write_handle: Arc>, + outlet_registry: OutletRegistry, + dst_ip: Ipv4Addr, + dst_port: u16, + ebpf_support: TcpTransportEbpfSupport, + ) -> Self { + Self { + options, + socket_write_handle, + outlet_registry, + dst_ip, + dst_port, + ebpf_support, + } + } + + async fn new_outlet_connection( + &self, + ctx: &Context, + src_addr: Address, + msg: OckamPortalPacket<'_>, + return_route: Route, + ) -> Result<()> { + // TODO: Remove connection eventually? + + // debug!("New TCP connection"); + info!("New TCP connection"); + + let addresses = Addresses::generate(PortalType::EbpfOutlet); + + self.options + .setup_flow_control_for_outlet(ctx.flow_controls(), &addresses, &src_addr); + + let (sender, receiver) = tokio::sync::mpsc::channel(128); + + // FIXME: eBPF Address? + let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let assigned_port = tcp_listener.local_addr().unwrap().port(); + + let mapping = OutletMappingValue { + inlet_worker_address: return_route.recipient()?, + _addresses: addresses.clone(), + sender, + assigned_port, + }; + + let processor = PortalProcessor::new_outlet( + receiver, + addresses.clone(), + return_route, + tcp_listener, + assigned_port, + self.ebpf_support.clone(), + ); + let worker = PortalWorker::new( + None, + self.socket_write_handle.clone(), + assigned_port, + self.dst_ip, + self.dst_port, + Some(msg.into_owned()), + ); + + ctx.start_processor_with_access_control( + addresses.receiver_remote, + processor, + DenyAll, + AllowAll, + ) + .await?; + ctx.start_worker_with_access_control( + addresses.sender_remote, + worker, + AllowAll, // FIXME eBPF + DenyAll, + ) + .await?; + + self.outlet_registry.add_mapping(mapping.clone()); + + Ok(()) + } +} + +#[async_trait] +impl Worker for OutletListenerWorker { + type Message = Any; + type Context = Context; + + async fn handle_message( + &mut self, + ctx: &mut Self::Context, + msg: Routed, + ) -> Result<()> { + let return_route = msg.return_route(); + let src_addr = msg.src_addr(); + let inlet_worker_address = return_route.recipient()?; + let payload = msg.into_payload(); + + let msg: OckamPortalPacket = minicbor::decode(&payload)?; + + if msg.flags != 2 { + warn!("Outlet Listener Worker received a non SYN packet"); + return Ok(()); + } + + if self + .outlet_registry + .get_mapping2(&inlet_worker_address) + .is_some() + { + // FIXME: eBPF Should still send it + debug!("Received another SYN for an already created connection"); + return Ok(()); + } + + self.new_outlet_connection(ctx, src_addr, msg, return_route) + .await?; + + Ok(()) + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_processor.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_processor.rs new file mode 100644 index 00000000000..69ede154893 --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_processor.rs @@ -0,0 +1,105 @@ +use crate::ebpf_portal::{OckamPortalPacket, Port, RawSocketMessage, TcpTransportEbpfSupport}; +use crate::portal::addresses::Addresses; +use ockam_core::{async_trait, route, LocalMessage, Processor, Result, Route}; +use ockam_node::Context; +use std::sync::{Arc, RwLock}; +use tokio::net::TcpListener; +use tokio::sync::mpsc::Receiver; +use tracing::info; + +/// Processor responsible for receiving TCP packets for a certain connection. +// TODO: eBPF Can be a worker instead? +pub(crate) struct PortalProcessor { + receiver: Receiver, + addresses: Addresses, + current_route: Arc>, + assigned_port_state: Option, +} + +struct AssignedPortState { + _tcp_listener: TcpListener, // Just hold it so that port is marked as taken + assigned_port: Port, + ebpf_support: TcpTransportEbpfSupport, +} + +impl PortalProcessor { + /// Constructor. + pub fn new_inlet( + receiver: Receiver, + addresses: Addresses, + current_route: Arc>, + ) -> Self { + Self { + receiver, + addresses, + current_route, + assigned_port_state: None, + } + } + + /// Constructor. + pub fn new_outlet( + receiver: Receiver, + addresses: Addresses, + current_route: Route, // Immutable + tcp_listener: TcpListener, + assigned_port: Port, + ebpf_support: TcpTransportEbpfSupport, + ) -> Self { + Self { + receiver, + addresses, + current_route: Arc::new(RwLock::new(current_route)), + assigned_port_state: Some(AssignedPortState { + _tcp_listener: tcp_listener, + assigned_port, + ebpf_support, + }), + } + } +} + +#[async_trait] +impl Processor for PortalProcessor { + type Context = Context; + + async fn initialize(&mut self, _context: &mut Self::Context) -> Result<()> { + if let Some(state) = &self.assigned_port_state { + state.ebpf_support.add_outlet_port(state.assigned_port)?; + } + + Ok(()) + } + + async fn shutdown(&mut self, _context: &mut Self::Context) -> Result<()> { + if let Some(state) = &self.assigned_port_state { + state.ebpf_support.remove_outlet_port(state.assigned_port)?; + } + + Ok(()) + } + + async fn process(&mut self, ctx: &mut Self::Context) -> Result { + let message = match self.receiver.recv().await { + Some(message) => message, + None => return Ok(false), + }; + + let packet = OckamPortalPacket::from(message); + + // debug!("Got packet, forwarding to the other side"); + info!("Got packet, forwarding to the other side"); + + let current_route = self.current_route.read().unwrap().clone(); + ctx.forward_from_address( + LocalMessage::new() + .with_onward_route(current_route) + .with_return_route(route![self.addresses.sender_remote.clone()]) + .with_payload(minicbor::to_vec(packet)?), + self.addresses.receiver_remote.clone(), + ) + .await?; + + Ok(true) + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_worker.rs new file mode 100644 index 00000000000..84b122b6ef5 --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portal_worker.rs @@ -0,0 +1,120 @@ +use crate::ebpf_portal::OckamPortalPacket; +use ockam_core::{async_trait, Any, Result, Route, Routed, Worker}; +use ockam_node::Context; +use pnet::packet::tcp::MutableTcpPacket; +use pnet::transport::TransportSender; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::{Arc, RwLock}; +use tracing::info; + +/// Worker responsible for writing data to the Socket that is received from the other side of the +/// TCP connection. +pub struct PortalWorker { + current_route: Option>>, + // FIXME: eBPF I doubt there should be a mutable usage + socket_write_handle: Arc>, + src_port: u16, + dst_ip: Ipv4Addr, + dst_port: u16, + + first_message: Option>, +} + +impl PortalWorker { + /// Constructor. + pub fn new( + current_route: Option>>, + socket_write_handle: Arc>, + src_port: u16, + dst_ip: Ipv4Addr, + dst_port: u16, + first_message: Option>, + ) -> Self { + Self { + current_route, + socket_write_handle, + src_port, + dst_ip, + dst_port, + first_message, + } + } + + async fn handle(&self, msg: OckamPortalPacket<'_>) -> Result<()> { + let buff_len = (msg.data_offset as usize) * 4 + msg.payload.len(); + + let buff = vec![0u8; buff_len]; + let mut packet = MutableTcpPacket::owned(buff).unwrap(); + + packet.set_sequence(msg.sequence); + packet.set_acknowledgement(msg.acknowledgement); + packet.set_data_offset(msg.data_offset); + packet.set_reserved(msg.reserved); + packet.set_flags(msg.flags); + packet.set_window(msg.window); + packet.set_urgent_ptr(msg.urgent_ptr); + packet.set_options( + msg.options + .into_iter() + .map(Into::into) + .collect::>() + .as_slice(), + ); + packet.set_payload(&msg.payload); + + packet.set_source(self.src_port); + packet.set_destination(self.dst_port); + + let check = pnet::packet::tcp::ipv4_checksum( + &packet.to_immutable(), + // checksum is adjusted inside the eBPF in respect to the correct src IP addr + &Ipv4Addr::new(0, 0, 0, 0), + &self.dst_ip, + ); + + packet.set_checksum(check); + + let packet = packet.to_immutable(); + + self.socket_write_handle + .write() + .unwrap() + .send_to(packet, IpAddr::V4(self.dst_ip)) + .unwrap(); + + Ok(()) + } +} + +#[async_trait] +impl Worker for PortalWorker { + type Message = Any; + type Context = Context; + + async fn initialize(&mut self, _context: &mut Self::Context) -> Result<()> { + if let Some(msg) = self.first_message.take() { + self.handle(msg).await?; + } + + Ok(()) + } + + async fn handle_message( + &mut self, + _ctx: &mut Self::Context, + msg: Routed, + ) -> Result<()> { + // debug!("Got message, forwarding to the socket"); + info!("Got message, forwarding to the socket"); + + if let Some(current_route) = &self.current_route { + *current_route.write().unwrap() = msg.return_route(); + } + + let payload = msg.into_payload(); + + let msg: OckamPortalPacket = minicbor::decode(&payload)?; + + self.handle(msg).await + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs new file mode 100644 index 00000000000..87f9dafe76a --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/portals.rs @@ -0,0 +1,166 @@ +use crate::ebpf_portal::OutletListenerWorker; +use crate::portal::InletSharedState; +use crate::{TcpInlet, TcpInletOptions, TcpOutletOptions, TcpTransport}; +use core::fmt::Debug; +use ockam_core::{Address, DenyAll, Result, Route}; +use ockam_node::compat::asynchronous::resolve_peer; +use ockam_node::WorkerBuilder; +use ockam_transport_core::HostnamePort; +use std::net::{IpAddr, SocketAddrV4}; +use std::sync::{Arc, RwLock}; +use tokio::net::TcpListener; +use tracing::instrument; + +impl TcpTransport { + /// Create a Raw Inlet + #[instrument(skip(self), fields(outlet_route=?outlet_route.clone()))] + pub async fn create_raw_inlet( + &self, + bind_addr: impl Into + Clone + Debug, + outlet_route: impl Into + Clone + Debug, + options: TcpInletOptions, + ) -> Result { + // TODO: eBPF Find correlation between bind_addr and iface? + let bind_addr = bind_addr.into(); + let tcp_listener = TcpListener::bind(bind_addr.clone()).await.unwrap(); // FIXME eBPF + + let local_address = tcp_listener.local_addr().unwrap(); // FIXME eBPF + let ip = match local_address.ip() { + IpAddr::V4(ip) => ip, + IpAddr::V6(_) => { + panic!() // FIXME eBPF + } + }; + let port = local_address.port(); + + let ifaddrs = nix::ifaddrs::getifaddrs().unwrap(); + for ifaddr in ifaddrs { + let addr = match ifaddr.address { + Some(addr) => addr, + None => continue, + }; + + let addr = match addr.as_sockaddr_in() { + Some(addr) => *addr, + None => continue, + }; + + let addr = SocketAddrV4::from(addr); + + if &ip == addr.ip() || ip.is_unspecified() { + // TODO: eBPF Should we instead attach to all interfaces & run a periodic task + // to identify network interfaces change? + self.attach_ebpf_if_needed(ifaddr.interface_name)?; + } + } + + let _write_handle = self.start_raw_socket_processor_if_needed().await?; + + let inlet_shared_state = Arc::new(RwLock::new(InletSharedState { + route: outlet_route.into(), + is_paused: false, + })); + + self.ebpf_support.inlet_registry.create_inlet( + options, + local_address.port(), + tcp_listener, + inlet_shared_state.clone(), + ); + + self.ebpf_support.add_inlet_port(port)?; + + Ok(TcpInlet::new_ebpf(local_address, inlet_shared_state)) + } + + /// Stop the Raw Inlet + #[instrument(skip(self), fields(port=port))] + pub async fn stop_raw_inlet(&self, port: u16) -> Result<()> { + self.ebpf_support.inlet_registry.delete_inlet(port); + + Ok(()) + } + + /// Create a Raw Outlet + #[instrument(skip(self), fields(address = ? address.clone().into(), peer=peer.clone().to_string()))] + pub async fn create_raw_outlet( + &self, + address: impl Into
+ Clone + Debug, + peer: HostnamePort, + options: TcpOutletOptions, + ) -> Result<()> { + // Resolve peer address as a host name and port + tracing::Span::current().record("peer", peer.to_string()); + + let address = address.into(); + + // TODO: eBPF May be good to run resolution every time there is incoming connection, but that + // would require also updating the self.ebpf_support.outlet_registry + let destination = resolve_peer(peer.to_string()).await?; + + let dst_ip = match destination.ip() { + IpAddr::V4(ip) => ip, + IpAddr::V6(_) => { + // FIXME eBPF + panic!() + } + }; + let dst_port = destination.port(); + + // TODO: eBPF Figure out which ifaces might be used and only attach to them + // TODO: eBPF Should we indeed attach to all interfaces & run a periodic task + // to identify network interfaces change? + let ifaddrs = nix::ifaddrs::getifaddrs().unwrap(); + for ifaddr in ifaddrs { + let addr = match ifaddr.address { + Some(addr) => addr, + None => continue, + }; + + if addr.as_sockaddr_in().is_none() { + continue; + }; + + self.attach_ebpf_if_needed(ifaddr.interface_name)?; + } + + let write_handle = self.start_raw_socket_processor_if_needed().await?; + + let access_control = options.incoming_access_control.clone(); + + options.setup_flow_control_for_outlet_listener(self.ctx().flow_controls(), &address); + + let outlet_listener_worker = OutletListenerWorker::new( + options, + write_handle, + self.ebpf_support.outlet_registry.clone(), + dst_ip, + dst_port, + self.ebpf_support.clone(), + ); + + WorkerBuilder::new(outlet_listener_worker) + .with_address(address) + .with_incoming_access_control_arc(access_control) + .with_outgoing_access_control(DenyAll) + .start(self.ctx()) + .await?; + + self.ebpf_support + .outlet_registry + .add_outlet(dst_ip, dst_port); + + Ok(()) + } + + /// Stop the Raw Inlet + #[instrument(skip(self), fields(address = % addr.clone().into()))] + pub async fn stop_raw_outlet(&self, addr: impl Into
+ Clone + Debug) -> Result<()> { + self.ctx().stop_worker(addr).await?; + + // TODO: eBPF Remove from the registry + // self.ebpf_support.outlet_registry + + Ok(()) + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/processor.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/processor.rs new file mode 100644 index 00000000000..f94206097d3 --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/processor.rs @@ -0,0 +1,253 @@ +use crate::ebpf_portal::{ + InletMappingValue, InletRegistry, OutletRegistry, PortalProcessor, PortalWorker, + RawSocketMessage, +}; +use crate::portal::addresses::{Addresses, PortalType}; +use crate::portal::InletSharedState; +use crate::TcpInletOptions; +use ockam_core::{async_trait, AllowAll, DenyAll, Processor, Result}; +use ockam_node::Context; +use pnet::packet::ip::IpNextHeaderProtocol; +use pnet::packet::Packet; +use pnet::transport; +use pnet::transport::{ + tcp_packet_iter, TransportChannelType, TransportProtocol, TransportReceiver, TransportSender, +}; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::{Arc, RwLock}; +use tracing::{info, warn}; + +/// Processor responsible for receiving all data with OCKAM_TCP_PORTAL_PROTOCOL on the machine +/// and redirect it to individual portal workers. +pub struct RawSocketProcessor { + socket_write_handle: Arc>, + socket_read_handle: Arc>, + + inlet_registry: InletRegistry, + outlet_registry: OutletRegistry, +} + +impl RawSocketProcessor { + pub(crate) async fn create( + ip_proto: u8, + inlet_registry: InletRegistry, + outlet_registry: OutletRegistry, + ) -> Result { + let (socket_write_handle, socket_read_handle) = transport::transport_channel( + 1024 * 1024, + TransportChannelType::Layer4(TransportProtocol::Ipv4(IpNextHeaderProtocol::new( + ip_proto, + ))), + ) + .unwrap(); + + let s = Self { + socket_write_handle: Arc::new(RwLock::new(socket_write_handle)), + socket_read_handle: Arc::new(RwLock::new(socket_read_handle)), + inlet_registry, + outlet_registry, + }; + + Ok(s) + } + + async fn new_inlet_connection( + ctx: &Context, + options: TcpInletOptions, + inlet_shared_state: Arc>, + socket_write_handle: Arc>, + registry: &InletRegistry, + src_ip: Ipv4Addr, + parsed_packet: &ParsedPacket, + ) -> Result> { + // TODO: eBPF Remove connection eventually + + let addresses = Addresses::generate(PortalType::EbpfInlet); + + let inlet_shared_state = inlet_shared_state.read().unwrap().clone(); + + if inlet_shared_state.is_paused { + // Just drop the stream + return Ok(None); + } + + options.setup_flow_control( + ctx.flow_controls(), + &addresses, + inlet_shared_state.route.next()?, + ); + + // TODO: Make sure the connection can't be spoofed by someone having access to that Outlet + + let (sender, receiver) = tokio::sync::mpsc::channel(128); + + let mapping = InletMappingValue { + client_ip: src_ip, + client_port: parsed_packet.source, + _addresses: addresses.clone(), + sender, + }; + + let outlet_route = Arc::new(RwLock::new(inlet_shared_state.route)); + let processor = + PortalProcessor::new_inlet(receiver, addresses.clone(), outlet_route.clone()); + let worker = PortalWorker::new( + Some(outlet_route), + socket_write_handle, + parsed_packet.destination, + src_ip, + parsed_packet.source, + None, + ); + + ctx.start_processor_with_access_control( + addresses.receiver_remote, + processor, + DenyAll, + AllowAll, // FIXME eBPF + ) + .await?; + ctx.start_worker_with_access_control( + addresses.sender_remote, + worker, + AllowAll, // FIXME eBPF + DenyAll, + ) + .await?; + + registry.add_mapping(mapping.clone()); + + Ok(Some(mapping)) + } + + /// Write handle to the socket + pub fn socket_write_handle(&self) -> Arc> { + self.socket_write_handle.clone() + } +} + +struct ParsedPacket { + message: RawSocketMessage, + + source_ip: Ipv4Addr, + flags: u8, + source: u16, + destination: u16, +} + +#[async_trait] +impl Processor for RawSocketProcessor { + type Context = Context; + + async fn process(&mut self, ctx: &mut Self::Context) -> Result { + let socket_read_handle = self.socket_read_handle.clone(); + let parsed_packet = tokio::task::spawn_blocking(move || { + let mut socket_read_handle = socket_read_handle.write().unwrap(); // FIXME + let mut iterator = tcp_packet_iter(&mut socket_read_handle); + // TODO: Should we check the checksum? + let (packet, source_ip) = iterator.next().unwrap(); // FIXME + + let source_ip = match source_ip { + IpAddr::V4(ip) => ip, + IpAddr::V6(_) => return None, + }; + + let source = packet.get_source(); + let destination = packet.get_destination(); + let flags = packet.get_flags(); + + info!( + "PACKET LEN: {}. Source: {}, Destination: {}", + packet.payload().len(), + source, + destination, + ); + + let message = RawSocketMessage::from_packet(packet, source_ip); + + let parsed_packet = ParsedPacket { + message, + source_ip, + flags, + source, + destination, + }; + + Some(parsed_packet) + }) + .await + .unwrap(); + + let parsed_packet = match parsed_packet { + Some(parsed_packet) => parsed_packet, + None => return Ok(false), + }; + + if let Some((inlet_shared_state, options)) = self + .inlet_registry + .get_inlets_info(parsed_packet.destination) + { + let mapping = match self + .inlet_registry + .get_mapping(parsed_packet.source_ip, parsed_packet.source) + { + Some(mapping) => { + // trace!("Existing connection from {}", packet.get_source()); + info!("Existing connection from {}", parsed_packet.source); + mapping + } + None => { + if parsed_packet.flags != 2 { + warn!( + "Unknown connection packet from {}. Skipping", + parsed_packet.source + ); + return Ok(true); + } + + // debug!("New connection from {}", packet.get_source()); + info!("New connection from {}", parsed_packet.source); + match Self::new_inlet_connection( + ctx, + options, + inlet_shared_state, + self.socket_write_handle.clone(), + &self.inlet_registry, + parsed_packet.source_ip, + &parsed_packet, + ) + .await? + { + Some(mapping) => mapping, + None => return Ok(true), + } + } + }; + + mapping.sender.send(parsed_packet.message).await.unwrap(); + + return Ok(true); + } + + let _outlet = match self + .outlet_registry + .get_outlet(parsed_packet.source_ip, parsed_packet.source) + { + Some(outlet) => outlet, + None => return Ok(true), + }; + + let mapping = match self.outlet_registry.get_mapping(parsed_packet.destination) { + Some(mapping) => { + // trace!("Existing connection to {}", packet.get_destination()); + info!("Existing connection to {}", parsed_packet.destination); + mapping + } + None => return Ok(true), + }; + + mapping.sender.send(parsed_packet.message).await.unwrap(); + + Ok(true) + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs new file mode 100644 index 00000000000..80a8532d7c5 --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/inlet.rs @@ -0,0 +1,99 @@ +use crate::ebpf_portal::RawSocketMessage; +use crate::portal::addresses::Addresses; +use crate::portal::InletSharedState; +use crate::TcpInletOptions; +use std::collections::HashMap; +use std::net::Ipv4Addr; +use std::sync::{Arc, RwLock}; +use tokio::net::TcpListener; +use tokio::sync::mpsc::Sender; + +/// Inlet registry +#[derive(Default, Clone)] +pub(crate) struct InletRegistry { + inlets: Arc>>, + mapping: Arc>>, +} + +impl InletRegistry { + /// Add new mapping + pub fn add_mapping(&self, mapping: InletMappingValue) { + // FIXME: eBPF duplicates + self.mapping.write().unwrap().push(mapping) + } + + /// Get mapping + pub fn get_mapping(&self, client_ip: Ipv4Addr, client_port: u16) -> Option { + let mapping = self.mapping.read().unwrap(); + + mapping.iter().find_map(|x| { + if x.client_ip == client_ip && x.client_port == client_port { + Some(x.clone()) + } else { + None + } + }) + } + + /// Get inlets + pub fn get_inlets_info( + &self, + dst_port: u16, + ) -> Option<(Arc>, TcpInletOptions)> { + let inlets = self.inlets.read().unwrap(); + + let inlet = inlets.get(&dst_port)?; + + Some((inlet.inlet_shared_state.clone(), inlet.options.clone())) + } + + /// Create inlet + pub fn create_inlet( + &self, + options: TcpInletOptions, + port: u16, + tcp_listener: TcpListener, + inlet_shared_state: Arc>, + ) { + let mut inlets = self.inlets.write().unwrap(); + + inlets.insert( + port, + InletInfo { + options, + inlet_shared_state, + tcp_listener, + }, + ); + } + + /// Delete the inlet + pub fn delete_inlet(&self, port: u16) { + let mut inlets = self.inlets.write().unwrap(); + + inlets.remove(&port); + } +} + +/// Inlet info +pub struct InletInfo { + /// Route to the corresponding Outlet + pub inlet_shared_state: Arc>, + /// Options + pub options: TcpInletOptions, + /// Hold to mark the port as taken + pub tcp_listener: TcpListener, +} + +/// Inlet Mapping +#[derive(Clone)] +pub(crate) struct InletMappingValue { + /// Client IP + pub client_ip: Ipv4Addr, + /// Client port + pub client_port: u16, + /// Addresses + pub _addresses: Addresses, + /// Sender to a processor + pub sender: Sender, +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/mod.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/mod.rs new file mode 100644 index 00000000000..2a571e3e283 --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/mod.rs @@ -0,0 +1,5 @@ +mod inlet; +mod outlet; + +pub use inlet::*; +pub use outlet::*; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs new file mode 100644 index 00000000000..2f92daeaea9 --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/registry/outlet.rs @@ -0,0 +1,96 @@ +use crate::ebpf_portal::RawSocketMessage; +use crate::portal::addresses::Addresses; +use ockam_core::Address; +use std::collections::HashMap; +use std::net::Ipv4Addr; +use std::sync::{Arc, RwLock}; +use tokio::sync::mpsc::Sender; + +/// Outlet registry +#[derive(Default, Clone)] +pub struct OutletRegistry { + targets_ports: Arc>>, + mapping: Arc>>, +} + +impl OutletRegistry { + /// Add mapping + pub(crate) fn add_mapping(&self, mapping: OutletMappingValue) { + // FIXME: eBPF duplicates + self.mapping.write().unwrap().push(mapping) + } + + /// Get mapping + pub(crate) fn get_mapping(&self, dst_port: u16) -> Option { + let mapping = self.mapping.read().unwrap(); + + mapping.iter().find_map(|x| { + if x.assigned_port == dst_port { + Some(x.clone()) + } else { + None + } + }) + } + + /// Get mapping + pub(crate) fn get_mapping2( + &self, + inlet_worker_address: &Address, + ) -> Option { + let mapping = self.mapping.read().unwrap(); + + mapping.iter().find_map(|x| { + if &x.inlet_worker_address == inlet_worker_address { + Some(x.clone()) + } else { + None + } + }) + } + + /// Get outlet + pub fn get_outlet(&self, src_ip: Ipv4Addr, src_port: u16) -> Option { + self.targets_ports + .read() + .unwrap() + .get(&OutletKey { + dst_ip: src_ip, + dst_port: src_port, + }) + .cloned() + } + + /// Add outlet + pub fn add_outlet(&self, dst_ip: Ipv4Addr, dst_port: u16) { + // TODO: eBPF Duplicates? + self.targets_ports + .write() + .unwrap() + .insert(OutletKey { dst_ip, dst_port }, OutletInfo {}); + } +} + +#[derive(Hash, PartialEq, Eq)] +struct OutletKey { + dst_ip: Ipv4Addr, + dst_port: u16, +} + +/// Outlet info +#[derive(Clone)] +pub struct OutletInfo {} + +/// Outlet mapping +#[derive(Clone)] +pub(crate) struct OutletMappingValue { + // TODO: eBPF Add identifier? + /// The other side's Inlet worker address + pub inlet_worker_address: Address, + /// Assigned port on our machine for a specific connection + pub assigned_port: u16, + /// Addresses + pub _addresses: Addresses, + /// Sender to the processor + pub sender: Sender, +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/transport.rs b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/transport.rs new file mode 100644 index 00000000000..8947451faad --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/ebpf_portal/transport.rs @@ -0,0 +1,32 @@ +use crate::ebpf_portal::Iface; +use crate::TcpTransport; +use ockam_core::Result; +use pnet::transport::TransportSender; +use std::sync::{Arc, RwLock}; + +impl TcpTransport { + /// Start [`RawSocketProcessor`]. Should be done once. + pub(crate) async fn start_raw_socket_processor_if_needed( + &self, + ) -> Result>> { + self.ebpf_support + .start_raw_socket_processor_if_needed(self.ctx()) + .await + } + + // TODO: eBPF Should we dispatch it to the sync thread? + pub(crate) fn attach_ebpf_if_needed(&self, iface: Iface) -> Result<()> { + self.ebpf_support.attach_ebpf_if_needed(iface) + } + + /// Detach the eBPFs. + pub fn detach_ebpfs(&self) { + self.ebpf_support.detach_ebpfs() + } +} + +impl Drop for TcpTransport { + fn drop(&mut self) { + self.detach_ebpfs() + } +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs b/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs index d141de544b4..f5d2bf19df7 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs @@ -38,6 +38,10 @@ pub use protocol_version::*; pub use registry::*; pub use transport::*; +#[cfg(ebpf_alias)] +/// eBPF backed TCP portals that works on TCP level rather than on top of TCP +pub mod ebpf_portal; + pub(crate) const CLUSTER_NAME: &str = "_internals.transport.tcp"; /// Transport type for TCP addresses diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/addresses.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/addresses.rs index 1a149d562f6..7dcad1755a3 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/addresses.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/addresses.rs @@ -2,16 +2,27 @@ use ockam_core::Address; /// Enumerate all portal types #[derive(Debug, Eq, PartialEq, Clone)] -pub(super) enum PortalType { +pub(crate) enum PortalType { Inlet, Outlet, + #[allow(unused)] + EbpfInlet, + #[allow(unused)] + EbpfOutlet, } impl PortalType { pub fn str(&self) -> &'static str { match self { - PortalType::Inlet => "inlet", - PortalType::Outlet => "outlet", + PortalType::Inlet | PortalType::EbpfInlet => "inlet", + PortalType::Outlet | PortalType::EbpfOutlet => "outlet", + } + } + + pub fn is_ebpf(&self) -> bool { + match self { + PortalType::Inlet | PortalType::Outlet => false, + PortalType::EbpfInlet | PortalType::EbpfOutlet => true, } } } @@ -29,19 +40,28 @@ pub(crate) struct Addresses { } impl Addresses { - pub(super) fn generate(portal_type: PortalType) -> Self { + pub(crate) fn generate(portal_type: PortalType) -> Self { let type_name = portal_type.str(); - let sender_internal = - Address::random_tagged(&format!("TcpPortalWorker.{}.sender_internal", type_name)); - let sender_remote = - Address::random_tagged(&format!("TcpPortalWorker.{}.sender_remote", type_name)); + let ebpf_str = if portal_type.is_ebpf() { + "ebpf" + } else { + "non_ebpf" + }; + let sender_internal = Address::random_tagged(&format!( + "TcpPortalWorker.{}.{}.sender_internal", + ebpf_str, type_name + )); + let sender_remote = Address::random_tagged(&format!( + "TcpPortalWorker.{}.{}.sender_remote", + ebpf_str, type_name + )); let receiver_internal = Address::random_tagged(&format!( - "TcpPortalRecvProcessor.{}.receiver_internal", - type_name + "TcpPortalRecvProcessor.{}.{}.receiver_internal", + ebpf_str, type_name )); let receiver_remote = Address::random_tagged(&format!( - "TcpPortalRecvProcessor.{}.receiver_remote", - type_name + "TcpPortalRecvProcessor.{}.{}.receiver_remote", + ebpf_str, type_name )); Self { diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs index 42ff2e53346..b9dd3a4b5e9 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs @@ -23,7 +23,7 @@ use tracing::{debug, error, instrument}; #[derive(Debug, Clone)] pub struct InletSharedState { pub route: Route, - pub is_paused: bool, + pub is_paused: bool, // FIXME: eBPF Not implemented } /// A TCP Portal Inlet listen processor @@ -34,7 +34,7 @@ pub struct InletSharedState { pub(crate) struct TcpInletListenProcessor { registry: TcpRegistry, inner: TcpListener, - outlet_shared_state: Arc>, + inlet_shared_state: Arc>, options: TcpInletOptions, } @@ -42,13 +42,13 @@ impl TcpInletListenProcessor { pub fn new( registry: TcpRegistry, inner: TcpListener, - outlet_shared_state: Arc>, + inlet_shared_state: Arc>, options: TcpInletOptions, ) -> Self { Self { registry, inner, - outlet_shared_state, + inlet_shared_state, options, } } @@ -73,20 +73,20 @@ impl TcpInletListenProcessor { } }; let socket_addr = inner.local_addr().map_err(TransportError::from)?; - let outlet_shared_state = InletSharedState { + let inlet_shared_state = InletSharedState { route: outlet_listener_route, is_paused: options.is_paused, }; - let outlet_shared_state = Arc::new(RwLock::new(outlet_shared_state)); - let processor = Self::new(registry, inner, outlet_shared_state.clone(), options); + let inlet_shared_state = Arc::new(RwLock::new(inlet_shared_state)); + let processor = Self::new(registry, inner, inlet_shared_state.clone(), options); ctx.start_processor(processor_address.clone(), processor) .await?; - Ok(TcpInlet::new( + Ok(TcpInlet::new_regular( socket_addr, processor_address, - outlet_shared_state, + inlet_shared_state, )) } @@ -190,9 +190,9 @@ impl Processor for TcpInletListenProcessor { let addresses = Addresses::generate(PortalType::Inlet); - let outlet_shared_state = self.outlet_shared_state.read().unwrap().clone(); + let inlet_shared_state = self.inlet_shared_state.read().unwrap().clone(); - if outlet_shared_state.is_paused { + if inlet_shared_state.is_paused { // Just drop the stream return Ok(true); } @@ -200,7 +200,7 @@ impl Processor for TcpInletListenProcessor { self.options.setup_flow_control( ctx.flow_controls(), &addresses, - outlet_shared_state.route.next()?, + inlet_shared_state.route.next()?, ); let streams = if let Some(certificate_provider) = &self.options.tls_certificate_provider { @@ -225,12 +225,13 @@ impl Processor for TcpInletListenProcessor { ) }; + // TODO: Make sure the connection can't be spoofed by someone having access to that Outlet TcpPortalWorker::start_new_inlet( ctx, self.registry.clone(), streams, HostnamePort::from(socket_addr), - outlet_shared_state.route, + inlet_shared_state.route, addresses, self.options.incoming_access_control.clone(), self.options.outgoing_access_control.clone(), diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs index 828f19aa94f..1112d6c4d21 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs @@ -1,4 +1,4 @@ -mod addresses; +pub mod addresses; mod inlet_listener; mod interceptor; pub mod options; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs index 29622296cb5..baaca7d8f7e 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs @@ -5,7 +5,7 @@ use ockam_core::flow_control::{FlowControlId, FlowControls}; use ockam_core::{Address, AllowAll, IncomingAccessControl, OutgoingAccessControl}; /// Trust Options for an Inlet -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct TcpInletOptions { pub(super) incoming_access_control: Arc, pub(super) outgoing_access_control: Arc, @@ -76,7 +76,7 @@ impl TcpInletOptions { self } - pub(super) fn setup_flow_control( + pub(crate) fn setup_flow_control( &self, flow_controls: &FlowControls, addresses: &Addresses, @@ -99,11 +99,11 @@ impl Default for TcpInletOptions { } /// Trust Options for an Outlet -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct TcpOutletOptions { pub(super) consumer: Vec, - pub(super) incoming_access_control: Arc, - pub(super) outgoing_access_control: Arc, + pub(crate) incoming_access_control: Arc, + pub(crate) outgoing_access_control: Arc, pub(super) tls: bool, } @@ -169,7 +169,7 @@ impl TcpOutletOptions { self } - pub(super) fn setup_flow_control_for_outlet_listener( + pub(crate) fn setup_flow_control_for_outlet_listener( &self, flow_controls: &FlowControls, address: &Address, @@ -179,7 +179,7 @@ impl TcpOutletOptions { } } - pub(super) fn setup_flow_control_for_outlet( + pub(crate) fn setup_flow_control_for_outlet( &self, flow_controls: &FlowControls, addresses: &Addresses, diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs index 6366bb75609..cfd4640f537 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs @@ -89,6 +89,8 @@ impl Worker for TcpOutletListenWorker { self.options .setup_flow_control_for_outlet(ctx.flow_controls(), &addresses, &src_addr); + // TODO: Make sure the connection can't be spoofed by someone having access to that Outlet + TcpPortalWorker::start_new_outlet( ctx, self.registry.clone(), diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/lifecycle.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/lifecycle.rs index c6effbfc276..5cfe5c626d0 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/lifecycle.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/lifecycle.rs @@ -21,10 +21,7 @@ impl TcpTransport { /// ``` #[instrument(name = "create tcp transport", skip_all)] pub async fn create(ctx: &Context) -> Result { - let tcp = Self { - ctx: Arc::new(ctx.async_try_clone().await?), - registry: TcpRegistry::default(), - }; + let tcp = Self::new(ctx.async_try_clone().await?); // make the TCP transport available in the list of supported transports for // later address resolution when socket addresses will need to be instantiated as TCP // worker addresses diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/mod.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/mod.rs index b57b5d4ecf0..2055fb1c5a3 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/mod.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/mod.rs @@ -58,6 +58,21 @@ use ockam_node::{Context, HasContext}; pub struct TcpTransport { ctx: Arc, registry: TcpRegistry, + + #[cfg(ebpf_alias)] + pub(crate) ebpf_support: crate::ebpf_portal::TcpTransportEbpfSupport, +} + +impl TcpTransport { + /// Constructor. + pub fn new(ctx: Context) -> Self { + Self { + ctx: Arc::new(ctx), + registry: TcpRegistry::default(), + #[cfg(ebpf_alias)] + ebpf_support: Default::default(), + } + } } /// This trait adds a `create_tcp_transport` method to any struct returning a Context. diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs index 8072cec8d23..a8a9b9bfb67 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/portals.rs @@ -4,7 +4,7 @@ use core::fmt; use core::fmt::{Debug, Formatter}; use ockam_core::compat::net::SocketAddr; use ockam_core::compat::sync::{Arc, RwLock}; -use ockam_core::{route, Address, Error, Result, Route}; +use ockam_core::{route, Address, Result, Route}; use ockam_node::Context; use ockam_transport_core::{parse_socket_addr, HostnamePort}; use tracing::instrument; @@ -76,24 +76,22 @@ impl TcpTransport { /// use ockam_transport_tcp::{TcpOutletOptions, TcpTransport}; /// # use ockam_node::Context; /// # use ockam_core::{AllowAll, Result}; - /// # async fn test(ctx: Context) -> Result<()> { + /// # use ockam_transport_core::HostnamePort; + /// + /// async fn test(ctx: Context) -> Result<()> { /// /// let tcp = TcpTransport::create(&ctx).await?; - /// tcp.create_outlet("outlet", "localhost:9000", TcpOutletOptions::new()).await?; + /// tcp.create_outlet("outlet", HostnamePort::new("localhost", 9000), TcpOutletOptions::new()).await?; /// # tcp.stop_outlet("outlet").await?; /// # Ok(()) } /// ``` - #[instrument(skip(self), fields(address = ? address.clone().into(), peer))] + #[instrument(skip(self), fields(address = ? address.clone().into(), peer=peer.clone().to_string()))] pub async fn create_outlet( &self, address: impl Into
+ Clone + Debug, - hostname_port: impl TryInto + Clone + Debug, + peer: HostnamePort, options: TcpOutletOptions, ) -> Result<()> { - // Resolve peer address as a host name and port - let peer = hostname_port.try_into()?; - tracing::Span::current().record("peer", peer.to_string()); - TcpOutletListenWorker::start( &self.ctx, self.registry.clone(), @@ -106,35 +104,17 @@ impl TcpTransport { Ok(()) } - /// Create Tcp Outlet Listener at address, that connects to peer using Tcp - #[instrument(skip(self))] - pub async fn create_tcp_outlet( - &self, - address: Address, - hostname_port: HostnamePort, - options: TcpOutletOptions, - ) -> Result<()> { - TcpOutletListenWorker::start( - &self.ctx, - self.registry.clone(), - address, - hostname_port, - options, - ) - .await?; - - Ok(()) - } - /// Stop outlet at addr /// ```rust /// use ockam_transport_tcp::{TcpOutletOptions, TcpTransport}; /// # use ockam_node::Context; /// # use ockam_core::{AllowAll, Result}; - /// # async fn test(ctx: Context) -> Result<()> { + /// # use ockam_transport_core::HostnamePort; + /// + /// async fn test(ctx: Context) -> Result<()> { /// /// let tcp = TcpTransport::create(&ctx).await?; - /// tcp.create_outlet("outlet", "127.0.0.1:5000", TcpOutletOptions::new()).await?; + /// tcp.create_outlet("outlet", HostnamePort::new("127.0.0.1", 5000), TcpOutletOptions::new()).await?; /// tcp.stop_outlet("outlet").await?; /// # Ok(()) } /// ``` @@ -149,31 +129,56 @@ impl TcpTransport { #[derive(Clone, Debug)] pub struct TcpInlet { socket_address: SocketAddr, - processor_address: Address, - outlet_state: Arc>, + state: TcpInletState, + inlet_shared_state: Arc>, +} + +#[derive(Clone, Debug)] +enum TcpInletState { + Ebpf, + Regular { processor_address: Address }, } impl fmt::Display for TcpInlet { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!( - f, - "Socket: {}, Processor: {}", - self.socket_address, self.processor_address - ) + match &self.state { + TcpInletState::Ebpf => { + write!(f, "Socket: {}. eBPF", self.socket_address) + } + TcpInletState::Regular { processor_address } => { + write!( + f, + "Socket: {}. Processor address: {}", + self.socket_address, processor_address + ) + } + } } } impl TcpInlet { /// Constructor - pub fn new( + pub fn new_regular( socket_address: SocketAddr, processor_address: Address, - outlet_state: Arc>, + inlet_shared_state: Arc>, ) -> Self { Self { socket_address, - processor_address, - outlet_state, + state: TcpInletState::Regular { processor_address }, + inlet_shared_state, + } + } + + /// Constructor + pub fn new_ebpf( + socket_address: SocketAddr, + inlet_shared_state: Arc>, + ) -> Self { + Self { + socket_address, + state: TcpInletState::Ebpf, + inlet_shared_state, } } @@ -183,8 +188,11 @@ impl TcpInlet { } /// Processor address - pub fn processor_address(&self) -> &Address { - &self.processor_address + pub fn processor_address(&self) -> Option<&Address> { + match &self.state { + TcpInletState::Ebpf => None, + TcpInletState::Regular { processor_address } => Some(processor_address), + } } fn build_new_full_route(new_route: Route, old_route: &Route) -> Result { @@ -198,32 +206,43 @@ impl TcpInlet { /// NOTE: Existing TCP connections will still use the old route, /// only newly accepted connections will use the new route. pub fn update_outlet_node_route(&self, new_route: Route) -> Result<()> { - let mut outlet_state = self.outlet_state.write().unwrap(); + let mut inlet_shared_state = self.inlet_shared_state.write().unwrap(); - outlet_state.route = Self::build_new_full_route(new_route, &outlet_state.route)?; + inlet_shared_state.route = + Self::build_new_full_route(new_route, &inlet_shared_state.route)?; Ok(()) } /// Pause TCP Inlet, all incoming TCP streams will be dropped. pub fn pause(&self) { - let mut outlet_state = self.outlet_state.write().unwrap(); + let mut inlet_shared_state = self.inlet_shared_state.write().unwrap(); - outlet_state.is_paused = true; + inlet_shared_state.is_paused = true; } /// Unpause TCP Inlet and update the outlet route. pub fn unpause(&self, new_route: Route) -> Result<()> { - let mut outlet_state = self.outlet_state.write().unwrap(); + let mut inlet_shared_state = self.inlet_shared_state.write().unwrap(); - outlet_state.route = Self::build_new_full_route(new_route, &outlet_state.route)?; - outlet_state.is_paused = false; + inlet_shared_state.route = + Self::build_new_full_route(new_route, &inlet_shared_state.route)?; + inlet_shared_state.is_paused = false; Ok(()) } /// Stop the Inlet pub async fn stop(&self, ctx: &Context) -> Result<()> { - ctx.stop_processor(self.processor_address.clone()).await + match &self.state { + TcpInletState::Ebpf => { + // TODO: eBPF + } + TcpInletState::Regular { processor_address } => { + ctx.stop_processor(processor_address.clone()).await?; + } + } + + Ok(()) } } diff --git a/implementations/rust/ockam/ockam_transport_tcp/tests/interceptor.rs b/implementations/rust/ockam/ockam_transport_tcp/tests/interceptor.rs index 93d3d4e5166..83abdddf974 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/tests/interceptor.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/tests/interceptor.rs @@ -51,8 +51,12 @@ async fn setup( let listener = { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let bind_address = listener.local_addr().unwrap().to_string(); - tcp.create_outlet("outlet", bind_address, TcpOutletOptions::new()) - .await?; + tcp.create_outlet( + "outlet", + bind_address.try_into().unwrap(), + TcpOutletOptions::new(), + ) + .await?; listener }; diff --git a/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs b/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs index 093be73e07a..b81607af37b 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs @@ -18,8 +18,12 @@ async fn setup(ctx: &Context) -> Result<(String, TcpListener)> { let listener = { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let bind_address = listener.local_addr().unwrap().to_string(); - tcp.create_outlet("outlet", bind_address, TcpOutletOptions::new()) - .await?; + tcp.create_outlet( + "outlet", + bind_address.try_into().unwrap(), + TcpOutletOptions::new(), + ) + .await?; listener }; @@ -136,7 +140,7 @@ async fn portal__tcp_connection__should_succeed(ctx: &mut Context) -> Result<()> let bind_address = listener.local_addr().unwrap().to_string(); tcp.create_outlet( "outlet", - bind_address, + bind_address.try_into().unwrap(), TcpOutletOptions::new().as_consumer(&outlet_flow_control_id), ) .await?; @@ -189,8 +193,12 @@ async fn portal__tcp_connection_with_invalid_message_flow__should_not_succeed( let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let bind_address = listener.local_addr().unwrap().to_string(); - tcp.create_outlet("outlet_invalid", bind_address, TcpOutletOptions::new()) - .await?; + tcp.create_outlet( + "outlet_invalid", + bind_address.try_into().unwrap(), + TcpOutletOptions::new(), + ) + .await?; let inlet = tcp .create_inlet( @@ -234,14 +242,15 @@ async fn portal__update_route__should_succeed(ctx: &mut Context) -> Result<()> { let tcp = TcpTransport::create(ctx).await?; let listener_outlet = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let listener_node = tcp - .listen("127.0.0.1:0", TcpListenerOptions::new()) - .await - .unwrap(); + let listener_node = tcp.listen("127.0.0.1:0", TcpListenerOptions::new()).await?; tcp.create_outlet( "outlet", - listener_outlet.local_addr().unwrap().to_string(), + listener_outlet + .local_addr() + .unwrap() + .to_string() + .try_into()?, TcpOutletOptions::new().as_consumer(listener_node.flow_control_id()), ) .await?; @@ -251,8 +260,7 @@ async fn portal__update_route__should_succeed(ctx: &mut Context) -> Result<()> { listener_node.socket_address().to_string(), TcpConnectionOptions::new(), ) - .await - .unwrap(); + .await?; let node_connection2 = tcp .connect( listener_node.socket_address().to_string(),