Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Provider Actuation #73

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ jemallocator = { version = "0.5.0", optional = true }
lazy_static = "1.4.0"
thiserror = "1.0.47"

futures = { version = "0.3.28" }
async-trait = "0.1.82"

# VISS
axum = { version = "0.6.20", optional = true, features = ["ws"] }
futures = { version = "0.3.28", optional = true }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }

Expand All @@ -74,7 +76,7 @@ sd-notify = "0.4.1"
default = ["tls"]
tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"]
viss = ["dep:axum", "dep:chrono", "dep:uuid"]
libtest = []

[build-dependencies]
Expand Down
274 changes: 261 additions & 13 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ use tracing::{debug, info, warn};

use crate::glob;

#[derive(Debug)]
pub enum ActuationError {
NotFound,
WrongType,
OutOfBounds,
UnsupportedType,
PermissionDenied,
PermissionExpired,
ProviderNotAvailable,
ProviderAlreadyExists,
TransmissionFailure,
}

#[derive(Debug)]
pub enum UpdateError {
NotFound,
Expand Down Expand Up @@ -101,6 +114,7 @@ pub struct Database {

#[derive(Default)]
pub struct Subscriptions {
actuation_subscriptions: Vec<ActuationSubscription>,
query_subscriptions: Vec<QuerySubscription>,
change_subscriptions: Vec<ChangeSubscription>,
}
Expand Down Expand Up @@ -148,6 +162,27 @@ pub struct DataBroker {
shutdown_trigger: broadcast::Sender<()>,
}

#[async_trait::async_trait]
pub trait ActuationProvider {
async fn actuate(
&self,
actuation_changes: Vec<ActuationChange>,
) -> Result<(), (ActuationError, String)>;
fn is_available(&self) -> bool;
}

#[derive(Clone)]
pub struct ActuationChange {
pub id: i32,
pub data_value: DataValue,
}

pub struct ActuationSubscription {
vss_ids: Vec<i32>,
wba2hi marked this conversation as resolved.
Show resolved Hide resolved
actuation_provider: Box<dyn ActuationProvider + Send + Sync + 'static>,
permissions: Permissions,
wba2hi marked this conversation as resolved.
Show resolved Hide resolved
}
wba2hi marked this conversation as resolved.
Show resolved Hide resolved

pub struct QuerySubscription {
query: query::CompiledQuery,
sender: mpsc::Sender<QueryResponse>,
Expand Down Expand Up @@ -599,6 +634,10 @@ pub enum SuccessfulUpdate {
}

impl Subscriptions {
pub fn add_actuation_subscription(&mut self, subscription: ActuationSubscription) {
self.actuation_subscriptions.push(subscription);
}

pub fn add_query_subscription(&mut self, subscription: QuerySubscription) {
self.query_subscriptions.push(subscription)
}
Expand Down Expand Up @@ -648,6 +687,7 @@ impl Subscriptions {
}

pub fn clear(&mut self) {
self.actuation_subscriptions.clear();
self.query_subscriptions.clear();
self.change_subscriptions.clear();
}
Expand All @@ -665,18 +705,23 @@ impl Subscriptions {
if sub.sender.is_closed() {
info!("Subscriber gone: removing subscription");
false
} else if sub.permissions.is_expired() {
info!("Permissions of Subscriber expired: removing subscription");
false
} else {
match &sub.permissions.expired() {
Ok(()) => true,
Err(PermissionError::Expired) => {
info!("Token expired: removing subscription");
false
}
Err(err) => {
info!("Error: {:?} -> removing subscription", err);
false
}
}
true
}
});

self.actuation_subscriptions.retain(|sub| {
if !sub.actuation_provider.is_available() {
info!("Provider gone: removing subscription");
false
} else if sub.permissions.is_expired() {
info!("Permissions of Provider expired: removing subscription");
false
} else {
true
}
});
}
Expand Down Expand Up @@ -1530,6 +1575,208 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
Err(e) => Err(QueryError::CompilationError(format!("{e:?}"))),
}
}

pub async fn provide_actuation(
&self,
vss_ids: Vec<i32>,
actuation_provider: Box<dyn ActuationProvider + Send + Sync + 'static>,
) -> Result<(), (ActuationError, String)> {
for vss_id in vss_ids.clone() {
self.can_write_actuator_target(&vss_id).await?;
}

let provided_vss_ids: Vec<i32> = self
.broker
.subscriptions
.read()
.await
.actuation_subscriptions
.iter()
.flat_map(|subscription| subscription.vss_ids.clone())
.collect();
let intersection: Vec<&i32> = vss_ids
.iter()
.filter(|&x| provided_vss_ids.contains(x))
.collect();
if !intersection.is_empty() {
let message = format!(
"Providers for the following vss_ids already registered: {:?}",
intersection
);
return Err((ActuationError::ProviderAlreadyExists, message));
}

let actuation_subscription: ActuationSubscription = ActuationSubscription {
vss_ids,
actuation_provider,
permissions: self.permissions.clone(),
};
self.broker
.subscriptions
.write()
.await
.add_actuation_subscription(actuation_subscription);

Ok(())
}

async fn map_actuation_changes_by_vss_id(
&self,
actuation_changes: Vec<ActuationChange>,
) -> HashMap<i32, Vec<ActuationChange>> {
let mut actuation_changes_per_vss_id: HashMap<i32, Vec<ActuationChange>> =
HashMap::with_capacity(actuation_changes.len());
for actuation_change in actuation_changes {
let vss_id = actuation_change.id;

let opt_vss_ids = actuation_changes_per_vss_id.get_mut(&vss_id);
match opt_vss_ids {
Some(vss_ids) => {
vss_ids.push(actuation_change.clone());
}
None => {
let vec = vec![actuation_change.clone()];
actuation_changes_per_vss_id.insert(vss_id, vec);
}
}
}
actuation_changes_per_vss_id
}

pub async fn batch_actuate(
&self,
actuation_changes: Vec<ActuationChange>,
) -> Result<(), (ActuationError, String)> {
let read_subscription_guard = self.broker.subscriptions.read().await;
let actuation_subscriptions = &read_subscription_guard.actuation_subscriptions;

for actuation_change in &actuation_changes {
let vss_id = actuation_change.id;
self.can_write_actuator_target(&vss_id).await?;
}

let actuation_changes_per_vss_id = &self
.map_actuation_changes_by_vss_id(actuation_changes)
.await;
for actuation_change_per_vss_id in actuation_changes_per_vss_id {
let vss_id = *actuation_change_per_vss_id.0;
let actuation_changes = actuation_change_per_vss_id.1.clone();

let opt_actuation_subscription = actuation_subscriptions
.iter()
.find(|subscription| subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(actuation_subscription) => {
let is_expired = actuation_subscription.permissions.is_expired();
if is_expired {
let message = format!(
"Permission for vss_ids {:?} expired",
actuation_subscription.vss_ids
);
return Err((ActuationError::PermissionExpired, message));
}

if !actuation_subscription.actuation_provider.is_available() {
let message = format!("Provider for vss_id {} does not exist", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}

actuation_subscription
.actuation_provider
.actuate(actuation_changes)
.await?
}
None => {
let message = format!("Provider for vss_id {} not available", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}
}
}

wba2hi marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

pub async fn actuate(
&self,
vss_id: &i32,
data_value: &DataValue,
) -> Result<(), (ActuationError, String)> {
let vss_id = *vss_id;

self.can_write_actuator_target(&vss_id).await?;

let read_subscription_guard = self.broker.subscriptions.read().await;
let opt_actuation_subscription = &read_subscription_guard
.actuation_subscriptions
.iter()
.find(|subscription| subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(actuation_subscription) => {
let is_expired = actuation_subscription.permissions.is_expired();
if is_expired {
let message = format!(
"Permission for vss_ids {:?} expired",
actuation_subscription.vss_ids
);
return Err((ActuationError::PermissionExpired, message));
}

if !actuation_subscription.actuation_provider.is_available() {
let message = format!("Provider for vss_id {} does not exist", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}

actuation_subscription
.actuation_provider
.actuate(vec![ActuationChange {
id: vss_id,
data_value: data_value.clone(),
}])
.await
}
None => {
let message = format!("Provider for vss_id {} does not exist", vss_id);
Err((ActuationError::ProviderNotAvailable, message))
}
}
}

async fn can_write_actuator_target(
&self,
vss_id: &i32,
) -> Result<(), (ActuationError, String)> {
let result_entry = self.get_entry_by_id(*vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
match result_can_write_actuator {
Ok(_) => Ok(()),
Err(PermissionError::Denied) => {
let message = format!("Permission denied for vss_path {}", vss_path);
Err((ActuationError::PermissionDenied, message))
}
Err(PermissionError::Expired) => Err((
ActuationError::PermissionExpired,
"Permission expired".to_string(),
)),
}
}
Err(ReadError::NotFound) => {
let message = format!("Could not resolve vss_path of vss_id {}", vss_id);
Err((ActuationError::NotFound, message))
}
Err(ReadError::PermissionDenied) => {
let message = format!("Permission denied for vss_id {}", vss_id);
Err((ActuationError::PermissionDenied, message))
}
Err(ReadError::PermissionExpired) => Err((
ActuationError::PermissionExpired,
"Permission expired".to_string(),
)),
}
}
}

impl DataBroker {
Expand Down Expand Up @@ -1557,13 +1804,14 @@ impl DataBroker {
pub fn start_housekeeping_task(&self) {
info!("Starting housekeeping task");
let subscriptions = self.subscriptions.clone();

tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));

loop {
interval.tick().await;
// Cleanup dropped subscriptions
subscriptions.write().await.cleanup();

subscriptions.write().await.cleanup(); // Cleanup dropped subscriptions
}
});
}
Expand Down
Loading