Skip to content

Commit

Permalink
feat(rust): implemented key rotation and rekeying for the kafka use case
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed Oct 31, 2024
1 parent fdb5469 commit e92a7d5
Show file tree
Hide file tree
Showing 25 changed files with 1,025 additions and 385 deletions.

Large diffs are not rendered by default.

13 changes: 3 additions & 10 deletions implementations/rust/ockam/ockam_api/src/kafka/key_exchange/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,8 @@ pub enum ConsumerPublishing {
#[n(2)] Relay(#[n(1)] MultiAddr),
}

type TopicPartition = (String, i32);

/// Offer simple APIs to encrypt and decrypt kafka messages.
/// Underneath it creates secure channels for each topic/partition
/// and uses them to encrypt the content.
/// Multiple secure channels may be created for the same topic/partition
/// but each will be explicitly labeled.
/// Underneath it creates secure channels for each topic uses them to encrypt the content.
#[async_trait]
pub(crate) trait KafkaKeyExchangeController: Send + Sync + 'static {
/// Encrypts the content specifically for the consumer waiting for that topic name and
Expand All @@ -43,7 +38,6 @@ pub(crate) trait KafkaKeyExchangeController: Send + Sync + 'static {
&self,
context: &mut Context,
topic_name: &str,
partition_index: i32,
content: Vec<u8>,
) -> ockam_core::Result<KafkaEncryptedContent>;

Expand All @@ -53,16 +47,15 @@ pub(crate) trait KafkaKeyExchangeController: Send + Sync + 'static {
&self,
context: &mut Context,
consumer_decryptor_address: &Address,
rekey_counter: u16,
encrypted_content: Vec<u8>,
) -> ockam_core::Result<Vec<u8>>;

/// Starts relays in the orchestrator for each {topic_name}_{partition} combination
/// should be used only by the consumer.
/// Starts relays in the orchestrator for each topic name, should be used only by the consumer.
/// does nothing if they were already created, but fails it they already exist.
async fn publish_consumer(
&self,
context: &mut Context,
topic_name: &str,
partitions: Vec<i32>,
) -> ockam_core::Result<()>;
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::kafka::protocol_aware::RequestInfo;
use crate::kafka::protocol_aware::{InterceptError, KafkaMessageRequestInterceptor};
use bytes::{Bytes, BytesMut};
use kafka_protocol::messages::fetch_request::FetchRequest;
use kafka_protocol::messages::produce_request::{PartitionProduceData, ProduceRequest};
use kafka_protocol::messages::produce_request::ProduceRequest;
use kafka_protocol::messages::request_header::RequestHeader;
use kafka_protocol::messages::{ApiKey, TopicName};
use kafka_protocol::protocol::buf::ByteBuf;
Expand Down Expand Up @@ -145,14 +145,8 @@ impl InletInterceptorImpl {
})?
};

let partitions: Vec<i32> = topic
.partitions
.iter()
.map(|partition| partition.partition)
.collect();

self.key_exchange_controller
.publish_consumer(context, &topic_id, partitions)
.publish_consumer(context, &topic_id)
.await
.map_err(InterceptError::Ockam)?
}
Expand Down Expand Up @@ -190,15 +184,10 @@ impl InletInterceptorImpl {
let buffer = if !self.encrypted_fields.is_empty() {
// if we encrypt only specific fields, we assume the record must be
// valid JSON map
self.encrypt_specific_fields(
context,
topic_name,
data,
&record_value,
)
.await?
self.encrypt_specific_fields(context, topic_name, &record_value)
.await?
} else {
self.encrypt_whole_record(context, topic_name, data, record_value)
self.encrypt_whole_record(context, topic_name, record_value)
.await?
};
record.value = Some(buffer.into());
Expand Down Expand Up @@ -233,12 +222,11 @@ impl InletInterceptorImpl {
&self,
context: &mut Context,
topic_name: &TopicName,
data: &mut PartitionProduceData,
record_value: Bytes,
) -> Result<Vec<u8>, InterceptError> {
let encrypted_content = self
.key_exchange_controller
.encrypt_content(context, topic_name, data.index, record_value.to_vec())
.encrypt_content(context, topic_name, record_value.to_vec())
.await
.map_err(InterceptError::Ockam)?;

Expand All @@ -255,7 +243,6 @@ impl InletInterceptorImpl {
&self,
context: &mut Context,
topic_name: &TopicName,
data: &mut PartitionProduceData,
record_value: &Bytes,
) -> Result<Vec<u8>, InterceptError> {
let mut record_value = serde_json::from_slice::<serde_json::Value>(record_value)?;
Expand All @@ -268,7 +255,6 @@ impl InletInterceptorImpl {
.encrypt_content(
context,
topic_name,
data.index,
serde_json::to_vec(value).map_err(|_| InterceptError::InvalidData)?,
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ impl InletInterceptorImpl {
.decrypt_content(
context,
&message_wrapper.consumer_decryptor_address,
message_wrapper.rekey_counter,
message_wrapper.content,
)
.await
Expand Down Expand Up @@ -307,6 +308,7 @@ impl InletInterceptorImpl {
.decrypt_content(
context,
&message_wrapper.consumer_decryptor_address,
message_wrapper.rekey_counter,
message_wrapper.content,
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,22 @@ impl KafkaKeyExchangeController for MockKafkaKeyExchangeController {
&self,
_context: &mut Context,
_topic_name: &str,
_partition_index: i32,
content: Vec<u8>,
) -> ockam_core::Result<KafkaEncryptedContent> {
let mut new_content = ENCRYPTED_PREFIX.to_vec();
new_content.extend_from_slice(&content);
Ok(KafkaEncryptedContent {
consumer_decryptor_address: Address::from_string("mock"),
content: new_content,
rekey_counter: u16::MAX,
})
}

async fn decrypt_content(
&self,
_context: &mut Context,
_consumer_decryptor_address: &Address,
_rekey_counter: u16,
encrypted_content: Vec<u8>,
) -> ockam_core::Result<Vec<u8>> {
Ok(encrypted_content[PREFIX_LEN..].to_vec())
Expand All @@ -57,7 +58,6 @@ impl KafkaKeyExchangeController for MockKafkaKeyExchangeController {
&self,
_context: &mut Context,
_topic_name: &str,
_partitions: Vec<i32>,
) -> ockam_core::Result<()> {
Ok(())
}
Expand Down Expand Up @@ -243,6 +243,7 @@ pub fn encode_field_value(value: serde_json::Value) -> String {
.encode(KafkaEncryptedContent {
consumer_decryptor_address: Address::from_string("mock"),
content: encrypted_content,
rekey_counter: u16::MAX,
})
.unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ pub(crate) struct KafkaEncryptedContent {
/// The secure channel identifier used to encrypt the content
#[n(0)] pub consumer_decryptor_address: Address,
/// The encrypted content
#[n(1)] pub content: Vec<u8>
#[n(1)] pub content: Vec<u8>,
/// Number of times rekey was performed before encrypting the content
#[n(2)] pub rekey_counter: u16,
}

/// By default, kafka supports up to 1MB messages. 16MB is the maximum suggested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod test {
};
use crate::kafka::{ConsumerPublishing, ConsumerResolution};
use crate::port_range::PortRange;
use crate::test_utils::TestNode;
use crate::test_utils::{AuthorityConfiguration, TestNode};
use kafka_protocol::messages::ApiKey;
use kafka_protocol::messages::BrokerId;
use kafka_protocol::messages::{ApiVersionsRequest, MetadataRequest, MetadataResponse};
Expand All @@ -27,7 +27,12 @@ mod test {
context: &mut Context,
) -> ockam::Result<()> {
TestNode::clean().await?;
let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?;
let handle = crate::test_utils::start_manager_for_tests(
context,
None,
AuthorityConfiguration::SelfReferencing,
)
.await?;

let inlet_map = KafkaInletController::new(
(*handle.node_manager).clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl;
use crate::kafka::protocol_aware::inlet::KafkaInletInterceptorFactory;
use crate::kafka::protocol_aware::utils::{encode_request, encode_response};
use crate::kafka::{ConsumerPublishing, ConsumerResolution, KafkaInletController};
use crate::test_utils::{NodeManagerHandle, TestNode};
use crate::test_utils::{AuthorityConfiguration, NodeManagerHandle, TestNode};
use ockam::compat::tokio::io::DuplexStream;
use ockam::tcp::{TcpInletOptions, TcpOutletOptions};
use ockam::Context;
Expand Down Expand Up @@ -134,7 +134,12 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
context: &mut Context,
) -> ockam::Result<()> {
TestNode::clean().await?;
let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?;
let handle = crate::test_utils::start_manager_for_tests(
context,
None,
AuthorityConfiguration::SelfReferencing,
)
.await?;

let consumer_bootstrap_port = create_kafka_service(
context,
Expand All @@ -153,8 +158,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
.await?;

// for the consumer to become available to the producer, the consumer has to issue a Fetch
// request first, so the sidecar can react by creating the relay for partition
// 1 of 'my-topic'
// request first, so the sidecar can react by creating the relay for topic 'my-topic'
{
let mut consumer_mock_kafka = TcpServerSimulator::start("127.0.0.1:0").await;
handle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::kafka::protocol_aware::KafkaMessageInterceptorWrapper;
use crate::kafka::protocol_aware::MAX_KAFKA_MESSAGE_SIZE;
use crate::kafka::{ConsumerPublishing, ConsumerResolution};
use crate::port_range::PortRange;
use crate::test_utils::{NodeManagerHandle, TestNode};
use crate::test_utils::{AuthorityConfiguration, NodeManagerHandle, TestNode};

const TEST_MAX_KAFKA_MESSAGE_SIZE: u32 = 128 * 1024;
const TEST_KAFKA_API_VERSION: i16 = 13;
Expand Down Expand Up @@ -68,7 +68,12 @@ async fn kafka_portal_worker__pieces_of_kafka_message__message_assembled(
context: &mut Context,
) -> ockam::Result<()> {
TestNode::clean().await?;
let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?;
let handle = crate::test_utils::start_manager_for_tests(
context,
None,
AuthorityConfiguration::SelfReferencing,
)
.await?;
let portal_inlet_address = setup_only_worker(context, &handle).await;

let mut request_buffer = BytesMut::new();
Expand Down Expand Up @@ -111,7 +116,12 @@ async fn kafka_portal_worker__double_kafka_message__message_assembled(
context: &mut Context,
) -> ockam::Result<()> {
TestNode::clean().await?;
let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?;
let handle = crate::test_utils::start_manager_for_tests(
context,
None,
AuthorityConfiguration::SelfReferencing,
)
.await?;
let portal_inlet_address = setup_only_worker(context, &handle).await;

let mut request_buffer = BytesMut::new();
Expand Down Expand Up @@ -149,7 +159,12 @@ async fn kafka_portal_worker__bigger_than_limit_kafka_message__error(
context: &mut Context,
) -> ockam::Result<()> {
TestNode::clean().await?;
let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?;
let handle = crate::test_utils::start_manager_for_tests(
context,
None,
AuthorityConfiguration::SelfReferencing,
)
.await?;
let portal_inlet_address = setup_only_worker(context, &handle).await;

// with the message container it goes well over the max allowed message kafka size
Expand Down Expand Up @@ -203,7 +218,12 @@ async fn kafka_portal_worker__almost_over_limit_than_limit_kafka_message__two_ka
context: &mut Context,
) -> ockam::Result<()> {
TestNode::clean().await?;
let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?;
let handle = crate::test_utils::start_manager_for_tests(
context,
None,
AuthorityConfiguration::SelfReferencing,
)
.await?;
let portal_inlet_address = setup_only_worker(context, &handle).await;

// let's build the message to 90% of max. size
Expand Down Expand Up @@ -382,7 +402,12 @@ async fn kafka_portal_worker__metadata_exchange__response_changed(
context: &mut Context,
) -> ockam::Result<()> {
TestNode::clean().await?;
let handle = crate::test_utils::start_manager_for_tests(context, None, None).await?;
let handle = crate::test_utils::start_manager_for_tests(
context,
None,
AuthorityConfiguration::SelfReferencing,
)
.await?;
let project_authority = handle
.node_manager
.node_manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl NodeManager {

/// SECURE CHANNEL LISTENERS
impl NodeManager {
pub(super) async fn start_key_exchanger_service(
pub(crate) async fn start_key_exchanger_service(
&self,
context: &Context,
address: Address,
Expand Down
Loading

0 comments on commit e92a7d5

Please sign in to comment.