Skip to content

Commit

Permalink
Implement Provider Actuation
Browse files Browse the repository at this point in the history
Co-authored-by: John Argérus <[email protected]>
  • Loading branch information
2 people authored and erikbosch committed Oct 14, 2024
1 parent 4dce6f3 commit 35c0f83
Show file tree
Hide file tree
Showing 7 changed files with 1,127 additions and 82 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ jemallocator = { version = "0.5.0", optional = true }
lazy_static = "1.4.0"
thiserror = "1.0.47"

futures = { version = "0.3.28" }
async-trait = "0.1.82"

# VISS
axum = { version = "0.6.20", optional = true, features = ["ws"] }
futures = { version = "0.3.28", optional = true }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }

Expand All @@ -74,7 +76,7 @@ sd-notify = "0.4.1"
default = ["tls"]
tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"]
viss = ["dep:axum", "dep:chrono", "dep:uuid"]
libtest = []

[build-dependencies]
Expand Down
274 changes: 261 additions & 13 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ use tracing::{debug, info, warn};

use crate::glob;

#[derive(Debug)]
pub enum ActuationError {
NotFound,
WrongType,
OutOfBounds,
UnsupportedType,
PermissionDenied,
PermissionExpired,
ProviderNotAvailable,
ProviderAlreadyExists,
TransmissionFailure,
}

#[derive(Debug)]
pub enum UpdateError {
NotFound,
Expand Down Expand Up @@ -101,6 +114,7 @@ pub struct Database {

#[derive(Default)]
pub struct Subscriptions {
actuation_subscriptions: Vec<ActuationSubscription>,
query_subscriptions: Vec<QuerySubscription>,
change_subscriptions: Vec<ChangeSubscription>,
}
Expand Down Expand Up @@ -148,6 +162,27 @@ pub struct DataBroker {
shutdown_trigger: broadcast::Sender<()>,
}

#[async_trait::async_trait]
pub trait ActuationProvider {
async fn actuate(
&self,
actuation_changes: Vec<ActuationChange>,
) -> Result<(), (ActuationError, String)>;
fn is_available(&self) -> bool;
}

#[derive(Clone)]
pub struct ActuationChange {
pub id: i32,
pub data_value: DataValue,
}

pub struct ActuationSubscription {
vss_ids: Vec<i32>,
actuation_provider: Box<dyn ActuationProvider + Send + Sync + 'static>,
permissions: Permissions,
}

pub struct QuerySubscription {
query: query::CompiledQuery,
sender: mpsc::Sender<QueryResponse>,
Expand Down Expand Up @@ -599,6 +634,10 @@ pub enum SuccessfulUpdate {
}

impl Subscriptions {
pub fn add_actuation_subscription(&mut self, subscription: ActuationSubscription) {
self.actuation_subscriptions.push(subscription);
}

pub fn add_query_subscription(&mut self, subscription: QuerySubscription) {
self.query_subscriptions.push(subscription)
}
Expand Down Expand Up @@ -648,6 +687,7 @@ impl Subscriptions {
}

pub fn clear(&mut self) {
self.actuation_subscriptions.clear();
self.query_subscriptions.clear();
self.change_subscriptions.clear();
}
Expand All @@ -665,18 +705,23 @@ impl Subscriptions {
if sub.sender.is_closed() {
info!("Subscriber gone: removing subscription");
false
} else if sub.permissions.is_expired() {
info!("Permissions of Subscriber expired: removing subscription");
false
} else {
match &sub.permissions.expired() {
Ok(()) => true,
Err(PermissionError::Expired) => {
info!("Token expired: removing subscription");
false
}
Err(err) => {
info!("Error: {:?} -> removing subscription", err);
false
}
}
true
}
});

self.actuation_subscriptions.retain(|sub| {
if !sub.actuation_provider.is_available() {
info!("Provider gone: removing subscription");
false
} else if sub.permissions.is_expired() {
info!("Permissions of Provider expired: removing subscription");
false
} else {
true
}
});
}
Expand Down Expand Up @@ -1530,6 +1575,208 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
Err(e) => Err(QueryError::CompilationError(format!("{e:?}"))),
}
}

pub async fn provide_actuation(
&self,
vss_ids: Vec<i32>,
actuation_provider: Box<dyn ActuationProvider + Send + Sync + 'static>,
) -> Result<(), (ActuationError, String)> {
for vss_id in vss_ids.clone() {
self.can_write_actuator_target(&vss_id).await?;
}

let provided_vss_ids: Vec<i32> = self
.broker
.subscriptions
.read()
.await
.actuation_subscriptions
.iter()
.flat_map(|subscription| subscription.vss_ids.clone())
.collect();
let intersection: Vec<&i32> = vss_ids
.iter()
.filter(|&x| provided_vss_ids.contains(x))
.collect();
if !intersection.is_empty() {
let message = format!(
"Providers for the following vss_ids already registered: {:?}",
intersection
);
return Err((ActuationError::ProviderAlreadyExists, message));
}

let actuation_subscription: ActuationSubscription = ActuationSubscription {
vss_ids,
actuation_provider,
permissions: self.permissions.clone(),
};
self.broker
.subscriptions
.write()
.await
.add_actuation_subscription(actuation_subscription);

Ok(())
}

async fn map_actuation_changes_by_vss_id(
&self,
actuation_changes: Vec<ActuationChange>,
) -> HashMap<i32, Vec<ActuationChange>> {
let mut actuation_changes_per_vss_id: HashMap<i32, Vec<ActuationChange>> =
HashMap::with_capacity(actuation_changes.len());
for actuation_change in actuation_changes {
let vss_id = actuation_change.id;

let opt_vss_ids = actuation_changes_per_vss_id.get_mut(&vss_id);
match opt_vss_ids {
Some(vss_ids) => {
vss_ids.push(actuation_change.clone());
}
None => {
let vec = vec![actuation_change.clone()];
actuation_changes_per_vss_id.insert(vss_id, vec);
}
}
}
actuation_changes_per_vss_id
}

pub async fn batch_actuate(
&self,
actuation_changes: Vec<ActuationChange>,
) -> Result<(), (ActuationError, String)> {
let read_subscription_guard = self.broker.subscriptions.read().await;
let actuation_subscriptions = &read_subscription_guard.actuation_subscriptions;

for actuation_change in &actuation_changes {
let vss_id = actuation_change.id;
self.can_write_actuator_target(&vss_id).await?;
}

let actuation_changes_per_vss_id = &self
.map_actuation_changes_by_vss_id(actuation_changes)
.await;
for actuation_change_per_vss_id in actuation_changes_per_vss_id {
let vss_id = *actuation_change_per_vss_id.0;
let actuation_changes = actuation_change_per_vss_id.1.clone();

let opt_actuation_subscription = actuation_subscriptions
.iter()
.find(|subscription| subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(actuation_subscription) => {
let is_expired = actuation_subscription.permissions.is_expired();
if is_expired {
let message = format!(
"Permission for vss_ids {:?} expired",
actuation_subscription.vss_ids
);
return Err((ActuationError::PermissionExpired, message));
}

if !actuation_subscription.actuation_provider.is_available() {
let message = format!("Provider for vss_id {} does not exist", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}

actuation_subscription
.actuation_provider
.actuate(actuation_changes)
.await?
}
None => {
let message = format!("Provider for vss_id {} not available", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}
}
}

Ok(())
}

pub async fn actuate(
&self,
vss_id: &i32,
data_value: &DataValue,
) -> Result<(), (ActuationError, String)> {
let vss_id = *vss_id;

self.can_write_actuator_target(&vss_id).await?;

let read_subscription_guard = self.broker.subscriptions.read().await;
let opt_actuation_subscription = &read_subscription_guard
.actuation_subscriptions
.iter()
.find(|subscription| subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(actuation_subscription) => {
let is_expired = actuation_subscription.permissions.is_expired();
if is_expired {
let message = format!(
"Permission for vss_ids {:?} expired",
actuation_subscription.vss_ids
);
return Err((ActuationError::PermissionExpired, message));
}

if !actuation_subscription.actuation_provider.is_available() {
let message = format!("Provider for vss_id {} does not exist", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}

actuation_subscription
.actuation_provider
.actuate(vec![ActuationChange {
id: vss_id,
data_value: data_value.clone(),
}])
.await
}
None => {
let message = format!("Provider for vss_id {} does not exist", vss_id);
Err((ActuationError::ProviderNotAvailable, message))
}
}
}

async fn can_write_actuator_target(
&self,
vss_id: &i32,
) -> Result<(), (ActuationError, String)> {
let result_entry = self.get_entry_by_id(*vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
match result_can_write_actuator {
Ok(_) => Ok(()),
Err(PermissionError::Denied) => {
let message = format!("Permission denied for vss_path {}", vss_path);
Err((ActuationError::PermissionDenied, message))
}
Err(PermissionError::Expired) => Err((
ActuationError::PermissionExpired,
"Permission expired".to_string(),
)),
}
}
Err(ReadError::NotFound) => {
let message = format!("Could not resolve vss_path of vss_id {}", vss_id);
Err((ActuationError::NotFound, message))
}
Err(ReadError::PermissionDenied) => {
let message = format!("Permission denied for vss_id {}", vss_id);
Err((ActuationError::PermissionDenied, message))
}
Err(ReadError::PermissionExpired) => Err((
ActuationError::PermissionExpired,
"Permission expired".to_string(),
)),
}
}
}

impl DataBroker {
Expand Down Expand Up @@ -1557,13 +1804,14 @@ impl DataBroker {
pub fn start_housekeeping_task(&self) {
info!("Starting housekeeping task");
let subscriptions = self.subscriptions.clone();

tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));

loop {
interval.tick().await;
// Cleanup dropped subscriptions
subscriptions.write().await.cleanup();

subscriptions.write().await.cleanup(); // Cleanup dropped subscriptions
}
});
}
Expand Down
Loading

0 comments on commit 35c0f83

Please sign in to comment.