From be0ca4239d319d75f9e5d460adcdae7320356ae0 Mon Sep 17 00:00:00 2001 From: Andre Weber Date: Tue, 1 Oct 2024 08:06:36 +0200 Subject: [PATCH] Fix Architectural Issues --- databroker/Cargo.toml | 7 +- databroker/src/broker.rs | 206 +++++++++++++++++------- databroker/src/grpc/kuksa_val_v2/val.rs | 56 +++++-- 3 files changed, 195 insertions(+), 74 deletions(-) diff --git a/databroker/Cargo.toml b/databroker/Cargo.toml index 50b6c0cd..60dcfc46 100644 --- a/databroker/Cargo.toml +++ b/databroker/Cargo.toml @@ -60,9 +60,10 @@ jemallocator = { version = "0.5.0", optional = true } lazy_static = "1.4.0" thiserror = "1.0.47" +futures = { version = "0.3.28" } + # VISS axum = { version = "0.6.20", optional = true, features = ["ws"] } -futures = { version = "0.3.28", optional = true } chrono = { version = "0.4.31", optional = true, features = ["std"] } uuid = { version = "1.4.1", optional = true, features = ["v4"] } async-trait = "0.1.82" @@ -72,10 +73,10 @@ async-trait = "0.1.82" sd-notify = "0.4.1" [features] -default = ["tls", "dep:futures"] +default = ["tls"] tls = ["tonic/tls"] jemalloc = ["dep:jemallocator"] -viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"] +viss = ["dep:axum", "dep:chrono", "dep:uuid"] libtest = [] [build-dependencies] diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 77684569..95a5e8d7 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -33,6 +33,19 @@ use tracing::{debug, info, warn}; use crate::glob; +#[derive(Debug)] +pub enum ActuationError { + NotFound, + WrongType, + OutOfBounds, + UnsupportedType, + PermissionDenied, + PermissionExpired, + ProviderNotAvailable, + ProviderAlreadyExists, + TransmissionFailure, +} + #[derive(Debug)] pub enum UpdateError { NotFound, @@ -182,7 +195,11 @@ pub struct DataBroker { #[async_trait::async_trait] pub trait ActuationProvider { - async fn actuate(&self, actuation_changes: Vec) -> Result<(), tonic::Status>; + async fn actuate( + &self, + actuation_changes: Vec, + ) -> Result<(), (ActuationError, String)>; + fn is_available(&self) -> bool; } #[derive(Clone)] @@ -707,7 +724,6 @@ impl Subscriptions { } pub fn cleanup(&mut self) { - // TODO how to cleanup actuation_subscriptions? self.query_subscriptions.retain(|sub| { if sub.sender.is_closed() { info!("Subscriber gone: removing subscription"); @@ -734,6 +750,28 @@ impl Subscriptions { } } }); + + self.actuation_subscriptions.retain(|sub| { + if !sub.actuation_provider.is_available() { + info!("actuation provider gone: removing subscription"); + false + } else { + match sub.permissions.expired() { + Ok(()) => true, + Err(PermissionError::Expired) => { + info!("Token expired for actuation provider: removing subscription"); + false + } + Err(err) => { + info!( + "actuation provider error: {:?} -> removing subscription", + err + ); + false + } + } + } + }); } } @@ -1590,7 +1628,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { &self, vss_ids: Vec, actuation_provider: Box, - ) -> Result<(), tonic::Status> { + ) -> Result<(), (ActuationError, String)> { for vss_id in vss_ids.clone() { let result_entry = self.get_entry_by_id(vss_id).await; match result_entry { @@ -1598,18 +1636,33 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let vss_path = entry.metadata.path; let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path); - if result_can_write_actuator.is_err() { - let can_write_actuator = result_can_write_actuator.unwrap_err(); - let message = format!("Can not provide actuation for vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator); - return Err(tonic::Status::permission_denied(message)); + match result_can_write_actuator { + Ok(_) => {} + Err(PermissionError::Denied) => { + let message = format!("Permission denied for vss_path {}", vss_path); + return Err((ActuationError::PermissionDenied, message)); + } + Err(PermissionError::Expired) => { + return Err(( + ActuationError::PermissionExpired, + "Permission expired".to_string(), + )) + } } } - Err(error) => { - let message = format!( - "Could not resolve vss_path for vss_id {}: {:?}", - vss_id, error - ); - return Err(tonic::Status::invalid_argument(message)); + Err(ReadError::NotFound) => { + let message = format!("Could not resolve vss_path of vss_id {}", vss_id); + return Err((ActuationError::NotFound, message)); + } + Err(ReadError::PermissionDenied) => { + let message = format!("Permission denied for vss_id {}", vss_id); + return Err((ActuationError::PermissionDenied, message)); + } + Err(ReadError::PermissionExpired) => { + return Err(( + ActuationError::PermissionExpired, + "Permission expired".to_string(), + )) } } } @@ -1629,10 +1682,10 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .collect(); if !intersection.is_empty() { let message = format!( - "ActuationProvider(s) for the following vss_ids already registered: {:?}", + "actuation providers for the following vss_ids already registered: {:?}", intersection ); - return Err(tonic::Status::invalid_argument(message)); + return Err((ActuationError::ProviderAlreadyExists, message)); } let actuation_subscription: ActuationSubscription = ActuationSubscription { @@ -1674,7 +1727,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { pub async fn batch_actuate( &self, actuation_changes: Vec, - ) -> Result<(), tonic::Status> { + ) -> Result<(), (ActuationError, String)> { let actuation_subscriptions = &self .broker .subscriptions @@ -1690,21 +1743,33 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let vss_path = entry.metadata.path; let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path); - if result_can_write_actuator.is_err() { - let can_write_actuator = result_can_write_actuator.unwrap_err(); - let message = format!( - "Can not actuate vss_path '{}' due to permission error {:?}", - vss_path, can_write_actuator - ); - return Err(tonic::Status::permission_denied(message)); + match result_can_write_actuator { + Ok(_) => {} + Err(PermissionError::Denied) => { + let message = format!("Permission denied for vss_path {}", vss_path); + return Err((ActuationError::PermissionDenied, message)); + } + Err(PermissionError::Expired) => { + return Err(( + ActuationError::PermissionExpired, + "Permission expired".to_string(), + )) + } } } - Err(error) => { - let message = format!( - "Could not resolve vss_path for vss_id {}: {:?}", - vss_id, error - ); - return Err(tonic::Status::invalid_argument(message)); + Err(ReadError::NotFound) => { + let message = format!("Could not resolve vss_path of vss_id {}", vss_id); + return Err((ActuationError::NotFound, message)); + } + Err(ReadError::PermissionDenied) => { + let message = format!("Permission denied for vss_id {}", vss_id); + return Err((ActuationError::PermissionDenied, message)); + } + Err(ReadError::PermissionExpired) => { + return Err(( + ActuationError::PermissionExpired, + "Permission expired".to_string(), + )) } } } @@ -1727,8 +1792,8 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .await? } None => { - let message = format!("No actuation provider available for vss_id: {}", vss_id); - return Err(tonic::Status::unavailable(message)); + let message = format!("actuation provider for vss_id {} not available", vss_id); + return Err((ActuationError::ProviderNotAvailable, message)); } } } @@ -1736,7 +1801,11 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { Ok(()) } - pub async fn actuate(&self, vss_id: &i32, data_value: &DataValue) -> Result<(), tonic::Status> { + pub async fn actuate( + &self, + vss_id: &i32, + data_value: &DataValue, + ) -> Result<(), (ActuationError, String)> { let vss_id = *vss_id; let result_entry = self.get_entry_by_id(vss_id).await; match result_entry { @@ -1744,21 +1813,33 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let vss_path = entry.metadata.path; let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path); - if result_can_write_actuator.is_err() { - let can_write_actuator = result_can_write_actuator.unwrap_err(); - let message = format!( - "Can not actuate vss_path '{}' due to permission error {:?}", - vss_path, can_write_actuator - ); - return Err(tonic::Status::permission_denied(message)); + match result_can_write_actuator { + Ok(_) => {} + Err(PermissionError::Denied) => { + let message = format!("Permission denied for vss_path {}", vss_path); + return Err((ActuationError::PermissionDenied, message)); + } + Err(PermissionError::Expired) => { + return Err(( + ActuationError::PermissionExpired, + "Permission expired".to_string(), + )) + } } } - Err(error) => { - let message = format!( - "Could not resolve vss_path for vss_id {}: {:?}", - vss_id, error - ); - return Err(tonic::Status::invalid_argument(message)); + Err(ReadError::NotFound) => { + let message = format!("Could not resolve vss_path of vss_id {}", vss_id); + return Err((ActuationError::NotFound, message)); + } + Err(ReadError::PermissionDenied) => { + let message = format!("Permission denied for vss_id {}", vss_id); + return Err((ActuationError::PermissionDenied, message)); + } + Err(ReadError::PermissionExpired) => { + return Err(( + ActuationError::PermissionExpired, + "Permission expired".to_string(), + )) } } @@ -1792,24 +1873,37 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { } } - let guard = self.broker.subscriptions.read().await; - let opt_actuation_subscription = guard + let mut guard = self.broker.subscriptions.write().await; + let opt_actuation_subscription = &guard .actuation_subscriptions .iter() .find(|subscription| subscription.vss_ids.contains(&vss_id)); match opt_actuation_subscription { Some(actuation_subscription) => { - actuation_subscription - .actuation_provider - .actuate(vec![ActuationChange { - id: vss_id, - data_value: data_value.clone(), - }]) - .await + let is_expired = actuation_subscription.permissions.expired(); + match is_expired { + Err(_) => { + let message = format!( + "Permission for vss_ids {:?} expired, removing actuation subscription", + actuation_subscription.vss_ids + ); + guard.cleanup(); + Err((ActuationError::PermissionExpired, message)) + } + Ok(_) => { + actuation_subscription + .actuation_provider + .actuate(vec![ActuationChange { + id: vss_id, + data_value: data_value.clone(), + }]) + .await + } + } } None => { - let message = format!("No actuation provider found for vss_id {}", vss_id); - Err(tonic::Status::new(tonic::Code::Unavailable, message)) + let message = format!("Provider for vss_id {} does not exist", vss_id); + Err((ActuationError::ProviderNotAvailable, message)) } } } diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index 6cc6054d..7bd7be32 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -48,7 +48,7 @@ impl ActuationProvider for Provider { async fn actuate( &self, actuation_changes: Vec, - ) -> Result<(), tonic::Status> { + ) -> Result<(), (broker::ActuationError, String)> { let mut actuation_requests: Vec = vec![]; for actuation_change in actuation_changes { let data_value = actuation_change.data_value; @@ -75,15 +75,22 @@ impl ActuationProvider for Provider { if result.is_err() { let send_error = result.unwrap_err().0; if send_error.is_err() { - let status = send_error.unwrap_err(); - return Err(status); + return Err(( + broker::ActuationError::TransmissionFailure, + "An error occured while sending the data".to_string(), + )); } - return Err(tonic::Status::cancelled( - "Could not send actuation changes to actuation provider", + return Err(( + broker::ActuationError::TransmissionFailure, + "An error occured while sending the data".to_string(), )); } return Ok(()); } + + fn is_available(&self) -> bool { + !self.sender.is_closed() + } } #[tonic::async_trait] @@ -239,7 +246,11 @@ impl proto::val_server::Val for broker::DataBroker { let result = broker.actuate(&id, &DataValue::from(value)).await; match result { Ok(_) => return Ok(tonic::Response::new(ActuateResponse {})), - Err(err) => return Err(err), + Err(error) => { + return Err(convert_actuation_error_to_status( + error.0, error.1, + )) + } }; } return Err(tonic::Status::not_found(format!( @@ -251,7 +262,9 @@ impl proto::val_server::Val for broker::DataBroker { let result = broker.actuate(&vss_id, &DataValue::from(value)).await; match result { Ok(_) => return Ok(tonic::Response::new(ActuateResponse {})), - Err(err) => return Err(err), + Err(error) => { + return Err(convert_actuation_error_to_status(error.0, error.1)) + } }; } None => { @@ -337,7 +350,7 @@ impl proto::val_server::Val for broker::DataBroker { let result = broker.batch_actuate(actuation_changes).await; match result { Ok(_) => Ok(tonic::Response::new(proto::BatchActuateResponse {})), - Err(error) => return Err(error), + Err(error) => return Err(convert_actuation_error_to_status(error.0, error.1)), } } @@ -663,13 +676,9 @@ impl proto::val_server::Val for broker::DataBroker { { debug!("Failed to provide actuation: {}", err) } - break; }, Some(BatchActuateStreamResponse(_batch_actuate_stream_response)) => { - if let Err(err) = response_stream_sender.send(Err(tonic::Status::new(tonic::Code::Unimplemented, "Unimplemented"))).await { - debug!("Failed to send error response: {}", err); - } - break; + // TODO discuss and implement }, None => { @@ -776,7 +785,7 @@ async fn provide_actuation( Ok(response) } - Err(status) => Err(status), + Err(error) => Err(convert_actuation_error_to_status(error.0, error.1)), } } @@ -889,6 +898,23 @@ fn convert_to_proto_stream( }) } +fn convert_actuation_error_to_status( + actuation_error: broker::ActuationError, + message: String, +) -> tonic::Status { + match actuation_error { + broker::ActuationError::NotFound => tonic::Status::not_found(message), + broker::ActuationError::WrongType => tonic::Status::invalid_argument(message), + broker::ActuationError::OutOfBounds => tonic::Status::out_of_range(message), + broker::ActuationError::UnsupportedType => tonic::Status::invalid_argument(message), + broker::ActuationError::PermissionDenied => tonic::Status::permission_denied(message), + broker::ActuationError::PermissionExpired => tonic::Status::permission_denied(message), + broker::ActuationError::ProviderNotAvailable => tonic::Status::unavailable(message), + broker::ActuationError::ProviderAlreadyExists => tonic::Status::already_exists(message), + broker::ActuationError::TransmissionFailure => tonic::Status::data_loss(message), + } +} + #[cfg(test)] mod tests { use super::*; @@ -1651,7 +1677,7 @@ mod tests { let vss_ids = vec![vss_id_abs]; - let (sender, _) = mpsc::channel(10); + let (sender, _receiver) = mpsc::channel(10); let actuation_provider = Provider { sender }; authorized_access .provide_actuation(vss_ids, Box::new(actuation_provider))