Skip to content

Commit

Permalink
Fix Architectural Issues
Browse files Browse the repository at this point in the history
  • Loading branch information
wba2hi committed Oct 1, 2024
1 parent 0cb9856 commit be0ca42
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 74 deletions.
7 changes: 4 additions & 3 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ jemallocator = { version = "0.5.0", optional = true }
lazy_static = "1.4.0"
thiserror = "1.0.47"

futures = { version = "0.3.28" }

# VISS
axum = { version = "0.6.20", optional = true, features = ["ws"] }
futures = { version = "0.3.28", optional = true }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }
async-trait = "0.1.82"
Expand All @@ -72,10 +73,10 @@ async-trait = "0.1.82"
sd-notify = "0.4.1"

[features]
default = ["tls", "dep:futures"]
default = ["tls"]
tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"]
viss = ["dep:axum", "dep:chrono", "dep:uuid"]
libtest = []

[build-dependencies]
Expand Down
206 changes: 150 additions & 56 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ use tracing::{debug, info, warn};

use crate::glob;

#[derive(Debug)]
pub enum ActuationError {
NotFound,
WrongType,
OutOfBounds,
UnsupportedType,
PermissionDenied,
PermissionExpired,
ProviderNotAvailable,
ProviderAlreadyExists,
TransmissionFailure,
}

#[derive(Debug)]
pub enum UpdateError {
NotFound,
Expand Down Expand Up @@ -182,7 +195,11 @@ pub struct DataBroker {

#[async_trait::async_trait]
pub trait ActuationProvider {
async fn actuate(&self, actuation_changes: Vec<ActuationChange>) -> Result<(), tonic::Status>;
async fn actuate(
&self,
actuation_changes: Vec<ActuationChange>,
) -> Result<(), (ActuationError, String)>;
fn is_available(&self) -> bool;
}

#[derive(Clone)]
Expand Down Expand Up @@ -707,7 +724,6 @@ impl Subscriptions {
}

pub fn cleanup(&mut self) {
// TODO how to cleanup actuation_subscriptions?
self.query_subscriptions.retain(|sub| {
if sub.sender.is_closed() {
info!("Subscriber gone: removing subscription");
Expand All @@ -734,6 +750,28 @@ impl Subscriptions {
}
}
});

self.actuation_subscriptions.retain(|sub| {
if !sub.actuation_provider.is_available() {
info!("actuation provider gone: removing subscription");
false
} else {
match sub.permissions.expired() {
Ok(()) => true,
Err(PermissionError::Expired) => {
info!("Token expired for actuation provider: removing subscription");
false
}
Err(err) => {
info!(
"actuation provider error: {:?} -> removing subscription",
err
);
false
}
}
}
});
}
}

Expand Down Expand Up @@ -1590,26 +1628,41 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
&self,
vss_ids: Vec<i32>,
actuation_provider: Box<dyn ActuationProvider + Send + Sync + 'static>,
) -> Result<(), tonic::Status> {
) -> Result<(), (ActuationError, String)> {
for vss_id in vss_ids.clone() {
let result_entry = self.get_entry_by_id(vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!("Can not provide actuation for vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator);
return Err(tonic::Status::permission_denied(message));
match result_can_write_actuator {
Ok(_) => {}
Err(PermissionError::Denied) => {
let message = format!("Permission denied for vss_path {}", vss_path);
return Err((ActuationError::PermissionDenied, message));
}
Err(PermissionError::Expired) => {
return Err((
ActuationError::PermissionExpired,
"Permission expired".to_string(),
))
}
}
}
Err(error) => {
let message = format!(
"Could not resolve vss_path for vss_id {}: {:?}",
vss_id, error
);
return Err(tonic::Status::invalid_argument(message));
Err(ReadError::NotFound) => {
let message = format!("Could not resolve vss_path of vss_id {}", vss_id);
return Err((ActuationError::NotFound, message));
}
Err(ReadError::PermissionDenied) => {
let message = format!("Permission denied for vss_id {}", vss_id);
return Err((ActuationError::PermissionDenied, message));
}
Err(ReadError::PermissionExpired) => {
return Err((
ActuationError::PermissionExpired,
"Permission expired".to_string(),
))
}
}
}
Expand All @@ -1629,10 +1682,10 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.collect();
if !intersection.is_empty() {
let message = format!(
"ActuationProvider(s) for the following vss_ids already registered: {:?}",
"actuation providers for the following vss_ids already registered: {:?}",
intersection
);
return Err(tonic::Status::invalid_argument(message));
return Err((ActuationError::ProviderAlreadyExists, message));
}

let actuation_subscription: ActuationSubscription = ActuationSubscription {
Expand Down Expand Up @@ -1674,7 +1727,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
pub async fn batch_actuate(
&self,
actuation_changes: Vec<ActuationChange>,
) -> Result<(), tonic::Status> {
) -> Result<(), (ActuationError, String)> {
let actuation_subscriptions = &self
.broker
.subscriptions
Expand All @@ -1690,21 +1743,33 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
let vss_path = entry.metadata.path;
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!(
"Can not actuate vss_path '{}' due to permission error {:?}",
vss_path, can_write_actuator
);
return Err(tonic::Status::permission_denied(message));
match result_can_write_actuator {
Ok(_) => {}
Err(PermissionError::Denied) => {
let message = format!("Permission denied for vss_path {}", vss_path);
return Err((ActuationError::PermissionDenied, message));
}
Err(PermissionError::Expired) => {
return Err((
ActuationError::PermissionExpired,
"Permission expired".to_string(),
))
}
}
}
Err(error) => {
let message = format!(
"Could not resolve vss_path for vss_id {}: {:?}",
vss_id, error
);
return Err(tonic::Status::invalid_argument(message));
Err(ReadError::NotFound) => {
let message = format!("Could not resolve vss_path of vss_id {}", vss_id);
return Err((ActuationError::NotFound, message));
}
Err(ReadError::PermissionDenied) => {
let message = format!("Permission denied for vss_id {}", vss_id);
return Err((ActuationError::PermissionDenied, message));
}
Err(ReadError::PermissionExpired) => {
return Err((
ActuationError::PermissionExpired,
"Permission expired".to_string(),
))
}
}
}
Expand All @@ -1727,38 +1792,54 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.await?
}
None => {
let message = format!("No actuation provider available for vss_id: {}", vss_id);
return Err(tonic::Status::unavailable(message));
let message = format!("actuation provider for vss_id {} not available", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}
}
}

Ok(())
}

pub async fn actuate(&self, vss_id: &i32, data_value: &DataValue) -> Result<(), tonic::Status> {
pub async fn actuate(
&self,
vss_id: &i32,
data_value: &DataValue,
) -> Result<(), (ActuationError, String)> {
let vss_id = *vss_id;
let result_entry = self.get_entry_by_id(vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!(
"Can not actuate vss_path '{}' due to permission error {:?}",
vss_path, can_write_actuator
);
return Err(tonic::Status::permission_denied(message));
match result_can_write_actuator {
Ok(_) => {}
Err(PermissionError::Denied) => {
let message = format!("Permission denied for vss_path {}", vss_path);
return Err((ActuationError::PermissionDenied, message));
}
Err(PermissionError::Expired) => {
return Err((
ActuationError::PermissionExpired,
"Permission expired".to_string(),
))
}
}
}
Err(error) => {
let message = format!(
"Could not resolve vss_path for vss_id {}: {:?}",
vss_id, error
);
return Err(tonic::Status::invalid_argument(message));
Err(ReadError::NotFound) => {
let message = format!("Could not resolve vss_path of vss_id {}", vss_id);
return Err((ActuationError::NotFound, message));
}
Err(ReadError::PermissionDenied) => {
let message = format!("Permission denied for vss_id {}", vss_id);
return Err((ActuationError::PermissionDenied, message));
}
Err(ReadError::PermissionExpired) => {
return Err((
ActuationError::PermissionExpired,
"Permission expired".to_string(),
))
}
}

Expand Down Expand Up @@ -1792,24 +1873,37 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
}
}

let guard = self.broker.subscriptions.read().await;
let opt_actuation_subscription = guard
let mut guard = self.broker.subscriptions.write().await;
let opt_actuation_subscription = &guard
.actuation_subscriptions
.iter()
.find(|subscription| subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(actuation_subscription) => {
actuation_subscription
.actuation_provider
.actuate(vec![ActuationChange {
id: vss_id,
data_value: data_value.clone(),
}])
.await
let is_expired = actuation_subscription.permissions.expired();
match is_expired {
Err(_) => {
let message = format!(
"Permission for vss_ids {:?} expired, removing actuation subscription",
actuation_subscription.vss_ids
);
guard.cleanup();
Err((ActuationError::PermissionExpired, message))
}
Ok(_) => {
actuation_subscription
.actuation_provider
.actuate(vec![ActuationChange {
id: vss_id,
data_value: data_value.clone(),
}])
.await
}
}
}
None => {
let message = format!("No actuation provider found for vss_id {}", vss_id);
Err(tonic::Status::new(tonic::Code::Unavailable, message))
let message = format!("Provider for vss_id {} does not exist", vss_id);
Err((ActuationError::ProviderNotAvailable, message))
}
}
}
Expand Down
Loading

0 comments on commit be0ca42

Please sign in to comment.