Skip to content

Commit

Permalink
feat(rust): switching key negotiation to a double secure channel
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed Dec 23, 2024
1 parent c85eb64 commit 24244ca
Show file tree
Hide file tree
Showing 17 changed files with 261 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ mod test {
pub fn rekey_rotation() -> ockam_core::Result<()> {
let runtime = Arc::new(Runtime::new().unwrap());
let runtime_cloned = runtime.clone();
std::env::set_var("OCKAM_LOGGING", "false");
std::env::set_var("OCKAM_LOGGING", "true");
std::env::set_var("OCKAM_LOG_LEVEL", "debug");

runtime_cloned.block_on(async move {
let test_body = async move {
Expand Down Expand Up @@ -304,6 +305,15 @@ mod test {
.secure_channels
.vault()
.encryption_at_rest_vault,
consumer_node
.node_manager
.secure_channels
.vault()
.secure_channel_vault,
consumer_node
.node_manager
.secure_channels
.secure_channel_registry(),
Duration::from_secs(60),
Duration::from_secs(60),
Duration::from_secs(60),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use crate::DefaultAddress;
use minicbor::{CborLen, Decode, Encode};
use ockam::identity::TimestampInSeconds;
use ockam::identity::{
SecureChannelApiRequest, SecureChannelApiResponse, SecureChannelRegistry, TimestampInSeconds,
};
use ockam_core::flow_control::FlowControlId;
use ockam_core::{
async_trait, Address, Decodable, Encodable, Encoded, IncomingAccessControl, Message,
async_trait, route, Address, Decodable, Encodable, Encoded, IncomingAccessControl, Message,
OutgoingAccessControl, Routed, Worker,
};
use ockam_node::{Context, WorkerBuilder};
use ockam_vault::VaultForEncryptionAtRest;
use rand::Rng;
use ockam_vault::{VaultForEncryptionAtRest, VaultForSecureChannels};
use std::sync::Arc;
use std::time::Duration;

pub(crate) struct KafkaKeyExchangeListener {
encryption_at_rest: Arc<dyn VaultForEncryptionAtRest>,
secure_channel_vault: Arc<dyn VaultForSecureChannels>,
secure_channel_registry: SecureChannelRegistry,
rekey_period: Duration,
key_validity: Duration,
key_rotation: Duration,
Expand All @@ -22,16 +25,16 @@ pub(crate) struct KafkaKeyExchangeListener {
#[derive(Debug, CborLen, Encode, Decode)]
#[rustfmt::skip]
pub(crate) struct KeyExchangeRequest {
#[n(1)] pub local_decryptor_address: Address,
}

#[derive(Debug, CborLen, Encode, Decode)]
#[rustfmt::skip]
pub(crate) struct KeyExchangeResponse {
#[n(0)] pub key_identifier_for_consumer: Vec<u8>,
#[n(1)] pub secret_key: [u8; 32],
#[n(2)] pub valid_until: TimestampInSeconds,
#[n(3)] pub rotate_after: TimestampInSeconds,
#[n(4)] pub rekey_period: Duration,
#[n(1)] pub valid_until: TimestampInSeconds,
#[n(2)] pub rotate_after: TimestampInSeconds,
#[n(3)] pub rekey_period: Duration,
}

impl Encodable for KeyExchangeRequest {
Expand Down Expand Up @@ -70,12 +73,41 @@ impl Worker for KafkaKeyExchangeListener {
context: &mut Self::Context,
message: Routed<Self::Message>,
) -> ockam_core::Result<()> {
let mut secret_key = [0u8; 32];
rand::thread_rng().fill(&mut secret_key[..]);
let handle = self
.encryption_at_rest
.import_aead_key(secret_key.to_vec())
.await?;
let request: KeyExchangeRequest = minicbor::decode(message.payload())?;
let local_decryptor = Address::from_string(request.local_decryptor_address);

let entry = self
.secure_channel_registry
.get_channel_by_encryptor_address(&local_decryptor);
let handle = match entry {
None => {
warn!("No secure channel found for local decryptor {local_decryptor}",);
return Ok(());
}
Some(entry) => {
let response: SecureChannelApiResponse = context
.send_and_receive(
route![entry.decryptor_api_address().clone()],
SecureChannelApiRequest::ExtractKey,
)
.await?;

let key_identifier = match response {
SecureChannelApiResponse::Ok(key_identifier) => key_identifier,
SecureChannelApiResponse::Err(error) => {
error!("Error extracting key: {error}");
return Ok(());
}
};

let secret = self
.secure_channel_vault
.export_rekey(&key_identifier)
.await?;

self.encryption_at_rest.import_aead_key(secret).await?
}
};

let now = TimestampInSeconds(ockam_core::compat::time::now()?);
let valid_until = now + self.key_validity;
Expand All @@ -86,7 +118,6 @@ impl Worker for KafkaKeyExchangeListener {
message.return_route().clone(),
KeyExchangeResponse {
key_identifier_for_consumer: handle.into_vec(),
secret_key,
valid_until,
rotate_after,
rekey_period: self.rekey_period,
Expand All @@ -103,6 +134,8 @@ impl KafkaKeyExchangeListener {
pub async fn create(
context: &Context,
encryption_at_rest: Arc<dyn VaultForEncryptionAtRest>,
secure_channel_vault: Arc<dyn VaultForSecureChannels>,
secure_channel_registry: SecureChannelRegistry,
key_rotation: Duration,
key_validity: Duration,
rekey_period: Duration,
Expand All @@ -117,9 +150,11 @@ impl KafkaKeyExchangeListener {

WorkerBuilder::new(KafkaKeyExchangeListener {
encryption_at_rest,
secure_channel_vault,
key_rotation,
key_validity,
rekey_period,
secure_channel_registry,
})
.with_address(address)
.with_incoming_access_control(incoming_access_control)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use crate::kafka::key_exchange::controller::{
use crate::kafka::key_exchange::listener::{KeyExchangeRequest, KeyExchangeResponse};
use crate::kafka::ConsumerResolution;
use crate::DefaultAddress;
use ockam::identity::TimestampInSeconds;
use ockam::identity::{SecureChannelApiRequest, SecureChannelApiResponse, TimestampInSeconds};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{Error, Result};
use ockam_core::{route, Error, Result};
use ockam_multiaddr::proto::{Secure, Service};
use ockam_multiaddr::MultiAddr;
use ockam_node::{Context, MessageSendReceiveOptions};
Expand Down Expand Up @@ -35,6 +35,65 @@ impl KafkaKeyExchangeControllerImpl {
destination.push_back(Secure::new(DefaultAddress::SECURE_CHANNEL_LISTENER))?;
destination.push_back(Service::new(DefaultAddress::KAFKA_CUSTODIAN))?;
if let Some(node_manager) = inner.node_manager.upgrade() {
// create a second secure channel to be used for key exchange
let (aead_secret_key_handle, their_decryptor_address) = {
let second_connection = node_manager
.make_connection(context, &destination, node_manager.identifier(), None, None)
.await?;

let entry = node_manager
.secure_channels
.secure_channel_registry()
.get_channel_by_encryptor_address(
second_connection
.secure_channel_encryptors
.first()
.expect("encryptor should be present"),
)
.expect("channel should be present");

if !inner
.consumer_policy_access_control
.is_identity_authorized(entry.their_id())
.await?
{
second_connection.close(context, &node_manager).await?;
return Err(Error::new(
Origin::Channel,
Kind::Invalid,
"Consumer is not authorized to use the secure channel",
));
}

let response: SecureChannelApiResponse = context
.send_and_receive(
route![entry.encryptor_api_address().clone()],
SecureChannelApiRequest::ExtractKey,
)
.await?;

match response {
SecureChannelApiResponse::Ok(secret_handle) => {
let secret = node_manager
.secure_channels
.vault()
.secure_channel_vault
.export_rekey(&secret_handle)
.await?;

second_connection.close(context, &node_manager).await?;
(
self.encryption_at_rest.import_aead_key(secret).await?,
entry.their_decryptor_address(),
)
}
SecureChannelApiResponse::Err(error) => {
second_connection.close(context, &node_manager).await?;
return Err(error);
}
}
};

let connection = node_manager
.make_connection(context, &destination, node_manager.identifier(), None, None)
.await?;
Expand All @@ -52,14 +111,17 @@ impl KafkaKeyExchangeControllerImpl {

let route = connection.route()?;
let response: KeyExchangeResponse = context
.send_and_receive_extended(route, KeyExchangeRequest {}, send_and_receive_options)
.send_and_receive_extended(
route,
KeyExchangeRequest {
local_decryptor_address: their_decryptor_address,
},
send_and_receive_options,
)
.await?
.into_body()?;

let aead_secret_key_handle = self
.encryption_at_rest
.import_aead_key(response.secret_key.to_vec())
.await?;
connection.close(context, &node_manager).await?;

Ok(ExchangedKey {
secret_key_handler: aead_secret_key_handle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ impl InMemoryNode {
KafkaKeyExchangeListener::create(
context,
vault.encryption_at_rest_vault,
self.secure_channels.vault().secure_channel_vault,
self.secure_channels.secure_channel_registry(),
std::time::Duration::from_secs(60 * 60 * 24),
std::time::Duration::from_secs(60 * 60 * 30),
std::time::Duration::from_secs(60 * 60),
Expand Down Expand Up @@ -397,6 +399,8 @@ impl InMemoryNode {
KafkaKeyExchangeListener::create(
context,
vault.encryption_at_rest_vault,
self.secure_channels.vault().secure_channel_vault,
self.secure_channels.secure_channel_registry(),
request.key_rotation,
request.key_validity,
request.rekey_period,
Expand Down
46 changes: 38 additions & 8 deletions implementations/rust/ockam/ockam_api/src/proxy_vault/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ pub mod vault_for_secure_channels {
use minicbor::{CborLen, Decode, Encode};
use ockam_core::{async_trait, cbor_encode_preallocate, MaybeZeroizeOnDrop};
use ockam_vault::{
AeadSecretKeyHandle, HKDFNumberOfOutputs, HashOutput, HkdfOutput, SecretBufferHandle,
VaultForSecureChannels, X25519PublicKey, X25519SecretKeyHandle,
AeadSecretKeyHandle, HKDFNumberOfOutputs, HashOutput, HkdfOutput, SecretBuffer,
SecretBufferHandle, VaultForSecureChannels, X25519PublicKey, X25519SecretKeyHandle,
};

pub(super) async fn handle_request(
Expand Down Expand Up @@ -617,7 +617,7 @@ pub mod vault_for_secure_channels {
}
Request::DeleteAeadSecretKey { secret_key_handle } => {
trace!("delete_aead_secret_key request for {secret_key_handle:?}");
let result = vault.delete_aead_secret_key(secret_key_handle).await;
let result = vault.delete_aead_secret_key(&secret_key_handle).await;
Response::DeleteAeadSecretKey(result.map_err(Into::into))
}
Request::Rekey {
Expand All @@ -628,6 +628,11 @@ pub mod vault_for_secure_channels {
let result = vault.rekey(&secret_key_handle, n).await;
Response::Rekey(result.map_err(Into::into))
}
Request::ExportRekey { secret_key_handle } => {
trace!("export_rekey request for {secret_key_handle:?}");
let result = vault.export_rekey(&secret_key_handle).await;
Response::ExportRekey(result.map_err(Into::into))
}
};
cbor_encode_preallocate(response)
}
Expand Down Expand Up @@ -695,6 +700,9 @@ pub mod vault_for_secure_channels {
#[n(0)] secret_key_handle: AeadSecretKeyHandle,
#[n(1)] n: u16,
},
#[n(18)] ExportRekey {
#[n(0)] secret_key_handle: AeadSecretKeyHandle
},
}

#[derive(Encode, Decode, CborLen)]
Expand All @@ -718,6 +726,7 @@ pub mod vault_for_secure_channels {
#[n(15)] ConvertSecretBufferToAeadKey(#[n(0)] Result<AeadSecretKeyHandle, ProxyError>),
#[n(16)] DeleteAeadSecretKey(#[n(0)] Result<bool, ProxyError>),
#[n(17)] Rekey(#[n(0)] Result<AeadSecretKeyHandle, ProxyError>),
#[n(18)] ExportRekey(#[n(0)] Result<SecretBuffer,ProxyError>),
}

#[async_trait]
Expand Down Expand Up @@ -866,6 +875,25 @@ pub mod vault_for_secure_channels {
Ok(result)
}

async fn export_rekey(
&self,
secret_key_handle: &AeadSecretKeyHandle,
) -> ockam_core::Result<SecretBuffer> {
trace!("sending export_rekey request for {secret_key_handle:?}");
let response: Response = self
.send_and_receive(Request::ExportRekey {
secret_key_handle: secret_key_handle.clone(),
})
.await?;

let result = match response {
Response::ExportRekey(result) => result?,
_ => Err(ProxyError::Protocol)?,
};

Ok(result)
}

async fn persist_aead_key(
&self,
secret_key_handle: &AeadSecretKeyHandle,
Expand Down Expand Up @@ -1067,11 +1095,13 @@ pub mod vault_for_secure_channels {

async fn delete_aead_secret_key(
&self,
secret_key_handle: AeadSecretKeyHandle,
secret_key_handle: &AeadSecretKeyHandle,
) -> ockam_core::Result<bool> {
trace!("sending delete_aead_secret_key request for {secret_key_handle:?}");
let response: Response = self
.send_and_receive(Request::DeleteAeadSecretKey { secret_key_handle })
.send_and_receive(Request::DeleteAeadSecretKey {
secret_key_handle: secret_key_handle.clone(),
})
.await?;

let result = match response {
Expand Down Expand Up @@ -1183,7 +1213,7 @@ pub mod vault_for_encryption_at_rest {
use crate::proxy_vault::protocol::{ProxyError, SpecificClient};
use minicbor::{CborLen, Decode, Encode};
use ockam_core::{async_trait, cbor_encode_preallocate, MaybeZeroizeOnDrop};
use ockam_vault::{AeadSecretKeyHandle, VaultForEncryptionAtRest};
use ockam_vault::{AeadSecretKeyHandle, SecretBuffer, VaultForEncryptionAtRest};

pub(super) async fn handle_request(
vault: &dyn VaultForEncryptionAtRest,
Expand Down Expand Up @@ -1246,7 +1276,7 @@ pub mod vault_for_encryption_at_rest {
#[n(0)] secret_key_handle: AeadSecretKeyHandle,
},
#[n(3)] ImportAeadKey {
#[n(0)] secret: Vec<u8>,
#[n(0)] secret: SecretBuffer,
},
}

Expand Down Expand Up @@ -1343,7 +1373,7 @@ pub mod vault_for_encryption_at_rest {

async fn import_aead_key(
&self,
secret: Vec<u8>,
secret: SecretBuffer,
) -> ockam_core::Result<AeadSecretKeyHandle> {
trace!("sending import_aead_key request");
let response: Response = self
Expand Down
Loading

0 comments on commit 24244ca

Please sign in to comment.