diff --git a/implementations/rust/ockam/ockam_identity/src/error.rs b/implementations/rust/ockam/ockam_identity/src/error.rs index 1efab301ae5..eca2591885f 100644 --- a/implementations/rust/ockam/ockam_identity/src/error.rs +++ b/implementations/rust/ockam/ockam_identity/src/error.rs @@ -24,6 +24,8 @@ pub enum IdentityError { UnknownTimestamp, /// Unknown Authority UnknownAuthority, + /// No CredentialsRetriever + NoCredentialsRetriever, /// Unknown version of the Credential UnknownCredentialVersion, /// Unknown version of the Identity diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/addresses.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/addresses.rs index 7de61dc8ebd..92b7a70338a 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/addresses.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/addresses.rs @@ -20,6 +20,8 @@ pub(crate) struct Addresses { pub(crate) encryptor: Address, // Used to decrypt messages that were received though some channel other than Ockam Routing from the other end of the channel pub(crate) encryptor_api: Address, + // Used by the encryptor itself for timer notifications (to force credentials refresh) + pub(crate) encryptor_internal: Address, } impl Addresses { @@ -35,6 +37,8 @@ impl Addresses { let encryptor = Address::random_tagged(&format!("SecureChannel.{}.encryptor", role_str)); let encryptor_api = Address::random_tagged(&format!("SecureChannel.{}.encryptor.api", role_str)); + let encryptor_internal = + Address::random_tagged(&format!("SecureChannel.{}.encryptor.internal", role_str)); Self { decryptor_internal, @@ -42,6 +46,7 @@ impl Addresses { decryptor_api, encryptor, encryptor_api, + encryptor_internal, } } } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs index a8e3dbf9864..b30ae49147b 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/decryptor.rs @@ -6,16 +6,18 @@ use ockam_node::Context; use crate::models::Identifier; use crate::secure_channel::encryptor::{Encryptor, KEY_RENEWAL_INTERVAL}; +use crate::secure_channel::handshake::handshake_state_machine::CommonStateMachine; use crate::secure_channel::key_tracker::KeyTracker; use crate::secure_channel::nonce_tracker::NonceTracker; use crate::secure_channel::Addresses; use crate::{ - DecryptionRequest, DecryptionResponse, IdentityError, IdentitySecureChannelLocalInfo, - SecureChannelMessage, + DecryptionRequest, DecryptionResponse, Identities, IdentityError, + IdentitySecureChannelLocalInfo, PlaintextPayloadMessage, RefreshCredentialsMessage, + SecureChannelMessage, TrustContext, }; use ockam_vault::{AeadSecretKeyHandle, VaultForSecureChannels}; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; pub(crate) struct DecryptorHandler { //for debug purposes only @@ -23,10 +25,15 @@ pub(crate) struct DecryptorHandler { pub(crate) addresses: Addresses, pub(crate) their_identity_id: Identifier, pub(crate) decryptor: Decryptor, + + identities: Arc, + trust_context: Option, } impl DecryptorHandler { pub fn new( + identities: Arc, + trust_context: Option, role: &'static str, addresses: Addresses, key: AeadSecretKeyHandle, @@ -38,6 +45,8 @@ impl DecryptorHandler { addresses, their_identity_id, decryptor: Decryptor::new(key, vault), + identities, + trust_context, } } @@ -71,32 +80,11 @@ impl DecryptorHandler { Ok(()) } - pub(crate) async fn handle_decrypt( + async fn handle_payload( &mut self, ctx: &mut Context, - msg: Routed, + mut msg: PlaintextPayloadMessage, ) -> Result<()> { - debug!( - "SecureChannel {} received Decrypt {}", - self.role, &self.addresses.decryptor_remote - ); - - // Decode raw payload binary - let payload = msg.into_transport_message().payload; - let payload = Vec::::decode(&payload)?; - - // Decrypt the binary - let decrypted_payload = self.decryptor.decrypt(&payload).await?; - - let msg: SecureChannelMessage = minicbor::decode(&decrypted_payload)?; - - let mut msg = match msg { - SecureChannelMessage::Payload(msg) => msg, - SecureChannelMessage::Close => { - todo!() - } - }; - // Add encryptor hop in the return_route (instead of our address) msg.return_route .modify() @@ -127,6 +115,71 @@ impl DecryptorHandler { } } + async fn handle_close(&mut self, ctx: &mut Context) -> Result<()> { + // Should be enough to stop the encryptor, since it will stop the decryptor + ctx.stop_worker(self.addresses.encryptor.clone()).await + } + + async fn handle_refresh_credentials( + &mut self, + _ctx: &mut Context, + msg: RefreshCredentialsMessage, + ) -> Result<()> { + debug!( + "Handling credentials refresh request for {}", + self.addresses.decryptor_remote + ); + + CommonStateMachine::process_identity_payload_static( + self.identities.clone(), + None, + self.trust_context.clone(), + Some(self.their_identity_id.clone()), + msg.change_history, + None, + msg.credentials, + None, + ) + .await?; + + info!( + "Successfully handled credentials refresh request for {}", + self.addresses.decryptor_remote + ); + + Ok(()) + } + + pub(crate) async fn handle_decrypt( + &mut self, + ctx: &mut Context, + msg: Routed, + ) -> Result<()> { + debug!( + "SecureChannel {} received Decrypt {}", + self.role, &self.addresses.decryptor_remote + ); + + // Decode raw payload binary + let payload = msg.into_transport_message().payload; + let payload = Vec::::decode(&payload)?; + + // Decrypt the binary + let decrypted_payload = self.decryptor.decrypt(&payload).await?; + + let msg: SecureChannelMessage = minicbor::decode(&decrypted_payload)?; + + match msg { + SecureChannelMessage::Payload(msg) => self.handle_payload(ctx, msg).await?, + SecureChannelMessage::RefreshCredentials(msg) => { + self.handle_refresh_credentials(ctx, msg).await? + } + SecureChannelMessage::Close => self.handle_close(ctx).await?, + }; + + Ok(()) + } + /// Remove the channel keys on shutdown pub(crate) async fn shutdown(&self) -> Result<()> { self.decryptor.shutdown().await diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs index 0d184cd6955..509d6605bd7 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs @@ -1,13 +1,22 @@ use ockam_core::compat::boxed::Box; +use ockam_core::compat::sync::Arc; use ockam_core::{async_trait, Decodable, Route}; use ockam_core::{Any, Result, Routed, Worker}; -use ockam_node::Context; -use tracing::{debug, error}; +use ockam_node::{Context, DelayedEvent}; +use std::time::Duration; +use tracing::{debug, error, info}; +use crate::models::CredentialData; use crate::secure_channel::addresses::Addresses; use crate::secure_channel::api::{EncryptionRequest, EncryptionResponse}; use crate::secure_channel::encryptor::Encryptor; -use crate::{IdentityError, PlaintextPayloadMessage, SecureChannelMessage}; +use crate::utils::now; +use crate::{ + AuthorityService, Identifier, IdentitiesReader, IdentityError, PlaintextPayloadMessage, + RefreshCredentialsMessage, SecureChannelMessage, TimestampInSeconds, +}; + +pub const DEFAULT_REFRESH_CREDENTIAL_TIME_GAP: Duration = Duration::from_secs(60); pub(crate) struct EncryptorWorker { //for debug purposes only @@ -15,20 +24,39 @@ pub(crate) struct EncryptorWorker { addresses: Addresses, remote_route: Route, encryptor: Encryptor, + my_identifier: Identifier, + identities_reader: Arc, + min_credential_expiration: Option, + refresh_credential_time_gap: Duration, + credential_refresh_event: Option>, + // TODO: Should be CredentialsRetriever + credentials_retriever: Option, } impl EncryptorWorker { + #[allow(clippy::too_many_arguments)] pub fn new( role: &'static str, addresses: Addresses, remote_route: Route, encryptor: Encryptor, + my_identifier: Identifier, + identities_reader: Arc, + min_credential_expiration: Option, + refresh_credential_time_gap: Duration, + credentials_retriever: Option, ) -> Self { Self { role, addresses, remote_route, encryptor, + my_identifier, + identities_reader, + min_credential_expiration, + refresh_credential_time_gap, + credential_refresh_event: None, + credentials_retriever, } } @@ -121,6 +149,112 @@ impl EncryptorWorker { Ok(()) } + + async fn handle_refresh_credentials(&mut self, ctx: &::Context) -> Result<()> { + debug!( + "Started credentials refresh for {}", + self.addresses.encryptor + ); + + let change_history = self + .identities_reader + .get_identity(&self.my_identifier) + .await?; + + let credential = if let Some(credentials_retriever) = &self.credentials_retriever { + credentials_retriever + .credential(ctx, &self.my_identifier) + .await? + } else { + return Err(IdentityError::NoCredentialsRetriever.into()); + }; + + let versioned_data = credential.credential.get_versioned_data()?; + let data = CredentialData::get_data(&versioned_data)?; + self.min_credential_expiration = Some(data.expires_at); + + let msg = RefreshCredentialsMessage { + change_history, + credentials: vec![credential], + }; + let msg = SecureChannelMessage::RefreshCredentials(msg); + + // Encrypt the message + let msg = match self.encryptor.encrypt(&minicbor::to_vec(&msg)?).await { + Ok(encrypted_payload) => encrypted_payload, + // If encryption failed, that means we have some internal error, + // and we may be in an invalid state, it's better to stop the Worker + Err(err) => { + let address = self.addresses.encryptor.clone(); + error!("Error while encrypting: {err} at: {address}"); + ctx.stop_worker(address).await?; + return Ok(()); + } + }; + + info!( + "Sending credentials refresh for {}", + self.addresses.encryptor + ); + + // Send the message to the decryptor on the other side + ctx.send_from_address( + self.remote_route.clone(), + msg, + self.addresses.encryptor.clone(), + ) + .await?; + + self.schedule_credentials_refresh(ctx).await?; + + Ok(()) + } + + async fn send_close_channel(&mut self, ctx: &Context) -> Result<()> { + let msg = SecureChannelMessage::Close; + + // Encrypt the message + let msg = self.encryptor.encrypt(&minicbor::to_vec(&msg)?).await?; + + // Send the message to the decryptor on the other side + ctx.send_from_address( + self.remote_route.clone(), + msg, + self.addresses.encryptor.clone(), + ) + .await?; + + Ok(()) + } + + async fn schedule_credentials_refresh(&mut self, ctx: &Context) -> Result<()> { + if let Some(min_credential_expiration) = self.min_credential_expiration { + self.credential_refresh_event = None; + + let mut credential_refresh_event = + DelayedEvent::create(ctx, self.addresses.encryptor_internal.clone(), ()).await?; + + let now = now()?; + + let duration = if min_credential_expiration + < now + self.refresh_credential_time_gap.as_secs().into() + { + 0 + } else { + *(min_credential_expiration + - self.refresh_credential_time_gap.as_secs().into() + - now) + }; + + debug!(duration = %duration, "Scheduling credentials refresh for {}", self.addresses.encryptor); + let duration = Duration::from_secs(duration); + credential_refresh_event.schedule(duration).await?; + + self.credential_refresh_event = Some(credential_refresh_event); + } + + Ok(()) + } } #[async_trait] @@ -128,6 +262,10 @@ impl Worker for EncryptorWorker { type Message = Any; type Context = Context; + async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> { + self.schedule_credentials_refresh(ctx).await + } + async fn handle_message( &mut self, ctx: &mut Self::Context, @@ -139,6 +277,8 @@ impl Worker for EncryptorWorker { self.handle_encrypt(ctx, msg).await?; } else if msg_addr == self.addresses.encryptor_api { self.handle_encrypt_api(ctx, msg).await?; + } else if msg_addr == self.addresses.encryptor_internal { + self.handle_refresh_credentials(ctx).await?; } else { return Err(IdentityError::UnknownChannelMsgDestination.into()); } @@ -150,6 +290,7 @@ impl Worker for EncryptorWorker { let _ = context .stop_worker(self.addresses.decryptor_internal.clone()) .await; + let _ = self.send_close_channel(context).await; self.encryptor.shutdown().await } } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_state_machine.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_state_machine.rs index 179e349641d..71f1b6c4432 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_state_machine.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_state_machine.rs @@ -61,7 +61,7 @@ pub(super) struct HandshakeResults { } /// This struct implements functions common to both initiator and the responder state machines -pub(super) struct CommonStateMachine { +pub(crate) struct CommonStateMachine { pub(super) identities: Arc, pub(super) identifier: Identifier, pub(super) purpose_key_attestation: PurposeKeyAttestation, @@ -116,77 +116,144 @@ impl CommonStateMachine { /// Verify the identity sent by the other party: the Purpose Key and the credentials must be valid /// If everything is valid, store the identity identifier which will used to make the /// final state machine result - pub(super) async fn verify_identity( + pub(super) async fn process_identity_payload( &mut self, peer: IdentityAndCredentials, - peer_public_key: &X25519PublicKey, + peer_public_key: X25519PublicKey, ) -> Result<()> { - let identity = Identity::import_from_change_history( + let identifier = Self::process_identity_payload_static( + self.identities.clone(), + Some(self.trust_policy.clone()), + self.trust_context.clone(), None, - peer.change_history.clone(), - self.identities.vault().verifying_vault, + peer.change_history, + Some(peer.purpose_key_attestation), + peer.credentials, + Some(peer_public_key), ) .await?; - self.identities + self.their_identifier = Some(identifier); + + Ok(()) + } + + /// Return the results of the full handshake + /// - the other party identity + /// - the encryption and decryption keys to use on the next messages to exchange + pub(super) fn make_handshake_results( + &self, + handshake_keys: Option, + ) -> Option { + match (self.their_identifier.clone(), handshake_keys) { + (Some(their_identifier), Some(handshake_keys)) => Some(HandshakeResults { + their_identifier, + handshake_keys, + }), + _ => None, + } + } +} + +impl CommonStateMachine { + /// Verify the identity sent by the other party: the Purpose Key and the credentials must be valid + /// If everything is valid, store the identity identifier which will used to make the + /// final state machine result + #[allow(clippy::too_many_arguments)] + pub(crate) async fn process_identity_payload_static( + identities: Arc, + trust_policy: Option>, + trust_context: Option, + expected_identifier: Option, + change_history: ChangeHistory, + purpose_key_attestation: Option, + credentials: Vec, + peer_public_key: Option, + ) -> Result { + let identity = Identity::import_from_change_history( + expected_identifier.as_ref(), + change_history, + identities.vault().verifying_vault, + ) + .await?; + + identities .identities_creation() .update_identity(&identity) .await?; - let purpose_key = self - .identities - .purpose_keys() - .purpose_keys_verification() - .verify_purpose_key_attestation( - Some(identity.identifier()), - &peer.purpose_key_attestation, - ) - .await?; + match (purpose_key_attestation, peer_public_key) { + (Some(purpose_key_attestation), Some(peer_public_key)) => { + let purpose_key = identities + .purpose_keys() + .purpose_keys_verification() + .verify_purpose_key_attestation( + Some(identity.identifier()), + &purpose_key_attestation, + ) + .await?; - match &purpose_key.public_key { - PurposePublicKey::SecureChannelStatic(public_key) => { - if public_key.0 != peer_public_key.0 { - return Err(IdentityError::InvalidKeyData.into()); + match &purpose_key.public_key { + PurposePublicKey::SecureChannelStatic(public_key) => { + if public_key.0 != peer_public_key.0 { + return Err(IdentityError::InvalidKeyData.into()); + } + } + PurposePublicKey::CredentialSigning(_) => { + return Err(IdentityError::InvalidKeyType.into()) + } } } - PurposePublicKey::CredentialSigning(_) => { - return Err(IdentityError::InvalidKeyType.into()) + (None, None) => {} + _ => { + return Err(IdentityError::InvalidKeyData.into()); } } - self.verify_credentials(identity.identifier(), peer.credentials) - .await?; - self.their_identifier = Some(identity.identifier().clone()); - Ok(()) + let their_identifier = identity.identifier().clone(); + + Self::verify_credentials( + identities, + trust_policy, + trust_context, + &their_identifier, + credentials, + ) + .await?; + + Ok(their_identifier) } /// Verify that the credentials sent by the other party are valid using a trust context /// and store them async fn verify_credentials( - &self, + identities: Arc, + trust_policy: Option>, + trust_context: Option, their_identifier: &Identifier, credentials: Vec, ) -> Result<()> { - // check our TrustPolicy - let trust_info = SecureChannelTrustInfo::new(their_identifier.clone()); - let trusted = self.trust_policy.check(&trust_info).await?; - if !trusted { - // TODO: Shutdown? Communicate error? - return Err(IdentityError::SecureChannelTrustCheckFailed.into()); + if let Some(trust_policy) = trust_policy { + // check our TrustPolicy + let trust_info = SecureChannelTrustInfo::new(their_identifier.clone()); + let trusted = trust_policy.check(&trust_info).await?; + if !trusted { + // TODO: Shutdown? Communicate error? + return Err(IdentityError::SecureChannelTrustCheckFailed.into()); + } + debug!( + "Initiator checked trust policy for SecureChannel from: {}", + their_identifier + ); } - debug!( - "Initiator checked trust policy for SecureChannel from: {}", - their_identifier - ); - if let Some(trust_context) = &self.trust_context { + if let Some(trust_context) = &trust_context { debug!( "got a trust context to check the credentials. There are {} credentials to check", credentials.len() ); for credential in &credentials { - let result = self - .identities + let result = identities .credentials() .credentials_verification() .receive_presented_credential( @@ -212,36 +279,19 @@ impl CommonStateMachine { Ok(()) } - - /// Return the results of the full handshake - /// - the other party identity - /// - the encryption and decryption keys to use on the next messages to exchange - pub(super) fn make_handshake_results( - &self, - handshake_keys: Option, - ) -> Option { - match (self.their_identifier.clone(), handshake_keys) { - (Some(their_identifier), Some(handshake_keys)) => Some(HandshakeResults { - their_identifier, - handshake_keys, - }), - _ => None, - } - } } /// This internal structure is used as a payload in the XX protocol #[derive(Debug, Clone, Encode, Decode)] #[rustfmt::skip] -#[cbor(map)] pub(super) struct IdentityAndCredentials { /// Exported identity - #[n(1)] pub(super) change_history: ChangeHistory, + #[n(0)] pub(super) change_history: ChangeHistory, /// The Purpose Key guarantees that the other end has access to the private key of the identity /// The Purpose Key here is also the static key of the noise ('x') and is issued with the static /// key of the identity - #[n(2)] pub(super) purpose_key_attestation: PurposeKeyAttestation, + #[n(1)] pub(super) purpose_key_attestation: PurposeKeyAttestation, /// Credentials associated to the identity along with corresponding Credentials Purpose Keys /// to verify those Credentials - #[n(3)] pub(super) credentials: Vec, + #[n(2)] pub(super) credentials: Vec, } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs index 6df7165fc73..793aa421da5 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs @@ -11,7 +11,7 @@ use ockam_node::callback::CallbackSender; use ockam_node::{Context, WorkerBuilder}; use tracing::{debug, info}; -use crate::models::{CredentialAndPurposeKey, Identifier}; +use crate::models::{CredentialAndPurposeKey, CredentialData, Identifier}; use crate::secure_channel::decryptor::DecryptorHandler; use crate::secure_channel::encryptor::Encryptor; use crate::secure_channel::encryptor_worker::EncryptorWorker; @@ -26,12 +26,12 @@ use crate::secure_channel::handshake::initiator_state_machine::InitiatorStateMac use crate::secure_channel::handshake::responder_state_machine::ResponderStateMachine; use crate::secure_channel::{Addresses, Role}; use crate::{ - IdentityError, SecureChannelPurposeKey, SecureChannelRegistryEntry, SecureChannels, - TrustContext, TrustPolicy, + IdentitiesReader, IdentityError, SecureChannelPurposeKey, SecureChannelRegistryEntry, + SecureChannels, TimestampInSeconds, TrustContext, TrustPolicy, }; /// This struct implements a Worker receiving and sending messages -/// on one side of the secure channel creation as specified with its role: INITIATOR or REPSONDER +/// on one side of the secure channel creation as specified with its role: INITIATOR or RESPONDER pub(crate) struct HandshakeWorker { secure_channels: Arc, callback_sender: Option>, @@ -41,6 +41,10 @@ pub(crate) struct HandshakeWorker { role: Role, remote_route: Option, decryptor_handler: Option, + min_credential_expiration: Option, + refresh_credential_time_gap: Duration, + trust_context: Option, + identities_reader: Arc, } #[ockam_core::worker] @@ -158,6 +162,7 @@ impl HandshakeWorker { trust_policy: Arc, decryptor_outgoing_access_control: Arc, credentials: Vec, + refresh_credential_time_gap: Duration, trust_context: Option, remote_route: Option, timeout: Option, @@ -165,16 +170,24 @@ impl HandshakeWorker { ) -> Result<()> { let vault = secure_channels.identities.vault().secure_channel_vault; let identities = secure_channels.identities(); + + let min_credential_expiration = credentials + .iter() + .filter_map(|credential| credential.credential.get_versioned_data().ok()) + .filter_map(|data| CredentialData::get_data(&data).ok()) + .map(|data| data.expires_at) + .min(); + let state_machine: Box = if role.is_initiator() { Box::new( InitiatorStateMachine::new( vault, - identities, + identities.clone(), identifier.clone(), purpose_key, credentials, trust_policy, - trust_context, + trust_context.clone(), ) .await?, ) @@ -182,12 +195,12 @@ impl HandshakeWorker { Box::new( ResponderStateMachine::new( vault, - identities, + identities.clone(), identifier.clone(), purpose_key, credentials, trust_policy, - trust_context, + trust_context.clone(), ) .await?, ) @@ -209,6 +222,10 @@ impl HandshakeWorker { remote_route: remote_route.clone(), addresses: addresses.clone(), decryptor_handler: None, + min_credential_expiration, + refresh_credential_time_gap, + trust_context, + identities_reader: identities.identities_reader(), }; WorkerBuilder::new(worker) @@ -290,6 +307,8 @@ impl HandshakeWorker { ) -> Result { // create a decryptor to delegate the processing of all messages after the handshake let decryptor = DecryptorHandler::new( + self.secure_channels.identities.clone(), + self.trust_context.clone(), self.role.str(), self.addresses.clone(), handshake_results.handshake_keys.decryption_key, @@ -297,6 +316,12 @@ impl HandshakeWorker { handshake_results.their_identifier.clone(), ); + // FIXME + let credentials_retriever = self + .trust_context + .clone() + .and_then(|x| x.authority().cloned().ok()); + // create a separate encryptor worker which will be started independently { let encryptor = EncryptorWorker::new( @@ -308,6 +333,11 @@ impl HandshakeWorker { 0, self.secure_channels.identities.vault().secure_channel_vault, ), + self.identifier.clone(), + self.identities_reader.clone(), + self.min_credential_expiration, + self.refresh_credential_time_gap, + credentials_retriever, ); let next_hop = self.remote_route()?.next()?.clone(); @@ -321,9 +351,17 @@ impl HandshakeWorker { Arc::new(AllowAll), Arc::new(AllowAll), ); + let internal_mailbox = Mailbox::new( + self.addresses.encryptor_internal.clone(), + Arc::new(AllowAll), + Arc::new(DenyAll), + ); WorkerBuilder::new(encryptor) - .with_mailboxes(Mailboxes::new(main_mailbox, vec![api_mailbox])) + .with_mailboxes(Mailboxes::new( + main_mailbox, + vec![api_mailbox, internal_mailbox], + )) .start(context) .await?; } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/initiator_state_machine.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/initiator_state_machine.rs index b5d8b82ab12..77c3de2e9e4 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/initiator_state_machine.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/initiator_state_machine.rs @@ -39,8 +39,11 @@ impl StateMachine for InitiatorStateMachine { let message2_payload = self.decode_message2(&message).await?; let their_identity_payload: IdentityAndCredentials = minicbor::decode(&message2_payload)?; - self.verify_identity(their_identity_payload, &self.handshake.state.rs()?.clone()) - .await?; + self.process_identity_payload( + their_identity_payload, + self.handshake.state.rs()?.clone(), + ) + .await?; let identity_payload = self .identity_payload .take() @@ -77,7 +80,7 @@ pub(super) struct InitiatorStateMachine { impl InitiatorStateMachine { delegate! { to self.common { - async fn verify_identity(&mut self, peer: IdentityAndCredentials, peer_public_key: &X25519PublicKey) -> Result<()>; + async fn process_identity_payload(&mut self, peer: IdentityAndCredentials, peer_public_key: X25519PublicKey) -> Result<()>; fn make_handshake_results(&self, handshake_keys: Option) -> Option; } } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/mod.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/mod.rs index 66f4c881326..f02bc20cf2a 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/mod.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/mod.rs @@ -4,9 +4,10 @@ mod error; // on a little endian system since it is not supporting a big endian one at the moment #[cfg(not(target_endian = "little"))] compile_error!("Key Exchange is only supported on little-endian machines"); + #[allow(clippy::module_inception)] mod handshake; -mod handshake_state_machine; +pub(crate) mod handshake_state_machine; pub(crate) mod handshake_worker; mod initiator_state_machine; mod responder_state_machine; diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/responder_state_machine.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/responder_state_machine.rs index 11482e592bb..9551b5fe139 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/responder_state_machine.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/responder_state_machine.rs @@ -48,8 +48,11 @@ impl StateMachine for ResponderStateMachine { let message3_payload = self.decode_message3(&message).await?; let their_identity_payload: IdentityAndCredentials = minicbor::decode(&message3_payload)?; - self.verify_identity(their_identity_payload, &self.handshake.state.rs()?.clone()) - .await?; + self.process_identity_payload( + their_identity_payload, + self.handshake.state.rs()?.clone(), + ) + .await?; self.set_final_state(Responder).await?; Ok(NoAction) } @@ -80,7 +83,7 @@ pub struct ResponderStateMachine { impl ResponderStateMachine { delegate! { to self.common { - async fn verify_identity(&mut self, peer: IdentityAndCredentials, peer_public_key: &X25519PublicKey) -> Result<()>; + async fn process_identity_payload(&mut self, peer: IdentityAndCredentials, peer_public_key: X25519PublicKey) -> Result<()>; fn make_handshake_results(&self, handshake_keys: Option) -> Option; } } diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/listener.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/listener.rs index 41a4868f54c..c835816b9b2 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/listener.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/listener.rs @@ -48,7 +48,7 @@ impl IdentityChannelListener { /// If credentials are not provided via list in options /// get them from the trust context - async fn get_credentials(&self, ctx: &mut Context) -> Result> { + async fn get_credentials(&self, ctx: &Context) -> Result> { let credentials = if self.options.credentials.is_empty() { if let Some(trust_context) = &self.options.trust_context { vec![ @@ -87,7 +87,8 @@ impl Worker for IdentityChannelListener { .options .create_access_control(ctx.flow_controls(), flow_control_id); - let credentials = self.get_credentials(ctx).await?; + // FIXME + let credentials = self.get_credentials(ctx).await.ok().unwrap_or(vec![]); // TODO: Allow manual PurposeKey management let purpose_key = self @@ -107,6 +108,7 @@ impl Worker for IdentityChannelListener { self.options.trust_policy.clone(), access_control.decryptor_outgoing_access_control, credentials, + self.options.refresh_credential_time_gap, self.options.trust_context.clone(), None, None, diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/message.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/message.rs index e9a6c4c2138..ce5879137e6 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/message.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/message.rs @@ -1,3 +1,4 @@ +use crate::models::{ChangeHistory, CredentialAndPurposeKey}; use minicbor::{Decode, Encode}; use ockam_core::Route; @@ -7,8 +8,10 @@ use ockam_core::Route; pub enum SecureChannelMessage { /// Encrypted payload message. #[n(0)] Payload(#[n(0)] PlaintextPayloadMessage), + /// Present credentials one more time. + #[n(1)] RefreshCredentials(#[n(0)] RefreshCredentialsMessage), /// Close the channel. - #[n(1)] Close, + #[n(2)] Close, } /// Secure Channel Message format. @@ -22,3 +25,14 @@ pub struct PlaintextPayloadMessage { /// Untyped binary payload. #[n(2)] pub payload: Vec, } + +/// Secure Channel Message format. +#[derive(Debug, Encode, Decode, Clone)] +#[rustfmt::skip] +pub struct RefreshCredentialsMessage { + /// Exported identity + #[n(0)] pub change_history: ChangeHistory, + /// Credentials associated to the identity along with corresponding Credentials Purpose Keys + /// to verify those Credentials + #[n(1)] pub credentials: Vec, +} diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/mod.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/mod.rs index e0838bce856..3670ac8a6d5 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/mod.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/mod.rs @@ -5,7 +5,7 @@ mod api; mod decryptor; mod encryptor; mod encryptor_worker; -mod handshake; +pub(crate) mod handshake; mod key_tracker; mod listener; mod local_info; diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs index 0dc6a3b7da4..59e061c4386 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/options.rs @@ -7,6 +7,7 @@ use crate::models::CredentialAndPurposeKey; use crate::secure_channel::Addresses; use crate::{TrustContext, TrustEveryonePolicy, TrustPolicy}; +use crate::secure_channel::encryptor_worker::DEFAULT_REFRESH_CREDENTIAL_TIME_GAP; use core::fmt; use core::fmt::Formatter; use core::time::Duration; @@ -21,6 +22,7 @@ pub struct SecureChannelOptions { pub(crate) trust_context: Option, pub(crate) credentials: Vec, pub(crate) timeout: Duration, + pub(crate) refresh_credential_time_gap: Duration, } impl fmt::Debug for SecureChannelOptions { @@ -43,6 +45,7 @@ impl SecureChannelOptions { trust_context: None, credentials: vec![], timeout: DEFAULT_TIMEOUT, + refresh_credential_time_gap: DEFAULT_REFRESH_CREDENTIAL_TIME_GAP, } } @@ -80,6 +83,15 @@ impl SecureChannelOptions { pub fn producer_flow_control_id(&self) -> FlowControlId { self.flow_control_id.clone() } + + /// Sets refresh_credential_time_gap + pub fn with_refresh_credential_time_gap( + mut self, + refresh_credential_time_gap: Duration, + ) -> Self { + self.refresh_credential_time_gap = refresh_credential_time_gap; + self + } } impl SecureChannelOptions { @@ -130,6 +142,7 @@ pub struct SecureChannelListenerOptions { pub(crate) trust_policy: Arc, pub(crate) trust_context: Option, pub(crate) credentials: Vec, + pub(crate) refresh_credential_time_gap: Duration, } impl fmt::Debug for SecureChannelListenerOptions { @@ -150,6 +163,7 @@ impl SecureChannelListenerOptions { trust_policy: Arc::new(TrustEveryonePolicy), trust_context: None, credentials: vec![], + refresh_credential_time_gap: DEFAULT_REFRESH_CREDENTIAL_TIME_GAP, } } @@ -190,6 +204,15 @@ impl SecureChannelListenerOptions { pub fn spawner_flow_control_id(&self) -> FlowControlId { self.flow_control_id.clone() } + + /// Sets refresh_credential_time_gap + pub fn with_refresh_credential_time_gap( + mut self, + refresh_credential_time_gap: Duration, + ) -> Self { + self.refresh_credential_time_gap = refresh_credential_time_gap; + self + } } impl SecureChannelListenerOptions { diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_channels.rs b/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_channels.rs index 3d023617518..8b94dafbb04 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_channels.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channels/secure_channels.rs @@ -4,7 +4,7 @@ use ockam_core::{Address, Route}; use ockam_node::Context; use crate::identities::Identities; -use crate::models::Identifier; +use crate::models::{CredentialAndPurposeKey, Identifier}; use crate::secure_channel::handshake_worker::HandshakeWorker; use crate::secure_channel::{ Addresses, IdentityChannelListener, Role, SecureChannelListenerOptions, SecureChannelOptions, @@ -80,6 +80,30 @@ impl SecureChannels { Ok(SecureChannelListener::new(address, flow_control_id)) } + /// If credentials are not provided via list in options + /// get them from the trust context + pub(crate) async fn get_credentials( + identifier: &Identifier, + options: &SecureChannelOptions, + ctx: &Context, + ) -> Result> { + let credentials = if options.credentials.is_empty() { + if let Some(trust_context) = &options.trust_context { + vec![ + trust_context + .authority()? + .credential(ctx, identifier) + .await?, + ] + } else { + vec![] + } + } else { + options.credentials.clone() + }; + Ok(credentials) + } + /// Initiate a SecureChannel using `Route` to the SecureChannel listener and [`SecureChannelOptions`] pub async fn create_secure_channel( &self, @@ -105,6 +129,8 @@ impl SecureChannels { .get_or_create_secure_channel_purpose_key(identifier) .await?; + let credentials = Self::get_credentials(identifier, &options, ctx).await?; + HandshakeWorker::create( ctx, Arc::new(self.clone()), @@ -113,7 +139,8 @@ impl SecureChannels { purpose_key, options.trust_policy, access_control.decryptor_outgoing_access_control, - options.credentials, + credentials, + options.refresh_credential_time_gap, options.trust_context, Some(route), Some(options.timeout), diff --git a/implementations/rust/ockam/ockam_identity/tests/channel.rs b/implementations/rust/ockam/ockam_identity/tests/channel.rs index fa680eef9a8..910239d85e5 100644 --- a/implementations/rust/ockam/ockam_identity/tests/channel.rs +++ b/implementations/rust/ockam/ockam_identity/tests/channel.rs @@ -1063,28 +1063,34 @@ async fn should_stop_encryptor__and__decryptor__in__secure_channel( let secure_channels = secure_channels(); let identities_creation = secure_channels.identities().identities_creation(); - let alice = identities_creation.create_identity().await?; - let bob = identities_creation.create_identity().await?; - - let bob_trust_policy = TrustIdentifierPolicy::new(alice.identifier().clone()); - let alice_trust_policy = TrustIdentifierPolicy::new(bob.identifier().clone()); + let alice = identities_creation + .create_identity() + .await? + .identifier() + .clone(); + let bob = identities_creation + .create_identity() + .await? + .identifier() + .clone(); - let identity_options = SecureChannelListenerOptions::new().with_trust_policy(bob_trust_policy); let _bob_listener = secure_channels - .create_secure_channel_listener(ctx, bob.identifier(), "bob_listener", identity_options) + .create_secure_channel_listener( + ctx, + &bob, + "bob_listener", + SecureChannelListenerOptions::new(), + ) .await?; - let _alice_sc = { - let alice_options = SecureChannelOptions::new().with_trust_policy(alice_trust_policy); - secure_channels - .create_secure_channel( - ctx, - alice.identifier(), - route!["bob_listener"], - alice_options, - ) - .await? - }; + secure_channels + .create_secure_channel( + ctx, + &alice, + route!["bob_listener"], + SecureChannelOptions::new(), + ) + .await?; ctx.sleep(Duration::from_millis(100)).await; @@ -1094,32 +1100,13 @@ async fn should_stop_encryptor__and__decryptor__in__secure_channel( let channel1 = sc_list[0].clone(); let channel2 = sc_list[1].clone(); + // This will stop both ends of the channel secure_channels .stop_secure_channel(ctx, channel1.encryptor_messaging_address()) .await?; ctx.sleep(Duration::from_millis(100)).await; - assert_eq!( - secure_channels - .secure_channel_registry() - .get_channel_list() - .len(), - 1 - ); - - let workers = ctx.list_workers().await?; - assert!(!workers.contains(channel1.decryptor_messaging_address())); - assert!(!workers.contains(channel1.encryptor_messaging_address())); - assert!(workers.contains(channel2.decryptor_messaging_address())); - assert!(workers.contains(channel2.encryptor_messaging_address())); - - secure_channels - .stop_secure_channel(ctx, channel2.encryptor_messaging_address()) - .await?; - - ctx.sleep(Duration::from_millis(100)).await; - assert_eq!( secure_channels .secure_channel_registry() diff --git a/implementations/rust/ockam/ockam_identity/tests/credentials.rs b/implementations/rust/ockam/ockam_identity/tests/credentials.rs index 829f439ddbc..97623033c5d 100644 --- a/implementations/rust/ockam/ockam_identity/tests/credentials.rs +++ b/implementations/rust/ockam/ockam_identity/tests/credentials.rs @@ -1,15 +1,16 @@ -use std::sync::atomic::{AtomicI8, Ordering}; +use std::sync::atomic::{AtomicI8, AtomicU8, Ordering}; use std::time::Duration; use ockam_core::compat::sync::Arc; use ockam_core::{async_trait, Any, DenyAll}; use ockam_core::{route, Result, Routed, Worker}; -use ockam_identity::models::CredentialSchemaIdentifier; +use ockam_identity::models::{CredentialAndPurposeKey, CredentialSchemaIdentifier}; use ockam_identity::secure_channels::secure_channels; use ockam_identity::utils::AttributesBuilder; use ockam_identity::{ - AuthorityService, CredentialAccessControl, CredentialsMemoryRetriever, - SecureChannelListenerOptions, SecureChannelOptions, TrustContext, TrustIdentifierPolicy, + AuthorityService, CredentialAccessControl, Credentials, CredentialsMemoryRetriever, + CredentialsRetriever, Identifier, SecureChannelListenerOptions, SecureChannelOptions, + TrustContext, TrustIdentifierPolicy, }; use ockam_node::{Context, WorkerBuilder}; @@ -329,3 +330,118 @@ impl Worker for CountingWorker { Ok(()) } } + +#[ockam_macros::test] +async fn autorefresh(ctx: &mut Context) -> Result<()> { + let secure_channels = secure_channels(); + let identities = secure_channels.identities(); + let identities_creation = identities.identities_creation(); + let credentials = identities.credentials(); + + let authority = identities_creation + .create_identity() + .await? + .identifier() + .clone(); + let client1 = identities_creation + .create_identity() + .await? + .identifier() + .clone(); + let client2 = identities_creation + .create_identity() + .await? + .identifier() + .clone(); + + struct LocalCredentialsRetriever { + credentials: Arc, + authority: Identifier, + client: Identifier, + ttl: Duration, + call_counter: Arc, + } + + #[async_trait] + impl CredentialsRetriever for LocalCredentialsRetriever { + async fn retrieve( + &self, + _ctx: &Context, + _for_identity: &Identifier, + ) -> Result { + let attributes = AttributesBuilder::with_schema(CredentialSchemaIdentifier(1)) + .with_attribute(b"name".to_vec(), b"client1".to_vec()) + .build(); + let credential = self + .credentials + .credentials_creation() + .issue_credential(&self.authority, &self.client, attributes, self.ttl) + .await?; + self.call_counter.fetch_add(1, Ordering::Relaxed); + Ok(credential) + } + } + + let call_counter2 = Arc::new(AtomicU8::new(0)); + let retriever2 = LocalCredentialsRetriever { + credentials: credentials.clone(), + authority: authority.clone(), + client: client2.clone(), + ttl: Duration::from_secs(5), + call_counter: call_counter2.clone(), + }; + let authority_service2 = AuthorityService::new( + credentials.clone(), + authority.clone(), + Some(Arc::new(retriever2)), + ); + let trust_context2 = TrustContext::new( + "test_trust_context_id".to_string(), + Some(authority_service2), + ); + let _listener = secure_channels + .create_secure_channel_listener( + ctx, + &client2, + "listener", + SecureChannelListenerOptions::new() + .with_trust_context(trust_context2) + .with_refresh_credential_time_gap(Duration::from_secs(1)), + ) + .await?; + + let call_counter1 = Arc::new(AtomicU8::new(0)); + let retriever1 = LocalCredentialsRetriever { + credentials: credentials.clone(), + authority: authority.clone(), + client: client1.clone(), + ttl: Duration::from_secs(4), + call_counter: call_counter1.clone(), + }; + let authority_service1 = AuthorityService::new( + credentials.clone(), + authority.clone(), + Some(Arc::new(retriever1)), + ); + let trust_context1 = TrustContext::new( + "test_trust_context_id".to_string(), + Some(authority_service1), + ); + let _channel = secure_channels + .create_secure_channel( + ctx, + &client1, + route!["listener"], + SecureChannelOptions::new() + .with_trust_context(trust_context1) + .with_refresh_credential_time_gap(Duration::from_secs(1)), + ) + .await?; + + ctx.sleep(Duration::from_secs(10)).await; + + assert_eq!(call_counter1.load(Ordering::Relaxed), 4); + assert_eq!(call_counter2.load(Ordering::Relaxed), 3); + + ctx.stop().await +}