diff --git a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs index 5fcd4c46229..65c1323f394 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs @@ -1,17 +1,20 @@ -use crate::kafka::key_exchange::{KafkaKeyExchangeController, TopicPartition}; +use crate::kafka::key_exchange::KafkaKeyExchangeController; use crate::kafka::protocol_aware::KafkaEncryptedContent; use crate::kafka::{ConsumerPublishing, ConsumerResolution}; use crate::nodes::models::relay::ReturnTiming; use crate::nodes::NodeManager; use ockam::identity::{ - DecryptionRequest, DecryptionResponse, EncryptionRequest, EncryptionResponse, Identifier, - SecureChannels, + utils, DecryptionRequest, DecryptionResponse, EncryptionRequest, EncryptionResponse, + SecureChannels, TimestampInSeconds, }; use ockam_abac::PolicyAccessControl; +use ockam_core::compat::clock::{Clock, ProductionClock}; use ockam_core::compat::collections::{HashMap, HashSet}; -use ockam_core::{async_trait, route, Address}; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::{async_trait, route, Address, Error}; use ockam_node::Context; -use std::sync::Arc; +use std::sync::{Arc, Weak}; +use time::Duration; use tokio::sync::Mutex; #[derive(Clone)] @@ -25,35 +28,28 @@ impl KafkaKeyExchangeController for KafkaKeyExchangeControllerImpl { &self, context: &mut Context, topic_name: &str, - partition_index: i32, content: Vec, ) -> ockam_core::Result { - let secure_channel_entry = self - .get_or_create_secure_channel(context, topic_name, partition_index) - .await?; - - let consumer_decryptor_address = secure_channel_entry.their_decryptor_address(); - - trace!("encrypting content with {consumer_decryptor_address}"); + let topic_key_handler = self.get_or_exchange_key(context, topic_name).await?; let encryption_response: EncryptionResponse = context .send_and_receive( - route![secure_channel_entry.encryptor_api_address().clone()], - EncryptionRequest(content), + route![topic_key_handler.encryptor_api_address.clone()], + EncryptionRequest::Encrypt(content), ) .await?; let encrypted_content = match encryption_response { EncryptionResponse::Ok(p) => p, EncryptionResponse::Err(cause) => { - warn!("cannot encrypt kafka message"); + warn!("Cannot encrypt kafka message"); return Err(cause); } }; - trace!("encrypted content with {consumer_decryptor_address}"); Ok(KafkaEncryptedContent { content: encrypted_content, - consumer_decryptor_address, + consumer_decryptor_address: topic_key_handler.consumer_decryptor_address, + rekey_counter: topic_key_handler.rekey_counter, }) } @@ -61,6 +57,7 @@ impl KafkaKeyExchangeController for KafkaKeyExchangeControllerImpl { &self, context: &mut Context, consumer_decryptor_address: &Address, + rekey_counter: u16, encrypted_content: Vec, ) -> ockam_core::Result> { let secure_channel_decryptor_api_address = self @@ -73,7 +70,7 @@ impl KafkaKeyExchangeController for KafkaKeyExchangeControllerImpl { let decrypt_response = context .send_and_receive( route![secure_channel_decryptor_api_address], - DecryptionRequest(encrypted_content), + DecryptionRequest(encrypted_content, Some(rekey_counter)), ) .await?; @@ -92,21 +89,19 @@ impl KafkaKeyExchangeController for KafkaKeyExchangeControllerImpl { &self, context: &mut Context, topic_name: &str, - partitions: Vec, ) -> ockam_core::Result<()> { let mut inner = self.inner.lock().await; match inner.consumer_publishing.clone() { ConsumerPublishing::None => {} ConsumerPublishing::Relay(where_to_publish) => { - for partition in partitions { - let topic_key: TopicPartition = (topic_name.to_string(), partition); - if inner.topic_relay_set.contains(&topic_key) { - continue; - } - let alias = format!("consumer_{topic_name}_{partition}"); - let relay_info = inner - .node_manager + if inner.topic_relay_set.contains(topic_name) { + return Ok(()); + } + let alias = format!("consumer_{topic_name}"); + + if let Some(node_manager) = inner.node_manager.upgrade() { + let relay_info = node_manager .create_relay( context, &where_to_publish.clone(), @@ -116,9 +111,8 @@ impl KafkaKeyExchangeController for KafkaKeyExchangeControllerImpl { ReturnTiming::AfterConnection, ) .await?; - trace!("remote relay created: {relay_info:?}"); - inner.topic_relay_set.insert(topic_key); + inner.topic_relay_set.insert(topic_name.to_string()); } } } @@ -127,20 +121,119 @@ impl KafkaKeyExchangeController for KafkaKeyExchangeControllerImpl { } } +#[derive(Debug, PartialEq)] +pub(crate) struct TopicEncryptionKey { + pub(crate) rekey_counter: u16, + pub(crate) encryptor_api_address: Address, + pub(crate) consumer_decryptor_address: Address, +} + +const ROTATION_RETRY_DELAY: Duration = Duration::minutes(5); +pub(crate) struct TopicEncryptionKeyState { + pub(crate) producer_encryptor_address: Address, + pub(crate) valid_until: TimestampInSeconds, + pub(crate) rotate_after: TimestampInSeconds, + pub(crate) last_rekey: TimestampInSeconds, + pub(crate) rekey_counter: u16, + pub(crate) rekey_period: Duration, + pub(crate) last_rotation_attempt: TimestampInSeconds, +} + +pub(crate) enum RequiredOperation { + Rekey, + ShouldRotate, + MustRotate, + None, +} + +impl TopicEncryptionKeyState { + /// Return the operation that should be performed on the key before using it + pub(crate) fn operation( + &self, + now: TimestampInSeconds, + ) -> ockam_core::Result { + if now >= self.valid_until { + return Ok(RequiredOperation::MustRotate); + } + + if now >= self.rotate_after + && now >= self.last_rotation_attempt + ROTATION_RETRY_DELAY.whole_seconds() as u64 + { + return Ok(RequiredOperation::ShouldRotate); + } + + if now >= self.last_rekey + self.rekey_period.whole_seconds() as u64 { + return Ok(RequiredOperation::Rekey); + } + + Ok(RequiredOperation::None) + } + + pub(crate) fn mark_rotation_attempt(&mut self) { + self.last_rotation_attempt = utils::now().unwrap(); + } + + pub(crate) async fn rekey( + &mut self, + context: &mut Context, + secure_channel: &SecureChannels, + now: TimestampInSeconds, + ) -> ockam_core::Result<()> { + if self.rekey_counter == u16::MAX { + return Err(Error::new( + Origin::Channel, + Kind::Unknown, + "Rekey counter overflow", + )); + } + + let encryptor_address = &self.producer_encryptor_address; + + let secure_channel_entry = secure_channel.secure_channel_registry().get_channel_by_encryptor_address( + encryptor_address, + ).ok_or_else(|| { + Error::new( + Origin::Channel, + Kind::Unknown, + format!("Cannot find secure channel address `{encryptor_address}` in local registry"), + ) + })?; + + let rekey_response: EncryptionResponse = context + .send_and_receive( + route![secure_channel_entry.encryptor_api_address().clone()], + EncryptionRequest::Rekey, + ) + .await?; + + match rekey_response { + EncryptionResponse::Ok(_) => {} + EncryptionResponse::Err(cause) => { + error!("Cannot rekey secure channel: {cause}"); + return Err(cause); + } + } + + self.last_rekey = now; + self.rekey_counter += 1; + + Ok(()) + } +} + +pub(crate) type TopicName = String; + pub struct InnerSecureChannelController { + pub(crate) clock: Box, // we identify the secure channel instance by using the decryptor address of the consumer // which is known to both parties - pub(crate) topic_encryptor_map: HashMap, - // since topic/partition is using a key exchange only secure channel, - // we need another secure channel for each consumer identifier - // to make sure the relative credential is properly updated - pub(crate) identity_encryptor_map: HashMap, - pub(crate) node_manager: Arc, + pub(crate) producer_topic_encryptor_map: HashMap, + pub(crate) node_manager: Weak, // describes how to reach the consumer node pub(crate) consumer_resolution: ConsumerResolution, // describes if/how to publish the consumer pub(crate) consumer_publishing: ConsumerPublishing, - pub(crate) topic_relay_set: HashSet, + pub(crate) topic_relay_set: HashSet, pub(crate) secure_channels: Arc, pub(crate) consumer_policy_access_control: PolicyAccessControl, pub(crate) producer_policy_access_control: PolicyAccessControl, @@ -154,13 +247,33 @@ impl KafkaKeyExchangeControllerImpl { consumer_publishing: ConsumerPublishing, consumer_policy_access_control: PolicyAccessControl, producer_policy_access_control: PolicyAccessControl, + ) -> KafkaKeyExchangeControllerImpl { + Self::new_extended( + ProductionClock, + node_manager, + secure_channels, + consumer_resolution, + consumer_publishing, + consumer_policy_access_control, + producer_policy_access_control, + ) + } + + pub(crate) fn new_extended( + clock: impl Clock, + node_manager: Arc, + secure_channels: Arc, + consumer_resolution: ConsumerResolution, + consumer_publishing: ConsumerPublishing, + consumer_policy_access_control: PolicyAccessControl, + producer_policy_access_control: PolicyAccessControl, ) -> KafkaKeyExchangeControllerImpl { Self { inner: Arc::new(Mutex::new(InnerSecureChannelController { - topic_encryptor_map: Default::default(), - identity_encryptor_map: Default::default(), + clock: Box::new(clock), + producer_topic_encryptor_map: Default::default(), topic_relay_set: Default::default(), - node_manager, + node_manager: Arc::downgrade(&node_manager), secure_channels, consumer_resolution, consumer_publishing, @@ -170,3 +283,209 @@ impl KafkaKeyExchangeControllerImpl { } } } + +#[cfg(test)] +mod test { + use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl; + use crate::kafka::{ConsumerPublishing, ConsumerResolution}; + use crate::test_utils::{AuthorityConfiguration, TestNode}; + use ockam::identity::Identifier; + use ockam_abac::{Action, Env, Resource, ResourceType}; + use ockam_core::compat::clock::test::TestClock; + use ockam_multiaddr::MultiAddr; + use std::sync::Arc; + use std::time::Duration; + use tokio::runtime::Runtime; + use tokio::time::timeout; + + #[test] + pub fn rekey_rotation() -> ockam_core::Result<()> { + let runtime = Arc::new(Runtime::new().unwrap()); + let runtime_cloned = runtime.clone(); + std::env::set_var("OCKAM_LOGGING", "false"); + + runtime_cloned.block_on(async move { + let test_body = async move { + TestNode::clean().await?; + let authority = TestNode::create_extended( + runtime.clone(), + None, + AuthorityConfiguration::SelfReferencing, + ) + .await; + + let mut consumer_node = TestNode::create_extended( + runtime.clone(), + None, + AuthorityConfiguration::Node(&authority), + ) + .await; + let mut producer_node = TestNode::create_extended( + runtime.clone(), + None, + AuthorityConfiguration::Node(&authority), + ) + .await; + + consumer_node + .node_manager + .start_key_exchanger_service( + &consumer_node.context, + crate::DefaultAddress::KEY_EXCHANGER_LISTENER.into(), + ) + .await?; + + let test_clock = TestClock::new(0); + + let destination = consumer_node.listen_address().await.multi_addr().unwrap(); + let producer_secure_channel_controller = create_secure_channel_controller( + test_clock.clone(), + &mut producer_node, + destination.clone(), + authority.node_manager.identifier(), + ) + .await; + + let _consumer_secure_channel_controller = create_secure_channel_controller( + test_clock.clone(), + &mut consumer_node, + destination.clone(), + authority.node_manager.identifier(), + ) + .await; + + let first_key = producer_secure_channel_controller + .get_or_exchange_key(&mut producer_node.context, "topic_name") + .await?; + + assert_eq!(first_key.rekey_counter, 0); + + // 00:10 - nothing should change + test_clock.add_seconds(10); + + let second_key = producer_secure_channel_controller + .get_or_exchange_key(&mut producer_node.context, "topic_name") + .await?; + + assert_eq!(first_key, second_key); + + // 01:00 - the default rekeying period is 1 minute + test_clock.add_seconds(50); + + let third_key = producer_secure_channel_controller + .get_or_exchange_key(&mut producer_node.context, "topic_name") + .await?; + + assert_eq!(third_key.rekey_counter, 1); + assert_eq!( + first_key.consumer_decryptor_address, + third_key.consumer_decryptor_address + ); + + // 04:00 - yet another rekey should happen, but no rotation + test_clock.add_seconds(60 * 3); + + let fourth_key = producer_secure_channel_controller + .get_or_exchange_key(&mut producer_node.context, "topic_name") + .await?; + + assert_eq!(fourth_key.rekey_counter, 2); + assert_eq!( + first_key.consumer_decryptor_address, + fourth_key.consumer_decryptor_address + ); + + // 05:00 - the default duration of the key is 10 minutes, + // but the rotation should happen after 5 minutes + test_clock.add_seconds(60); + + let fifth_key = producer_secure_channel_controller + .get_or_exchange_key(&mut producer_node.context, "topic_name") + .await?; + + assert_ne!( + third_key.consumer_decryptor_address, + fifth_key.consumer_decryptor_address + ); + assert_eq!(fifth_key.rekey_counter, 0); + + // Now let's simulate a failure to rekey by shutting down the consumer + consumer_node.context.stop().await?; + drop(consumer_node); + + // 06:00 - The producer should still be able to rekey + test_clock.add_seconds(60); + let sixth_key = producer_secure_channel_controller + .get_or_exchange_key(&mut producer_node.context, "topic_name") + .await?; + + assert_eq!(sixth_key.rekey_counter, 1); + assert_eq!( + fifth_key.consumer_decryptor_address, + sixth_key.consumer_decryptor_address + ); + + // 10:00 - Rotation fails, but the existing key is still valid + // and needs to be rekeyed + // (since we exchanged key at 05:00, it should be valid until 15:00) + test_clock.add_seconds(60 * 4); + let seventh_key = producer_secure_channel_controller + .get_or_exchange_key(&mut producer_node.context, "topic_name") + .await?; + + assert_eq!(seventh_key.rekey_counter, 2); + assert_eq!( + fifth_key.consumer_decryptor_address, + seventh_key.consumer_decryptor_address + ); + + // 15:00 - Rotation fails, and the existing key is no longer valid + test_clock.add_seconds(60 * 5); + let result = producer_secure_channel_controller + .get_or_exchange_key(&mut producer_node.context, "topic_name") + .await; + + assert!(result.is_err()); + + Ok(()) + }; + + timeout(Duration::from_secs(10), test_body).await.unwrap() + }) + } + + async fn create_secure_channel_controller( + test_clock: TestClock, + node: &mut TestNode, + destination: MultiAddr, + authority: Identifier, + ) -> KafkaKeyExchangeControllerImpl { + let consumer_policy_access_control = + node.node_manager.policies().make_policy_access_control( + node.secure_channels.identities().identities_attributes(), + Resource::new("arbitrary-resource-name", ResourceType::KafkaConsumer), + Action::HandleMessage, + Env::new(), + Some(authority.clone()), + ); + + let producer_policy_access_control = + node.node_manager.policies().make_policy_access_control( + node.secure_channels.identities().identities_attributes(), + Resource::new("arbitrary-resource-name", ResourceType::KafkaProducer), + Action::HandleMessage, + Env::new(), + Some(authority), + ); + + KafkaKeyExchangeControllerImpl::new_extended( + test_clock, + (*node.node_manager).clone(), + node.node_manager.secure_channels(), + ConsumerResolution::SingleNode(destination), + ConsumerPublishing::None, + consumer_policy_access_control, + producer_policy_access_control, + ) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/mod.rs b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/mod.rs index 57747f6db7d..fb4dc81de38 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/mod.rs @@ -25,13 +25,8 @@ pub enum ConsumerPublishing { #[n(2)] Relay(#[n(1)] MultiAddr), } -type TopicPartition = (String, i32); - /// Offer simple APIs to encrypt and decrypt kafka messages. -/// Underneath it creates secure channels for each topic/partition -/// and uses them to encrypt the content. -/// Multiple secure channels may be created for the same topic/partition -/// but each will be explicitly labeled. +/// Underneath it creates secure channels for each topic uses them to encrypt the content. #[async_trait] pub(crate) trait KafkaKeyExchangeController: Send + Sync + 'static { /// Encrypts the content specifically for the consumer waiting for that topic name and @@ -43,7 +38,6 @@ pub(crate) trait KafkaKeyExchangeController: Send + Sync + 'static { &self, context: &mut Context, topic_name: &str, - partition_index: i32, content: Vec, ) -> ockam_core::Result; @@ -53,16 +47,15 @@ pub(crate) trait KafkaKeyExchangeController: Send + Sync + 'static { &self, context: &mut Context, consumer_decryptor_address: &Address, + rekey_counter: u16, encrypted_content: Vec, ) -> ockam_core::Result>; - /// Starts relays in the orchestrator for each {topic_name}_{partition} combination - /// should be used only by the consumer. + /// Starts relays in the orchestrator for each topic name, should be used only by the consumer. /// does nothing if they were already created, but fails it they already exist. async fn publish_consumer( &self, context: &mut Context, topic_name: &str, - partitions: Vec, ) -> ockam_core::Result<()>; } diff --git a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/secure_channels.rs b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/secure_channels.rs index a8317e06330..4ac9fce7d68 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/secure_channels.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/secure_channels.rs @@ -1,40 +1,20 @@ use crate::kafka::key_exchange::controller::{ - InnerSecureChannelController, KafkaKeyExchangeControllerImpl, + InnerSecureChannelController, KafkaKeyExchangeControllerImpl, RequiredOperation, + TopicEncryptionKey, TopicEncryptionKeyState, }; use crate::kafka::ConsumerResolution; use crate::nodes::service::SecureChannelType; use crate::DefaultAddress; -use ockam::identity::SecureChannelRegistryEntry; +use ockam::identity::{SecureChannelRegistryEntry, TimestampInSeconds}; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{Address, Error, Result}; use ockam_multiaddr::proto::{Secure, Service}; use ockam_multiaddr::MultiAddr; use ockam_node::Context; +use time::Duration; use tokio::sync::MutexGuard; impl KafkaKeyExchangeControllerImpl { - /// Creates a secure channel for the given destination. - async fn create_secure_channel( - inner: &MutexGuard<'_, InnerSecureChannelController>, - context: &Context, - mut destination: MultiAddr, - ) -> Result
{ - destination.push_back(Service::new(DefaultAddress::SECURE_CHANNEL_LISTENER))?; - let secure_channel = inner - .node_manager - .create_secure_channel( - context, - destination, - None, - None, - None, - None, - SecureChannelType::KeyExchangeAndMessages, - ) - .await?; - Ok(secure_channel.encryptor_address().clone()) - } - /// Creates a secure channel for the given destination, for key exchange only. async fn create_key_exchange_only_secure_channel( inner: &MutexGuard<'_, InnerSecureChannelController>, @@ -42,132 +22,100 @@ impl KafkaKeyExchangeControllerImpl { mut destination: MultiAddr, ) -> Result
{ destination.push_back(Service::new(DefaultAddress::KEY_EXCHANGER_LISTENER))?; - let secure_channel = inner - .node_manager - .create_secure_channel( - context, - destination, - None, - None, - None, - None, - SecureChannelType::KeyExchangeOnly, - ) - .await?; - Ok(secure_channel.encryptor_address().clone()) + if let Some(node_manager) = inner.node_manager.upgrade() { + let secure_channel = node_manager + .create_secure_channel( + context, + destination, + None, + None, + None, + None, + SecureChannelType::KeyExchangeOnly, + ) + .await?; + + // TODO: temporary workaround until the secure channel persistence is changed + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + Ok(secure_channel.encryptor_address().clone()) + } else { + Err(Error::new( + Origin::Transport, + Kind::Internal, + "Node Manager is not available", + )) + } } /// Creates a secure channel from the producer to the consumer needed to encrypt messages. - /// Returns the relative secure channel entry. - pub(crate) async fn get_or_create_secure_channel( + /// Returns the relative encryption key information. + pub(crate) async fn get_or_exchange_key( &self, context: &mut Context, topic_name: &str, - partition: i32, - ) -> Result { + ) -> Result { let mut inner = self.inner.lock().await; - // TODO: it may be better to exchange a new key for each partition - // when we have only one consumer, we use the same secure channel for all topics - let topic_partition_key = match &inner.consumer_resolution { - ConsumerResolution::SingleNode(_) | ConsumerResolution::None => ("".to_string(), 0i32), - ConsumerResolution::ViaRelay(_) => (topic_name.to_string(), partition), - }; + let rekey_counter; + let encryptor_address; - let encryptor_address = { - if let Some(encryptor_address) = inner.topic_encryptor_map.get(&topic_partition_key) { - encryptor_address.clone() - } else { - // destination is without the final service - let destination = match inner.consumer_resolution.clone() { - ConsumerResolution::SingleNode(mut destination) => { - debug!("creating new direct secure channel to consumer: {destination}"); - // remove /secure/api service from the destination if present - if let Some(service) = destination.last() { - let service: Option = service.cast(); - if let Some(service) = service { - if service.as_bytes() - == DefaultAddress::SECURE_CHANNEL_LISTENER.as_bytes() - { - destination.pop_back(); - } - } + let now = TimestampInSeconds(inner.clock.now()?); + let secure_channels = inner.secure_channels.clone(); + if let Some(encryption_key) = inner.producer_topic_encryptor_map.get_mut(topic_name) { + // before using it, check if it's still valid + match encryption_key.operation(now)? { + RequiredOperation::None => { + // the key is still valid + rekey_counter = encryption_key.rekey_counter; + encryptor_address = encryption_key.producer_encryptor_address.clone(); + } + RequiredOperation::Rekey => { + encryption_key.rekey(context, &secure_channels, now).await?; + rekey_counter = encryption_key.rekey_counter; + encryptor_address = encryption_key.producer_encryptor_address.clone(); + } + RequiredOperation::ShouldRotate => { + encryption_key.mark_rotation_attempt(); + // the key is still valid, but it's time to rotate it + let result = self.exchange_key(context, topic_name, &mut inner).await; + match result { + Ok(producer_encryptor_address) => { + rekey_counter = 0; + encryptor_address = producer_encryptor_address; } - destination - } - ConsumerResolution::ViaRelay(mut destination) => { - // consumer_ is the arbitrary chosen prefix by both parties - let topic_partition_address = - format!("forward_to_consumer_{topic_name}_{partition}"); - debug!( - "creating new secure channel via relay to {topic_partition_address}" - ); - destination.push_back(Service::new(topic_partition_address))?; - destination - } - ConsumerResolution::None => { - return Err(Error::new( - Origin::Transport, - Kind::Invalid, - "cannot encrypt messages with consumer key when consumer route resolution is not set", - )); - } - }; + Err(error) => { + warn!( + "Failed to rotate encryption key for topic `{topic_name}`: {error}. The current key will be used instead." + ); - let producer_encryptor_address = Self::create_key_exchange_only_secure_channel( - &inner, - context, - destination.clone(), - ) - .await?; + // borrow it again to satisfy the borrow checker + let encryption_key = inner + .producer_topic_encryptor_map + .get_mut(topic_name) + .expect("key should be present"); - if let Some(entry) = inner - .secure_channels - .secure_channel_registry() - .get_channel_by_encryptor_address(&producer_encryptor_address) - { - if let Err(error) = Self::validate_consumer_credentials(&inner, &entry).await { - inner - .node_manager - .delete_secure_channel(context, &producer_encryptor_address) - .await?; - return Err(error); - }; + // we might still need to rekey + if let RequiredOperation::Rekey = encryption_key.operation(now)? { + encryption_key.rekey(context, &secure_channels, now).await?; + } - // creates a dedicated secure channel to the consumer to keep the - // credentials up to date - if !inner.identity_encryptor_map.contains_key(entry.their_id()) { - if let Err(err) = - Self::create_secure_channel(&inner, context, destination).await - { - inner - .node_manager - .delete_secure_channel(context, &producer_encryptor_address) - .await?; - return Err(err); + encryptor_address = encryption_key.producer_encryptor_address.clone(); + rekey_counter = encryption_key.rekey_counter; } } - } else { - return Err(Error::new( - Origin::Transport, - Kind::Internal, - format!( - "cannot find secure channel address `{producer_encryptor_address}` in local registry" - ), - )); } - - inner - .topic_encryptor_map - .insert(topic_partition_key, producer_encryptor_address.clone()); - - debug!("created secure channel"); - producer_encryptor_address - } + RequiredOperation::MustRotate => { + // the key is no longer valid, must not be reused + rekey_counter = 0; + encryptor_address = self.exchange_key(context, topic_name, &mut inner).await?; + } + }; + } else { + rekey_counter = 0; + encryptor_address = self.exchange_key(context, topic_name, &mut inner).await?; }; - inner - .secure_channels + let entry = secure_channels .secure_channel_registry() .get_channel_by_encryptor_address(&encryptor_address) .ok_or_else(|| { @@ -176,7 +124,103 @@ impl KafkaKeyExchangeControllerImpl { Kind::Unknown, format!("cannot find secure channel address `{encryptor_address}` in local registry"), ) - }) + })?; + + Ok(TopicEncryptionKey { + rekey_counter, + encryptor_api_address: entry.encryptor_api_address().clone(), + consumer_decryptor_address: entry.their_decryptor_address().clone(), + }) + } + + async fn exchange_key( + &self, + context: &mut Context, + topic_name: &str, + inner: &mut MutexGuard<'_, InnerSecureChannelController>, + ) -> Result
{ + // destination is without the final service + let destination = match inner.consumer_resolution.clone() { + ConsumerResolution::SingleNode(mut destination) => { + debug!("creating new direct secure channel to consumer: {destination}"); + // remove /secure/api service from the destination if present + if let Some(service) = destination.last() { + let service: Option = service.cast(); + if let Some(service) = service { + if service.as_bytes() == DefaultAddress::SECURE_CHANNEL_LISTENER.as_bytes() + { + destination.pop_back(); + } + } + } + destination + } + ConsumerResolution::ViaRelay(mut destination) => { + // consumer_ is the arbitrary chosen prefix by both parties + let topic_address = format!("forward_to_consumer_{topic_name}"); + debug!("creating new secure channel via relay to {topic_address}"); + destination.push_back(Service::new(topic_address))?; + destination + } + ConsumerResolution::None => { + return Err(Error::new( + Origin::Transport, + Kind::Invalid, + "cannot encrypt messages with consumer key when consumer route resolution is not set", + )); + } + }; + + let producer_encryptor_address = + Self::create_key_exchange_only_secure_channel(inner, context, destination.clone()) + .await?; + + if let Some(entry) = inner + .secure_channels + .secure_channel_registry() + .get_channel_by_encryptor_address(&producer_encryptor_address) + { + if let Err(error) = Self::validate_consumer_credentials(inner, &entry).await { + if let Some(node_manager) = inner.node_manager.upgrade() { + node_manager + .delete_secure_channel(context, &producer_encryptor_address) + .await?; + } + return Err(error); + }; + } else { + return Err(Error::new( + Origin::Transport, + Kind::Internal, + format!( + "cannot find secure channel address `{producer_encryptor_address}` in local registry" + ), + )); + } + + let now = TimestampInSeconds(inner.clock.now()?); + + // TODO: retrieve these values from the other party + let valid_until = now + TimestampInSeconds(10 * 60); // 10 minutes + let rotate_after = now + TimestampInSeconds(5 * 60); // 5 minutes + let rekey_period = Duration::minutes(1); + + let encryption_key = TopicEncryptionKeyState { + producer_encryptor_address: producer_encryptor_address.clone(), + valid_until, + rotate_after, + rekey_period, + last_rekey: now, + last_rotation_attempt: TimestampInSeconds(0), + rekey_counter: 0, + }; + + inner + .producer_topic_encryptor_map + .insert(topic_name.to_string(), encryption_key); + + info!("Successfully exchanged new key with {destination} for topic {topic_name}"); + Ok(producer_encryptor_address) } async fn validate_consumer_credentials( diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/request.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/request.rs index e12667732a9..8459cc41f80 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/request.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/request.rs @@ -4,7 +4,7 @@ use crate::kafka::protocol_aware::RequestInfo; use crate::kafka::protocol_aware::{InterceptError, KafkaMessageRequestInterceptor}; use bytes::{Bytes, BytesMut}; use kafka_protocol::messages::fetch_request::FetchRequest; -use kafka_protocol::messages::produce_request::{PartitionProduceData, ProduceRequest}; +use kafka_protocol::messages::produce_request::ProduceRequest; use kafka_protocol::messages::request_header::RequestHeader; use kafka_protocol::messages::{ApiKey, ApiVersionsRequest, TopicName}; use kafka_protocol::protocol::buf::ByteBuf; @@ -173,14 +173,8 @@ impl InletInterceptorImpl { })? }; - let partitions: Vec = topic - .partitions - .iter() - .map(|partition| partition.partition) - .collect(); - self.key_exchange_controller - .publish_consumer(context, &topic_id, partitions) + .publish_consumer(context, &topic_id) .await .map_err(InterceptError::Ockam)? } @@ -221,15 +215,10 @@ impl InletInterceptorImpl { let buffer = if !self.encrypted_fields.is_empty() { // if we encrypt only specific fields, we assume the record must be // valid JSON map - self.encrypt_specific_fields( - context, - &topic.name, - data, - &record_value, - ) - .await? + self.encrypt_specific_fields(context, &topic.name, &record_value) + .await? } else { - self.encrypt_whole_record(context, &topic.name, data, record_value) + self.encrypt_whole_record(context, &topic.name, record_value) .await? }; record.value = Some(buffer.into()); @@ -265,12 +254,11 @@ impl InletInterceptorImpl { &self, context: &mut Context, topic_name: &TopicName, - data: &mut PartitionProduceData, record_value: Bytes, ) -> Result, InterceptError> { let encrypted_content = self .key_exchange_controller - .encrypt_content(context, topic_name, data.index, record_value.to_vec()) + .encrypt_content(context, topic_name, record_value.to_vec()) .await .map_err(InterceptError::Ockam)?; @@ -287,7 +275,6 @@ impl InletInterceptorImpl { &self, context: &mut Context, topic_name: &TopicName, - data: &mut PartitionProduceData, record_value: &Bytes, ) -> Result, InterceptError> { let mut record_value = serde_json::from_slice::(record_value)?; @@ -300,7 +287,6 @@ impl InletInterceptorImpl { .encrypt_content( context, topic_name, - data.index, serde_json::to_vec(value).map_err(|_| InterceptError::InvalidData)?, ) .await diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/response.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/response.rs index c0b221cf503..3f1fc6b4a0c 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/response.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/response.rs @@ -345,6 +345,7 @@ impl InletInterceptorImpl { .decrypt_content( context, &message_wrapper.consumer_decryptor_address, + message_wrapper.rekey_counter, message_wrapper.content, ) .await @@ -378,6 +379,7 @@ impl InletInterceptorImpl { .decrypt_content( context, &message_wrapper.consumer_decryptor_address, + message_wrapper.rekey_counter, message_wrapper.content, ) .await diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/tests.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/tests.rs index b7db0eb0119..c4714bdb0f9 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/tests.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/tests.rs @@ -31,7 +31,6 @@ impl KafkaKeyExchangeController for MockKafkaKeyExchangeController { &self, _context: &mut Context, _topic_name: &str, - _partition_index: i32, content: Vec, ) -> ockam_core::Result { let mut new_content = ENCRYPTED_PREFIX.to_vec(); @@ -39,6 +38,7 @@ impl KafkaKeyExchangeController for MockKafkaKeyExchangeController { Ok(KafkaEncryptedContent { consumer_decryptor_address: Address::from_string("mock"), content: new_content, + rekey_counter: u16::MAX, }) } @@ -46,6 +46,7 @@ impl KafkaKeyExchangeController for MockKafkaKeyExchangeController { &self, _context: &mut Context, _consumer_decryptor_address: &Address, + _rekey_counter: u16, encrypted_content: Vec, ) -> ockam_core::Result> { Ok(encrypted_content[PREFIX_LEN..].to_vec()) @@ -55,7 +56,6 @@ impl KafkaKeyExchangeController for MockKafkaKeyExchangeController { &self, _context: &mut Context, _topic_name: &str, - _partitions: Vec, ) -> ockam_core::Result<()> { Ok(()) } @@ -198,6 +198,7 @@ pub fn encode_field_value(value: serde_json::Value) -> String { .encode(KafkaEncryptedContent { consumer_decryptor_address: Address::from_string("mock"), content: encrypted_content, + rekey_counter: u16::MAX, }) .unwrap(); diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs index 2e1644dba45..0d002fcb7d3 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs @@ -150,7 +150,9 @@ pub(crate) struct KafkaEncryptedContent { /// The secure channel identifier used to encrypt the content #[n(0)] pub consumer_decryptor_address: Address, /// The encrypted content - #[n(1)] pub content: Vec + #[n(1)] pub content: Vec, + /// Number of times rekey was performed before encrypting the content + #[n(2)] pub rekey_counter: u16, } /// By default, kafka supports up to 1MB messages. 16MB is the maximum suggested diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs index afdd9b25173..c8a4e24b015 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs @@ -9,7 +9,7 @@ mod test { }; use crate::kafka::{ConsumerPublishing, ConsumerResolution}; use crate::port_range::PortRange; - use crate::test_utils::TestNode; + use crate::test_utils::{AuthorityConfiguration, TestNode}; use kafka_protocol::messages::ApiKey; use kafka_protocol::messages::BrokerId; use kafka_protocol::messages::{ApiVersionsRequest, MetadataRequest, MetadataResponse}; @@ -27,7 +27,12 @@ mod test { context: &mut Context, ) -> ockam::Result<()> { TestNode::clean().await?; - let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; + let handle = crate::test_utils::start_manager_for_tests( + context, + None, + AuthorityConfiguration::SelfReferencing, + ) + .await?; let inlet_map = KafkaInletController::new( (*handle.node_manager).clone(), diff --git a/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs index 016fc7f062d..1761bc0b20f 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs @@ -27,7 +27,7 @@ use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl; use crate::kafka::protocol_aware::inlet::KafkaInletInterceptorFactory; use crate::kafka::protocol_aware::utils::{encode_request, encode_response}; use crate::kafka::{ConsumerPublishing, ConsumerResolution, KafkaInletController}; -use crate::test_utils::{NodeManagerHandle, TestNode}; +use crate::test_utils::{AuthorityConfiguration, NodeManagerHandle, TestNode}; use ockam::compat::tokio::io::DuplexStream; use ockam::tcp::{TcpInletOptions, TcpOutletOptions}; use ockam::Context; @@ -131,7 +131,12 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption( context: &mut Context, ) -> ockam::Result<()> { TestNode::clean().await?; - let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; + let handle = crate::test_utils::start_manager_for_tests( + context, + None, + AuthorityConfiguration::SelfReferencing, + ) + .await?; let consumer_bootstrap_port = create_kafka_service( context, @@ -150,8 +155,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption( .await?; // for the consumer to become available to the producer, the consumer has to issue a Fetch - // request first, so the sidecar can react by creating the relay for partition - // 1 of 'my-topic' + // request first, so the sidecar can react by creating the relay for topic 'my-topic' { let mut consumer_mock_kafka = TcpServerSimulator::start("127.0.0.1:0").await; handle diff --git a/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs index bea6e701729..6df8cf4d8bf 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs @@ -31,7 +31,7 @@ use crate::kafka::protocol_aware::KafkaMessageInterceptorWrapper; use crate::kafka::protocol_aware::MAX_KAFKA_MESSAGE_SIZE; use crate::kafka::{ConsumerPublishing, ConsumerResolution}; use crate::port_range::PortRange; -use crate::test_utils::{NodeManagerHandle, TestNode}; +use crate::test_utils::{AuthorityConfiguration, NodeManagerHandle, TestNode}; const TEST_MAX_KAFKA_MESSAGE_SIZE: u32 = 128 * 1024; const TEST_KAFKA_API_VERSION: i16 = 13; @@ -66,7 +66,12 @@ async fn kafka_portal_worker__pieces_of_kafka_message__message_assembled( context: &mut Context, ) -> ockam::Result<()> { TestNode::clean().await?; - let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; + let handle = crate::test_utils::start_manager_for_tests( + context, + None, + AuthorityConfiguration::SelfReferencing, + ) + .await?; let portal_inlet_address = setup_only_worker(context, &handle).await; let mut request_buffer = BytesMut::new(); @@ -109,7 +114,12 @@ async fn kafka_portal_worker__double_kafka_message__message_assembled( context: &mut Context, ) -> ockam::Result<()> { TestNode::clean().await?; - let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; + let handle = crate::test_utils::start_manager_for_tests( + context, + None, + AuthorityConfiguration::SelfReferencing, + ) + .await?; let portal_inlet_address = setup_only_worker(context, &handle).await; let mut request_buffer = BytesMut::new(); @@ -147,7 +157,12 @@ async fn kafka_portal_worker__bigger_than_limit_kafka_message__error( context: &mut Context, ) -> ockam::Result<()> { TestNode::clean().await?; - let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; + let handle = crate::test_utils::start_manager_for_tests( + context, + None, + AuthorityConfiguration::SelfReferencing, + ) + .await?; let portal_inlet_address = setup_only_worker(context, &handle).await; // with the message container it goes well over the max allowed message kafka size @@ -194,7 +209,12 @@ async fn kafka_portal_worker__almost_over_limit_than_limit_kafka_message__two_ka context: &mut Context, ) -> ockam::Result<()> { TestNode::clean().await?; - let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; + let handle = crate::test_utils::start_manager_for_tests( + context, + None, + AuthorityConfiguration::SelfReferencing, + ) + .await?; let portal_inlet_address = setup_only_worker(context, &handle).await; // let's build the message to 90% of max. size @@ -363,7 +383,12 @@ async fn kafka_portal_worker__metadata_exchange__response_changed( context: &mut Context, ) -> ockam::Result<()> { TestNode::clean().await?; - let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?; + let handle = crate::test_utils::start_manager_for_tests( + context, + None, + AuthorityConfiguration::SelfReferencing, + ) + .await?; let project_authority = handle .node_manager .node_manager diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs index 916fc710ce6..8501636d484 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs @@ -303,7 +303,7 @@ impl NodeManager { /// SECURE CHANNEL LISTENERS impl NodeManager { - pub(super) async fn start_key_exchanger_service( + pub(crate) async fn start_key_exchanger_service( &self, context: &Context, address: Address, diff --git a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs index eb866a05fe0..e9ad1436994 100644 --- a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs @@ -2,6 +2,13 @@ use crate::config::lookup::InternetAddress; use crate::nodes::service::{NodeManagerCredentialRetrieverOptions, NodeManagerTrustOptions}; +use ockam::identity::utils::AttributesBuilder; +use ockam::identity::SecureChannels; +use ockam::tcp::{TcpListenerOptions, TcpTransport}; +use ockam::transport::HostnamePort; +use ockam::Result; +use ockam_core::AsyncTryClone; +use ockam_node::database::{DatabaseConfiguration, SqlxDatabase}; use ockam_node::{Context, NodeBuilder}; use sqlx::__rt::timeout; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -15,14 +22,6 @@ use tokio::net::{TcpListener, TcpSocket, TcpStream}; use tokio::runtime::Runtime; use tracing::{error, info}; -use ockam::identity::utils::AttributesBuilder; -use ockam::identity::SecureChannels; -use ockam::tcp::{TcpListenerOptions, TcpTransport}; -use ockam::transport::HostnamePort; -use ockam::Result; -use ockam_core::AsyncTryClone; -use ockam_node::database::{DatabaseConfiguration, SqlxDatabase}; - use crate::authenticator::credential_issuer::{DEFAULT_CREDENTIAL_VALIDITY, PROJECT_MEMBER_SCHEMA}; use crate::cli_state::{random_name, CliState}; use crate::nodes::service::{NodeManagerGeneralOptions, NodeManagerTransportOptions}; @@ -49,14 +48,21 @@ impl Drop for NodeManagerHandle { } } +/// Authority configuration for the node manager. +pub enum AuthorityConfiguration<'a> { + None, + Node(&'a TestNode), + SelfReferencing, +} + /// Starts a local node manager and returns a handle to it. /// -/// Be careful: if you drop the returned handle before the end of the test +/// Be careful: if you drop the returned handle before the end of the test, /// things *will* break. pub async fn start_manager_for_tests( context: &mut Context, bind_addr: Option<&str>, - trust_options: Option, + authority_configuration: AuthorityConfiguration<'_>, ) -> Result { let tcp = TcpTransport::create(context).await?; let tcp_listener = tcp @@ -80,18 +86,83 @@ pub async fn start_manager_for_tests( let vault = cli_state.make_vault(named_vault).await?; let identities = cli_state.make_identities(vault).await?; - let attributes = AttributesBuilder::with_schema(PROJECT_MEMBER_SCHEMA).build(); - let credential = identities - .credentials() - .credentials_creation() - .issue_credential( - &identifier, - &identifier, - attributes, - DEFAULT_CREDENTIAL_VALIDITY, - ) - .await - .unwrap(); + let trust_options = match authority_configuration { + AuthorityConfiguration::None => NodeManagerTrustOptions::new( + NodeManagerCredentialRetrieverOptions::None, + NodeManagerCredentialRetrieverOptions::None, + None, + NodeManagerCredentialRetrieverOptions::None, + ), + AuthorityConfiguration::Node(authority) => { + // if we have a third-party authority, we need to manually exchange identities + // since no actual secure channel is established to exchange credentials + authority + .secure_channels + .identities() + .identities_verification() + .import_from_change_history( + Some(&identifier), + identities + .identities_verification() + .get_change_history(&identifier) + .await?, + ) + .await?; + + let authority_identifier = authority.node_manager_handle.node_manager.identifier(); + identities + .identities_verification() + .import_from_change_history( + Some(&authority_identifier), + authority + .secure_channels + .identities() + .identities_verification() + .get_change_history(&authority_identifier) + .await?, + ) + .await?; + + let credential = authority + .secure_channels + .identities() + .credentials() + .credentials_creation() + .issue_credential( + &authority_identifier, + &identifier, + AttributesBuilder::with_schema(PROJECT_MEMBER_SCHEMA).build(), + DEFAULT_CREDENTIAL_VALIDITY, + ) + .await?; + + NodeManagerTrustOptions::new( + NodeManagerCredentialRetrieverOptions::InMemory(credential), + NodeManagerCredentialRetrieverOptions::None, + Some(authority_identifier), + NodeManagerCredentialRetrieverOptions::None, + ) + } + AuthorityConfiguration::SelfReferencing => { + let credential = identities + .credentials() + .credentials_creation() + .issue_credential( + &identifier, + &identifier, + AttributesBuilder::with_schema(PROJECT_MEMBER_SCHEMA).build(), + DEFAULT_CREDENTIAL_VALIDITY, + ) + .await?; + + NodeManagerTrustOptions::new( + NodeManagerCredentialRetrieverOptions::InMemory(credential), + NodeManagerCredentialRetrieverOptions::None, + Some(identifier), + NodeManagerCredentialRetrieverOptions::None, + ) + } + }; let node_manager = InMemoryNode::new( context, @@ -101,14 +172,7 @@ pub async fn start_manager_for_tests( tcp.async_try_clone().await?, None, ), - trust_options.unwrap_or_else(|| { - NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::InMemory(credential), - NodeManagerCredentialRetrieverOptions::None, - Some(identifier), - NodeManagerCredentialRetrieverOptions::None, - ) - }), + trust_options, ) .await?; @@ -218,22 +282,22 @@ impl TestNode { } pub async fn create(runtime: Arc, listen_addr: Option<&str>) -> Self { + Self::create_extended(runtime, listen_addr, AuthorityConfiguration::None).await + } + + pub async fn create_extended( + runtime: Arc, + listen_addr: Option<&str>, + authority_configuration: AuthorityConfiguration<'_>, + ) -> Self { let (mut context, mut executor) = NodeBuilder::new().with_runtime(runtime.clone()).build(); runtime.spawn(async move { executor.start_router().await.expect("cannot start router"); }); - let node_manager_handle = start_manager_for_tests( - &mut context, - listen_addr, - Some(NodeManagerTrustOptions::new( - NodeManagerCredentialRetrieverOptions::None, - NodeManagerCredentialRetrieverOptions::None, - None, - NodeManagerCredentialRetrieverOptions::None, - )), - ) - .await - .expect("cannot start node manager"); + let node_manager_handle = + start_manager_for_tests(&mut context, listen_addr, authority_configuration) + .await + .expect("cannot start node manager"); Self { context, diff --git a/implementations/rust/ockam/ockam_api/tests/latency.rs b/implementations/rust/ockam/ockam_api/tests/latency.rs index 4eb7bf974ee..7ed6b3d5a24 100644 --- a/implementations/rust/ockam/ockam_api/tests/latency.rs +++ b/implementations/rust/ockam/ockam_api/tests/latency.rs @@ -1,3 +1,5 @@ +#![recursion_limit = "256"] + use ockam_api::nodes::service::SecureChannelType; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -10,8 +12,7 @@ use tokio::time::timeout; use ockam_api::nodes::models::portal::OutletAccessControl; use ockam_api::test_utils::{start_tcp_echo_server, TestNode}; use ockam_core::env::FromString; -use ockam_core::errcode::{Kind, Origin}; -use ockam_core::{route, Address, AllowAll, Error, NeutralMessage}; +use ockam_core::{route, Address, AllowAll, NeutralMessage}; use ockam_multiaddr::MultiAddr; use ockam_transport_core::HostnamePort; @@ -21,12 +22,12 @@ use ockam_transport_core::HostnamePort; /// `cargo test --test latency --release -- --ignored --show-output` #[ignore] #[test] -pub fn measure_message_latency_two_nodes() { +pub fn measure_message_latency_two_nodes() -> ockam_core::Result<()> { let runtime = Arc::new(Runtime::new().unwrap()); let runtime_cloned = runtime.clone(); std::env::remove_var("OCKAM_LOG_LEVEL"); - let result: ockam::Result<()> = runtime_cloned.block_on(async move { + runtime_cloned.block_on(async move { let test_body = async move { TestNode::clean().await?; let mut first_node = TestNode::create(runtime.clone(), None).await; @@ -109,20 +110,17 @@ pub fn measure_message_latency_two_nodes() { }; timeout(Duration::from_secs(30), test_body).await.unwrap() - }); - - result.unwrap(); - drop(runtime_cloned); + }) } #[ignore] #[test] -pub fn measure_buffer_latency_two_nodes_portal() { +pub fn measure_buffer_latency_two_nodes_portal() -> ockam_core::Result<()> { let runtime = Arc::new(Runtime::new().unwrap()); let runtime_cloned = runtime.clone(); std::env::remove_var("OCKAM_LOG_LEVEL"); - let result: ockam::Result<()> = runtime_cloned.block_on(async move { + runtime_cloned.block_on(async move { let test_body = async move { let echo_server_handle = start_tcp_echo_server().await; @@ -200,11 +198,6 @@ pub fn measure_buffer_latency_two_nodes_portal() { Ok(()) }; - timeout(Duration::from_secs(30), test_body) - .await - .unwrap_or_else(|_| Err(Error::new(Origin::Node, Kind::Timeout, "Test timed out"))) - }); - - result.unwrap(); - drop(runtime_cloned); + timeout(Duration::from_secs(30), test_body).await.unwrap() + }) } diff --git a/implementations/rust/ockam/ockam_api/tests/portals.rs b/implementations/rust/ockam/ockam_api/tests/portals.rs index b86a303bfc8..e6cf766e1e7 100644 --- a/implementations/rust/ockam/ockam_api/tests/portals.rs +++ b/implementations/rust/ockam/ockam_api/tests/portals.rs @@ -1,7 +1,8 @@ use ockam_api::config::lookup::InternetAddress; use ockam_api::nodes::models::portal::OutletAccessControl; use ockam_api::test_utils::{ - start_manager_for_tests, start_passthrough_server, start_tcp_echo_server, Disruption, TestNode, + start_manager_for_tests, start_passthrough_server, start_tcp_echo_server, + AuthorityConfiguration, Disruption, TestNode, }; use ockam_api::ConnectionStatus; use ockam_core::compat::rand::RngCore; @@ -24,7 +25,8 @@ use tracing::info; async fn inlet_outlet_local_successful(context: &mut Context) -> ockam::Result<()> { TestNode::clean().await?; let echo_server_handle = start_tcp_echo_server().await; - let node_manager_handle = start_manager_for_tests(context, None, None).await?; + let node_manager_handle = + start_manager_for_tests(context, None, AuthorityConfiguration::SelfReferencing).await?; let outlet_status = node_manager_handle .node_manager @@ -221,7 +223,7 @@ fn portal_node_goes_down_reconnect() { } #[test] -fn portal_low_bandwidth_connection_keep_working_for_60s() { +fn portal_low_bandwidth_connection_keep_working_for_60s() -> ockam_core::Result<()> { // in this test we use two nodes, connected through a passthrough server // which limits the bandwidth to 170kb per second // @@ -240,7 +242,7 @@ fn portal_low_bandwidth_connection_keep_working_for_60s() { let runtime_cloned = runtime.clone(); std::env::remove_var("OCKAM_LOG_LEVEL"); - let result: ockam::Result<()> = handle.block_on(async move { + handle.block_on(async move { let test_body = async move { let echo_server_handle = start_tcp_echo_server().await; @@ -346,22 +348,18 @@ fn portal_low_bandwidth_connection_keep_working_for_60s() { Ok(()) }; - timeout(Duration::from_secs(90), test_body) - .await - .unwrap_or_else(|_| Err(Error::new(Origin::Node, Kind::Timeout, "Test timed out"))) - }); - - result.unwrap(); + timeout(Duration::from_secs(90), test_body).await.unwrap() + }) } #[test] -fn portal_heavy_load_exchanged() { +fn portal_heavy_load_exchanged() -> ockam_core::Result<()> { let runtime = Arc::new(Runtime::new().unwrap()); let handle = runtime.handle(); let runtime_cloned = runtime.clone(); std::env::remove_var("OCKAM_LOG_LEVEL"); - let result: ockam::Result<()> = handle.block_on(async move { + handle.block_on(async move { let test_body = async move { let echo_server_handle = start_tcp_echo_server().await; @@ -457,17 +455,13 @@ fn portal_heavy_load_exchanged() { Ok(()) }; - timeout(Duration::from_secs(90), test_body) - .await - .unwrap_or_else(|_| Err(Error::new(Origin::Node, Kind::Timeout, "Test timed out"))) - }); - - result.unwrap(); + timeout(Duration::from_secs(90), test_body).await.unwrap() + }) } #[ignore] #[test] -fn portal_connection_drop_packets() { +fn portal_connection_drop_packets() -> ockam_core::Result<()> { // Drop even packets after 32 packets (to allow for the initial // handshake to complete). // This test checks that: @@ -475,22 +469,25 @@ fn portal_connection_drop_packets() { // - the portion of the received data matches with the sent data. // - test_portal_payload_transfer(Disruption::DropPacketsAfter(32), Disruption::None); + test_portal_payload_transfer(Disruption::DropPacketsAfter(32), Disruption::None) } #[ignore] #[test] -fn portal_connection_change_packet_order() { +fn portal_connection_change_packet_order() -> ockam_core::Result<()> { // Change packet order after 32 packets (to allow for the initial // handshake to complete). // This test checks that: // - connection is interrupted when a failure is detected // - the portion of the received data matches with the sent data. - test_portal_payload_transfer(Disruption::PacketsOutOfOrderAfter(32), Disruption::None); + test_portal_payload_transfer(Disruption::PacketsOutOfOrderAfter(32), Disruption::None) } -fn test_portal_payload_transfer(outgoing_disruption: Disruption, incoming_disruption: Disruption) { +fn test_portal_payload_transfer( + outgoing_disruption: Disruption, + incoming_disruption: Disruption, +) -> ockam_core::Result<()> { // we use two nodes, connected through a passthrough server // ┌────────┐ ┌───────────┐ ┌────────┐ // │ Node └─────► TCP └────────► Node │ @@ -507,7 +504,7 @@ fn test_portal_payload_transfer(outgoing_disruption: Disruption, incoming_disrup let runtime_cloned = runtime.clone(); std::env::remove_var("OCKAM_LOG_LEVEL"); - let result: ockam::Result<_> = handle.block_on(async move { + handle.block_on(async move { let test_body = async move { let echo_server_handle = start_tcp_echo_server().await; @@ -609,10 +606,6 @@ fn test_portal_payload_transfer(outgoing_disruption: Disruption, incoming_disrup Ok(()) }; - timeout(Duration::from_secs(60), test_body) - .await - .unwrap_or_else(|_| Err(Error::new(Origin::Node, Kind::Timeout, "Test timed out"))) - }); - - result.unwrap(); + timeout(Duration::from_secs(60), test_body).await.unwrap() + }) } diff --git a/implementations/rust/ockam/ockam_core/src/compat.rs b/implementations/rust/ockam/ockam_core/src/compat.rs index 68136336506..06616baade7 100644 --- a/implementations/rust/ockam/ockam_core/src/compat.rs +++ b/implementations/rust/ockam/ockam_core/src/compat.rs @@ -317,6 +317,67 @@ pub mod vec { pub type Vec = heapless::Vec; } +/// Provides [`Clock`] and [`test::TestClock`]. +/// These are useful for testing time-dependent code. +pub mod clock { + /// A trait for providing the current time. + pub trait Clock: Send + Sync + 'static { + /// Returns the current time in seconds since the Unix epoch. + fn now(&self) -> crate::Result; + } + + /// A production implementation of the [`Clock`] trait. + /// Unless you are writing testing code, this is the implementation you should use. + #[derive(Clone, Default, Debug)] + pub struct ProductionClock; + + impl Clock for ProductionClock { + fn now(&self) -> crate::Result { + super::time::now() + } + } + + #[cfg(feature = "std")] + #[allow(dead_code, missing_docs)] + pub mod test { + use crate::compat::clock::Clock; + use core::fmt::Debug; + use std::sync::atomic::AtomicU64; + use std::sync::Arc; + + #[derive(Clone)] + pub struct TestClock { + pub time: Arc, + } + + impl Clock for TestClock { + fn now(&self) -> crate::Result { + Ok(self.time.load(std::sync::atomic::Ordering::Relaxed)) + } + } + + impl Debug for TestClock { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{:?}", self.time) + } + } + + impl TestClock { + pub fn new(time: u64) -> Self { + Self { + time: Arc::new(AtomicU64::new(time)), + } + } + + pub fn add_seconds(&self, seconds: u64) { + let time = self.time.load(std::sync::atomic::Ordering::Relaxed); + self.time + .store(time + seconds, std::sync::atomic::Ordering::Relaxed); + } + } + } +} + /// Provides `std::time` for `std` targets. #[cfg(feature = "std")] pub mod time { diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/api.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/api.rs index a0daab91b6f..936aa03bde3 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/api.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/api.rs @@ -5,7 +5,12 @@ use serde::{Deserialize, Serialize}; /// Request type for `EncryptorWorker` API Address #[derive(Serialize, Deserialize, Message)] -pub struct EncryptionRequest(pub Vec); +pub enum EncryptionRequest { + /// Encrypt data + Encrypt(Vec), + /// Trigger a manual rekey + Rekey, +} /// Response type for `EncryptorWorker` API Address #[derive(Serialize, Deserialize, Message)] @@ -18,7 +23,7 @@ pub enum EncryptionResponse { /// Request type for `Decryptor` API Address (the `Decryptor` is accessible through the `HandshakeWorker`) #[derive(Serialize, Deserialize, Message)] -pub struct DecryptionRequest(pub Vec); +pub struct DecryptionRequest(pub Vec, pub Option); /// Response type for `Decryptor` API Address (the `Decryptor` is accessible through the `HandshakeWorker`) #[derive(Serialize, Deserialize, Message)] diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs index d37ce85fbac..42027b53f72 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs @@ -5,7 +5,7 @@ use ockam_core::{Decodable, LocalMessage}; use ockam_node::Context; use crate::models::Identifier; -use crate::secure_channel::encryptor::{Encryptor, KEY_RENEWAL_INTERVAL}; +use crate::secure_channel::encryptor::KEY_RENEWAL_INTERVAL; use crate::secure_channel::handshake::handshake_state_machine::CommonStateMachine; use crate::secure_channel::key_tracker::KeyTracker; use crate::secure_channel::nonce_tracker::NonceTracker; @@ -17,6 +17,7 @@ use crate::{ }; use crate::secure_channel::encryptor_worker::SecureChannelSharedState; +use ockam_core::errcode::{Kind, Origin}; use ockam_vault::{AeadSecretKeyHandle, VaultForSecureChannels}; use tracing::{debug, info, trace, warn}; use tracing_attributes::instrument; @@ -80,9 +81,13 @@ impl DecryptorHandler { // Decode raw payload binary let mut request = DecryptionRequest::decode(&msg.payload)?; - - // Decrypt the binary - let decrypted_payload = self.decryptor.decrypt(request.0.as_mut_slice()).await; + let decrypted_payload = if let Some(rekey_counter) = request.1 { + self.decryptor + .decrypt_with_rekey_counter(&mut request.0, rekey_counter) + .await + } else { + self.decryptor.decrypt(request.0.as_mut_slice()).await + }; let response = match decrypted_payload { Ok((payload, _nonce)) => DecryptionResponse::Ok(payload.to_vec()), @@ -231,6 +236,7 @@ pub(crate) struct Decryptor { vault: Arc, key_tracker: KeyTracker, nonce_tracker: Option, + rekey_cache: Option<(u16, AeadSecretKeyHandle)>, } impl Decryptor { @@ -239,6 +245,7 @@ impl Decryptor { vault, key_tracker: KeyTracker::new(key, KEY_RENEWAL_INTERVAL), nonce_tracker: Some(NonceTracker::new()), + rekey_cache: None, } } @@ -248,6 +255,7 @@ impl Decryptor { vault, key_tracker: KeyTracker::new(key, KEY_RENEWAL_INTERVAL), nonce_tracker: None, + rekey_cache: None, } } @@ -273,7 +281,7 @@ impl Decryptor { if let Some(key) = self.key_tracker.get_key(nonce)? { key } else { - rekey_key = Encryptor::rekey(&self.vault, &self.key_tracker.current_key).await?; + rekey_key = self.vault.rekey(&self.key_tracker.current_key, 1).await?; &rekey_key } } else { @@ -305,15 +313,111 @@ impl Decryptor { } } + #[instrument(skip_all)] + pub async fn decrypt_with_rekey_counter<'a>( + &mut self, + payload: &'a mut [u8], + rekey_counter: u16, + ) -> Result<(&'a [u8], Nonce)> { + if payload.len() < 8 { + return Err(IdentityError::InvalidNonce)?; + } + + let nonce = Nonce::try_from(&payload[..8])?; + let nonce_tracker = if let Some(nonce_tracker) = &self.nonce_tracker { + Some(nonce_tracker.mark(nonce)?) + } else { + None + }; + + let key_handle = + if let Some((cached_rekey_counter, cached_key_handle)) = self.rekey_cache.clone() { + if cached_rekey_counter == rekey_counter { + Some(cached_key_handle) + } else { + self.rekey_cache = None; + self.vault + .delete_aead_secret_key(cached_key_handle.clone()) + .await?; + None + } + } else { + None + }; + + let key_handle = match key_handle { + Some(key) => key, + None => { + let current_number_of_rekeys = self.key_tracker.number_of_rekeys(); + if current_number_of_rekeys > rekey_counter as u64 { + return Err(ockam_core::Error::new( + Origin::Channel, + Kind::Invalid, + "cannot rekey backwards", + )); + } else if current_number_of_rekeys > u16::MAX as u64 { + return Err(ockam_core::Error::new( + Origin::Channel, + Kind::Invalid, + "rekey counter overflow", + )); + } else { + let n_rekying = rekey_counter - current_number_of_rekeys as u16; + if n_rekying > 0 { + let key_handle = self + .vault + .rekey(&self.key_tracker.current_key, n_rekying) + .await?; + self.rekey_cache = Some((rekey_counter, key_handle.clone())); + key_handle + } else { + self.key_tracker.current_key.clone() + } + } + } + }; + + // to improve protection against connection disruption attacks, we want to validate the + // message with a decryption _before_ committing to the new state + let result = self + .vault + .aead_decrypt( + &key_handle, + &mut payload[NOISE_NONCE_LEN..], + &nonce.to_aes_gcm_nonce(), + &[], + ) + .await; + + if result.is_ok() { + self.nonce_tracker = nonce_tracker; + if let Some(key_to_delete) = self.key_tracker.update_key(&key_handle)? { + self.vault.delete_aead_secret_key(key_to_delete).await?; + } + } + + result.map(|payload| (&*payload, nonce)) + } + /// Remove the channel keys on shutdown #[instrument(skip_all)] pub(crate) async fn shutdown(&self) -> Result<()> { self.vault .delete_aead_secret_key(self.key_tracker.current_key.clone()) .await?; - if let Some(previous_key) = self.key_tracker.previous_key.clone() { - self.vault.delete_aead_secret_key(previous_key).await?; + + if let Some(previous_key) = &self.key_tracker.previous_key { + self.vault + .delete_aead_secret_key(previous_key.clone()) + .await?; }; + + if let Some((_, key_handle)) = &self.rekey_cache { + self.vault + .delete_aead_secret_key(key_handle.clone()) + .await?; + } + Ok(()) } } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor.rs index b7e7db73b28..7a6fa90cec7 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor.rs @@ -4,8 +4,7 @@ use ockam_core::{Error, Result}; use ockam_vault::{AeadSecretKeyHandle, VaultForSecureChannels}; use tracing_attributes::instrument; -use crate::secure_channel::handshake::handshake::AES_GCM_TAGSIZE; -use crate::{Nonce, MAX_NONCE, NOISE_NONCE_LEN}; +use crate::{Nonce, NOISE_NONCE_LEN}; pub(crate) struct Encryptor { key: AeadSecretKeyHandle, @@ -20,28 +19,6 @@ pub(crate) struct Encryptor { pub(crate) const KEY_RENEWAL_INTERVAL: u64 = 32; impl Encryptor { - #[instrument(skip_all)] - pub async fn rekey( - vault: &Arc, - key: &AeadSecretKeyHandle, - ) -> Result { - let mut new_key_buffer = vec![0u8; 32 + AES_GCM_TAGSIZE]; - vault - .aead_encrypt( - key, - new_key_buffer.as_mut_slice(), - &MAX_NONCE.to_aes_gcm_nonce(), - &[], - ) - .await?; - - let buffer = vault - .import_secret_buffer(new_key_buffer[0..32].to_vec()) - .await?; - - vault.convert_secret_buffer_to_aead_key(buffer).await - } - #[instrument(skip_all)] pub async fn encrypt(&mut self, payload: &mut [u8]) -> Result<()> { let current_nonce = self.nonce; @@ -52,7 +29,7 @@ impl Encryptor { && current_nonce.value() > 0 && current_nonce.value() % KEY_RENEWAL_INTERVAL == 0 { - let new_key = Self::rekey(&self.vault, &self.key).await?; + let new_key = self.vault.rekey(&self.key, 1).await?; let old_key = core::mem::replace(&mut self.key, new_key); self.vault.delete_aead_secret_key(old_key).await?; } @@ -71,6 +48,14 @@ impl Encryptor { Ok(()) } + #[instrument(skip_all)] + pub async fn manual_rekey(&mut self) -> Result<()> { + let new_key = self.vault.rekey(&self.key, 1).await?; + let old_key = core::mem::replace(&mut self.key, new_key); + self.vault.delete_aead_secret_key(old_key).await?; + Ok(()) + } + pub fn new( key: AeadSecretKeyHandle, nonce: Nonce, diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs index b958fdac970..c0587a6aa5f 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs @@ -133,28 +133,46 @@ impl EncryptorWorker { // Decode raw payload binary let request = EncryptionRequest::decode(&msg.payload)?; + // If encryption fails, that means we have some internal error, + // and we may be in an invalid state, it's better to stop the Worker let mut should_stop = false; - let len = NOISE_NONCE_LEN + request.0.len() + AES_GCM_TAGSIZE; - let mut encrypted_payload = vec![0u8; len]; - encrypted_payload[NOISE_NONCE_LEN..len - AES_GCM_TAGSIZE].copy_from_slice(&request.0); - - // Encrypt the message - let response = match self - .encryptor - .encrypt(encrypted_payload.as_mut_slice()) - .await - { - Ok(()) => EncryptionResponse::Ok(encrypted_payload), - // If encryption failed, that means we have some internal error, - // and we may be in an invalid state, it's better to stop the Worker - Err(err) => { - should_stop = true; - error!( - "Error while encrypting: {err} at: {}", - self.addresses.encryptor - ); - EncryptionResponse::Err(err) + let response = match request { + EncryptionRequest::Encrypt(plaintext) => { + let len = NOISE_NONCE_LEN + plaintext.len() + AES_GCM_TAGSIZE; + let mut encrypted_payload = vec![0u8; len]; + encrypted_payload[NOISE_NONCE_LEN..len - AES_GCM_TAGSIZE] + .copy_from_slice(&plaintext); + + // Encrypt the message + match self + .encryptor + .encrypt(encrypted_payload.as_mut_slice()) + .await + { + Ok(()) => EncryptionResponse::Ok(encrypted_payload), + // If encryption failed, that means we have some internal error, + // and we may be in an invalid state, it's better to stop the Worker + Err(err) => { + should_stop = true; + error!( + "Error while encrypting: {err} at: {}", + self.addresses.encryptor + ); + EncryptionResponse::Err(err) + } + } } + EncryptionRequest::Rekey => match self.encryptor.manual_rekey().await { + Ok(()) => EncryptionResponse::Ok(Vec::new()), + Err(err) => { + should_stop = true; + error!( + "Error while rekeying: {err} at: {}", + self.addresses.encryptor + ); + EncryptionResponse::Err(err) + } + }, }; // Send the reply to the caller diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/key_tracker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/key_tracker.rs index d0c48339e65..d600a4c7837 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/key_tracker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/key_tracker.rs @@ -23,6 +23,10 @@ impl KeyTracker { renewal_interval, } } + + pub(crate) fn number_of_rekeys(&self) -> u64 { + self.number_of_rekeys + } } impl KeyTracker { diff --git a/implementations/rust/ockam/ockam_identity/tests/channel.rs b/implementations/rust/ockam/ockam_identity/tests/channel.rs index bcf55d3112d..2086b1e5986 100644 --- a/implementations/rust/ockam/ockam_identity/tests/channel.rs +++ b/implementations/rust/ockam/ockam_identity/tests/channel.rs @@ -10,9 +10,9 @@ use ockam_identity::models::{CredentialSchemaIdentifier, Identifier}; use ockam_identity::secure_channels::secure_channels; use ockam_identity::utils::AttributesBuilder; use ockam_identity::{ - DecryptionResponse, EncryptionRequest, EncryptionResponse, IdentityAccessControlBuilder, - SecureChannelListenerOptions, SecureChannelOptions, SecureChannels, TrustEveryonePolicy, - TrustIdentifierPolicy, Vault, + DecryptionRequest, DecryptionResponse, EncryptionRequest, EncryptionResponse, + IdentityAccessControlBuilder, SecureChannelListenerOptions, SecureChannelOptions, + SecureChannels, TrustEveryonePolicy, TrustIdentifierPolicy, Vault, }; use ockam_node::{Context, MessageReceiveOptions, WorkerBuilder}; use ockam_vault::{ @@ -464,7 +464,7 @@ async fn test_channel_api(ctx: &mut Context) -> Result<()> { let encrypted_alice: EncryptionResponse = ctx .send_and_receive( route![alice_channel_data.encryptor_api_address().clone()], - EncryptionRequest(b"Ping".to_vec()), + EncryptionRequest::Encrypt(b"Ping".to_vec()), ) .await?; let encrypted_alice = match encrypted_alice { @@ -475,7 +475,7 @@ async fn test_channel_api(ctx: &mut Context) -> Result<()> { let encrypted_bob: EncryptionResponse = ctx .send_and_receive( route![bob_channel_data.encryptor_api_address().clone()], - EncryptionRequest(b"Pong".to_vec()), + EncryptionRequest::Encrypt(b"Pong".to_vec()), ) .await?; let encrypted_bob = match encrypted_bob { @@ -486,7 +486,7 @@ async fn test_channel_api(ctx: &mut Context) -> Result<()> { let decrypted_alice: DecryptionResponse = ctx .send_and_receive( route![alice_channel_data.decryptor_api_address().clone()], - encrypted_bob, + DecryptionRequest(encrypted_bob, None), ) .await?; let decrypted_alice = match decrypted_alice { @@ -497,7 +497,7 @@ async fn test_channel_api(ctx: &mut Context) -> Result<()> { let decrypted_bob: DecryptionResponse = ctx .send_and_receive( route![bob_channel_data.decryptor_api_address().clone()], - encrypted_alice, + DecryptionRequest(encrypted_alice, None), ) .await?; let decrypted_bob = match decrypted_bob { diff --git a/implementations/rust/ockam/ockam_identity/tests/persistence.rs b/implementations/rust/ockam/ockam_identity/tests/persistence.rs index 3e859728d64..b9b22860c99 100644 --- a/implementations/rust/ockam/ockam_identity/tests/persistence.rs +++ b/implementations/rust/ockam/ockam_identity/tests/persistence.rs @@ -53,7 +53,7 @@ async fn test_key_exchange_only(ctx: &mut Context) -> ockam_core::Result<()> { let EncryptionResponse::Ok(encrypted_msg1_alice) = ctx .send_and_receive( route![alice_channel.encryptor_api_address().clone()], - EncryptionRequest(msg1_alice.clone()), + EncryptionRequest::Encrypt(msg1_alice.clone()), ) .await? else { @@ -62,7 +62,7 @@ async fn test_key_exchange_only(ctx: &mut Context) -> ockam_core::Result<()> { let EncryptionResponse::Ok(encrypted_msg2_alice) = ctx .send_and_receive( route![alice_channel.encryptor_api_address().clone()], - EncryptionRequest(msg2_alice.clone()), + EncryptionRequest::Encrypt(msg2_alice.clone()), ) .await? else { @@ -71,7 +71,7 @@ async fn test_key_exchange_only(ctx: &mut Context) -> ockam_core::Result<()> { let EncryptionResponse::Ok(encrypted_msg1_bob) = ctx .send_and_receive( route![bob_channel.encryptor_api_address().clone()], - EncryptionRequest(msg1_bob.clone()), + EncryptionRequest::Encrypt(msg1_bob.clone()), ) .await? else { @@ -80,7 +80,7 @@ async fn test_key_exchange_only(ctx: &mut Context) -> ockam_core::Result<()> { let EncryptionResponse::Ok(encrypted_msg2_bob) = ctx .send_and_receive( route![bob_channel.encryptor_api_address().clone()], - EncryptionRequest(msg2_bob.clone()), + EncryptionRequest::Encrypt(msg2_bob.clone()), ) .await? else { @@ -90,7 +90,7 @@ async fn test_key_exchange_only(ctx: &mut Context) -> ockam_core::Result<()> { let DecryptionResponse::Ok(decrypted_msg1_alice) = ctx .send_and_receive( route![bob_channel.decryptor_api_address().clone()], - DecryptionRequest(encrypted_msg1_alice), + DecryptionRequest(encrypted_msg1_alice, None), ) .await? else { @@ -100,7 +100,7 @@ async fn test_key_exchange_only(ctx: &mut Context) -> ockam_core::Result<()> { let DecryptionResponse::Ok(decrypted_msg2_alice) = ctx .send_and_receive( route![bob_channel.decryptor_api_address().clone()], - DecryptionRequest(encrypted_msg2_alice), + DecryptionRequest(encrypted_msg2_alice, None), ) .await? else { @@ -110,7 +110,7 @@ async fn test_key_exchange_only(ctx: &mut Context) -> ockam_core::Result<()> { let DecryptionResponse::Ok(decrypted_msg1_bob) = ctx .send_and_receive( route![alice_channel.decryptor_api_address().clone()], - DecryptionRequest(encrypted_msg1_bob), + DecryptionRequest(encrypted_msg1_bob, None), ) .await? else { @@ -120,7 +120,7 @@ async fn test_key_exchange_only(ctx: &mut Context) -> ockam_core::Result<()> { let DecryptionResponse::Ok(decrypted_msg2_bob) = ctx .send_and_receive( route![alice_channel.decryptor_api_address().clone()], - DecryptionRequest(encrypted_msg2_bob), + DecryptionRequest(encrypted_msg2_bob, None), ) .await? else { @@ -222,7 +222,7 @@ fn test_persistence() -> ockam_core::Result<()> { let EncryptionResponse::Ok(encrypted_msg1_alice) = ctx1 .send_and_receive( route![alice_channel.encryptor_api_address().clone()], - EncryptionRequest(msg1_alice.clone()), + EncryptionRequest::Encrypt(msg1_alice.clone()), ) .await? else { @@ -231,7 +231,7 @@ fn test_persistence() -> ockam_core::Result<()> { let EncryptionResponse::Ok(encrypted_msg2_alice) = ctx1 .send_and_receive( route![alice_channel.encryptor_api_address().clone()], - EncryptionRequest(msg2_alice.clone()), + EncryptionRequest::Encrypt(msg2_alice.clone()), ) .await? else { @@ -240,7 +240,7 @@ fn test_persistence() -> ockam_core::Result<()> { let EncryptionResponse::Ok(encrypted_msg1_bob) = ctx1 .send_and_receive( route![bob_channel.encryptor_api_address().clone()], - EncryptionRequest(msg1_bob.clone()), + EncryptionRequest::Encrypt(msg1_bob.clone()), ) .await? else { @@ -249,7 +249,7 @@ fn test_persistence() -> ockam_core::Result<()> { let EncryptionResponse::Ok(encrypted_msg2_bob) = ctx1 .send_and_receive( route![bob_channel.encryptor_api_address().clone()], - EncryptionRequest(msg2_bob.clone()), + EncryptionRequest::Encrypt(msg2_bob.clone()), ) .await? else { @@ -326,7 +326,7 @@ fn test_persistence() -> ockam_core::Result<()> { let DecryptionResponse::Ok(decrypted_msg1_alice) = ctx2 .send_and_receive( route![data.decryptor_api_address_bob.clone()], - DecryptionRequest(data.encrypted_msg1_alice), + DecryptionRequest(data.encrypted_msg1_alice, None), ) .await? else { @@ -336,7 +336,7 @@ fn test_persistence() -> ockam_core::Result<()> { let DecryptionResponse::Ok(decrypted_msg2_alice) = ctx2 .send_and_receive( route![data.decryptor_api_address_bob.clone()], - DecryptionRequest(data.encrypted_msg2_alice), + DecryptionRequest(data.encrypted_msg2_alice, None), ) .await? else { @@ -346,7 +346,7 @@ fn test_persistence() -> ockam_core::Result<()> { let DecryptionResponse::Ok(decrypted_msg1_bob) = ctx2 .send_and_receive( route![data.decryptor_api_address_alice.clone()], - DecryptionRequest(data.encrypted_msg1_bob), + DecryptionRequest(data.encrypted_msg1_bob, None), ) .await? else { @@ -356,7 +356,7 @@ fn test_persistence() -> ockam_core::Result<()> { let DecryptionResponse::Ok(decrypted_msg2_bob) = ctx2 .send_and_receive( route![data.decryptor_api_address_alice.clone()], - DecryptionRequest(data.encrypted_msg2_bob), + DecryptionRequest(data.encrypted_msg2_bob, None), ) .await? else { diff --git a/implementations/rust/ockam/ockam_vault/src/error.rs b/implementations/rust/ockam/ockam_vault/src/error.rs index 687431f691a..aea6087e635 100644 --- a/implementations/rust/ockam/ockam_vault/src/error.rs +++ b/implementations/rust/ockam/ockam_vault/src/error.rs @@ -35,6 +35,8 @@ pub enum VaultError { InvalidSignatureSize, /// Aead secret was not found in the storage AeadSecretNotFound, + /// Invalid rekey count + InvalidRekeyCount, /// Buffer is too short during encryption InsufficientEncryptBuffer, /// Buffer is too short during decryption @@ -61,6 +63,7 @@ impl core::fmt::Display for VaultError { Self::InvalidSha256Len => write!(f, "invalid sha256 len"), Self::InvalidSignatureSize => write!(f, "invalid signature len"), Self::AeadSecretNotFound => write!(f, "aead secret was not found in the storage"), + Self::InvalidRekeyCount => write!(f, "invalid rekey count"), Self::InsufficientEncryptBuffer => write!(f, "insufficient encrypt buffer"), Self::InsufficientDecryptBuffer => write!(f, "insufficient decrypt buffer"), } diff --git a/implementations/rust/ockam/ockam_vault/src/software/vault_for_secure_channels/vault_for_secure_channels.rs b/implementations/rust/ockam/ockam_vault/src/software/vault_for_secure_channels/vault_for_secure_channels.rs index 0cce4e24ef5..96340d96f59 100644 --- a/implementations/rust/ockam/ockam_vault/src/software/vault_for_secure_channels/vault_for_secure_channels.rs +++ b/implementations/rust/ockam/ockam_vault/src/software/vault_for_secure_channels/vault_for_secure_channels.rs @@ -22,6 +22,8 @@ use crate::{ use super::make_aes; +const AES_GCM_TAGSIZE: usize = 16; + /// [`SecureChannelVault`] implementation using software pub struct SoftwareVaultForSecureChannels { ephemeral_buffer_secrets: Arc>>, @@ -307,6 +309,36 @@ impl VaultForSecureChannels for SoftwareVaultForSecureChannels { Ok(plaintext) } + #[instrument(skip_all)] + async fn rekey( + &self, + secret_key_handle: &AeadSecretKeyHandle, + n: u16, + ) -> Result { + if n == 0 { + return Err(VaultError::InvalidRekeyCount)?; + } + + const MAX_NONCE: [u8; 12] = [ + 0x00, 0x00, 0x00, 0x00, // we only use 8 bytes of nonce + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, // u64::MAX + ]; + + let mut new_key_buffer = vec![0u8; 32 + AES_GCM_TAGSIZE]; + let mut counter = n; + + while counter > 0 { + let secret = self.get_aead_secret(secret_key_handle).await?; + let aes = make_aes(&secret); + aes.encrypt_message(&mut new_key_buffer, &MAX_NONCE, &[])?; + + counter -= 1; + } + + let buffer = self.import_secret_buffer(new_key_buffer).await?; + self.convert_secret_buffer_to_aead_key(buffer).await + } + #[instrument(skip_all)] async fn persist_aead_key(&self, secret_key_handle: &AeadSecretKeyHandle) -> Result<()> { let secret = self.get_aead_secret(secret_key_handle).await?; diff --git a/implementations/rust/ockam/ockam_vault/src/traits/vault_for_secure_channels.rs b/implementations/rust/ockam/ockam_vault/src/traits/vault_for_secure_channels.rs index 4edec6998e1..8c3a310e76d 100644 --- a/implementations/rust/ockam/ockam_vault/src/traits/vault_for_secure_channels.rs +++ b/implementations/rust/ockam/ockam_vault/src/traits/vault_for_secure_channels.rs @@ -58,6 +58,14 @@ pub trait VaultForSecureChannels: Send + Sync + 'static { aad: &[u8], ) -> Result<&'a mut [u8]>; + /// Perform rekey `n`. times. `n` must be greater than 0. + /// [1]: http://www.noiseprotocol.org/noise.html#cipher-functions + async fn rekey( + &self, + secret_key_handle: &AeadSecretKeyHandle, + n: u16, + ) -> Result; + /// Persist an existing AEAD key. async fn persist_aead_key(&self, secret_key_handle: &AeadSecretKeyHandle) -> Result<()>;