Skip to content

Commit

Permalink
feat(rust): take into account token expiration
Browse files Browse the repository at this point in the history
improve token refresh
  • Loading branch information
polvorin committed Aug 30, 2024
1 parent 75607ce commit 22d5988
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 103 deletions.
38 changes: 25 additions & 13 deletions implementations/rust/ockam/ockam_api/src/http_auth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use std::io::Write;
use std::sync::RwLock;

use httparse::{Header, Status};
use ockam_core::async_trait;
use ockam_node::Context;
use ockam_transport_tcp::{Direction, PortalInterceptor, PortalInterceptorFactory};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::sync::Mutex;

use ockam::errcode::{Kind, Origin};

use tracing::{debug, error};

use crate::TokenLeaseRefresher;

#[derive(Debug, Clone, PartialEq)]
enum RequestState {
ParsingHeader(Option<Vec<u8>>),
Expand All @@ -25,38 +27,41 @@ struct HttpAuthInterceptorState {

struct HttpAuthInterceptor {
state: Arc<Mutex<HttpAuthInterceptorState>>,
token: Arc<RwLock<String>>,
token_refresher: TokenLeaseRefresher,
upstream: String,
}

impl HttpAuthInterceptor {
fn new(token: Arc<RwLock<String>>, upstream: String) -> Self {
fn new(token_refresher: TokenLeaseRefresher, upstream: String) -> Self {
let state = HttpAuthInterceptorState {
state: RequestState::ParsingHeader(None),
};
Self {
state: Arc::new(Mutex::new(state)),
token,
token_refresher,
upstream,
}
}
}

pub struct HttpAuthInterceptorFactory {
token: Arc<RwLock<String>>,
token_refresher: TokenLeaseRefresher,
upstream: String,
}

impl HttpAuthInterceptorFactory {
pub fn new(token: Arc<RwLock<String>>, upstream: String) -> Self {
Self { token, upstream }
pub fn new(token_refresher: TokenLeaseRefresher, upstream: String) -> Self {
Self {
token_refresher,
upstream,
}
}
}

impl PortalInterceptorFactory for HttpAuthInterceptorFactory {
fn create(&self) -> Arc<dyn PortalInterceptor> {
Arc::new(HttpAuthInterceptor::new(
self.token.clone(),
self.token_refresher.clone(),
self.upstream.clone(),
))
}
Expand Down Expand Up @@ -247,10 +252,17 @@ impl PortalInterceptor for HttpAuthInterceptor {
Direction::FromOutletToInlet => ockam_core::Result::Ok(Some(buffer.to_vec())),

Direction::FromInletToOutlet => {
let mut guard = self.state.lock().unwrap();
let token = self.token.read().unwrap();
let (new_state, out) =
copy_body(guard.state.clone(), buffer, &token, &self.upstream)?;
let mut guard = self.state.lock().await;
let token = self.token_refresher.get_token().await;
if token.is_none() {
error!("No authorization token available");
}
let (new_state, out) = copy_body(
guard.state.clone(),
buffer,
&token.unwrap_or_default(),
&self.upstream,
)?;
guard.state = new_state;
Ok(Some(out))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#[allow(clippy::module_inception)]
mod influxdb_token_lease;
mod token_lease_refresher;

pub use influxdb_token_lease::*;
pub use token_lease_refresher::*;
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use chrono::DateTime;
use ockam::{compat::time::now, Address, Mailboxes};
use ockam_core::{api::Error, AllowAll, DenyAll};
use ockam_node::Context;
use std::{cmp::max, sync::Arc, time::Duration};
use tokio::sync::RwLock;

use crate::{
cloud::{CredentialsEnabled, ProjectNodeClient},
nodes::InMemoryNode,
InfluxDbTokenLease,
};

// The default timeouts are too high. Set shorter timeouts, so if
// the project is unresponsible or connection got lost for some reason
// (like in a project restart), it recover faster.
const SECURE_CHANNEL_TIMEOUT: Duration = Duration::from_secs(10);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);

#[derive(Clone)]
pub struct TokenLeaseRefresher {
token: Arc<RwLock<Option<String>>>,
}

impl TokenLeaseRefresher {
pub async fn new(
ctx: &Context,
node_manager: Arc<InMemoryNode>,
) -> Result<TokenLeaseRefresher, Error> {
let token = Arc::new(RwLock::new(None));
let project = node_manager
.cli_state
.projects()
.get_default_project()
.await
.map_err(|e| {
Error::new_without_path().with_message(format!("No default project {}", e))
})?;
let project_identifier = project
.project_identifier()
.ok_or(Error::new_without_path().with_message("Project not configured"))?;
let project_multiaddr = project.project_multiaddr().map_err(|e| {
Error::new_without_path()
.with_message(format!("Project multiaddr not configured {:}", e))
})?;
let identity_name = node_manager
.cli_state
.get_default_identity_name()
.await
.map_err(|e| {
Error::new_without_path().with_message(format!("Error retrieving identity {:}", e))
})?;
let caller_identifier = node_manager
.cli_state
.get_identifier_by_name(&identity_name)
.await
.map_err(|e| {
Error::new_without_path()
.with_message(format!("Error retrieving identifier {:}", e))
})?;

let project_client = node_manager
.make_project_node_client(
&project_identifier,
project_multiaddr,
&caller_identifier,
CredentialsEnabled::On,
)
.await
.map_err(|e| {
Error::new_without_path()
.with_message(format!("Error creating project client {:}", e))
})?
.with_request_timeout(&REQUEST_TIMEOUT)
.with_secure_channel_timeout(&SECURE_CHANNEL_TIMEOUT);
let mailboxes = Mailboxes::main(
Address::random_tagged("LeaseRetriever"),
Arc::new(DenyAll),
Arc::new(AllowAll),
);
let new_ctx = ctx.new_detached_with_mailboxes(mailboxes).await?;

let token_clone = token.clone();
ockam_node::spawn(async move {
refresh_loop(token_clone, new_ctx, project_client).await;
});
Ok(Self { token })
}

pub async fn get_token(&self) -> Option<String> {
self.token.read().await.clone()
}
}

async fn refresh_loop(
token: Arc<RwLock<Option<String>>>,
ctx: Context,
project_client: ProjectNodeClient,
) {
loop {
debug!("refreshing token");
let token_result = project_client.create_token(&ctx).await;
let now_t = now().unwrap();
let wait_secs = match token_result {
Ok(new_token) => {
let expires = DateTime::parse_from_rfc3339(&new_token.expires).unwrap();
let expires_unix = expires.timestamp() as u64;
let duration = expires_unix - now_t;

info!("Auth Token obtained expires at {}", new_token.expires);
let mut t = token.write().await;
*t = Some(new_token.token);
// We request a new token once reaching half its duration, with a minimum
// of 5 seconds.
max(duration / 2, 5)
}
Err(err) => {
warn!("Error retrieving token {:}", err);
15
}
};
debug!("waiting for {} seconds before refreshing token", wait_secs);
ctx.sleep_long_until(now_t + wait_secs).await;
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use std::sync::{Arc, RwLock};
use std::sync::Arc;

use ockam::compat::time::now;
use ockam::{route, Address, Mailboxes, Result};
use ockam::{route, Address, Result};
use ockam_core::api::{Error, Response};
use ockam_core::{AllowAll, DenyAll};
use ockam_core::AllowAll;
use ockam_node::Context;
use ockam_transport_core::HostnamePort;
use ockam_transport_tcp::PortalInletInterceptor;

use crate::cloud::CredentialsEnabled;
use crate::http_auth::HttpAuthInterceptorFactory;
use crate::nodes::models::portal::{CreateInlet, InletStatus};
use crate::nodes::NodeManagerWorker;
use crate::InfluxDbTokenLease;
use tracing::warn;
use crate::TokenLeaseRefresher;

impl NodeManagerWorker {
pub(crate) async fn get_inlets(&self) -> Result<Response<Vec<InletStatus>>, Response<Error>> {
Expand Down Expand Up @@ -86,90 +83,9 @@ impl NodeManagerWorker {
upstream: HostnamePort,
inlet_alias: &String,
) -> Result<Address, Error> {
let project = self
.node_manager
.cli_state
.projects()
.get_default_project()
.await
.unwrap();
let project_identifier = project
.project_identifier()
.ok_or(Error::new_without_path().with_message("Project not configured"))?;
let project_multiaddr = project.project_multiaddr().map_err(|e| {
Error::new_without_path()
.with_message(format!("Project multiaddr not configured {:}", e))
})?;
let identity_name = self
.node_manager
.cli_state
.get_default_identity_name()
.await
.map_err(|e| {
Error::new_without_path().with_message(format!("Error retrieving identity {:}", e))
})?;
let caller_identifier = self
.node_manager
.cli_state
.get_identifier_by_name(&identity_name)
.await
.map_err(|e| {
Error::new_without_path()
.with_message(format!("Error retrieving identifier {:}", e))
})?;

let project_client = self
.node_manager
.make_project_node_client(
&project_identifier,
project_multiaddr,
&caller_identifier,
CredentialsEnabled::On,
)
.await
.map_err(|e| {
Error::new_without_path()
.with_message(format!("Error creating project client {:}", e))
})?;

let mailboxes = Mailboxes::main(
Address::random_tagged("LeaseRetriever"),
Arc::new(DenyAll),
Arc::new(AllowAll),
);
let new_ctx = ctx.new_detached_with_mailboxes(mailboxes).await?;
let token = Arc::new(RwLock::new("".to_string()));
let tt = token.clone();

//TODO: * find a better way other than spawining this here.
// * it should take the token' lease expiration to figure out when to request a new one,
// instead of using a fixed time
// * there is code that perform pretty similar task (at its core at least) for retrieving
// credentials (remote_retriever.rs). Maybe we can think on how to generalize the common pieces
// * I'm not sure if the project client survive disconnections/projects restarts. Probably
// better to just create the client on every iteration, as it isn't that frequently (tokens should be
// refreshed each couple minutes I guess). This assuming the client doesn't leak any resources (like addresses being created or something)
// * ideally also should be stopped if the http interceptor factory is stopped
// (is there a way to do that? do removing the inlet remove the interceptor?) *and*
// there are no more live inlets using it neither.
ockam_node::spawn(async move {
loop {
match project_client.create_token(&new_ctx).await {
Ok(new_token) => {
debug!("Auth Token obtained");
let mut t = token.write().unwrap();
*t = new_token.token
}
Err(err) => {
warn!("Error retrieving token {:}", err);
}
}
let t = now().unwrap();
new_ctx.sleep_long_until(t + 60).await;
}
});
let token_refresher = TokenLeaseRefresher::new(ctx, self.node_manager.clone()).await?;
let http_interceptor_factory = Arc::new(HttpAuthInterceptorFactory::new(
tt,
token_refresher,
format!("{}:{}", upstream.hostname(), upstream.port()).to_string(),
));

Expand Down

0 comments on commit 22d5988

Please sign in to comment.