Skip to content

Commit

Permalink
feat(rust): handle close and refresh credentials secure channel messages
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Nov 7, 2023
1 parent c03d33a commit 486de90
Show file tree
Hide file tree
Showing 16 changed files with 618 additions and 153 deletions.
2 changes: 2 additions & 0 deletions implementations/rust/ockam/ockam_identity/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub enum IdentityError {
UnknownTimestamp,
/// Unknown Authority
UnknownAuthority,
/// No CredentialsRetriever
NoCredentialsRetriever,
/// Unknown version of the Credential
UnknownCredentialVersion,
/// Unknown version of the Identity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub(crate) struct Addresses {
pub(crate) encryptor: Address,
// Used to decrypt messages that were received though some channel other than Ockam Routing from the other end of the channel
pub(crate) encryptor_api: Address,
// Used by the encryptor itself for timer notifications (to force credentials refresh)
pub(crate) encryptor_internal: Address,
}

impl Addresses {
Expand All @@ -35,13 +37,16 @@ impl Addresses {
let encryptor = Address::random_tagged(&format!("SecureChannel.{}.encryptor", role_str));
let encryptor_api =
Address::random_tagged(&format!("SecureChannel.{}.encryptor.api", role_str));
let encryptor_internal =
Address::random_tagged(&format!("SecureChannel.{}.encryptor.internal", role_str));

Self {
decryptor_internal,
decryptor_remote,
decryptor_api,
encryptor,
encryptor_api,
encryptor_internal,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,34 @@ use ockam_node::Context;

use crate::models::Identifier;
use crate::secure_channel::encryptor::{Encryptor, KEY_RENEWAL_INTERVAL};
use crate::secure_channel::handshake::handshake_state_machine::CommonStateMachine;
use crate::secure_channel::key_tracker::KeyTracker;
use crate::secure_channel::nonce_tracker::NonceTracker;
use crate::secure_channel::Addresses;
use crate::{
DecryptionRequest, DecryptionResponse, IdentityError, IdentitySecureChannelLocalInfo,
SecureChannelMessage,
DecryptionRequest, DecryptionResponse, Identities, IdentityError,
IdentitySecureChannelLocalInfo, PlaintextPayloadMessage, RefreshCredentialsMessage,
SecureChannelMessage, TrustContext,
};

use ockam_vault::{AeadSecretKeyHandle, VaultForSecureChannels};
use tracing::{debug, warn};
use tracing::{debug, info, warn};

pub(crate) struct DecryptorHandler {
//for debug purposes only
pub(crate) role: &'static str,
pub(crate) addresses: Addresses,
pub(crate) their_identity_id: Identifier,
pub(crate) decryptor: Decryptor,

identities: Arc<Identities>,
trust_context: Option<TrustContext>,
}

impl DecryptorHandler {
pub fn new(
identities: Arc<Identities>,
trust_context: Option<TrustContext>,
role: &'static str,
addresses: Addresses,
key: AeadSecretKeyHandle,
Expand All @@ -38,6 +45,8 @@ impl DecryptorHandler {
addresses,
their_identity_id,
decryptor: Decryptor::new(key, vault),
identities,
trust_context,
}
}

Expand Down Expand Up @@ -71,32 +80,11 @@ impl DecryptorHandler {
Ok(())
}

pub(crate) async fn handle_decrypt(
async fn handle_payload(
&mut self,
ctx: &mut Context,
msg: Routed<Any>,
mut msg: PlaintextPayloadMessage,
) -> Result<()> {
debug!(
"SecureChannel {} received Decrypt {}",
self.role, &self.addresses.decryptor_remote
);

// Decode raw payload binary
let payload = msg.into_transport_message().payload;
let payload = Vec::<u8>::decode(&payload)?;

// Decrypt the binary
let decrypted_payload = self.decryptor.decrypt(&payload).await?;

let msg: SecureChannelMessage = minicbor::decode(&decrypted_payload)?;

let mut msg = match msg {
SecureChannelMessage::Payload(msg) => msg,
SecureChannelMessage::Close => {
todo!()
}
};

// Add encryptor hop in the return_route (instead of our address)
msg.return_route
.modify()
Expand Down Expand Up @@ -127,6 +115,71 @@ impl DecryptorHandler {
}
}

async fn handle_close(&mut self, ctx: &mut Context) -> Result<()> {
// Should be enough to stop the encryptor, since it will stop the decryptor
ctx.stop_worker(self.addresses.encryptor.clone()).await
}

async fn handle_refresh_credentials(
&mut self,
_ctx: &mut Context,
msg: RefreshCredentialsMessage,
) -> Result<()> {
debug!(
"Handling credentials refresh request for {}",
self.addresses.decryptor_remote
);

CommonStateMachine::process_identity_payload_static(
self.identities.clone(),
None,
self.trust_context.clone(),
Some(self.their_identity_id.clone()),
msg.change_history,
None,
msg.credentials,
None,
)
.await?;

info!(
"Successfully handled credentials refresh request for {}",
self.addresses.decryptor_remote
);

Ok(())
}

pub(crate) async fn handle_decrypt(
&mut self,
ctx: &mut Context,
msg: Routed<Any>,
) -> Result<()> {
debug!(
"SecureChannel {} received Decrypt {}",
self.role, &self.addresses.decryptor_remote
);

// Decode raw payload binary
let payload = msg.into_transport_message().payload;
let payload = Vec::<u8>::decode(&payload)?;

// Decrypt the binary
let decrypted_payload = self.decryptor.decrypt(&payload).await?;

let msg: SecureChannelMessage = minicbor::decode(&decrypted_payload)?;

match msg {
SecureChannelMessage::Payload(msg) => self.handle_payload(ctx, msg).await?,
SecureChannelMessage::RefreshCredentials(msg) => {
self.handle_refresh_credentials(ctx, msg).await?
}
SecureChannelMessage::Close => self.handle_close(ctx).await?,
};

Ok(())
}

/// Remove the channel keys on shutdown
pub(crate) async fn shutdown(&self) -> Result<()> {
self.decryptor.shutdown().await
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,62 @@
use ockam_core::compat::boxed::Box;
use ockam_core::compat::sync::Arc;
use ockam_core::{async_trait, Decodable, Route};
use ockam_core::{Any, Result, Routed, Worker};
use ockam_node::Context;
use tracing::{debug, error};
use ockam_node::{Context, DelayedEvent};
use std::time::Duration;
use tracing::{debug, error, info};

use crate::models::CredentialData;
use crate::secure_channel::addresses::Addresses;
use crate::secure_channel::api::{EncryptionRequest, EncryptionResponse};
use crate::secure_channel::encryptor::Encryptor;
use crate::{IdentityError, PlaintextPayloadMessage, SecureChannelMessage};
use crate::utils::now;
use crate::{
AuthorityService, Identifier, IdentitiesReader, IdentityError, PlaintextPayloadMessage,
RefreshCredentialsMessage, SecureChannelMessage, TimestampInSeconds,
};

pub const DEFAULT_REFRESH_CREDENTIAL_TIME_GAP: Duration = Duration::from_secs(60);

pub(crate) struct EncryptorWorker {
//for debug purposes only
role: &'static str,
addresses: Addresses,
remote_route: Route,
encryptor: Encryptor,
my_identifier: Identifier,
identities_reader: Arc<dyn IdentitiesReader>,
min_credential_expiration: Option<TimestampInSeconds>,
refresh_credential_time_gap: Duration,
credential_refresh_event: Option<DelayedEvent<()>>,
// TODO: Should be CredentialsRetriever
credentials_retriever: Option<AuthorityService>,
}

impl EncryptorWorker {
#[allow(clippy::too_many_arguments)]
pub fn new(
role: &'static str,
addresses: Addresses,
remote_route: Route,
encryptor: Encryptor,
my_identifier: Identifier,
identities_reader: Arc<dyn IdentitiesReader>,
min_credential_expiration: Option<TimestampInSeconds>,
refresh_credential_time_gap: Duration,
credentials_retriever: Option<AuthorityService>,
) -> Self {
Self {
role,
addresses,
remote_route,
encryptor,
my_identifier,
identities_reader,
min_credential_expiration,
refresh_credential_time_gap,
credential_refresh_event: None,
credentials_retriever,
}
}

Expand Down Expand Up @@ -121,13 +149,123 @@ impl EncryptorWorker {

Ok(())
}

async fn handle_refresh_credentials(&mut self, ctx: &<Self as Worker>::Context) -> Result<()> {
debug!(
"Started credentials refresh for {}",
self.addresses.encryptor
);

let change_history = self
.identities_reader
.get_identity(&self.my_identifier)
.await?;

let credential = if let Some(credentials_retriever) = &self.credentials_retriever {
credentials_retriever
.credential(ctx, &self.my_identifier)
.await?
} else {
return Err(IdentityError::NoCredentialsRetriever.into());
};

let versioned_data = credential.credential.get_versioned_data()?;
let data = CredentialData::get_data(&versioned_data)?;
self.min_credential_expiration = Some(data.expires_at);

let msg = RefreshCredentialsMessage {
change_history,
credentials: vec![credential],
};
let msg = SecureChannelMessage::RefreshCredentials(msg);

// Encrypt the message
let msg = match self.encryptor.encrypt(&minicbor::to_vec(&msg)?).await {
Ok(encrypted_payload) => encrypted_payload,
// If encryption failed, that means we have some internal error,
// and we may be in an invalid state, it's better to stop the Worker
Err(err) => {
let address = self.addresses.encryptor.clone();
error!("Error while encrypting: {err} at: {address}");
ctx.stop_worker(address).await?;
return Ok(());
}
};

info!(
"Sending credentials refresh for {}",
self.addresses.encryptor
);

// Send the message to the decryptor on the other side
ctx.send_from_address(
self.remote_route.clone(),
msg,
self.addresses.encryptor.clone(),
)
.await?;

self.schedule_credentials_refresh(ctx).await?;

Ok(())
}

async fn send_close_channel(&mut self, ctx: &Context) -> Result<()> {
let msg = SecureChannelMessage::Close;

// Encrypt the message
let msg = self.encryptor.encrypt(&minicbor::to_vec(&msg)?).await?;

// Send the message to the decryptor on the other side
ctx.send_from_address(
self.remote_route.clone(),
msg,
self.addresses.encryptor.clone(),
)
.await?;

Ok(())
}

async fn schedule_credentials_refresh(&mut self, ctx: &Context) -> Result<()> {
if let Some(min_credential_expiration) = self.min_credential_expiration {
self.credential_refresh_event = None;

let mut credential_refresh_event =
DelayedEvent::create(ctx, self.addresses.encryptor_internal.clone(), ()).await?;

let now = now()?;

let duration = if min_credential_expiration
< now + self.refresh_credential_time_gap.as_secs().into()
{
0
} else {
*(min_credential_expiration
- self.refresh_credential_time_gap.as_secs().into()
- now)
};

debug!(duration = %duration, "Scheduling credentials refresh for {}", self.addresses.encryptor);
let duration = Duration::from_secs(duration);
credential_refresh_event.schedule(duration).await?;

self.credential_refresh_event = Some(credential_refresh_event);
}

Ok(())
}
}

#[async_trait]
impl Worker for EncryptorWorker {
type Message = Any;
type Context = Context;

async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> {
self.schedule_credentials_refresh(ctx).await
}

async fn handle_message(
&mut self,
ctx: &mut Self::Context,
Expand All @@ -139,6 +277,8 @@ impl Worker for EncryptorWorker {
self.handle_encrypt(ctx, msg).await?;
} else if msg_addr == self.addresses.encryptor_api {
self.handle_encrypt_api(ctx, msg).await?;
} else if msg_addr == self.addresses.encryptor_internal {
self.handle_refresh_credentials(ctx).await?;
} else {
return Err(IdentityError::UnknownChannelMsgDestination.into());
}
Expand All @@ -150,6 +290,7 @@ impl Worker for EncryptorWorker {
let _ = context
.stop_worker(self.addresses.decryptor_internal.clone())
.await;
let _ = self.send_close_channel(context).await;
self.encryptor.shutdown().await
}
}
Loading

0 comments on commit 486de90

Please sign in to comment.