Skip to content

Commit

Permalink
feat(rust): wip add lessor processor to automatically revoke expired …
Browse files Browse the repository at this point in the history
…tokens
  • Loading branch information
adrianbenavides committed Sep 12, 2024
1 parent 8b5013f commit 1823bc3
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::fmt::{Display, Formatter};
use strum::{Display, EnumString};
use time::OffsetDateTime;

#[derive(Encode, Decode, CborLen, Serialize, Deserialize, Debug, PartialEq)]
#[derive(Encode, Decode, CborLen, Serialize, Deserialize, Debug, Clone, PartialEq)]
#[cbor(map)]
pub struct LeaseToken {
#[cbor(n(1))]
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Output for LeaseToken {
}

#[derive(
Encode, Decode, CborLen, Serialize, Deserialize, PartialEq, Debug, EnumString, Display,
Encode, Decode, CborLen, Serialize, Deserialize, PartialEq, Debug, Clone, EnumString, Display,
)]
pub enum TokenStatus {
#[n(0)]
Expand Down
1 change: 1 addition & 0 deletions implementations/rust/ockam/ockam_api/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod lease_token;
pub mod token_lessor_node_service;
mod token_lessor_worker;
mod token_lessor_processor;

pub use token_lessor_node_service::StartInfluxDBLeaseManagerRequest;
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::influxdb::lease_token::LeaseToken;
use crate::influxdb::token_lessor_processor::InfluxDBTokenLessorProcessor;
use crate::influxdb::token_lessor_worker::InfluxDBTokenLessorWorker;
use crate::nodes::models::services::{DeleteServiceRequest, StartServiceRequest};
use crate::nodes::service::messages::Messages;
Expand All @@ -9,20 +11,19 @@ use ockam_abac::{Action, PolicyExpression, Resource, ResourceType};
use ockam_core::api::{Error, Request, Response};
use ockam_core::{async_trait, Address};
use ockam_multiaddr::MultiAddr;
use ockam_node::{Context, WorkerBuilder};
use ockam_node::{Context, ProcessorBuilder, WorkerBuilder};
use serde::{Deserialize, Serialize};
use crate::influxdb::lease_token::LeaseToken;

impl NodeManagerWorker {
pub(crate) async fn start_influxdb_token_lease_manager_service(
pub(crate) async fn start_influxdb_token_lessor_service(
&self,
context: &Context,
body: StartServiceRequest<StartInfluxDBLeaseManagerRequest>,
) -> Result<Response, Response<Error>> {
let request = body.request().clone();
match self
.node_manager
.start_influxdb_token_lease_manager_service(
.start_influxdb_token_lessor_service(
context,
Address::from_string(body.address()),
request,
Expand All @@ -34,15 +35,15 @@ impl NodeManagerWorker {
}
}

pub(crate) async fn delete_influxdb_token_lease_manager_service(
pub(crate) async fn delete_influxdb_token_lessor_service(
&self,
context: &Context,
req: DeleteServiceRequest,
) -> Result<Response, Response<Error>> {
let address = req.address();
match self
.node_manager
.delete_influxdb_token_lease_manager_service(context, address.clone())
.delete_influxdb_token_lessor_service(context, address.clone())
.await
{
Ok(Some(_)) => Ok(Response::ok()),
Expand All @@ -55,7 +56,7 @@ impl NodeManagerWorker {
}

impl InMemoryNode {
async fn start_influxdb_token_lease_manager_service(
async fn start_influxdb_token_lessor_service(
&self,
context: &Context,
address: Address,
Expand Down Expand Up @@ -84,32 +85,37 @@ impl InMemoryNode {
)
.await?;

WorkerBuilder::new(
InfluxDBTokenLessorWorker::new(
address.clone(),
req.influxdb_address,
req.influxdb_org_id,
req.influxdb_token,
req.token_permissions,
req.token_ttl,
)
.await?,
let worker = InfluxDBTokenLessorWorker::new(
address.clone(),
req.influxdb_address,
req.influxdb_org_id,
req.influxdb_token,
req.token_permissions,
req.token_ttl,
)
.with_address(address.clone())
.with_incoming_access_control_arc(incoming_ac)
.with_outgoing_access_control_arc(outgoing_ac)
.start(context)
.await?;
let processor = InfluxDBTokenLessorProcessor::new(worker.state.clone());

WorkerBuilder::new(worker)
.with_address(address.clone())
.with_incoming_access_control_arc(incoming_ac)
.with_outgoing_access_control_arc(outgoing_ac)
.start(context)
.await?;
self.registry
.influxdb_services
.insert(address.clone(), ())
.await;

ProcessorBuilder::new(processor)
.with_address(format!("{address}-processor"))
.start(context)
.await?;

Ok(())
}

async fn delete_influxdb_token_lease_manager_service(
async fn delete_influxdb_token_lessor_service(
&self,
context: &Context,
address: Address,
Expand All @@ -119,6 +125,9 @@ impl InMemoryNode {
None => Ok(None),
Some(_) => {
context.stop_worker(address.clone()).await?;
context
.stop_processor(format!("{address}-processor"))
.await?;
self.registry.influxdb_services.remove(&address).await;
Ok(Some(()))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::influxdb::lease_token::LeaseToken;
use crate::influxdb::token_lessor_worker::InfluxDBTokenLessorState;
use ockam_core::{async_trait, Processor};
use ockam_node::Context;
use std::sync::Arc;
use tokio::sync::RwLock;

pub(crate) struct InfluxDBTokenLessorProcessor {
state: Arc<RwLock<InfluxDBTokenLessorState>>,
}

impl InfluxDBTokenLessorProcessor {
pub(crate) fn new(state: Arc<RwLock<InfluxDBTokenLessorState>>) -> Self {
Self { state }
}

async fn list_tokens(&self) -> ockam_core::Result<Vec<LeaseToken>> {
debug!("Listing tokens");
Ok(vec![])
}

async fn revoke_outstanding_tokens(&self) -> ockam_core::Result<()> {
debug!("Revoking outstanding tokens");
let mut state = self.state.write().await;
let expired_tokens = state
.active_tokens
.iter()
.filter(|token| token.is_expired().unwrap_or(false))
.collect::<Vec<_>>();
let mut to_remove = vec![];
for token in expired_tokens {
to_remove.push(token.id.clone());
}
state
.active_tokens
.retain(|token| !to_remove.contains(&token.id));
Ok(())
}
}

#[async_trait]
impl Processor for InfluxDBTokenLessorProcessor {
type Context = Context;

async fn initialize(&mut self, _context: &mut Self::Context) -> ockam_core::Result<()> {
loop {
if let Ok(tokens) = self.list_tokens().await {
let mut state = self.state.write().await;
state.active_tokens = tokens;
break;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
}

async fn shutdown(&mut self, _context: &mut Self::Context) -> ockam_core::Result<()> {
debug!("Shutting down InfluxDBTokenLeaseManagerWorker");
Ok(())
}

async fn process(&mut self, _context: &mut Self::Context) -> ockam_core::Result<bool> {
if let Err(err) = self.revoke_outstanding_tokens().await {
error!("Failed to revoke outstanding tokens: {err}");
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::sync::RwLock;

#[derive(Clone)]
pub(crate) struct InfluxDBTokenLessorWorker {
state: Arc<RwLock<InfluxDBTokenLessorState>>,
pub(crate) state: Arc<RwLock<InfluxDBTokenLessorState>>,
}

impl InfluxDBTokenLessorWorker {
Expand All @@ -29,7 +29,7 @@ impl InfluxDBTokenLessorWorker {
token_permissions: String,
token_ttl: i32,
) -> ockam_core::Result<Self> {
debug!("Creating InfluxDBTokenLeaseManagerWorker");
debug!("Creating InfluxDBTokenLessorWorker");

let http_client = reqwest::ClientBuilder::new().build().map_err(|e| {
ockam_core::Error::new(
Expand All @@ -47,23 +47,16 @@ impl InfluxDBTokenLessorWorker {
token_permissions,
token_ttl: Duration::seconds(token_ttl as i64),
http_client,
active_tokens: vec![],
})),
};
_self.expire_outstanding_tokens().await?;
Ok(_self)
}

async fn expire_outstanding_tokens(&self) -> ockam_core::Result<()> {
//TODO: list all (OCKAM-generated) tokens on influxdb, and revoke from influxdb the ones
// that must be expired. This because we might have generated tokens, and then the
// node got restarted, need to check which tokens must expire.
Ok(())
}

#[instrument(skip_all, fields(method = ?req.method(), path = req.path()))]
async fn handle_request(
&mut self,
ctx: &mut Context,
_ctx: &mut Context,
requester: &Identifier,
req: &RequestHeader,
_dec: &mut Decoder<'_>,
Expand All @@ -85,13 +78,11 @@ impl InfluxDBTokenLessorWorker {
debug!(path_segments = ?path_segments.as_slice().iter().map(|s| s.to_string()).collect::<Vec<_>>(), "Handling request");

let r = match (method, path_segments.as_slice()) {
(Post, [""]) => encode_response(req, self.create_token(ctx, requester).await)?,
(Get, [""]) => encode_response(req, self.list_tokens(ctx, requester).await)?,
(Get, [token_id]) => {
encode_response(req, self.get_token(ctx, requester, token_id).await)?
}
(Post, [""]) => encode_response(req, self.create_token(requester).await)?,
(Get, [""]) => encode_response(req, self.list_tokens(requester).await)?,
(Get, [token_id]) => encode_response(req, self.get_token(requester, token_id).await)?,
(Delete, [token_id]) => {
encode_response(req, self.revoke_token(ctx, requester, token_id).await)?
encode_response(req, self.revoke_token(requester, token_id).await)?
}
// ==*== Catch-all for Unimplemented APIs ==*==
_ => {
Expand All @@ -110,11 +101,11 @@ impl Worker for InfluxDBTokenLessorWorker {
type Context = Context;

async fn shutdown(&mut self, _ctx: &mut Self::Context) -> ockam_core::Result<()> {
debug!("Shutting down InfluxDBTokenLeaseManagerWorker");
debug!("Shutting down InfluxDBTokenLessorWorker");
Ok(())
}

#[instrument(skip_all, name = "InfluxDBTokenLeaseWorker::handle_message")]
#[instrument(skip_all, name = "InfluxDBTokenLessorWorker::handle_message")]
async fn handle_message(
&mut self,
ctx: &mut Context,
Expand Down Expand Up @@ -163,40 +154,37 @@ impl Worker for InfluxDBTokenLessorWorker {
}

pub(crate) struct InfluxDBTokenLessorState {
address: Address,
influxdb_address: String,
influxdb_org_id: String,
influxdb_token: String,
token_permissions: String,
token_ttl: Duration,
http_client: Client,
pub(super) address: Address,
pub(super) influxdb_address: String,
pub(super) influxdb_org_id: String,
pub(super) influxdb_token: String,
pub(super) token_permissions: String,
pub(super) token_ttl: Duration,
pub(super) http_client: Client,
pub(super) active_tokens: Vec<LeaseToken>,
}

#[async_trait]
pub trait InfluxDBTokenLessorWorkerTrait {
async fn create_token(
&self,
ctx: &Context,
requester: &Identifier,
) -> Result<Response<LeaseToken>, Response<ockam_core::api::Error>>;

async fn get_token(
&self,
ctx: &Context,
requester: &Identifier,
token_id: &str,
) -> Result<Response<LeaseToken>, Response<ockam_core::api::Error>>;

async fn revoke_token(
&self,
ctx: &Context,
requester: &Identifier,
token_id: &str,
) -> Result<Response, Response<ockam_core::api::Error>>;

async fn list_tokens(
&self,
ctx: &Context,
requester: &Identifier,
) -> Result<Response<Vec<LeaseToken>>, Response<ockam_core::api::Error>>;
}
Expand All @@ -205,7 +193,6 @@ pub trait InfluxDBTokenLessorWorkerTrait {
impl InfluxDBTokenLessorWorkerTrait for InfluxDBTokenLessorWorker {
async fn create_token(
&self,
_ctx: &Context,
requester: &Identifier,
) -> Result<Response<LeaseToken>, Response<ockam_core::api::Error>> {
debug!(%requester, "Creating token");
Expand Down Expand Up @@ -250,7 +237,13 @@ impl InfluxDBTokenLessorWorkerTrait for InfluxDBTokenLessorWorker {
))
})?;
match lease_token {
Some(lease_token) => Ok(Response::ok().body(lease_token)),
Some(lease_token) => {
{
let mut state = self.state.write().await;
state.active_tokens.push(lease_token.clone());
}
Ok(Response::ok().body(lease_token))
}
None => {
warn!("Token does not contain Ockam metadata, ignoring");
Err(Response::bad_request_no_request(
Expand All @@ -270,7 +263,6 @@ impl InfluxDBTokenLessorWorkerTrait for InfluxDBTokenLessorWorker {

async fn get_token(
&self,
_ctx: &Context,
requester: &Identifier,
token_id: &str,
) -> Result<Response<LeaseToken>, Response<ockam_core::api::Error>> {
Expand Down Expand Up @@ -327,20 +319,19 @@ impl InfluxDBTokenLessorWorkerTrait for InfluxDBTokenLessorWorker {

async fn revoke_token(
&self,
ctx: &Context,
requester: &Identifier,
token_id: &str,
) -> Result<Response, Response<ockam_core::api::Error>> {
// TODO: https://docs.influxdata.com/influxdb/v2/api/#operation/DeleteAuthorizationsID
// NOTE!!!: first retrieve the token, check if the identifier that created it is the same
// one deleting it, and if so revoke it. Otherwise the user is not authorized for doing it.
debug!(%requester, %token_id, "Revoking token");
// TODO: remove token from active_tokens
Ok(Response::ok())
}

async fn list_tokens(
&self,
_ctx: &Context,
requester: &Identifier,
) -> Result<Response<Vec<LeaseToken>>, Response<ockam_core::api::Error>> {
debug!(%requester, "Listing tokens");
Expand Down
Loading

0 comments on commit 1823bc3

Please sign in to comment.