From 1823bc3ae95516a9886e58df2d54e9520e63a275 Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Thu, 12 Sep 2024 20:05:22 +0200 Subject: [PATCH] feat(rust): wip add lessor processor to automatically revoke expired tokens --- .../ockam_api/src/influxdb/lease_token.rs | 4 +- .../rust/ockam/ockam_api/src/influxdb/mod.rs | 1 + .../src/influxdb/token_lessor_node_service.rs | 53 ++++++++------ .../src/influxdb/token_lessor_processor.rs | 69 +++++++++++++++++++ .../src/influxdb/token_lessor_worker.rs | 61 +++++++--------- .../ockam_api/src/nodes/service/worker.rs | 4 +- 6 files changed, 131 insertions(+), 61 deletions(-) create mode 100644 implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_processor.rs diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/lease_token.rs b/implementations/rust/ockam/ockam_api/src/influxdb/lease_token.rs index aea07b0435f..163219efcb9 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/lease_token.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/lease_token.rs @@ -11,7 +11,7 @@ use std::fmt::{Display, Formatter}; use strum::{Display, EnumString}; use time::OffsetDateTime; -#[derive(Encode, Decode, CborLen, Serialize, Deserialize, Debug, PartialEq)] +#[derive(Encode, Decode, CborLen, Serialize, Deserialize, Debug, Clone, PartialEq)] #[cbor(map)] pub struct LeaseToken { #[cbor(n(1))] @@ -76,7 +76,7 @@ impl Output for LeaseToken { } #[derive( - Encode, Decode, CborLen, Serialize, Deserialize, PartialEq, Debug, EnumString, Display, + Encode, Decode, CborLen, Serialize, Deserialize, PartialEq, Debug, Clone, EnumString, Display, )] pub enum TokenStatus { #[n(0)] diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/mod.rs b/implementations/rust/ockam/ockam_api/src/influxdb/mod.rs index 6137bee6482..2274cc51617 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/mod.rs @@ -1,5 +1,6 @@ mod lease_token; pub mod token_lessor_node_service; mod token_lessor_worker; +mod token_lessor_processor; pub use token_lessor_node_service::StartInfluxDBLeaseManagerRequest; diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_node_service.rs b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_node_service.rs index 0ed0f980423..067fd3eb8da 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_node_service.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_node_service.rs @@ -1,3 +1,5 @@ +use crate::influxdb::lease_token::LeaseToken; +use crate::influxdb::token_lessor_processor::InfluxDBTokenLessorProcessor; use crate::influxdb::token_lessor_worker::InfluxDBTokenLessorWorker; use crate::nodes::models::services::{DeleteServiceRequest, StartServiceRequest}; use crate::nodes::service::messages::Messages; @@ -9,12 +11,11 @@ use ockam_abac::{Action, PolicyExpression, Resource, ResourceType}; use ockam_core::api::{Error, Request, Response}; use ockam_core::{async_trait, Address}; use ockam_multiaddr::MultiAddr; -use ockam_node::{Context, WorkerBuilder}; +use ockam_node::{Context, ProcessorBuilder, WorkerBuilder}; use serde::{Deserialize, Serialize}; -use crate::influxdb::lease_token::LeaseToken; impl NodeManagerWorker { - pub(crate) async fn start_influxdb_token_lease_manager_service( + pub(crate) async fn start_influxdb_token_lessor_service( &self, context: &Context, body: StartServiceRequest, @@ -22,7 +23,7 @@ impl NodeManagerWorker { let request = body.request().clone(); match self .node_manager - .start_influxdb_token_lease_manager_service( + .start_influxdb_token_lessor_service( context, Address::from_string(body.address()), request, @@ -34,7 +35,7 @@ impl NodeManagerWorker { } } - pub(crate) async fn delete_influxdb_token_lease_manager_service( + pub(crate) async fn delete_influxdb_token_lessor_service( &self, context: &Context, req: DeleteServiceRequest, @@ -42,7 +43,7 @@ impl NodeManagerWorker { let address = req.address(); match self .node_manager - .delete_influxdb_token_lease_manager_service(context, address.clone()) + .delete_influxdb_token_lessor_service(context, address.clone()) .await { Ok(Some(_)) => Ok(Response::ok()), @@ -55,7 +56,7 @@ impl NodeManagerWorker { } impl InMemoryNode { - async fn start_influxdb_token_lease_manager_service( + async fn start_influxdb_token_lessor_service( &self, context: &Context, address: Address, @@ -84,32 +85,37 @@ impl InMemoryNode { ) .await?; - WorkerBuilder::new( - InfluxDBTokenLessorWorker::new( - address.clone(), - req.influxdb_address, - req.influxdb_org_id, - req.influxdb_token, - req.token_permissions, - req.token_ttl, - ) - .await?, + let worker = InfluxDBTokenLessorWorker::new( + address.clone(), + req.influxdb_address, + req.influxdb_org_id, + req.influxdb_token, + req.token_permissions, + req.token_ttl, ) - .with_address(address.clone()) - .with_incoming_access_control_arc(incoming_ac) - .with_outgoing_access_control_arc(outgoing_ac) - .start(context) .await?; + let processor = InfluxDBTokenLessorProcessor::new(worker.state.clone()); + WorkerBuilder::new(worker) + .with_address(address.clone()) + .with_incoming_access_control_arc(incoming_ac) + .with_outgoing_access_control_arc(outgoing_ac) + .start(context) + .await?; self.registry .influxdb_services .insert(address.clone(), ()) .await; + ProcessorBuilder::new(processor) + .with_address(format!("{address}-processor")) + .start(context) + .await?; + Ok(()) } - async fn delete_influxdb_token_lease_manager_service( + async fn delete_influxdb_token_lessor_service( &self, context: &Context, address: Address, @@ -119,6 +125,9 @@ impl InMemoryNode { None => Ok(None), Some(_) => { context.stop_worker(address.clone()).await?; + context + .stop_processor(format!("{address}-processor")) + .await?; self.registry.influxdb_services.remove(&address).await; Ok(Some(())) } diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_processor.rs b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_processor.rs new file mode 100644 index 00000000000..d7ebcecb90d --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_processor.rs @@ -0,0 +1,69 @@ +use crate::influxdb::lease_token::LeaseToken; +use crate::influxdb::token_lessor_worker::InfluxDBTokenLessorState; +use ockam_core::{async_trait, Processor}; +use ockam_node::Context; +use std::sync::Arc; +use tokio::sync::RwLock; + +pub(crate) struct InfluxDBTokenLessorProcessor { + state: Arc>, +} + +impl InfluxDBTokenLessorProcessor { + pub(crate) fn new(state: Arc>) -> Self { + Self { state } + } + + async fn list_tokens(&self) -> ockam_core::Result> { + debug!("Listing tokens"); + Ok(vec![]) + } + + async fn revoke_outstanding_tokens(&self) -> ockam_core::Result<()> { + debug!("Revoking outstanding tokens"); + let mut state = self.state.write().await; + let expired_tokens = state + .active_tokens + .iter() + .filter(|token| token.is_expired().unwrap_or(false)) + .collect::>(); + let mut to_remove = vec![]; + for token in expired_tokens { + to_remove.push(token.id.clone()); + } + state + .active_tokens + .retain(|token| !to_remove.contains(&token.id)); + Ok(()) + } +} + +#[async_trait] +impl Processor for InfluxDBTokenLessorProcessor { + type Context = Context; + + async fn initialize(&mut self, _context: &mut Self::Context) -> ockam_core::Result<()> { + loop { + if let Ok(tokens) = self.list_tokens().await { + let mut state = self.state.write().await; + state.active_tokens = tokens; + break; + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + Ok(()) + } + + async fn shutdown(&mut self, _context: &mut Self::Context) -> ockam_core::Result<()> { + debug!("Shutting down InfluxDBTokenLeaseManagerWorker"); + Ok(()) + } + + async fn process(&mut self, _context: &mut Self::Context) -> ockam_core::Result { + if let Err(err) = self.revoke_outstanding_tokens().await { + error!("Failed to revoke outstanding tokens: {err}"); + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + Ok(true) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_worker.rs b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_worker.rs index c7288b6051a..4d8318d5f0c 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_worker.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/token_lessor_worker.rs @@ -17,7 +17,7 @@ use tokio::sync::RwLock; #[derive(Clone)] pub(crate) struct InfluxDBTokenLessorWorker { - state: Arc>, + pub(crate) state: Arc>, } impl InfluxDBTokenLessorWorker { @@ -29,7 +29,7 @@ impl InfluxDBTokenLessorWorker { token_permissions: String, token_ttl: i32, ) -> ockam_core::Result { - debug!("Creating InfluxDBTokenLeaseManagerWorker"); + debug!("Creating InfluxDBTokenLessorWorker"); let http_client = reqwest::ClientBuilder::new().build().map_err(|e| { ockam_core::Error::new( @@ -47,23 +47,16 @@ impl InfluxDBTokenLessorWorker { token_permissions, token_ttl: Duration::seconds(token_ttl as i64), http_client, + active_tokens: vec![], })), }; - _self.expire_outstanding_tokens().await?; Ok(_self) } - async fn expire_outstanding_tokens(&self) -> ockam_core::Result<()> { - //TODO: list all (OCKAM-generated) tokens on influxdb, and revoke from influxdb the ones - // that must be expired. This because we might have generated tokens, and then the - // node got restarted, need to check which tokens must expire. - Ok(()) - } - #[instrument(skip_all, fields(method = ?req.method(), path = req.path()))] async fn handle_request( &mut self, - ctx: &mut Context, + _ctx: &mut Context, requester: &Identifier, req: &RequestHeader, _dec: &mut Decoder<'_>, @@ -85,13 +78,11 @@ impl InfluxDBTokenLessorWorker { debug!(path_segments = ?path_segments.as_slice().iter().map(|s| s.to_string()).collect::>(), "Handling request"); let r = match (method, path_segments.as_slice()) { - (Post, [""]) => encode_response(req, self.create_token(ctx, requester).await)?, - (Get, [""]) => encode_response(req, self.list_tokens(ctx, requester).await)?, - (Get, [token_id]) => { - encode_response(req, self.get_token(ctx, requester, token_id).await)? - } + (Post, [""]) => encode_response(req, self.create_token(requester).await)?, + (Get, [""]) => encode_response(req, self.list_tokens(requester).await)?, + (Get, [token_id]) => encode_response(req, self.get_token(requester, token_id).await)?, (Delete, [token_id]) => { - encode_response(req, self.revoke_token(ctx, requester, token_id).await)? + encode_response(req, self.revoke_token(requester, token_id).await)? } // ==*== Catch-all for Unimplemented APIs ==*== _ => { @@ -110,11 +101,11 @@ impl Worker for InfluxDBTokenLessorWorker { type Context = Context; async fn shutdown(&mut self, _ctx: &mut Self::Context) -> ockam_core::Result<()> { - debug!("Shutting down InfluxDBTokenLeaseManagerWorker"); + debug!("Shutting down InfluxDBTokenLessorWorker"); Ok(()) } - #[instrument(skip_all, name = "InfluxDBTokenLeaseWorker::handle_message")] + #[instrument(skip_all, name = "InfluxDBTokenLessorWorker::handle_message")] async fn handle_message( &mut self, ctx: &mut Context, @@ -163,40 +154,37 @@ impl Worker for InfluxDBTokenLessorWorker { } pub(crate) struct InfluxDBTokenLessorState { - address: Address, - influxdb_address: String, - influxdb_org_id: String, - influxdb_token: String, - token_permissions: String, - token_ttl: Duration, - http_client: Client, + pub(super) address: Address, + pub(super) influxdb_address: String, + pub(super) influxdb_org_id: String, + pub(super) influxdb_token: String, + pub(super) token_permissions: String, + pub(super) token_ttl: Duration, + pub(super) http_client: Client, + pub(super) active_tokens: Vec, } #[async_trait] pub trait InfluxDBTokenLessorWorkerTrait { async fn create_token( &self, - ctx: &Context, requester: &Identifier, ) -> Result, Response>; async fn get_token( &self, - ctx: &Context, requester: &Identifier, token_id: &str, ) -> Result, Response>; async fn revoke_token( &self, - ctx: &Context, requester: &Identifier, token_id: &str, ) -> Result>; async fn list_tokens( &self, - ctx: &Context, requester: &Identifier, ) -> Result>, Response>; } @@ -205,7 +193,6 @@ pub trait InfluxDBTokenLessorWorkerTrait { impl InfluxDBTokenLessorWorkerTrait for InfluxDBTokenLessorWorker { async fn create_token( &self, - _ctx: &Context, requester: &Identifier, ) -> Result, Response> { debug!(%requester, "Creating token"); @@ -250,7 +237,13 @@ impl InfluxDBTokenLessorWorkerTrait for InfluxDBTokenLessorWorker { )) })?; match lease_token { - Some(lease_token) => Ok(Response::ok().body(lease_token)), + Some(lease_token) => { + { + let mut state = self.state.write().await; + state.active_tokens.push(lease_token.clone()); + } + Ok(Response::ok().body(lease_token)) + } None => { warn!("Token does not contain Ockam metadata, ignoring"); Err(Response::bad_request_no_request( @@ -270,7 +263,6 @@ impl InfluxDBTokenLessorWorkerTrait for InfluxDBTokenLessorWorker { async fn get_token( &self, - _ctx: &Context, requester: &Identifier, token_id: &str, ) -> Result, Response> { @@ -327,7 +319,6 @@ impl InfluxDBTokenLessorWorkerTrait for InfluxDBTokenLessorWorker { async fn revoke_token( &self, - ctx: &Context, requester: &Identifier, token_id: &str, ) -> Result> { @@ -335,12 +326,12 @@ impl InfluxDBTokenLessorWorkerTrait for InfluxDBTokenLessorWorker { // NOTE!!!: first retrieve the token, check if the identifier that created it is the same // one deleting it, and if so revoke it. Otherwise the user is not authorized for doing it. debug!(%requester, %token_id, "Revoking token"); + // TODO: remove token from active_tokens Ok(Response::ok()) } async fn list_tokens( &self, - _ctx: &Context, requester: &Identifier, ) -> Result>, Response> { debug!(%requester, "Listing tokens"); diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs index 3b931b92f07..91d7818d761 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs @@ -144,13 +144,13 @@ impl NodeManagerWorker { )?, (Post, ["node", "services", DefaultAddress::INFLUXDB_TOKEN_LESSOR]) => encode_response( req, - self.start_influxdb_token_lease_manager_service(ctx, dec.decode()?) + self.start_influxdb_token_lessor_service(ctx, dec.decode()?) .await, )?, (Delete, ["node", "services", DefaultAddress::INFLUXDB_TOKEN_LESSOR]) => { encode_response( req, - self.delete_influxdb_token_lease_manager_service(ctx, dec.decode()?) + self.delete_influxdb_token_lessor_service(ctx, dec.decode()?) .await, )? }