Skip to content

Commit

Permalink
feat(rust): add reliable TCP portals to ockam_api&ockam_command
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Sep 16, 2024
1 parent 0b5976e commit af10e83
Show file tree
Hide file tree
Showing 25 changed files with 153 additions and 36 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repository = "https://github.com/ockam-network/ockam/implementations/rust/ockam/
description = "Ockam's request-response API"

[features]
default = ["std", "rust-crypto"]
default = ["std", "rust-crypto", "ebpf"]
std = [
"either/use_std",
"hex/std",
Expand All @@ -42,6 +42,10 @@ std = [
storage = ["ockam/storage"]
aws-lc = ["ockam_vault/aws-lc", "ockam_transport_tcp/aws-lc"]
rust-crypto = ["ockam_vault/rust-crypto", "ockam_transport_tcp/ring"]
ebpf = ["ockam_transport_tcp/ebpf"]

[build-dependencies]
cfg_aliases = "0.2.1"

[dependencies]
base64-url = "3.0.0"
Expand Down
4 changes: 4 additions & 0 deletions implementations/rust/ockam/ockam_api/build.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use cfg_aliases::cfg_aliases;
use std::process::Command;

fn hash() {
Expand All @@ -11,4 +12,7 @@ fn hash() {

fn main() {
hash();
cfg_aliases! {
ebpf_alias: { all(target_os = "linux", feature = "ebpf") }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl KafkaInletController {
None,
false,
false,
false,
None,
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl KafkaOutletController {
Some(kafka_outlet_address(broker_id)),
false,
OutletAccessControl::WithPolicyExpression(self.policy_expression.clone()),
false,
)
.await
.map(|info| info.to)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use ockam_core::AllowAll;
use ockam_multiaddr::proto::Service;
use ockam_multiaddr::MultiAddr;
use ockam_node::compat::tokio;
use ockam_transport_core::HostnamePort;
use ockam_transport_tcp::PortalInletInterceptor;

// TODO: upgrade to 13 by adding a metadata request to map uuid<=>topic_name
Expand Down Expand Up @@ -160,7 +161,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
.tcp
.create_outlet(
"kafka_consumer_outlet",
format!("127.0.0.1:{}", consumer_mock_kafka.port),
HostnamePort::new("127.0.0.1", consumer_mock_kafka.port),
TcpOutletOptions::new(),
)
.await?;
Expand All @@ -180,7 +181,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
.tcp
.create_outlet(
"kafka_producer_outlet",
format!("127.0.0.1:{}", producer_mock_kafka.port),
HostnamePort::new("127.0.0.1", producer_mock_kafka.port),
TcpOutletOptions::new(),
)
.await?;
Expand Down Expand Up @@ -216,7 +217,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
.tcp
.create_outlet(
"kafka_consumer_outlet",
format!("127.0.0.1:{}", consumer_mock_kafka.port),
HostnamePort::new("127.0.0.1", consumer_mock_kafka.port),
TcpOutletOptions::new(),
)
.await?;
Expand Down
12 changes: 11 additions & 1 deletion implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ pub struct CreateInlet {
/// Disable fallback to TCP.
/// TCP won't be used to transfer data between the Inlet and the Outlet.
#[n(12)] pub(crate) disable_tcp_fallback: bool,
/// Use eBPF and RawSocket to access TCP packets instead of TCP data stream.
#[n(13)] pub(crate) ebpf: bool,
/// TLS certificate provider route.
#[n(13)] pub(crate) tls_certificate_provider: Option<MultiAddr>,
#[n(14)] pub(crate) tls_certificate_provider: Option<MultiAddr>,
}

impl CreateInlet {
Expand All @@ -73,6 +75,7 @@ impl CreateInlet {
wait_connection: bool,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
) -> Self {
Self {
listen_addr: listen,
Expand All @@ -87,6 +90,7 @@ impl CreateInlet {
secure_channel_identifier: None,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider: None,
}
}
Expand All @@ -102,6 +106,7 @@ impl CreateInlet {
wait_connection: bool,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
) -> Self {
Self {
listen_addr: listen,
Expand All @@ -116,6 +121,7 @@ impl CreateInlet {
secure_channel_identifier: None,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider: None,
}
}
Expand Down Expand Up @@ -183,6 +189,8 @@ pub struct CreateOutlet {
/// If not set, the policy set for the [TCP outlet resource type](ockam_abac::ResourceType::TcpOutlet)
/// will be used.
#[n(5)] pub policy_expression: Option<PolicyExpression>,
/// Use eBPF and RawSocket to access TCP packets instead of TCP data stream.
#[n(6)] pub ebpf: bool
}

impl CreateOutlet {
Expand All @@ -191,13 +199,15 @@ impl CreateOutlet {
tls: bool,
worker_addr: Option<Address>,
reachable_from_default_secure_channel: bool,
ebpf: bool,
) -> Self {
Self {
hostname_port,
tls,
worker_addr,
reachable_from_default_secure_channel,
policy_expression: None,
ebpf,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl InMemoryNode {
None,
false,
false,
false,
None,
)
.await?;
Expand Down Expand Up @@ -320,6 +321,7 @@ impl InMemoryNode {
Some(KAFKA_OUTLET_BOOTSTRAP_ADDRESS.into()),
false,
OutletAccessControl::WithPolicyExpression(outlet_policy_expression),
false,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl Inlets for BackgroundNodeClient {
secure_channel_identifier: &Option<Identifier>,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
tls_certificate_provider: &Option<MultiAddr>,
) -> miette::Result<Reply<InletStatus>> {
let request = {
Expand All @@ -41,6 +42,7 @@ impl Inlets for BackgroundNodeClient {
wait_connection,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
)
} else {
CreateInlet::to_node(
Expand All @@ -53,6 +55,7 @@ impl Inlets for BackgroundNodeClient {
wait_connection,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
)
};
if let Some(e) = policy_expression.as_ref() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl InMemoryNode {
secure_channel_identifier: Option<Identifier>,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
tls_certificate_provider: Option<MultiAddr>,
) -> Result<InletStatus> {
self.node_manager
Expand All @@ -45,6 +46,7 @@ impl InMemoryNode {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider,
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub trait Inlets {
secure_channel_identifier: &Option<Identifier>,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
tls_certificate_provider: &Option<MultiAddr>,
) -> miette::Result<Reply<InletStatus>>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl NodeManager {
enable_udp_puncture: bool,
// TODO: Introduce mode enum
disable_tcp_fallback: bool,
ebpf: bool,
tls_certificate_provider: Option<MultiAddr>,
) -> Result<InletStatus> {
info!("Handling request to create inlet portal");
Expand Down Expand Up @@ -126,6 +127,7 @@ impl NodeManager {
additional_secure_channel: None,
udp_puncture: None,
additional_route: None,
ebpf,
};

let replacer = Arc::new(Mutex::new(replacer));
Expand Down Expand Up @@ -184,7 +186,10 @@ impl NodeManager {

let tcp_inlet_status = InletStatus::new(
listen_addr.to_string(),
outcome.clone().map(|s| s.worker.address().to_string()),
outcome.clone().map(|s| match s.worker {
Some(address) => address.address().to_string(),
None => "<>".to_string(),
}),
&alias,
None,
outcome.clone().map(|s| s.route.to_string()),
Expand Down Expand Up @@ -233,9 +238,14 @@ impl NodeManager {
drop(session);
if let Some(outcome) = outcome {
if let ReplacerOutputKind::Inlet(status) = outcome {
let address = match &status.worker {
Some(address) => address.address().to_string(),
None => "<>".to_string(),
};

Some(InletStatus::new(
inlet_info.bind_addr.to_string(),
status.worker.address().to_string(),
address,
alias,
None,
status.route.to_string(),
Expand Down Expand Up @@ -272,15 +282,22 @@ impl NodeManager {

let status = if let Some(outcome) = outcome {
match &outcome {
ReplacerOutputKind::Inlet(status) => InletStatus::new(
&info.bind_addr,
status.worker.address().to_string(),
alias,
None,
status.route.to_string(),
connection_status,
info.outlet_addr.to_string(),
),
ReplacerOutputKind::Inlet(status) => {
let address = match &status.worker {
Some(address) => address.address().to_string(),
None => "<>".to_string(),
};

InletStatus::new(
&info.bind_addr,
address,
alias,
None,
status.route.to_string(),
connection_status,
info.outlet_addr.to_string(),
)
}
_ => {
panic!("Unexpected outcome: {:?}", outcome)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl NodeManagerWorker {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider,
} = create_inlet;
match self
Expand All @@ -48,6 +49,7 @@ impl NodeManagerWorker {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider,
)
.await
Expand Down
Loading

0 comments on commit af10e83

Please sign in to comment.