Skip to content

Commit

Permalink
feat(rust): wip add lessor processor to automatically revoke expired …
Browse files Browse the repository at this point in the history
…tokens
  • Loading branch information
adrianbenavides committed Sep 12, 2024
1 parent 8b5013f commit bb8e191
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 176 deletions.
121 changes: 121 additions & 0 deletions implementations/rust/ockam/ockam_api/src/influxdb/influxdb_models.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use crate::influxdb::lease_token::{LeaseToken, TokenStatus};
use crate::ApiError;
use ockam::identity::Identifier;
use std::str::FromStr;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;

/// Token returned by InfluxDB API
#[derive(serde::Deserialize, Debug, PartialEq, Eq)]
pub struct InfluxDBTokenResponse {
pub id: String,
pub description: String,
pub token: String,
pub status: String,
#[serde(rename = "createdAt")]
pub created_at: String,
}

/// Return a `LeaseToken` if it's an Ockam token (i.e., if the `description` contains a valid Ockam metadata).
/// If the metadata is not found, the token will be ignored.
impl TryFrom<InfluxDBTokenResponse> for Option<LeaseToken> {
type Error = ockam_core::Error;

fn try_from(token: InfluxDBTokenResponse) -> Result<Self, Self::Error> {
match token.unpack_metadata()? {
Some((issued_for, expires)) => Ok(Some(LeaseToken {
id: token.id,
issued_for,
created_at: OffsetDateTime::parse(&token.created_at, &Rfc3339)
.map_err(|_| {
ApiError::core(format!(
"Expected Rfc3339 format for 'created_at' with value {}",
token.created_at
))
})?
.unix_timestamp(),
expires_at: expires.unix_timestamp(),
status: TokenStatus::from_str(&token.status)?,
token: token.token,
})),
None => Ok(None),
}
}
}

impl InfluxDBTokenResponse {
/// The InfluxDB tokens only have a description field that can be used to store metadata.
/// The Ockam `LeaseToken` will pack in the description field the identifier that created the token,
/// and its expiration time.
pub fn pack_metadata(requester: &Identifier, expires: OffsetDateTime) -> String {
format!("OCKAM:{}:{}", requester, expires.unix_timestamp()).to_string()
}

/// Unpack the metadata from the description field.
pub fn unpack_metadata(&self) -> ockam_core::Result<Option<(Identifier, OffsetDateTime)>> {
let segments = self.description.split(':').collect::<Vec<_>>();
match segments[..] {
["OCKAM", identifier, expires] => {
let identifier = Identifier::try_from(identifier)?;
let expires_timestamp: i64 = expires
.parse()
.map_err(|_| ApiError::core("Invalid 'expires' timestamp"))?;
let expires = OffsetDateTime::from_unix_timestamp(expires_timestamp)
.map_err(|_| ApiError::core("Invalid 'expires' timestamp"))?;
Ok(Some((identifier, expires)))
}
_ => Ok(None),
}
}
}

#[derive(serde::Deserialize, Debug, PartialEq, Eq)]
pub struct InfluxDBListTokensResponse {
#[serde(rename = "authorizations")]
pub tokens: Vec<InfluxDBTokenResponse>,
#[serde(rename = "links")]
pub pagination: InfluxDBPagination,
}

#[derive(serde::Deserialize, Debug, PartialEq, Eq)]
pub struct InfluxDBPagination {
pub next: Option<String>,
pub prev: Option<String>,
}

#[cfg(test)]
mod tests {
use super::*;
use crate::influxdb::lease_token::{LeaseToken, TokenStatus};
use std::str::FromStr;
use time::OffsetDateTime;

#[test]
fn lease_token_from_influxdb_token() {
let identifier = "I0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
let expires_at = OffsetDateTime::now_utc() + core::time::Duration::from_secs(60);
let expires_at_timestamp = expires_at.unix_timestamp();
let created_at = "2024-09-12T16:23:54Z";
let created_at_timestamp = 1726158234;
let token = InfluxDBTokenResponse {
id: "token_id".to_string(),
description: format!("OCKAM:{identifier}:{expires_at_timestamp}"),
token: "token".to_string(),
status: "active".to_string(),
created_at: created_at.to_string(),
};
let expected = LeaseToken {
id: "token_id".to_string(),
issued_for: Identifier::from_str(identifier).unwrap(),
created_at: created_at_timestamp,
expires_at: expires_at_timestamp,
token: "token".to_string(),
status: TokenStatus::Active,
};
let got = {
let got: Option<LeaseToken> = token.try_into().unwrap();
got.unwrap()
};
assert_eq!(got, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::fmt::{Display, Formatter};
use strum::{Display, EnumString};
use time::OffsetDateTime;

#[derive(Encode, Decode, CborLen, Serialize, Deserialize, Debug, PartialEq)]
#[derive(Encode, Decode, CborLen, Serialize, Deserialize, Debug, Clone, PartialEq)]
#[cbor(map)]
pub struct LeaseToken {
#[cbor(n(1))]
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Output for LeaseToken {
}

#[derive(
Encode, Decode, CborLen, Serialize, Deserialize, PartialEq, Debug, EnumString, Display,
Encode, Decode, CborLen, Serialize, Deserialize, PartialEq, Debug, Clone, EnumString, Display,
)]
pub enum TokenStatus {
#[n(0)]
Expand Down
2 changes: 2 additions & 0 deletions implementations/rust/ockam/ockam_api/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod influxdb_models;
mod lease_token;
pub mod token_lessor_node_service;
mod token_lessor_processor;
mod token_lessor_worker;

pub use token_lessor_node_service::StartInfluxDBLeaseManagerRequest;
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::influxdb::lease_token::LeaseToken;
use crate::influxdb::token_lessor_processor::InfluxDBTokenLessorProcessor;
use crate::influxdb::token_lessor_worker::InfluxDBTokenLessorWorker;
use crate::nodes::models::services::{DeleteServiceRequest, StartServiceRequest};
use crate::nodes::service::messages::Messages;
Expand All @@ -9,20 +11,19 @@ use ockam_abac::{Action, PolicyExpression, Resource, ResourceType};
use ockam_core::api::{Error, Request, Response};
use ockam_core::{async_trait, Address};
use ockam_multiaddr::MultiAddr;
use ockam_node::{Context, WorkerBuilder};
use ockam_node::{Context, ProcessorBuilder, WorkerBuilder};
use serde::{Deserialize, Serialize};
use crate::influxdb::lease_token::LeaseToken;

impl NodeManagerWorker {
pub(crate) async fn start_influxdb_token_lease_manager_service(
pub(crate) async fn start_influxdb_token_lessor_service(
&self,
context: &Context,
body: StartServiceRequest<StartInfluxDBLeaseManagerRequest>,
) -> Result<Response, Response<Error>> {
let request = body.request().clone();
match self
.node_manager
.start_influxdb_token_lease_manager_service(
.start_influxdb_token_lessor_service(
context,
Address::from_string(body.address()),
request,
Expand All @@ -34,15 +35,15 @@ impl NodeManagerWorker {
}
}

pub(crate) async fn delete_influxdb_token_lease_manager_service(
pub(crate) async fn delete_influxdb_token_lessor_service(
&self,
context: &Context,
req: DeleteServiceRequest,
) -> Result<Response, Response<Error>> {
let address = req.address();
match self
.node_manager
.delete_influxdb_token_lease_manager_service(context, address.clone())
.delete_influxdb_token_lessor_service(context, address.clone())
.await
{
Ok(Some(_)) => Ok(Response::ok()),
Expand All @@ -55,7 +56,7 @@ impl NodeManagerWorker {
}

impl InMemoryNode {
async fn start_influxdb_token_lease_manager_service(
async fn start_influxdb_token_lessor_service(
&self,
context: &Context,
address: Address,
Expand Down Expand Up @@ -84,32 +85,37 @@ impl InMemoryNode {
)
.await?;

WorkerBuilder::new(
InfluxDBTokenLessorWorker::new(
address.clone(),
req.influxdb_address,
req.influxdb_org_id,
req.influxdb_token,
req.token_permissions,
req.token_ttl,
)
.await?,
let worker = InfluxDBTokenLessorWorker::new(
address.clone(),
req.influxdb_address,
req.influxdb_org_id,
req.influxdb_token,
req.token_permissions,
req.token_ttl,
)
.with_address(address.clone())
.with_incoming_access_control_arc(incoming_ac)
.with_outgoing_access_control_arc(outgoing_ac)
.start(context)
.await?;
let processor = InfluxDBTokenLessorProcessor::new(worker.state.clone());

WorkerBuilder::new(worker)
.with_address(address.clone())
.with_incoming_access_control_arc(incoming_ac)
.with_outgoing_access_control_arc(outgoing_ac)
.start(context)
.await?;
self.registry
.influxdb_services
.insert(address.clone(), ())
.await;

ProcessorBuilder::new(processor)
.with_address(format!("{address}-processor"))
.start(context)
.await?;

Ok(())
}

async fn delete_influxdb_token_lease_manager_service(
async fn delete_influxdb_token_lessor_service(
&self,
context: &Context,
address: Address,
Expand All @@ -119,6 +125,9 @@ impl InMemoryNode {
None => Ok(None),
Some(_) => {
context.stop_worker(address.clone()).await?;
context
.stop_processor(format!("{address}-processor"))
.await?;
self.registry.influxdb_services.remove(&address).await;
Ok(Some(()))
}
Expand Down
Loading

0 comments on commit bb8e191

Please sign in to comment.