From 9211db034f31417a6c20f151c384d57e0795e2e4 Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Tue, 10 Sep 2024 18:43:00 +0200 Subject: [PATCH] feat(rust): implement influxdb token lessor service --- Cargo.lock | 6 +- .../rust/ockam/ockam_abac/Cargo.toml | 1 + .../rust/ockam/ockam_abac/src/policy_expr.rs | 65 +++- .../rust/ockam/ockam_abac/src/resource.rs | 3 + .../rust/ockam/ockam_api/Cargo.toml | 1 + .../ockam_api/src/cloud/lease_manager/mod.rs | 1 - .../cloud/lease_manager/models/influxdb.rs | 64 ---- .../src/cloud/lease_manager/models/mod.rs | 1 - .../rust/ockam/ockam_api/src/cloud/mod.rs | 1 - .../src/influxdb/influxdb_api_client.rs | 257 ++++++++++++++ .../ockam_api/src/influxdb/lease_token.rs | 170 ++++++++++ .../rust/ockam/ockam_api/src/influxdb/mod.rs | 7 + .../src/influxdb/token_lessor_node_service.rs | 293 ++++++++++++++++ .../src/influxdb/token_lessor_processor.rs | 106 ++++++ .../src/influxdb/token_lessor_worker.rs | 316 ++++++++++++++++++ .../influxdb_token_lease.rs | 64 ---- .../ockam_api/src/influxdb_token_lease/mod.rs | 4 - .../rust/ockam/ockam_api/src/lib.rs | 11 +- .../ockam/ockam_api/src/nodes/registry.rs | 1 + .../src/nodes/service/default_address.rs | 34 +- .../ockam_api/src/nodes/service/manager.rs | 4 +- .../ockam_api/src/nodes/service/worker.rs | 12 + .../ockam/ockam_command/src/lease/create.rs | 121 +++---- .../ockam/ockam_command/src/lease/list.rs | 95 +++--- .../rust/ockam/ockam_command/src/lease/mod.rs | 81 ++--- .../ockam/ockam_command/src/lease/revoke.rs | 93 ++++-- .../ockam/ockam_command/src/lease/show.rs | 82 +++-- .../ockam_command/src/node/create/config.rs | 7 + .../src/node/create/foreground.rs | 49 ++- .../src/run/parser/resource/node.rs | 5 + .../ockam/ockam_command/src/service/config.rs | 45 +-- .../ockam/ockam_command/src/subcommand.rs | 2 - .../rust/ockam/ockam_core/src/api.rs | 5 + 33 files changed, 1561 insertions(+), 446 deletions(-) delete mode 100644 implementations/rust/ockam/ockam_api/src/cloud/lease_manager/mod.rs delete mode 100644 implementations/rust/ockam/ockam_api/src/cloud/lease_manager/models/influxdb.rs delete mode 100644 implementations/rust/ockam/ockam_api/src/cloud/lease_manager/models/mod.rs create mode 100644 implementations/rust/ockam/ockam_api/src/influxdb/influxdb_api_client.rs create mode 100644 implementations/rust/ockam/ockam_api/src/influxdb/lease_token.rs create mode 100644 implementations/rust/ockam/ockam_api/src/influxdb/mod.rs create mode 100644 implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_node_service.rs create mode 100644 implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_processor.rs create mode 100644 implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_worker.rs delete mode 100644 implementations/rust/ockam/ockam_api/src/influxdb_token_lease/influxdb_token_lease.rs delete mode 100644 implementations/rust/ockam/ockam_api/src/influxdb_token_lease/mod.rs diff --git a/Cargo.lock b/Cargo.lock index bf530a90216..0181835a1d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4657,6 +4657,7 @@ dependencies = [ "rustyline", "rustyline-derive", "serde", + "serde_json", "sqlx-build-trust", "str-buf", "strum 0.26.3", @@ -4737,6 +4738,7 @@ dependencies = [ "sha2", "sqlx-build-trust", "strip-ansi-escapes", + "strum 0.26.3", "sysinfo", "tempfile", "thiserror", @@ -6698,9 +6700,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.127" +version = "1.0.128" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad" +checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" dependencies = [ "itoa", "memchr", diff --git a/implementations/rust/ockam/ockam_abac/Cargo.toml b/implementations/rust/ockam/ockam_abac/Cargo.toml index 9d15a6829aa..db3618cb1d5 100644 --- a/implementations/rust/ockam/ockam_abac/Cargo.toml +++ b/implementations/rust/ockam/ockam_abac/Cargo.toml @@ -61,6 +61,7 @@ winnow = { version = "0.6.18", default-features = false, optional = true, featur ockam_vault = { path = "../ockam_vault", default-features = false, features = ["rust-crypto"] } quickcheck = "1.0.3" rand = "0.8.5" +serde_json = "1.0.128" tempfile = "3.10.1" [[bin]] diff --git a/implementations/rust/ockam/ockam_abac/src/policy_expr.rs b/implementations/rust/ockam/ockam_abac/src/policy_expr.rs index b30d0cb252b..99e80efbd95 100644 --- a/implementations/rust/ockam/ockam_abac/src/policy_expr.rs +++ b/implementations/rust/ockam/ockam_abac/src/policy_expr.rs @@ -1,12 +1,15 @@ +cfg_if::cfg_if! { + if #[cfg(feature = "std")] { + use core::str::FromStr; + use ockam_core::compat::fmt::{Display, Formatter}; + use ockam_core::Result; + use serde::{Deserialize, Serialize}; + } +} + use crate::policy_expr::PolicyExpression::{BooleanExpression, FullExpression}; use crate::{BooleanExpr, Expr}; -#[cfg(feature = "std")] -use core::str::FromStr; use minicbor::{CborLen, Decode, Encode}; -#[cfg(feature = "std")] -use ockam_core::compat::fmt::{Display, Formatter}; -#[cfg(feature = "std")] -use ockam_core::Result; /// A Policy expression can either be represented with /// - A full expression with string valued attributes, contain operator, etc... @@ -93,6 +96,27 @@ impl TryFrom for PolicyExpression { } } +#[cfg(feature = "std")] +impl Serialize for PolicyExpression { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.to_string().serialize(serializer) + } +} + +#[cfg(feature = "std")] +impl<'d> Deserialize<'d> for PolicyExpression { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'d>, + { + let s = String::deserialize(deserializer)?; + PolicyExpression::from_str(s.as_str()).map_err(serde::de::Error::custom) + } +} + #[cfg(test)] mod tests { use crate::PolicyExpression::{BooleanExpression, FullExpression}; @@ -113,4 +137,33 @@ mod tests { BooleanExpression(BooleanExpr::from_str(boolean_expression).unwrap()) ); } + + #[test] + fn serde_json() { + let full_expression = "(= subject.test = \"true\")"; + let full_expression = FullExpression(Expr::from_str(full_expression).unwrap()); + let full_expression_str = serde_json::to_string(&full_expression).unwrap(); + assert_eq!(full_expression_str, "\"(= subject.test = \\\"true\\\")\""); + let full_expression: PolicyExpression = serde_json::from_str(&full_expression_str).unwrap(); + assert_eq!( + full_expression, + FullExpression(Expr::from_str("(= subject.test = \"true\")").unwrap()) + ); + + let boolean_expression = "test"; + let boolean_expression = + BooleanExpression(BooleanExpr::from_str(boolean_expression).unwrap()); + let boolean_expression_str = serde_json::to_string(&boolean_expression).unwrap(); + assert_eq!(boolean_expression_str, "\"test\""); + let boolean_expression: PolicyExpression = + serde_json::from_str(&boolean_expression_str).unwrap(); + assert_eq!( + boolean_expression, + BooleanExpression(BooleanExpr::from_str("test").unwrap()) + ); + + let invalid_expression = "\"( test = \"true\")\""; + let result: Result = serde_json::from_str(invalid_expression); + assert!(result.is_err()); + } } diff --git a/implementations/rust/ockam/ockam_abac/src/resource.rs b/implementations/rust/ockam/ockam_abac/src/resource.rs index fbc71784320..193ae03a535 100644 --- a/implementations/rust/ockam/ockam_abac/src/resource.rs +++ b/implementations/rust/ockam/ockam_abac/src/resource.rs @@ -56,6 +56,9 @@ pub enum ResourceType { #[n(6)] #[strum(serialize = "relay")] Relay, + #[n(7)] + #[strum(serialize = "lessor")] + InfluxDBLessor, } impl ResourceType { diff --git a/implementations/rust/ockam/ockam_api/Cargo.toml b/implementations/rust/ockam/ockam_api/Cargo.toml index 5159d958192..dbc34df41e2 100644 --- a/implementations/rust/ockam/ockam_api/Cargo.toml +++ b/implementations/rust/ockam/ockam_api/Cargo.toml @@ -90,6 +90,7 @@ serde_json = "1.0.118" sha2 = "0.10.8" sqlx-build-trust = { version = "0.7.8" } strip-ansi-escapes = "0.2" +strum = { version = "0.26.3", default-features = false, features = ["derive"] } sysinfo = "0.30" thiserror = "1.0" time = { version = "0.3.36", default-features = false, features = ["std", "formatting", "local-offset", "macros"] } diff --git a/implementations/rust/ockam/ockam_api/src/cloud/lease_manager/mod.rs b/implementations/rust/ockam/ockam_api/src/cloud/lease_manager/mod.rs deleted file mode 100644 index c446ac88338..00000000000 --- a/implementations/rust/ockam/ockam_api/src/cloud/lease_manager/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod models; diff --git a/implementations/rust/ockam/ockam_api/src/cloud/lease_manager/models/influxdb.rs b/implementations/rust/ockam/ockam_api/src/cloud/lease_manager/models/influxdb.rs deleted file mode 100644 index d509ecede79..00000000000 --- a/implementations/rust/ockam/ockam_api/src/cloud/lease_manager/models/influxdb.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::colors::OckamColor; -use crate::error::ParseError; -use crate::output::Output; -use crate::Result; -use colorful::Colorful; -use minicbor::{CborLen, Decode, Encode}; -use serde::{Deserialize, Serialize}; -use std::fmt::Write; -use time::format_description::well_known::Iso8601; -use time::PrimitiveDateTime; - -#[derive(Encode, Decode, CborLen, Serialize, Deserialize, Debug)] -#[cbor(map)] -pub struct Token { - #[cbor(n(1))] - pub id: String, - - #[cbor(n(2))] - pub issued_for: String, - - #[cbor(n(3))] - pub created_at: String, - - #[cbor(n(4))] - pub expires: String, - - #[cbor(n(5))] - pub token: String, - - #[cbor(n(6))] - pub status: String, -} - -impl Output for Token { - fn item(&self) -> Result { - let mut output = String::new(); - let status = match self.status.as_str() { - "active" => self - .status - .to_uppercase() - .color(OckamColor::Success.color()), - _ => self - .status - .to_uppercase() - .color(OckamColor::Failure.color()), - }; - let expires_at = { - PrimitiveDateTime::parse(&self.expires, &Iso8601::DEFAULT) - .map_err(ParseError::Time)? - .to_string() - .color(OckamColor::PrimaryResource.color()) - }; - let id = self - .id - .to_string() - .color(OckamColor::PrimaryResource.color()); - - writeln!(output, "Token {id}")?; - writeln!(output, "Expires {expires_at} {status}")?; - write!(output, "{}", self.token)?; - - Ok(output) - } -} diff --git a/implementations/rust/ockam/ockam_api/src/cloud/lease_manager/models/mod.rs b/implementations/rust/ockam/ockam_api/src/cloud/lease_manager/models/mod.rs deleted file mode 100644 index ae50cade700..00000000000 --- a/implementations/rust/ockam/ockam_api/src/cloud/lease_manager/models/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod influxdb; diff --git a/implementations/rust/ockam/ockam_api/src/cloud/mod.rs b/implementations/rust/ockam/ockam_api/src/cloud/mod.rs index c7c523b82d4..365641cc0d8 100644 --- a/implementations/rust/ockam/ockam_api/src/cloud/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/cloud/mod.rs @@ -3,7 +3,6 @@ pub use secure_clients::*; pub mod addon; pub mod email_address; pub mod enroll; -pub mod lease_manager; pub mod operation; pub mod project; pub mod secure_clients; diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/influxdb_api_client.rs b/implementations/rust/ockam/ockam_api/src/influxdb/influxdb_api_client.rs new file mode 100644 index 00000000000..faf01bfde3a --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/influxdb/influxdb_api_client.rs @@ -0,0 +1,257 @@ +use crate::influxdb::lease_token::{LeaseToken, TokenStatus}; +use crate::ApiError; +use ockam::identity::Identifier; +use ockam_core::async_trait; +use reqwest::Client; +use std::str::FromStr; +use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; + +#[async_trait] +pub trait InfluxDBApi { + async fn create_token( + &self, + req: InfluxDBCreateTokenRequest, + ) -> ockam_core::Result; + + async fn get_token(&self, token_id: &str) -> ockam_core::Result; + + async fn revoke_token(&self, token_id: &str) -> ockam_core::Result<()>; + + async fn list_tokens(&self) -> ockam_core::Result>; +} + +#[derive(Debug, Clone)] +pub struct InfluxDBApiClient { + http_client: Client, + base_url: String, + token: String, +} + +impl InfluxDBApiClient { + pub fn new(base_url: impl Into, token: impl Into) -> ockam_core::Result { + let http_client = reqwest::ClientBuilder::new() + .build() + .map_err(|e| ApiError::core(format!("Failed to create http client: {e}")))?; + Ok(Self { + http_client, + base_url: base_url.into(), + token: token.into(), + }) + } +} + +#[async_trait] +impl InfluxDBApi for InfluxDBApiClient { + async fn create_token( + &self, + req: InfluxDBCreateTokenRequest, + ) -> ockam_core::Result { + let req = self + .http_client + .post(format!("{}/api/v2/authorizations", self.base_url)) + .header("Authorization", format!("Token {}", self.token)) + .header("Content-Type", "application/json") + .body(format!( + "{{\"description\": \"{}\", \"orgID\": \"{}\", \"permissions\":{}}}", + req.description, req.org_id, req.permissions + )); + match req.send().await { + Ok(res) => Ok(res.json::().await.map_err(|e| { + ApiError::core(format!("Failed to parse InfluxDB token from json: {e}")) + })?), + Err(e) => { + error!("Failed to create token: {e:?}"); + Err(ApiError::core(format!("Failed to create token: {e:?}"))) + } + } + } + + async fn get_token(&self, token_id: &str) -> ockam_core::Result { + let req = self + .http_client + .get(format!( + "{}/api/v2/authorizations/{}", + self.base_url, token_id + )) + .header("Authorization", format!("Token {}", self.token)) + .header("Content-Type", "application/json"); + match req.send().await { + Ok(res) => Ok(res.json::().await.map_err(|e| { + ApiError::core(format!("Failed to parse InfluxDB token from json: {e}")) + })?), + Err(e) => { + error!("Failed to create token: {e:?}"); + Err(ApiError::core(format!("Failed to create token: {e:?}"))) + } + } + } + + async fn revoke_token(&self, token_id: &str) -> ockam_core::Result<()> { + let req = self + .http_client + .delete(format!( + "{}/api/v2/authorizations/{}", + self.base_url, token_id + )) + .header("Authorization", format!("Token {}", self.token)) + .header("Content-Type", "application/json"); + match req.send().await { + Ok(res) => { + if res.status().is_success() { + info!(%token_id, "Revoked token"); + Ok(()) + } else { + error!(%token_id, "Failed to revoke token: {}", res.status()); + Err(ApiError::core(format!( + "Failed to revoke token: {}", + res.status() + ))) + } + } + Err(e) => { + error!("Failed to revoke token: {e:?}"); + Err(ApiError::core(format!("Failed to revoke token: {e:?}"))) + } + } + } + + async fn list_tokens(&self) -> ockam_core::Result> { + let req = self + .http_client + .get(format!("{}/api/v2/authorizations", self.base_url)) + .header("Authorization", format!("Token {}", self.token)) + .header("Content-Type", "application/json"); + match req.send().await { + Ok(res) => { + let influxdb_tokens = + res.json::() + .await + .map_err(|e| { + ApiError::core(format!( + "Failed to parse InfluxDB tokens from json: {e}" + )) + })?; + Ok(influxdb_tokens.tokens) + } + Err(e) => { + error!("Failed to list tokens: {e:?}"); + Err(ApiError::core(format!("Failed to list tokens: {e:?}"))) + } + } + } +} + +#[derive(Debug, PartialEq, Eq)] +pub struct InfluxDBCreateTokenRequest { + pub description: String, + pub org_id: String, + pub permissions: String, +} + +/// Token returned by InfluxDB API +#[derive(serde::Deserialize, Debug, PartialEq, Eq)] +pub struct InfluxDBTokenResponse { + pub id: String, + pub description: String, + pub token: String, + pub status: String, + #[serde(rename = "createdAt")] + pub created_at: String, +} + +/// Return a `LeaseToken` if it's an Ockam token (i.e., if the `description` contains a valid Ockam metadata). +/// If the metadata is not found, the token will be ignored. +impl TryFrom for Option { + type Error = ockam_core::Error; + + fn try_from(token: InfluxDBTokenResponse) -> Result { + match token.unpack_metadata()? { + Some((issued_for, expires)) => Ok(Some(LeaseToken { + id: token.id, + issued_for, + created_at: OffsetDateTime::parse(&token.created_at, &Rfc3339) + .map_err(|_| { + ApiError::core(format!( + "Expected Rfc3339 format for 'created_at' with value {}", + token.created_at + )) + })? + .unix_timestamp(), + expires_at: expires.unix_timestamp(), + status: TokenStatus::from_str(&token.status)?, + token: token.token, + })), + None => Ok(None), + } + } +} + +impl InfluxDBTokenResponse { + /// The InfluxDB tokens only have a description field that can be used to store metadata. + /// The Ockam `LeaseToken` will pack in the description field the identifier that created the token, + /// and its expiration time. + pub fn pack_metadata(requester: &Identifier, expires: OffsetDateTime) -> String { + format!("OCKAM:{}:{}", requester, expires.unix_timestamp()).to_string() + } + + /// Unpack the metadata from the description field. + pub fn unpack_metadata(&self) -> ockam_core::Result> { + let segments = self.description.split(':').collect::>(); + match segments[..] { + ["OCKAM", identifier, expires] => { + let identifier = Identifier::try_from(identifier)?; + let expires_timestamp: i64 = expires + .parse() + .map_err(|_| ApiError::core("Invalid 'expires' timestamp"))?; + let expires = OffsetDateTime::from_unix_timestamp(expires_timestamp) + .map_err(|_| ApiError::core("Invalid 'expires' timestamp"))?; + Ok(Some((identifier, expires))) + } + _ => Ok(None), + } + } +} + +#[derive(serde::Deserialize, Debug, PartialEq, Eq)] +pub struct InfluxDBListTokensResponse { + #[serde(rename = "authorizations")] + pub tokens: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::influxdb::lease_token::{LeaseToken, TokenStatus}; + use std::str::FromStr; + use time::OffsetDateTime; + + #[test] + fn lease_token_from_influxdb_token() { + let identifier = "I0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; + let expires_at = OffsetDateTime::now_utc() + core::time::Duration::from_secs(60); + let expires_at_timestamp = expires_at.unix_timestamp(); + let created_at = "2024-09-12T16:23:54Z"; + let created_at_timestamp = 1726158234; + let token = InfluxDBTokenResponse { + id: "token_id".to_string(), + description: format!("OCKAM:{identifier}:{expires_at_timestamp}"), + token: "token".to_string(), + status: "active".to_string(), + created_at: created_at.to_string(), + }; + let expected = LeaseToken { + id: "token_id".to_string(), + issued_for: Identifier::from_str(identifier).unwrap(), + created_at: created_at_timestamp, + expires_at: expires_at_timestamp, + token: "token".to_string(), + status: TokenStatus::Active, + }; + let got = { + let got: Option = token.try_into().unwrap(); + got.unwrap() + }; + assert_eq!(got, expected); + } +} diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/lease_token.rs b/implementations/rust/ockam/ockam_api/src/influxdb/lease_token.rs new file mode 100644 index 00000000000..3b08ed8454b --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/influxdb/lease_token.rs @@ -0,0 +1,170 @@ +use crate::colors::{color_error, color_primary}; +use crate::output::Output; +use crate::terminal::fmt; +use crate::ApiError; +use minicbor::{CborLen, Decode, Encode}; +use ockam::identity::Identifier; +use ockam_core::compat::fmt::Error as FmtError; +use serde::{Deserialize, Serialize}; +use std::cmp::PartialEq; +use std::fmt::{Display, Formatter}; +use strum::{Display, EnumString}; +use time::OffsetDateTime; + +#[derive(Encode, Decode, CborLen, Serialize, Deserialize, Debug, Clone)] +#[cbor(map)] +pub struct LeaseToken { + #[cbor(n(1))] + pub id: String, + + #[cbor(n(2))] + pub issued_for: Identifier, + + #[cbor(n(3))] + pub created_at: i64, + + #[cbor(n(4))] + pub expires_at: i64, + + #[cbor(n(5))] + pub token: String, + + #[cbor(n(6))] + pub status: TokenStatus, +} + +#[cfg(test)] +impl Default for LeaseToken { + fn default() -> Self { + use std::str::FromStr; + Self { + id: "token_id".to_string(), + issued_for: Identifier::from_str( + "I0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", + ) + .unwrap(), + created_at: OffsetDateTime::now_utc().unix_timestamp(), + expires_at: OffsetDateTime::now_utc().unix_timestamp(), + token: "token".to_string(), + status: TokenStatus::Active, + } + } +} + +impl LeaseToken { + pub fn is_active(&self) -> bool { + self.status == TokenStatus::Active + } + + pub fn expires_at(&self) -> ockam_core::Result { + OffsetDateTime::from_unix_timestamp(self.expires_at) + .map_err(|e| ApiError::core(e.to_string())) + } + + pub fn is_expired(&self) -> ockam_core::Result { + Ok(self.expires_at()? < OffsetDateTime::now_utc()) + } +} + +impl Display for LeaseToken { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "{}", color_primary(&self.id))?; + writeln!(f, "{}Token: {}", fmt::INDENTATION, color_primary(&self.id))?; + + let expires_at = self.expires_at().map_err(|_| FmtError)?.to_string(); + let expiration_time = if self.is_expired().map_err(|_| FmtError)? { + format!("Expired at {}", color_error(&expires_at)) + } else { + format!("Expires at {}", color_primary(&expires_at)) + }; + let status = if self.is_active() { + color_primary(self.status.to_string()) + } else { + color_error(self.status.to_string()) + }; + writeln!(f, "{}{expiration_time} ({status})", fmt::INDENTATION)?; + Ok(()) + } +} + +impl Output for LeaseToken { + fn item(&self) -> crate::Result { + Ok(self.padded_display()) + } +} + +impl Ord for LeaseToken { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.expires_at.cmp(&other.expires_at) + } +} + +impl PartialOrd for LeaseToken { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for LeaseToken { + fn eq(&self, other: &Self) -> bool { + self.expires_at == other.expires_at + } +} + +impl Eq for LeaseToken {} + +#[derive( + Encode, Decode, CborLen, Serialize, Deserialize, PartialEq, Debug, Clone, EnumString, Display, +)] +pub enum TokenStatus { + #[n(0)] + #[strum(serialize = "active")] + Active, + + #[n(1)] + #[strum(serialize = "inactive")] + Revoked, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn lease_token_display() { + let token = LeaseToken { + created_at: OffsetDateTime::now_utc().unix_timestamp(), + expires_at: OffsetDateTime::now_utc().unix_timestamp(), + ..Default::default() + }; + assert!(token.expires_at().is_ok()); + assert!(token.is_expired().is_ok()); + assert!(token.item().is_ok()); + } + + #[test] + fn token_lease_is_expired() { + let mut token = LeaseToken { + expires_at: OffsetDateTime::now_utc().unix_timestamp() - 100, + ..Default::default() + }; + assert!(token.is_expired().unwrap()); + + token.expires_at = OffsetDateTime::now_utc().unix_timestamp() + 100; + assert!(!token.is_expired().unwrap()); + } + + #[test] + fn token_lease_ordering() { + let token1 = LeaseToken { + expires_at: OffsetDateTime::now_utc().unix_timestamp() + 100, + ..Default::default() + }; + let token2 = LeaseToken { + expires_at: OffsetDateTime::now_utc().unix_timestamp() + 200, + ..Default::default() + }; + // token1 expires before token2 + assert!(token1 < token2); + } +} diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/mod.rs b/implementations/rust/ockam/ockam_api/src/influxdb/mod.rs new file mode 100644 index 00000000000..77fd15b6bc2 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/influxdb/mod.rs @@ -0,0 +1,7 @@ +mod influxdb_api_client; +mod lease_token; +pub mod token_lessor_node_service; +mod token_lessor_processor; +mod token_lessor_worker; + +pub use token_lessor_node_service::StartInfluxDBLeaseManagerRequest; diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_node_service.rs b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_node_service.rs new file mode 100644 index 00000000000..f1cb720f066 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_node_service.rs @@ -0,0 +1,293 @@ +use crate::influxdb::influxdb_api_client::InfluxDBApiClient; +use crate::influxdb::lease_token::LeaseToken; +use crate::influxdb::token_lessor_processor::InfluxDBTokenLessorProcessor; +use crate::influxdb::token_lessor_worker::InfluxDBTokenLessorWorker; +use crate::nodes::models::services::{DeleteServiceRequest, StartServiceRequest}; +use crate::nodes::service::messages::Messages; +use crate::nodes::{InMemoryNode, NodeManagerWorker}; +use crate::{ApiError, DefaultAddress}; +use miette::IntoDiagnostic; +use minicbor::{CborLen, Decode, Encode}; +use ockam_abac::{Action, PolicyExpression, Resource, ResourceType}; +use ockam_core::api::{Error, Request, Response}; +use ockam_core::{async_trait, Address}; +use ockam_multiaddr::MultiAddr; +use ockam_node::{Context, ProcessorBuilder, WorkerBuilder}; +use serde::{Deserialize, Serialize}; +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use time::Duration; + +impl NodeManagerWorker { + pub(crate) async fn start_influxdb_token_lessor_service( + &self, + context: &Context, + body: StartServiceRequest, + ) -> Result> { + let request = body.request().clone(); + match self + .node_manager + .start_influxdb_token_lessor_service( + context, + Address::from_string(body.address()), + request, + ) + .await + { + Ok(_) => Ok(Response::ok()), + Err(e) => Err(Response::internal_error_no_request(&e.to_string())), + } + } + + pub(crate) async fn delete_influxdb_token_lessor_service( + &self, + context: &Context, + req: DeleteServiceRequest, + ) -> Result> { + let address = req.address(); + match self + .node_manager + .delete_influxdb_token_lessor_service(context, address.clone()) + .await + { + Ok(Some(_)) => Ok(Response::ok()), + Ok(None) => Err(Response::not_found_no_request(&format!( + "Influxdb token lease manager service not found at address '{address}'" + ))), + Err(e) => Err(Response::internal_error_no_request(&e.to_string())), + } + } +} + +impl InMemoryNode { + async fn start_influxdb_token_lessor_service( + &self, + context: &Context, + address: Address, + req: StartInfluxDBLeaseManagerRequest, + ) -> Result<(), Error> { + debug!(address = %address.address(), "Starting influxdb token lease manager service"); + + let default_secure_channel_listener_flow_control_id = context + .flow_controls() + .get_flow_control_with_spawner(&DefaultAddress::SECURE_CHANNEL_LISTENER.into()) + .ok_or_else(|| { + ApiError::core("Unable to get flow control for secure channel listener") + })?; + context.flow_controls().add_consumer( + address.clone(), + &default_secure_channel_listener_flow_control_id, + ); + + let (incoming_ac, outgoing_ac) = self + .access_control( + context, + self.project_authority(), + Resource::new(address.address(), ResourceType::InfluxDBLessor), + Action::HandleMessage, + req.policy_expression, + ) + .await?; + + let worker = InfluxDBTokenLessorWorker::new( + address.clone(), + req.influxdb_address, + req.influxdb_org_id, + req.influxdb_token, + req.token_permissions, + req.token_ttl, + ) + .await?; + let processor = InfluxDBTokenLessorProcessor::new(worker.state.clone()); + + WorkerBuilder::new(worker) + .with_address(address.clone()) + .with_incoming_access_control_arc(incoming_ac) + .with_outgoing_access_control_arc(outgoing_ac) + .start(context) + .await?; + self.registry + .influxdb_services + .insert(address.clone(), ()) + .await; + + ProcessorBuilder::new(processor) + .with_address(format!("{address}-processor")) + .start(context) + .await?; + + Ok(()) + } + + async fn delete_influxdb_token_lessor_service( + &self, + context: &Context, + address: Address, + ) -> Result, Error> { + debug!(address = %address,"Deleting influxdb token lease manager service"); + match self.registry.influxdb_services.get(&address).await { + None => Ok(None), + Some(_) => { + context.stop_worker(address.clone()).await?; + context + .stop_processor(format!("{address}-processor")) + .await?; + self.registry.influxdb_services.remove(&address).await; + Ok(Some(())) + } + } + } +} + +#[derive(Debug, Clone, Encode, Decode, CborLen, Serialize, Deserialize, PartialEq)] +#[rustfmt::skip] +#[cbor(map)] +pub struct StartInfluxDBLeaseManagerRequest { + #[n(1)] influxdb_address: String, + #[n(2)] influxdb_org_id: String, + #[n(3)] influxdb_token: String, + #[n(4)] token_permissions: String, + #[n(5)] token_ttl: i32, + #[n(6)] policy_expression: Option, +} + +#[async_trait] +pub trait InfluxDBTokenLessorNodeServiceTrait { + async fn create_token(&self, ctx: &Context, at: &MultiAddr) -> miette::Result; + + async fn get_token( + &self, + ctx: &Context, + at: &MultiAddr, + token_id: &str, + ) -> miette::Result; + + async fn revoke_token( + &self, + ctx: &Context, + at: &MultiAddr, + token_id: &str, + ) -> miette::Result<()>; + + async fn list_tokens(&self, ctx: &Context, at: &MultiAddr) -> miette::Result>; +} + +#[async_trait] +impl InfluxDBTokenLessorNodeServiceTrait for InMemoryNode { + async fn create_token(&self, ctx: &Context, at: &MultiAddr) -> miette::Result { + let req = Request::post("").to_vec().into_diagnostic()?; + let bytes = self.send_message(ctx, at, req, None).await?; + let res = + Response::parse_response_reply::(bytes.as_slice()).into_diagnostic()?; + Ok(res.success().into_diagnostic()?) + } + + async fn get_token( + &self, + ctx: &Context, + at: &MultiAddr, + token_id: &str, + ) -> miette::Result { + let req = Request::get(format!("/{token_id}")) + .to_vec() + .into_diagnostic()?; + let bytes = self.send_message(ctx, at, req, None).await?; + let res = + Response::parse_response_reply::(bytes.as_slice()).into_diagnostic()?; + Ok(res.success().into_diagnostic()?) + } + + async fn revoke_token( + &self, + ctx: &Context, + at: &MultiAddr, + token_id: &str, + ) -> miette::Result<()> { + let req = Request::delete(format!("/{token_id}")) + .to_vec() + .into_diagnostic()?; + let bytes = self.send_message(ctx, at, req, None).await?; + let res = Response::parse_response_reply::<()>(bytes.as_slice()).into_diagnostic()?; + Ok(res.success().into_diagnostic()?) + } + + async fn list_tokens(&self, ctx: &Context, at: &MultiAddr) -> miette::Result> { + let req = Request::get("").to_vec().into_diagnostic()?; + let bytes = self.send_message(ctx, at, req, None).await?; + let res = Response::parse_response_reply::>(bytes.as_slice()) + .into_diagnostic()?; + Ok(res.success().into_diagnostic()?) + } +} + +pub(crate) struct InfluxDBTokenLessorState { + #[allow(dead_code)] + pub(super) address: Address, + pub(super) influxdb_api_client: InfluxDBApiClient, + pub(super) influxdb_org_id: String, + + /// Permissions for the created tokens + pub(super) token_permissions: String, + + /// Duration for which a token is valid + pub(super) token_ttl: Duration, + + /// Active tokens ordered by expiration time, earliest first + pub(super) active_tokens: BinaryHeap>, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn state_active_tokens_ordering() { + let mut state = InfluxDBTokenLessorState { + address: Address::from_string("test"), + influxdb_api_client: InfluxDBApiClient::new("http://localhost:8086", "token").unwrap(), + influxdb_org_id: "org_id".to_string(), + token_permissions: "permissions".to_string(), + token_ttl: Duration::seconds(60), + active_tokens: BinaryHeap::new(), + }; + + let token1 = LeaseToken { + expires_at: (time::OffsetDateTime::now_utc() + Duration::seconds(10)).unix_timestamp(), + ..Default::default() + }; + let token2 = LeaseToken { + expires_at: (time::OffsetDateTime::now_utc() + Duration::seconds(20)).unix_timestamp(), + ..Default::default() + }; + let token3 = LeaseToken { + expires_at: (time::OffsetDateTime::now_utc() + Duration::seconds(30)).unix_timestamp(), + ..Default::default() + }; + let token4 = LeaseToken { + expires_at: (time::OffsetDateTime::now_utc() + Duration::seconds(40)).unix_timestamp(), + ..Default::default() + }; + let token5 = LeaseToken { + expires_at: (time::OffsetDateTime::now_utc() + Duration::seconds(50)).unix_timestamp(), + ..Default::default() + }; + let token6 = LeaseToken { + expires_at: (time::OffsetDateTime::now_utc() + Duration::seconds(60)).unix_timestamp(), + ..Default::default() + }; + + state.active_tokens.push(Reverse(token4.clone())); + state.active_tokens.push(Reverse(token2.clone())); + state.active_tokens.push(Reverse(token6.clone())); + state.active_tokens.push(Reverse(token1.clone())); + state.active_tokens.push(Reverse(token5.clone())); + state.active_tokens.push(Reverse(token3.clone())); + + assert_eq!(state.active_tokens.pop().unwrap().0, token1); + assert_eq!(state.active_tokens.pop().unwrap().0, token2); + assert_eq!(state.active_tokens.pop().unwrap().0, token3); + assert_eq!(state.active_tokens.pop().unwrap().0, token4); + assert_eq!(state.active_tokens.pop().unwrap().0, token5); + assert_eq!(state.active_tokens.pop().unwrap().0, token6); + } +} diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_processor.rs b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_processor.rs new file mode 100644 index 00000000000..30de8703af0 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_processor.rs @@ -0,0 +1,106 @@ +use crate::influxdb::influxdb_api_client::InfluxDBApi; +use crate::influxdb::lease_token::LeaseToken; +use crate::token_lessor_node_service::InfluxDBTokenLessorState; +use ockam_core::{async_trait, Processor}; +use ockam_node::Context; +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::sync::Arc; +use tokio::sync::RwLock; + +pub(crate) struct InfluxDBTokenLessorProcessor { + state: Arc>, +} + +impl InfluxDBTokenLessorProcessor { + pub(crate) fn new(state: Arc>) -> Self { + Self { state } + } + + async fn list_tokens(&self) -> ockam_core::Result> { + debug!("Listing all tokens"); + let influxdb_tokens = { + let state = self.state.read().await; + state.influxdb_api_client.list_tokens().await? + }; + let lease_tokens: Vec = influxdb_tokens + .into_iter() + .filter_map(|token| { + let lease_token: Result, _> = token.try_into(); + lease_token.ok().flatten() + }) + .collect(); + info!("Found {} tokens", lease_tokens.len()); + Ok(lease_tokens) + } + + async fn revoke_outstanding_tokens(&self) -> ockam_core::Result<()> { + let to_remove = { + let state = self.state.read().await; + if state.active_tokens.is_empty() { + return Ok(()); + } + let influxdb_api_client = &state.influxdb_api_client; + let mut to_remove = vec![]; + for token in state.active_tokens.iter() { + if token.0.is_expired().unwrap_or(true) { + let token_id = &token.0.id; + if influxdb_api_client.revoke_token(token_id).await.is_ok() { + to_remove.push(token_id.clone()); + } + } else { + break; + } + } + to_remove + }; + { + let mut state = self.state.write().await; + state + .active_tokens + .retain(|token| !to_remove.contains(&token.0.id)); + } + Ok(()) + } +} + +#[async_trait] +impl Processor for InfluxDBTokenLessorProcessor { + type Context = Context; + + async fn initialize(&mut self, _context: &mut Self::Context) -> ockam_core::Result<()> { + let mut max_retries = 5; + loop { + match self.list_tokens().await { + Ok(tokens) => { + let mut state = self.state.write().await; + state.active_tokens = + BinaryHeap::from(tokens.into_iter().map(Reverse).collect::>()); + break; + } + Err(_) => { + max_retries -= 1; + if max_retries == 0 { + error!("Failed to initialize InfluxDBTokenLessorProcessor"); + break; + } + } + } + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + Ok(()) + } + + async fn shutdown(&mut self, _context: &mut Self::Context) -> ockam_core::Result<()> { + debug!("Shutting down InfluxDBTokenLessorProcessor"); + Ok(()) + } + + async fn process(&mut self, _context: &mut Self::Context) -> ockam_core::Result { + if let Err(err) = self.revoke_outstanding_tokens().await { + error!("Failed to revoke outstanding tokens: {err}"); + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok(true) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_worker.rs b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_worker.rs new file mode 100644 index 00000000000..9144197a6dd --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_worker.rs @@ -0,0 +1,316 @@ +use crate::influxdb::influxdb_api_client::{ + InfluxDBApi, InfluxDBApiClient, InfluxDBCreateTokenRequest, InfluxDBTokenResponse, +}; +use crate::influxdb::lease_token::LeaseToken; +use crate::nodes::service::encode_response; +use crate::token_lessor_node_service::InfluxDBTokenLessorState; +use crate::ApiError; +use minicbor::Decoder; +use ockam::identity::{Identifier, IdentitySecureChannelLocalInfo}; +use ockam_core::api::Method::{Delete, Get, Post}; +use ockam_core::api::{RequestHeader, Response}; +use ockam_core::{async_trait, Address, Routed, Worker}; +use ockam_node::Context; +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::error::Error; +use std::sync::Arc; +use time::{Duration, OffsetDateTime}; +use tokio::sync::RwLock; + +#[derive(Clone)] +pub(crate) struct InfluxDBTokenLessorWorker { + pub(crate) state: Arc>, +} + +impl InfluxDBTokenLessorWorker { + pub(crate) async fn new( + address: Address, + influxdb_address: String, + influxdb_org_id: String, + influxdb_token: String, + token_permissions: String, + token_ttl: i32, + ) -> ockam_core::Result { + debug!("Creating InfluxDBTokenLessorWorker"); + let _self = Self { + state: Arc::new(RwLock::new(InfluxDBTokenLessorState { + address, + influxdb_api_client: InfluxDBApiClient::new(influxdb_address, influxdb_token)?, + influxdb_org_id, + token_permissions, + token_ttl: Duration::seconds(token_ttl as i64), + active_tokens: BinaryHeap::new(), + })), + }; + Ok(_self) + } + + #[instrument(skip_all, fields(method = ?req.method(), path = req.path()))] + async fn handle_request( + &mut self, + _ctx: &mut Context, + requester: &Identifier, + req: &RequestHeader, + _dec: &mut Decoder<'_>, + ) -> ockam_core::Result> { + debug! { + id = %req.id(), + method = ?req.method(), + path = %req.path(), + body = %req.has_body(), + "request" + } + + let path = req.path(); + let path_segments = req.path_segments::<5>(); + let method = match req.method() { + Some(m) => m, + None => todo!(), + }; + debug!(path_segments = ?path_segments.as_slice().iter().map(|s| s.to_string()).collect::>(), "Handling request"); + + let r = match (method, path_segments.as_slice()) { + (Post, [""]) => encode_response(req, self.create_token(requester).await)?, + (Get, [""]) => encode_response(req, self.list_tokens(requester).await)?, + (Get, [token_id]) => encode_response(req, self.get_token(requester, token_id).await)?, + (Delete, [token_id]) => { + encode_response(req, self.revoke_token(requester, token_id).await)? + } + // ==*== Catch-all for Unimplemented APIs ==*== + _ => { + warn!(%method, %path, "Called invalid endpoint"); + Response::bad_request(req, &format!("Invalid endpoint: {} {}", method, path)) + .to_vec()? + } + }; + Ok(r) + } +} + +#[ockam::worker] +impl Worker for InfluxDBTokenLessorWorker { + type Message = Vec; + type Context = Context; + + async fn shutdown(&mut self, _ctx: &mut Self::Context) -> ockam_core::Result<()> { + debug!("Shutting down InfluxDBTokenLessorWorker"); + Ok(()) + } + + #[instrument(skip_all, name = "InfluxDBTokenLessorWorker::handle_message")] + async fn handle_message( + &mut self, + ctx: &mut Context, + msg: Routed>, + ) -> ockam_core::Result<()> { + let requester_identifier = + IdentitySecureChannelLocalInfo::find_info(msg.local_message())?.their_identity_id(); + + let return_route = msg.return_route(); + let body = msg.into_body()?; + let mut dec = Decoder::new(&body); + let req: RequestHeader = match dec.decode() { + Ok(r) => r, + Err(e) => { + error!("Failed to decode request: {:?}", e); + return Ok(()); + } + }; + + let r = match self + .handle_request(ctx, &requester_identifier, &req, &mut dec) + .await + { + Ok(r) => r, + Err(err) => { + error! { + re = %req.id(), + method = ?req.method(), + path = %req.path(), + code = %err.code(), + cause = ?err.source(), + "failed to handle request" + } + Response::internal_error(&req, &format!("failed to handle request: {err} {req:?}")) + .to_vec()? + } + }; + debug! { + re = %req.id(), + method = ?req.method(), + path = %req.path(), + "responding" + } + ctx.send(return_route, r).await + } +} + +#[async_trait] +pub trait InfluxDBTokenLessorWorkerApi { + async fn create_token( + &self, + requester: &Identifier, + ) -> Result, Response>; + + async fn get_token( + &self, + requester: &Identifier, + token_id: &str, + ) -> Result, Response>; + + async fn revoke_token( + &self, + requester: &Identifier, + token_id: &str, + ) -> Result>; + + async fn list_tokens( + &self, + requester: &Identifier, + ) -> Result>, Response>; +} + +#[async_trait] +impl InfluxDBTokenLessorWorkerApi for InfluxDBTokenLessorWorker { + async fn create_token( + &self, + requester: &Identifier, + ) -> Result, Response> { + debug!(%requester, "Creating token"); + let influxdb_token = { + let state = self.state.read().await; + let expires = OffsetDateTime::now_utc() + state.token_ttl; + let description = InfluxDBTokenResponse::pack_metadata(requester, expires); + state + .influxdb_api_client + .create_token(InfluxDBCreateTokenRequest { + description, + org_id: state.influxdb_org_id.clone(), + permissions: state.token_permissions.clone(), + }) + .await? + }; + let lease_token: Option = influxdb_token.try_into()?; + match lease_token { + Some(lease_token) => { + { + let mut state = self.state.write().await; + state.active_tokens.push(Reverse(lease_token.clone())); + } + Ok(Response::ok().body(lease_token)) + } + None => { + warn!("Token does not contain Ockam metadata, ignoring"); + Err(Response::bad_request_no_request( + "Token does not contain Ockam metadata", + )) + } + } + } + + async fn get_token( + &self, + requester: &Identifier, + token_id: &str, + ) -> Result, Response> { + debug!(%requester, %token_id, "Getting token"); + let influxdb_token = { + let state = self.state.read().await; + state.influxdb_api_client.get_token(token_id).await? + }; + debug!(%requester, %token_id, "Received token: {:?}", influxdb_token); + let lease_token: Option = influxdb_token.try_into().map_err(|e| { + ApiError::core(format!( + "Failed to parse InfluxDB token as a LeaseToken: {e}" + )) + })?; + match lease_token { + Some(lease_token) => { + if requester.eq(&lease_token.issued_for) { + Ok(Response::ok().body(lease_token)) + } else { + warn!(%requester, %token_id, "Token not authorized"); + Err(Response::unauthorized_no_request( + "Token not authorized for requester", + )) + } + } + None => { + warn!(%requester, %token_id, "Token not found"); + Err(Response::not_found_no_request( + "Token does not contain Ockam metadata", + )) + } + } + } + + async fn revoke_token( + &self, + requester: &Identifier, + token_id: &str, + ) -> Result> { + debug!(%requester, %token_id, "Revoking token"); + let is_authorized_to_revoke = self + .get_token(requester, token_id) + .await? + .into_parts() + .1 + .is_some(); + let revoked = if is_authorized_to_revoke { + let state = self.state.read().await; + state.influxdb_api_client.revoke_token(token_id).await + } else { + Err(ApiError::core("Not authorized to revoke token")) + }; + match revoked { + Ok(_) => { + info!(%requester, %token_id, "Token revoked"); + { + let mut state = self.state.write().await; + state.active_tokens.retain(|t| t.0.id != token_id); + } + Ok(Response::ok()) + } + Err(e) => { + error!("Failed to revoke token: {e:?}"); + Err(Response::internal_error_no_request( + "Failed to revoke token", + )) + } + } + } + + async fn list_tokens( + &self, + requester: &Identifier, + ) -> Result>, Response> { + debug!(%requester, "Listing tokens"); + let influxdb_tokens = { + let state = self.state.read().await; + state.influxdb_api_client.list_tokens().await? + }; + debug!("Received tokens list: {:?}", influxdb_tokens); + let lease_tokens: Vec = influxdb_tokens + .into_iter() + .filter_map(|token| { + let lease_token: Result, _> = token.try_into(); + if let Some(lease_token) = lease_token.ok().flatten() { + if requester.eq(&lease_token.issued_for) { + Some(lease_token) + } else { + None + } + } else { + None + } + }) + .collect(); + { + let mut state = self.state.write().await; + state.active_tokens = lease_tokens.iter().map(|t| Reverse(t.clone())).collect(); + } + info!("Found {} tokens", lease_tokens.len()); + Ok(Response::ok().body(lease_tokens)) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/influxdb_token_lease/influxdb_token_lease.rs b/implementations/rust/ockam/ockam_api/src/influxdb_token_lease/influxdb_token_lease.rs deleted file mode 100644 index 967015e380b..00000000000 --- a/implementations/rust/ockam/ockam_api/src/influxdb_token_lease/influxdb_token_lease.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::cloud::lease_manager::models::influxdb::Token; -use crate::cloud::{HasSecureClient, ProjectNodeClient}; -use miette::IntoDiagnostic; -use ockam_core::api::Request; -use ockam_core::async_trait; -use ockam_node::Context; - -#[async_trait] -pub trait InfluxDbTokenLease { - async fn create_token(&self, ctx: &Context) -> miette::Result; - - async fn get_token(&self, ctx: &Context, token_id: String) -> miette::Result; - - async fn revoke_token(&self, ctx: &Context, token_id: String) -> miette::Result<()>; - - async fn list_tokens(&self, ctx: &Context) -> miette::Result>; -} - -#[async_trait] -impl InfluxDbTokenLease for ProjectNodeClient { - async fn create_token(&self, ctx: &Context) -> miette::Result { - self.get_secure_client() - .ask(ctx, "influxdb_token_lease", Request::post("/")) - .await - .into_diagnostic()? - .success() - .into_diagnostic() - } - - async fn get_token(&self, ctx: &Context, token_id: String) -> miette::Result { - self.get_secure_client() - .ask( - ctx, - "influxdb_token_lease", - Request::get(format!("/{token_id}")), - ) - .await - .into_diagnostic()? - .success() - .into_diagnostic() - } - - async fn revoke_token(&self, ctx: &Context, token_id: String) -> miette::Result<()> { - self.get_secure_client() - .tell( - ctx, - "influxdb_token_lease", - Request::delete(format!("/{token_id}")), - ) - .await - .into_diagnostic()? - .success() - .into_diagnostic() - } - - async fn list_tokens(&self, ctx: &Context) -> miette::Result> { - self.get_secure_client() - .ask(ctx, "influxdb_token_lease", Request::get("/")) - .await - .into_diagnostic()? - .success() - .into_diagnostic() - } -} diff --git a/implementations/rust/ockam/ockam_api/src/influxdb_token_lease/mod.rs b/implementations/rust/ockam/ockam_api/src/influxdb_token_lease/mod.rs deleted file mode 100644 index 9f8a2f2f8fc..00000000000 --- a/implementations/rust/ockam/ockam_api/src/influxdb_token_lease/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -#[allow(clippy::module_inception)] -mod influxdb_token_lease; - -pub use influxdb_token_lease::*; diff --git a/implementations/rust/ockam/ockam_api/src/lib.rs b/implementations/rust/ockam/ockam_api/src/lib.rs index a757e15453c..ee9d5ddce5c 100644 --- a/implementations/rust/ockam/ockam_api/src/lib.rs +++ b/implementations/rust/ockam/ockam_api/src/lib.rs @@ -16,6 +16,9 @@ //! │ └─ ... //! ``` +/// This is a temporary workaround until the fixes done +/// in https://github.com/launchbadge/sqlx/pull/3298 are released +extern crate sqlx_build_trust as sqlx; #[macro_use] extern crate tracing; @@ -38,7 +41,7 @@ pub mod uppercase; mod version; pub mod authority_node; -mod influxdb_token_lease; +pub mod influxdb; pub mod logs; mod schema; @@ -51,14 +54,10 @@ mod util; pub use cli_state::CliState; pub use error::*; -pub use influxdb_token_lease::*; +pub use influxdb::*; pub use nodes::service::default_address::*; pub use rendezvous_healthcheck::*; pub use session::connection_status::ConnectionStatus; pub use ui::*; pub use util::*; pub use version::*; - -/// This is a temporary workaround until the fixes done -/// in https://github.com/launchbadge/sqlx/pull/3298 are released -extern crate sqlx_build_trust as sqlx; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/registry.rs b/implementations/rust/ockam/ockam_api/src/nodes/registry.rs index 420cd1ee9cc..e646f8ff067 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/registry.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/registry.rs @@ -173,6 +173,7 @@ pub(crate) struct Registry { pub(crate) relays: RegistryOf, pub(crate) inlets: RegistryOf, pub(crate) outlets: RegistryOf, + pub(crate) influxdb_services: RegistryOf, // TODO: what should we persist here? } pub(crate) struct RegistryOf { diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs index 103ebbd5f82..5ecb401fd98 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs @@ -21,6 +21,7 @@ impl DefaultAddress { pub const OKTA_IDENTITY_PROVIDER: &'static str = "okta"; pub const KAFKA_OUTLET: &'static str = "kafka_outlet"; pub const KAFKA_INLET: &'static str = "kafka_inlet"; + pub const INFLUXDB_TOKEN_LESSOR: &'static str = "lessor"; pub fn get_rendezvous_server_address() -> Address { let server_address = @@ -42,7 +43,8 @@ impl DefaultAddress { | Self::ENROLLMENT_TOKEN_ACCEPTOR | Self::OKTA_IDENTITY_PROVIDER | Self::KAFKA_INLET - | Self::KAFKA_OUTLET) + | Self::KAFKA_OUTLET + | Self::INFLUXDB_TOKEN_LESSOR) } pub fn iter() -> impl Iterator { @@ -62,6 +64,7 @@ impl DefaultAddress { Self::OKTA_IDENTITY_PROVIDER, Self::KAFKA_INLET, Self::KAFKA_OUTLET, + Self::INFLUXDB_TOKEN_LESSOR, ] .iter() .copied() @@ -75,31 +78,8 @@ mod test { #[test] fn test_default_address_is_valid() { assert!(!DefaultAddress::is_valid("foo")); - assert!(DefaultAddress::is_valid(DefaultAddress::OUTLET_SERVICE)); - assert!(DefaultAddress::is_valid(DefaultAddress::RELAY_SERVICE)); - assert!(DefaultAddress::is_valid( - DefaultAddress::STATIC_RELAY_SERVICE - )); - assert!(DefaultAddress::is_valid(DefaultAddress::UPPERCASE_SERVICE)); - assert!(DefaultAddress::is_valid(DefaultAddress::ECHO_SERVICE)); - assert!(DefaultAddress::is_valid(DefaultAddress::HOP_SERVICE)); - assert!(DefaultAddress::is_valid( - DefaultAddress::SECURE_CHANNEL_LISTENER - )); - assert!(DefaultAddress::is_valid( - DefaultAddress::DIRECT_AUTHENTICATOR - )); - assert!(DefaultAddress::is_valid(DefaultAddress::CREDENTIAL_ISSUER)); - assert!(DefaultAddress::is_valid( - DefaultAddress::ENROLLMENT_TOKEN_ISSUER - )); - assert!(DefaultAddress::is_valid( - DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR - )); - assert!(DefaultAddress::is_valid( - DefaultAddress::OKTA_IDENTITY_PROVIDER - )); - assert!(DefaultAddress::is_valid(DefaultAddress::KAFKA_INLET)); - assert!(DefaultAddress::is_valid(DefaultAddress::KAFKA_OUTLET)); + for name in DefaultAddress::iter() { + assert!(DefaultAddress::is_valid(name)); + } } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs index b3416cf558d..495fabfc2b4 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/manager.rs @@ -51,7 +51,7 @@ pub struct NodeManager { pub(crate) cli_state: CliState, pub(super) node_name: String, pub(super) node_identifier: Identifier, - pub(super) api_transport_flow_control_id: FlowControlId, + pub(crate) api_transport_flow_control_id: FlowControlId, pub(crate) tcp_transport: TcpTransport, pub(crate) udp_transport: Option, pub(crate) secure_channels: Arc, @@ -491,7 +491,7 @@ impl NodeManager { .into_diagnostic() } - pub(super) async fn access_control( + pub(crate) async fn access_control( &self, ctx: &Context, authority: Option, diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs index 900d31689cb..91d7818d761 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs @@ -142,6 +142,18 @@ impl NodeManagerWorker { self.delete_kafka_service(ctx, dec.decode()?, KafkaServiceKind::Inlet) .await, )?, + (Post, ["node", "services", DefaultAddress::INFLUXDB_TOKEN_LESSOR]) => encode_response( + req, + self.start_influxdb_token_lessor_service(ctx, dec.decode()?) + .await, + )?, + (Delete, ["node", "services", DefaultAddress::INFLUXDB_TOKEN_LESSOR]) => { + encode_response( + req, + self.delete_influxdb_token_lessor_service(ctx, dec.decode()?) + .await, + )? + } (Get, ["node", "services"]) => encode_response(req, self.list_services().await)?, (Get, ["node", "services", service_type]) => { encode_response(req, self.list_services_of_type(service_type).await)? diff --git a/implementations/rust/ockam/ockam_command/src/lease/create.rs b/implementations/rust/ockam/ockam_command/src/lease/create.rs index a458a215051..95e7c3d6bb4 100644 --- a/implementations/rust/ockam/ockam_command/src/lease/create.rs +++ b/implementations/rust/ockam/ockam_command/src/lease/create.rs @@ -1,98 +1,69 @@ +use crate::shared_args::{IdentityOpts, TimeoutArg, TrustOpts}; +use crate::{docs, Command, CommandGlobalOpts}; +use async_trait::async_trait; use clap::Args; -use colorful::Colorful; -use miette::IntoDiagnostic; -use time::format_description::well_known::Iso8601; -use time::PrimitiveDateTime; -use tokio::sync::Mutex; -use tokio::try_join; - -use ockam::Context; -use ockam_api::colors::OckamColor; -use ockam_api::{fmt_log, fmt_ok, InfluxDbTokenLease}; - -use crate::lease::create_project_client; -use crate::shared_args::{IdentityOpts, TrustOpts}; -use crate::util::async_cmd; -use crate::{docs, CommandGlobalOpts}; +use ockam_api::fmt_log; +use ockam_api::influxdb::token_lessor_node_service::InfluxDBTokenLessorNodeServiceTrait; +use ockam_api::nodes::InMemoryNode; +use ockam_api::output::Output; +use ockam_multiaddr::MultiAddr; +use ockam_node::Context; const HELP_DETAIL: &str = ""; /// Create a token within the lease token manager #[derive(Clone, Debug, Args)] #[command(help_template = docs::after_help(HELP_DETAIL))] -pub struct CreateCommand {} +pub struct CreateCommand { + /// The route to the node that will be used to create the token + #[arg(long, value_name = "ROUTE", default_value_t = super::lease_at_default_value())] + pub at: MultiAddr, -impl CreateCommand { - pub fn run( - self, - opts: CommandGlobalOpts, - identity_opts: IdentityOpts, - trust_opts: TrustOpts, - ) -> miette::Result<()> { - async_cmd(&self.name(), opts.clone(), |ctx| async move { - self.async_run(&ctx, opts, identity_opts, trust_opts).await - }) - } + #[command(flatten)] + pub timeout: TimeoutArg, - pub fn name(&self) -> String { - "lease create".into() - } + #[command(flatten)] + identity_opts: IdentityOpts, - async fn async_run( - &self, - ctx: &Context, - opts: CommandGlobalOpts, - identity_opts: IdentityOpts, - trust_opts: TrustOpts, - ) -> miette::Result<()> { - opts.terminal - .write_line(&fmt_log!("Creating influxdb token...\n"))?; + #[command(flatten)] + trust_opts: TrustOpts, +} - let project_node = create_project_client(ctx, &opts, &identity_opts, &trust_opts).await?; - let is_finished: Mutex = Mutex::new(false); +#[async_trait] +impl Command for CreateCommand { + const NAME: &'static str = "lease create"; - let send_req = async { - let token = project_node.create_token(ctx).await?; - *is_finished.lock().await = true; - Ok(token) - }; + async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { + let cmd = self.parse_args(&opts).await?; - let output_messages = vec!["Creating influxdb token...".to_string()]; + let node = InMemoryNode::start_with_identity_and_project_name( + ctx, + &opts.state, + cmd.identity_opts.identity_name.clone(), + cmd.trust_opts.project_name.clone(), + ) + .await? + .with_timeout(cmd.timeout.timeout); - let progress_output = opts.terminal.loop_messages(&output_messages, &is_finished); + opts.terminal + .write_line(&fmt_log!("Creating influxdb token...\n"))?; - let (resp_token, _) = try_join!(send_req, progress_output)?; + let res = node.create_token(ctx, &cmd.at).await?; opts.terminal .stdout() - .machine(resp_token.token.to_string()) - .json(serde_json::to_string(&resp_token).into_diagnostic()?) - .plain( - fmt_ok!("Created influxdb token\n") - + &fmt_log!( - "{}\n", - &resp_token - .token - .to_string() - .color(OckamColor::PrimaryResource.color()) - ) - + &fmt_log!( - "Id {}\n", - &resp_token - .id - .to_string() - .color(OckamColor::PrimaryResource.color()) - ) - + &fmt_log!( - "Expires at {}", - PrimitiveDateTime::parse(&resp_token.expires, &Iso8601::DEFAULT) - .into_diagnostic()? - .to_string() - .color(OckamColor::PrimaryResource.color()) - ), - ) + .machine(res.token.to_string()) + .plain(res.item()?) + .json_obj(res)? .write_line()?; Ok(()) } } + +impl CreateCommand { + async fn parse_args(mut self, opts: &CommandGlobalOpts) -> crate::Result { + self.at = super::resolve_at_arg(&self.at, &opts.state).await?; + Ok(self) + } +} diff --git a/implementations/rust/ockam/ockam_command/src/lease/list.rs b/implementations/rust/ockam/ockam_command/src/lease/list.rs index 2e429ef8431..737eb805223 100644 --- a/implementations/rust/ockam/ockam_command/src/lease/list.rs +++ b/implementations/rust/ockam/ockam_command/src/lease/list.rs @@ -1,70 +1,71 @@ +use crate::shared_args::{IdentityOpts, TimeoutArg, TrustOpts}; +use crate::util::clean_nodes_multiaddr; +use crate::{docs, Command, CommandGlobalOpts}; +use async_trait::async_trait; use clap::Args; -use miette::IntoDiagnostic; use ockam::Context; -use ockam_api::cloud::lease_manager::models::influxdb::Token; -use ockam_api::InfluxDbTokenLease; -use tokio::sync::Mutex; -use tokio::try_join; - -use crate::lease::create_project_client; -use crate::shared_args::{IdentityOpts, TrustOpts}; -use crate::util::async_cmd; -use crate::{docs, CommandGlobalOpts}; +use ockam_api::fmt_log; +use ockam_api::nodes::InMemoryNode; +use ockam_api::token_lessor_node_service::InfluxDBTokenLessorNodeServiceTrait; +use ockam_multiaddr::MultiAddr; const HELP_DETAIL: &str = ""; /// List tokens within the lease token manager #[derive(Clone, Debug, Args)] #[command(help_template = docs::after_help(HELP_DETAIL))] -pub struct ListCommand; +pub struct ListCommand { + /// The route to the node that will be used to create the token + #[arg(long, value_name = "ROUTE", default_value_t = super::lease_at_default_value())] + pub at: MultiAddr, -impl ListCommand { - pub fn run( - self, - opts: CommandGlobalOpts, - identity_opts: IdentityOpts, - trust_opts: TrustOpts, - ) -> miette::Result<()> { - async_cmd(&self.name(), opts.clone(), |ctx| async move { - self.async_run(&ctx, opts, identity_opts, trust_opts).await - }) - } + #[command(flatten)] + pub timeout: TimeoutArg, - pub fn name(&self) -> String { - "lease list".into() - } + #[command(flatten)] + identity_opts: IdentityOpts, - async fn async_run( - &self, - ctx: &Context, - opts: CommandGlobalOpts, - identity_opts: IdentityOpts, - trust_opts: TrustOpts, - ) -> miette::Result<()> { - let is_finished: Mutex = Mutex::new(false); - let project_node = create_project_client(ctx, &opts, &identity_opts, &trust_opts).await?; + #[command(flatten)] + trust_opts: TrustOpts, +} + +#[async_trait] +impl Command for ListCommand { + const NAME: &'static str = "lease list"; - let send_req = async { - let tokens: Vec = project_node.list_tokens(ctx).await?; - *is_finished.lock().await = true; - Ok(tokens) - }; + async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { + let cmd = self.parse_args(&opts).await?; - let output_messages = vec![format!("Listing Tokens...\n")]; + let node = InMemoryNode::start_with_identity_and_project_name( + ctx, + &opts.state, + cmd.identity_opts.identity_name.clone(), + cmd.trust_opts.project_name.clone(), + ) + .await? + .with_timeout(cmd.timeout.timeout); - let progress_output = opts.terminal.loop_messages(&output_messages, &is_finished); + opts.terminal + .write_line(&fmt_log!("Listing influxdb tokens...\n"))?; - let (tokens, _) = try_join!(send_req, progress_output)?; + let (at, _meta) = clean_nodes_multiaddr(&cmd.at, &opts.state).await?; + let res = node.list_tokens(ctx, &at).await?; + + let plain = &opts.terminal.build_list(&res, "No tokens found")?; - let plain = opts - .terminal - .build_list(&tokens, "No active tokens found within service.")?; - let json = serde_json::to_string(&tokens).into_diagnostic()?; opts.terminal .stdout() .plain(plain) - .json(json) + .json_obj(res)? .write_line()?; + Ok(()) } } + +impl ListCommand { + async fn parse_args(mut self, opts: &CommandGlobalOpts) -> crate::Result { + self.at = super::resolve_at_arg(&self.at, &opts.state).await?; + Ok(self) + } +} diff --git a/implementations/rust/ockam/ockam_command/src/lease/mod.rs b/implementations/rust/ockam/ockam_command/src/lease/mod.rs index 160b546954e..004f0f221ca 100644 --- a/implementations/rust/ockam/ockam_command/src/lease/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/lease/mod.rs @@ -1,16 +1,14 @@ +use self::revoke::RevokeCommand; +use crate::util::process_nodes_multiaddr; +use crate::{Command, CommandGlobalOpts, Error}; use clap::{Args, Subcommand}; -use miette::{miette, IntoDiagnostic}; - pub use create::CreateCommand; pub use list::ListCommand; -use ockam_api::cloud::{CredentialsEnabled, ProjectNodeClient}; -use ockam_api::nodes::InMemoryNode; +use miette::IntoDiagnostic; +use ockam_api::CliState; +use ockam_multiaddr::MultiAddr; pub use show::ShowCommand; - -use crate::shared_args::{IdentityOpts, TrustOpts}; -use crate::CommandGlobalOpts; - -use self::revoke::RevokeCommand; +use std::str::FromStr; mod create; mod list; @@ -22,12 +20,6 @@ mod show; pub struct LeaseCommand { #[command(subcommand)] subcommand: LeaseSubcommand, - - #[command(flatten)] - identity_opts: IdentityOpts, - - #[command(flatten)] - trust_opts: TrustOpts, } #[derive(Clone, Debug, Subcommand)] @@ -41,10 +33,10 @@ pub enum LeaseSubcommand { impl LeaseCommand { pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { match self.subcommand { - LeaseSubcommand::Create(c) => c.run(opts, self.identity_opts, self.trust_opts), - LeaseSubcommand::List(c) => c.run(opts, self.identity_opts, self.trust_opts), - LeaseSubcommand::Show(c) => c.run(opts, self.identity_opts, self.trust_opts), - LeaseSubcommand::Revoke(c) => c.run(opts, self.identity_opts, self.trust_opts), + LeaseSubcommand::Create(c) => c.run(opts), + LeaseSubcommand::List(c) => c.run(opts), + LeaseSubcommand::Show(c) => c.run(opts), + LeaseSubcommand::Revoke(c) => c.run(opts), } } @@ -58,37 +50,26 @@ impl LeaseCommand { } } -async fn create_project_client( - ctx: &ockam_node::Context, - opts: &CommandGlobalOpts, - identity_opts: &IdentityOpts, - trust_opts: &TrustOpts, -) -> miette::Result { - let node = InMemoryNode::start_with_identity_and_project_name( - ctx, - &opts.state, - identity_opts.identity_name.clone(), - trust_opts.project_name.clone(), - ) - .await?; +fn lease_at_default_value() -> MultiAddr { + // Backwards compatibility with the service running on the project node + MultiAddr::from_str("/project//service/influxdb_token_lease") + .expect("Invalid default value for at") +} - let identity = opts - .state - .get_identity_name_or_default(&identity_opts.identity_name) - .await?; - let project = opts - .state - .projects() - .get_project_by_name_or_default(&trust_opts.project_name) - .await?; +async fn resolve_at_arg(at: &MultiAddr, state: &CliState) -> miette::Result { + let mut at = at.to_string(); + if at.contains("") { + let project_name = state + .projects() + .get_default_project() + .await + .map(|p| p.name().to_string()) + .ok() + .ok_or(Error::arg_validation("at", &at, Some("No projects found")))?; + at = at.replace("", &project_name); + } - node.create_project_client( - &project - .project_identifier() - .ok_or(miette!("The project has no identifier"))?, - project.project_multiaddr().into_diagnostic()?, - Some(identity.clone()), - CredentialsEnabled::On, - ) - .await + // Parse "to" as a multiaddr again with all the values in place + let to = MultiAddr::from_str(&at).into_diagnostic()?; + Ok(process_nodes_multiaddr(&to, state).await?) } diff --git a/implementations/rust/ockam/ockam_command/src/lease/revoke.rs b/implementations/rust/ockam/ockam_command/src/lease/revoke.rs index 621d659519b..02cfb1a5bfc 100644 --- a/implementations/rust/ockam/ockam_command/src/lease/revoke.rs +++ b/implementations/rust/ockam/ockam_command/src/lease/revoke.rs @@ -1,12 +1,15 @@ +use crate::shared_args::{IdentityOpts, TimeoutArg, TrustOpts}; +use crate::util::clean_nodes_multiaddr; +use crate::{docs, Command, CommandGlobalOpts}; +use async_trait::async_trait; use clap::Args; - +use colorful::Colorful; use ockam::Context; -use ockam_api::InfluxDbTokenLease; - -use crate::lease::create_project_client; -use crate::shared_args::{IdentityOpts, TrustOpts}; -use crate::util::async_cmd; -use crate::{docs, CommandGlobalOpts}; +use ockam_api::colors::color_primary; +use ockam_api::nodes::InMemoryNode; +use ockam_api::token_lessor_node_service::InfluxDBTokenLessorNodeServiceTrait; +use ockam_api::{fmt_log, fmt_ok}; +use ockam_multiaddr::MultiAddr; const HELP_DETAIL: &str = ""; @@ -14,39 +17,63 @@ const HELP_DETAIL: &str = ""; #[derive(Clone, Debug, Args)] #[command(help_template = docs::after_help(HELP_DETAIL))] pub struct RevokeCommand { + /// The route to the node that will be used to create the token + #[arg(long, value_name = "ROUTE", default_value_t = super::lease_at_default_value())] + pub at: MultiAddr, + /// ID of the token to revoke #[arg(long, short, id = "token_id", value_name = "TOKEN_ID")] pub token_id: String, + + #[command(flatten)] + pub timeout: TimeoutArg, + + #[command(flatten)] + identity_opts: IdentityOpts, + + #[command(flatten)] + trust_opts: TrustOpts, } -impl RevokeCommand { - pub fn run( - self, - opts: CommandGlobalOpts, - identity_opts: IdentityOpts, - trust_opts: TrustOpts, - ) -> miette::Result<()> { - async_cmd(&self.name(), opts.clone(), |ctx| async move { - self.async_run(&ctx, opts, identity_opts, trust_opts).await - }) - } +#[async_trait] +impl Command for RevokeCommand { + const NAME: &'static str = "lease revoke"; - pub fn name(&self) -> String { - "lease revoke".into() - } + async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { + let cmd = self.parse_args(&opts).await?; + + let node = InMemoryNode::start_with_identity_and_project_name( + ctx, + &opts.state, + cmd.identity_opts.identity_name.clone(), + cmd.trust_opts.project_name.clone(), + ) + .await? + .with_timeout(cmd.timeout.timeout); + + opts.terminal + .write_line(&fmt_log!("Revoking influxdb token {}...\n", cmd.token_id))?; + + let (at, _meta) = clean_nodes_multiaddr(&cmd.at, &opts.state).await?; + node.revoke_token(ctx, &at, &cmd.token_id).await?; + + opts.terminal + .stdout() + .plain(fmt_ok!( + "Token with id {} has been revoked.", + color_primary(&cmd.token_id) + )) + .machine(&cmd.token_id) + .json(serde_json::json!({ "id": &cmd.token_id })) + .write_line()?; - async fn async_run( - &self, - ctx: &Context, - opts: CommandGlobalOpts, - identity_opts: IdentityOpts, - trust_opts: TrustOpts, - ) -> miette::Result<()> { - let project_node = create_project_client(ctx, &opts, &identity_opts, &trust_opts).await?; - project_node - .revoke_token(ctx, self.token_id.clone()) - .await?; - println!("Revoked influxdb token {}.", self.token_id); Ok(()) } } + +impl RevokeCommand { + async fn parse_args(mut self, opts: &CommandGlobalOpts) -> crate::Result { + self.at = super::resolve_at_arg(&self.at, &opts.state).await?; + Ok(self) + } +} diff --git a/implementations/rust/ockam/ockam_command/src/lease/show.rs b/implementations/rust/ockam/ockam_command/src/lease/show.rs index d5ea602b882..d4900acc9fd 100644 --- a/implementations/rust/ockam/ockam_command/src/lease/show.rs +++ b/implementations/rust/ockam/ockam_command/src/lease/show.rs @@ -1,13 +1,14 @@ +use crate::shared_args::{IdentityOpts, TimeoutArg, TrustOpts}; +use crate::util::clean_nodes_multiaddr; +use crate::{docs, Command, CommandGlobalOpts}; +use async_trait::async_trait; use clap::Args; - use ockam::Context; -use ockam_api::InfluxDbTokenLease; - -use crate::lease::create_project_client; -use crate::shared_args::{IdentityOpts, TrustOpts}; -use crate::util::async_cmd; -use crate::{docs, CommandGlobalOpts}; +use ockam_api::fmt_log; +use ockam_api::nodes::InMemoryNode; use ockam_api::output::Output; +use ockam_api::token_lessor_node_service::InfluxDBTokenLessorNodeServiceTrait; +use ockam_multiaddr::MultiAddr; const HELP_DETAIL: &str = ""; @@ -15,43 +16,60 @@ const HELP_DETAIL: &str = ""; #[derive(Clone, Debug, Args)] #[command(help_template = docs::after_help(HELP_DETAIL))] pub struct ShowCommand { + /// The route to the node that will be used to create the token + #[arg(long, value_name = "ROUTE", default_value_t = super::lease_at_default_value())] + pub at: MultiAddr, + /// ID of the token to retrieve #[arg(short, long, value_name = "TOKEN_ID")] pub token_id: String, + + #[command(flatten)] + pub timeout: TimeoutArg, + + #[command(flatten)] + identity_opts: IdentityOpts, + + #[command(flatten)] + trust_opts: TrustOpts, } -impl ShowCommand { - pub fn run( - self, - opts: CommandGlobalOpts, - identity_opts: IdentityOpts, - trust_opts: TrustOpts, - ) -> miette::Result<()> { - async_cmd(&self.name(), opts.clone(), |ctx| async move { - self.async_run(&ctx, opts, identity_opts, trust_opts).await - }) - } +#[async_trait] +impl Command for ShowCommand { + const NAME: &'static str = "lease show"; - pub fn name(&self) -> String { - "lease show".into() - } + async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> { + let cmd = self.parse_args(&opts).await?; + + let node = InMemoryNode::start_with_identity_and_project_name( + ctx, + &opts.state, + cmd.identity_opts.identity_name.clone(), + cmd.trust_opts.project_name.clone(), + ) + .await? + .with_timeout(cmd.timeout.timeout); + + opts.terminal + .write_line(&fmt_log!("Retrieving influxdb token...\n"))?; - async fn async_run( - &self, - ctx: &Context, - opts: CommandGlobalOpts, - identity_opts: IdentityOpts, - trust_opts: TrustOpts, - ) -> miette::Result<()> { - let project_node = create_project_client(ctx, &opts, &identity_opts, &trust_opts).await?; - let token = project_node.get_token(ctx, self.token_id.clone()).await?; + let (at, _meta) = clean_nodes_multiaddr(&cmd.at, &opts.state).await?; + let res = node.create_token(ctx, &at).await?; opts.terminal .stdout() - .plain(token.item()?) - .json(serde_json::json!(&token)) + .machine(res.token.to_string()) + .plain(res.item()?) + .json_obj(res)? .write_line()?; Ok(()) } } + +impl ShowCommand { + async fn parse_args(mut self, opts: &CommandGlobalOpts) -> crate::Result { + self.at = super::resolve_at_arg(&self.at, &opts.state).await?; + Ok(self) + } +} diff --git a/implementations/rust/ockam/ockam_command/src/node/create/config.rs b/implementations/rust/ockam/ockam_command/src/node/create/config.rs index 4da115d95cf..ef49badc535 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/config.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/config.rs @@ -191,6 +191,13 @@ impl NodeConfig { if let Some(project) = &cmd.trust_opts.project_name { self.node.project = Some(project.clone().into()); } + if let Some(launch_config) = &cmd.launch_config { + self.node.launch_config = Some( + serde_json::to_string(launch_config) + .into_diagnostic()? + .into(), + ); + } if let Some(context) = &cmd.opentelemetry_context { self.node.opentelemetry_context = Some(serde_json::to_string(&context).into_diagnostic()?.into()); diff --git a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs index 0c7a1547ee9..12f1a9215e0 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs @@ -3,27 +3,30 @@ use std::sync::Arc; use colorful::Colorful; use miette::{miette, IntoDiagnostic}; +use minicbor::Decoder; use tokio::time::{sleep, Duration}; use tracing::{debug, info, instrument}; +use crate::node::show::is_node_up; +use crate::node::CreateCommand; +use crate::secure_channel::listener::create as secure_channel_listener; +use crate::util::foreground_args::wait_for_exit_signal; +use crate::CommandGlobalOpts; use ockam::tcp::{TcpListenerOptions, TcpTransport}; use ockam::udp::UdpTransport; use ockam::{Address, Context}; use ockam_api::colors::color_primary; -use ockam_api::nodes::InMemoryNode; +use ockam_api::nodes::models::services::StartServiceRequest; use ockam_api::nodes::{ service::{NodeManagerGeneralOptions, NodeManagerTransportOptions}, NodeManagerWorker, NODEMANAGER_ADDR, }; +use ockam_api::nodes::{BackgroundNodeClient, InMemoryNode}; use ockam_api::terminal::notification::NotificationHandler; -use ockam_api::{fmt_log, fmt_ok}; +use ockam_api::{fmt_log, fmt_ok, DefaultAddress}; +use ockam_core::api::{Request, ResponseHeader}; use ockam_core::{route, LOCAL}; -use crate::node::CreateCommand; -use crate::secure_channel::listener::create as secure_channel_listener; -use crate::util::foreground_args::wait_for_exit_signal; -use crate::CommandGlobalOpts; - impl CreateCommand { #[instrument(skip_all, fields(node_name = self.name))] pub(super) async fn foreground_mode( @@ -172,6 +175,15 @@ impl CreateCommand { } async fn start_services(&self, ctx: &Context, opts: &CommandGlobalOpts) -> miette::Result<()> { + // Wait until the node is fully started + let mut node = + BackgroundNodeClient::create(ctx, &opts.state, &Some(self.name.clone())).await?; + if !is_node_up(ctx, &mut node, true).await? { + return Err(miette!( + "Couldn't start services because the node is not up" + )); + } + if let Some(config) = &self.launch_config { if let Some(startup_services) = &config.startup_services { if let Some(cfg) = startup_services.secure_channel_listener.clone() { @@ -188,6 +200,29 @@ impl CreateCommand { .await?; } } + if let Some(cfg) = startup_services.influxdb_token_lessor.clone() { + opts.terminal + .write_line(fmt_log!("Starting InfluxDB token lease manager ..."))?; + let service_name = DefaultAddress::INFLUXDB_TOKEN_LESSOR.to_string(); // TODO: should be configurable + let mut req = vec![]; + Request::post(format!("/node/services/{service_name}")) + .body(StartServiceRequest::new(cfg, service_name)) + .encode(&mut req) + .into_diagnostic()?; + + let resp: Vec = ctx + .send_and_receive(NODEMANAGER_ADDR, req) + .await + .into_diagnostic()?; + let mut dec = Decoder::new(&resp); + let response = dec.decode::().into_diagnostic()?; + if response.is_ok() { + opts.terminal + .write_line(fmt_ok!("InfluxDB token lease manager started"))?; + } else { + return Err(miette!("Failed to start InfluxDB token lease manager")); + } + } } } Ok(()) diff --git a/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs b/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs index 3fe71801785..9cf5243b903 100644 --- a/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs +++ b/implementations/rust/ockam/ockam_command/src/run/parser/resource/node.rs @@ -29,6 +29,8 @@ pub struct Node { pub http_server_port: Option, pub identity: Option, pub project: Option, + #[serde(alias = "launch-config")] + pub launch_config: Option, #[serde(alias = "opentelemetry-context")] pub opentelemetry_context: Option, } @@ -68,6 +70,9 @@ impl Resource for Node { if let Some(project) = self.project { args.insert("project".into(), project); } + if let Some(launch_config) = self.launch_config { + args.insert("launch-config".into(), launch_config); + } if let Some(opentelemetry_context) = self.opentelemetry_context { args.insert("opentelemetry-context".into(), opentelemetry_context); } diff --git a/implementations/rust/ockam/ockam_command/src/service/config.rs b/implementations/rust/ockam/ockam_command/src/service/config.rs index 3f102c9906a..e01eaf72d7a 100644 --- a/implementations/rust/ockam/ockam_command/src/service/config.rs +++ b/implementations/rust/ockam/ockam_command/src/service/config.rs @@ -3,30 +3,10 @@ use std::path::Path; use miette::{Context as _, IntoDiagnostic}; use serde::{Deserialize, Serialize}; +use crate::Result; use ockam::identity::Identifier; use ockam_api::nodes::service::default_address::DefaultAddress; - -use crate::Result; - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct SecureChannelListenerConfig { - #[serde(default = "sec_listener_default_addr")] - pub(crate) address: String, - - #[serde(default)] - pub(crate) authorized_identifiers: Option>, - - #[serde(default)] - pub(crate) disabled: bool, - - #[serde(default)] - pub(crate) identity: Option, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct ServiceConfigs { - pub(crate) secure_channel_listener: Option, -} +use ockam_api::StartInfluxDBLeaseManagerRequest; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Config { @@ -45,6 +25,27 @@ impl Config { } } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ServiceConfigs { + pub(crate) secure_channel_listener: Option, + pub(crate) influxdb_token_lessor: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SecureChannelListenerConfig { + #[serde(default = "sec_listener_default_addr")] + pub(crate) address: String, + + #[serde(default)] + pub(crate) authorized_identifiers: Option>, + + #[serde(default)] + pub(crate) disabled: bool, + + #[serde(default)] + pub(crate) identity: Option, +} + fn sec_listener_default_addr() -> String { DefaultAddress::SECURE_CHANNEL_LISTENER.to_string() } diff --git a/implementations/rust/ockam/ockam_command/src/subcommand.rs b/implementations/rust/ockam/ockam_command/src/subcommand.rs index e4c65b85199..6643259974e 100644 --- a/implementations/rust/ockam/ockam_command/src/subcommand.rs +++ b/implementations/rust/ockam/ockam_command/src/subcommand.rs @@ -106,7 +106,6 @@ pub enum OckamSubcommand { Policy(PolicyCommand), Lease(LeaseCommand), - Run(RunCommand), Status(StatusCommand), Reset(ResetCommand), @@ -163,7 +162,6 @@ impl OckamSubcommand { OckamSubcommand::Policy(c) => c.run(opts), OckamSubcommand::Lease(c) => c.run(opts), - OckamSubcommand::Run(c) => c.run(opts), OckamSubcommand::Status(c) => c.run(opts), OckamSubcommand::Reset(c) => c.run(opts), diff --git a/implementations/rust/ockam/ockam_core/src/api.rs b/implementations/rust/ockam/ockam_core/src/api.rs index 3d187b72f4a..f37f176cd39 100644 --- a/implementations/rust/ockam/ockam_core/src/api.rs +++ b/implementations/rust/ockam/ockam_core/src/api.rs @@ -701,6 +701,11 @@ impl Response { Response::builder(re, Status::Unauthorized) } + pub fn unauthorized_no_request(msg: &str) -> Response { + let e = Error::new_without_path().with_message(msg); + Response::builder(Id::default(), Status::Unauthorized).body(e) + } + pub fn forbidden_no_request(re: Id) -> Response { Response::builder(re, Status::Forbidden) }