Skip to content

Commit

Permalink
feat(rust): add reliable TCP portals to ockam_api&ockam_command
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Sep 17, 2024
1 parent 072237a commit e910617
Show file tree
Hide file tree
Showing 26 changed files with 160 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repository = "https://github.com/ockam-network/ockam/implementations/rust/ockam/
description = "Ockam's request-response API"

[features]
default = ["std", "rust-crypto"]
default = ["std", "rust-crypto", "ebpf"]
std = [
"either/use_std",
"hex/std",
Expand All @@ -42,6 +42,10 @@ std = [
storage = ["ockam/storage"]
aws-lc = ["ockam_vault/aws-lc", "ockam_transport_tcp/aws-lc"]
rust-crypto = ["ockam_vault/rust-crypto", "ockam_transport_tcp/ring"]
ebpf = ["ockam_transport_tcp/ebpf"]

[build-dependencies]
cfg_aliases = "0.2.1"

[dependencies]
base64-url = "3.0.0"
Expand Down
4 changes: 4 additions & 0 deletions implementations/rust/ockam/ockam_api/build.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use cfg_aliases::cfg_aliases;
use std::process::Command;

fn hash() {
Expand All @@ -11,4 +12,7 @@ fn hash() {

fn main() {
hash();
cfg_aliases! {
ebpf_alias: { all(target_os = "linux", feature = "ebpf") }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl KafkaInletController {
None,
false,
false,
false,
None,
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl KafkaOutletController {
Some(kafka_outlet_address(broker_id)),
false,
OutletAccessControl::WithPolicyExpression(self.policy_expression.clone()),
false,
)
.await
.map(|info| info.to)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use ockam_core::AllowAll;
use ockam_multiaddr::proto::Service;
use ockam_multiaddr::MultiAddr;
use ockam_node::compat::tokio;
use ockam_transport_core::HostnamePort;
use ockam_transport_tcp::PortalInletInterceptor;

// TODO: upgrade to 13 by adding a metadata request to map uuid<=>topic_name
Expand Down Expand Up @@ -160,7 +161,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
.tcp
.create_outlet(
"kafka_consumer_outlet",
format!("127.0.0.1:{}", consumer_mock_kafka.port),
HostnamePort::new("127.0.0.1", consumer_mock_kafka.port),
TcpOutletOptions::new(),
)
.await?;
Expand All @@ -180,7 +181,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
.tcp
.create_outlet(
"kafka_producer_outlet",
format!("127.0.0.1:{}", producer_mock_kafka.port),
HostnamePort::new("127.0.0.1", producer_mock_kafka.port),
TcpOutletOptions::new(),
)
.await?;
Expand Down Expand Up @@ -216,7 +217,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
.tcp
.create_outlet(
"kafka_consumer_outlet",
format!("127.0.0.1:{}", consumer_mock_kafka.port),
HostnamePort::new("127.0.0.1", consumer_mock_kafka.port),
TcpOutletOptions::new(),
)
.await?;
Expand Down
12 changes: 11 additions & 1 deletion implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ pub struct CreateInlet {
/// Disable fallback to TCP.
/// TCP won't be used to transfer data between the Inlet and the Outlet.
#[n(11)] pub(crate) disable_tcp_fallback: bool,
/// Use eBPF and RawSocket to access TCP packets instead of TCP data stream.
#[n(12)] pub(crate) ebpf: bool,
/// TLS certificate provider route.
#[n(12)] pub(crate) tls_certificate_provider: Option<MultiAddr>,
#[n(13)] pub(crate) tls_certificate_provider: Option<MultiAddr>,
}

impl CreateInlet {
Expand All @@ -68,6 +70,7 @@ impl CreateInlet {
wait_connection: bool,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
suffix_route: Route,
) -> Self {
Self {
Expand All @@ -81,6 +84,7 @@ impl CreateInlet {
secure_channel_identifier: None,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider: None,
suffix_route,
}
Expand All @@ -95,6 +99,7 @@ impl CreateInlet {
wait_connection: bool,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
suffix_route: Route,
) -> Self {
Self {
Expand All @@ -108,6 +113,7 @@ impl CreateInlet {
secure_channel_identifier: None,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider: None,
suffix_route,
}
Expand Down Expand Up @@ -168,6 +174,8 @@ pub struct CreateOutlet {
/// If not set, the policy set for the [TCP outlet resource type](ockam_abac::ResourceType::TcpOutlet)
/// will be used.
#[n(5)] pub policy_expression: Option<PolicyExpression>,
/// Use eBPF and RawSocket to access TCP packets instead of TCP data stream.
#[n(6)] pub ebpf: bool
}

impl CreateOutlet {
Expand All @@ -176,13 +184,15 @@ impl CreateOutlet {
tls: bool,
worker_addr: Option<Address>,
reachable_from_default_secure_channel: bool,
ebpf: bool,
) -> Self {
Self {
hostname_port,
tls,
worker_addr,
reachable_from_default_secure_channel,
policy_expression: None,
ebpf,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl NodeManagerWorker {
worker_addr,
reachable_from_default_secure_channel,
policy_expression,
ebpf,
tls,
} = body.tcp_outlet;
let interceptor_addr = self
Expand Down Expand Up @@ -61,6 +62,7 @@ impl NodeManagerWorker {
Some(outlet_addr),
reachable_from_default_secure_channel,
OutletAccessControl::WithPolicyExpression(policy_expression),
ebpf,
)
.await
{
Expand All @@ -85,6 +87,7 @@ impl NodeManagerWorker {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider,
suffix_route,
} = body.tcp_inlet.clone();
Expand Down Expand Up @@ -113,6 +116,7 @@ impl NodeManagerWorker {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider,
)
.await
Expand Down Expand Up @@ -259,7 +263,7 @@ impl InfluxDBPortals for BackgroundNodeClient {
policy_expression: Option<PolicyExpression>,
token_leaser: MultiAddr,
) -> miette::Result<OutletStatus> {
let mut outlet_payload = CreateOutlet::new(to, tls, from.cloned(), true);
let mut outlet_payload = CreateOutlet::new(to, tls, from.cloned(), true, false);
if let Some(policy_expression) = policy_expression {
outlet_payload.set_policy_expression(policy_expression);
}
Expand Down Expand Up @@ -299,6 +303,7 @@ impl InfluxDBPortals for BackgroundNodeClient {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
false,
tls_certificate_provider,
suffix_route,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl InMemoryNode {
None,
false,
false,
false,
None,
)
.await?;
Expand Down Expand Up @@ -320,6 +321,7 @@ impl InMemoryNode {
Some(KAFKA_OUTLET_BOOTSTRAP_ADDRESS.into()),
false,
OutletAccessControl::WithPolicyExpression(outlet_policy_expression),
false,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub fn create_inlet_payload(
secure_channel_identifier: &Option<Identifier>,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
tls_certificate_provider: &Option<MultiAddr>,
outlet_suffix_address: Route,
) -> CreateInlet {
Expand All @@ -37,6 +38,7 @@ pub fn create_inlet_payload(
wait_connection,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
outlet_suffix_address,
)
} else {
Expand All @@ -48,6 +50,7 @@ pub fn create_inlet_payload(
wait_connection,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
outlet_suffix_address,
)
};
Expand Down Expand Up @@ -79,6 +82,7 @@ impl Inlets for BackgroundNodeClient {
secure_channel_identifier: &Option<Identifier>,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
tls_certificate_provider: &Option<MultiAddr>,
outlet_suffix_addr: Route,
) -> miette::Result<Reply<InletStatus>> {
Expand All @@ -94,6 +98,7 @@ impl Inlets for BackgroundNodeClient {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider,
outlet_suffix_addr,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl InMemoryNode {
secure_channel_identifier: Option<Identifier>,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
tls_certificate_provider: Option<MultiAddr>,
) -> Result<InletStatus> {
self.node_manager
Expand All @@ -45,6 +46,7 @@ impl InMemoryNode {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider,
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub trait Inlets {
secure_channel_identifier: &Option<Identifier>,
enable_udp_puncture: bool,
disable_tcp_fallback: bool,
ebpf: bool,
tls_certificate_provider: &Option<MultiAddr>,
outlet_suffix_address: Route,
) -> miette::Result<Reply<InletStatus>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl NodeManager {
enable_udp_puncture: bool,
// TODO: Introduce mode enum
disable_tcp_fallback: bool,
ebpf: bool,
tls_certificate_provider: Option<MultiAddr>,
) -> Result<InletStatus> {
info!("Handling request to create inlet portal");
Expand Down Expand Up @@ -126,6 +127,7 @@ impl NodeManager {
additional_secure_channel: None,
udp_puncture: None,
additional_route: None,
ebpf,
};

let replacer = Arc::new(Mutex::new(replacer));
Expand Down Expand Up @@ -184,7 +186,9 @@ impl NodeManager {

let tcp_inlet_status = InletStatus::new(
listen_addr.to_string(),
outcome.clone().map(|s| s.worker.address().to_string()),
outcome
.clone()
.and_then(|s| s.worker.map(|address| address.address().to_string())),
&alias,
None,
outcome.clone().map(|s| s.route.to_string()),
Expand Down Expand Up @@ -233,9 +237,14 @@ impl NodeManager {
drop(session);
if let Some(outcome) = outcome {
if let ReplacerOutputKind::Inlet(status) = outcome {
let address = match &status.worker {
Some(address) => address.address().to_string(),
None => "<>".to_string(),
};

Some(InletStatus::new(
inlet_info.bind_addr.to_string(),
status.worker.address().to_string(),
address,
alias,
None,
status.route.to_string(),
Expand Down Expand Up @@ -272,15 +281,22 @@ impl NodeManager {

let status = if let Some(outcome) = outcome {
match &outcome {
ReplacerOutputKind::Inlet(status) => InletStatus::new(
&info.bind_addr,
status.worker.address().to_string(),
alias,
None,
status.route.to_string(),
connection_status,
info.outlet_addr.to_string(),
),
ReplacerOutputKind::Inlet(status) => {
let address = match &status.worker {
Some(address) => address.address().to_string(),
None => "<>".to_string(),
};

InletStatus::new(
&info.bind_addr,
address,
alias,
None,
status.route.to_string(),
connection_status,
info.outlet_addr.to_string(),
)
}
_ => {
panic!("Unexpected outcome: {:?}", outcome)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl NodeManagerWorker {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider,
suffix_route,
} = create_inlet;
Expand All @@ -47,6 +48,7 @@ impl NodeManagerWorker {
secure_channel_identifier,
enable_udp_puncture,
disable_tcp_fallback,
ebpf,
tls_certificate_provider,
)
.await
Expand Down
Loading

0 comments on commit e910617

Please sign in to comment.