Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Provider Actuation #5

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ axum = { version = "0.6.20", optional = true, features = ["ws"] }
futures = { version = "0.3.28", optional = true }
chrono = { version = "0.4.31", optional = true, features = ["std"] }
uuid = { version = "1.4.1", optional = true, features = ["v4"] }
async-trait = "0.1.82"

# systemd related dependency, only relevant on linux systems
[target.'cfg(target_os = "linux")'.dependencies]
sd-notify = "0.4.1"

[features]
default = ["tls"]
default = ["tls", "dep:futures"]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

required for val.rs -> provide_actuation

    let future_vss_ids = vss_paths.iter().map(|vss_path| broker.get_id_by_path(&vss_path));
    let resolved_opt_vss_ids = futures::future::join_all(future_vss_ids).await;

tls = ["tonic/tls"]
jemalloc = ["dep:jemallocator"]
viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"]
Expand Down
257 changes: 255 additions & 2 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub struct Database {

#[derive(Default)]
pub struct Subscriptions {
actuation_subscriptions: Vec<ActuationSubscription>,
query_subscriptions: Vec<QuerySubscription>,
change_subscriptions: Vec<ChangeSubscription>,
}
Expand Down Expand Up @@ -179,6 +180,23 @@ pub struct DataBroker {
shutdown_trigger: broadcast::Sender<()>,
}

#[async_trait::async_trait]
pub trait ActuationProvider {
async fn actuate(&self, actuation_changes: Vec<ActuationChange>) -> Result<(), tonic::Status>;
}

#[derive(Clone)]
pub struct ActuationChange {
pub id: i32,
pub data_value: DataValue,
}

pub struct ActuationSubscription {
vss_ids: Vec<i32>,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vss_paths are normalized to vss_ids, so it's easier to keep a common list of available / registered actuators!

actuation_provider: Box<dyn ActuationProvider + Send + Sync + 'static>,
permissions: Permissions,
}
Comment on lines +194 to +198
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now some cleanup operation is missing, e.g. an ActuationProvider disconnects or crashes and can no longer actuate. However, we prevent multiple ActuationProviders to register for the same signal, meaning once the ActuationProvider disconnects, it can no longer re-register to it's signal.

I can not use the actuation_provider to check if it is still available. I maybe could check (e.g. Err during send()) in val.rs -> Provider -> actuate but there I would not have the relation between the ActuationProvider and the ActuationSubscription. Sounds like a lot of overhead and not sure if there isn't a better way


pub struct QuerySubscription {
query: query::CompiledQuery,
sender: mpsc::Sender<QueryResponse>,
Expand Down Expand Up @@ -630,6 +648,10 @@ pub enum SuccessfulUpdate {
}

impl Subscriptions {
pub fn add_actuation_subscription(&mut self, subscription: ActuationSubscription) {
self.actuation_subscriptions.push(subscription);
}

pub fn add_query_subscription(&mut self, subscription: QuerySubscription) {
self.query_subscriptions.push(subscription)
}
Expand Down Expand Up @@ -679,11 +701,13 @@ impl Subscriptions {
}

pub fn clear(&mut self) {
self.actuation_subscriptions.clear();
self.query_subscriptions.clear();
self.change_subscriptions.clear();
}

pub fn cleanup(&mut self) {
// TODO how to cleanup actuation_subscriptions?
self.query_subscriptions.retain(|sub| {
if sub.sender.is_closed() {
info!("Subscriber gone: removing subscription");
Expand Down Expand Up @@ -1561,6 +1585,234 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
Err(e) => Err(QueryError::CompilationError(format!("{e:?}"))),
}
}

pub async fn provide_actuation(
&self,
vss_ids: Vec<i32>,
actuation_provider: Box<dyn ActuationProvider + Send + Sync + 'static>,
) -> Result<(), tonic::Status> {
for vss_id in vss_ids.clone() {
let result_entry = self.get_entry_by_id(vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!("Can not provide actuation for vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator);
return Err(tonic::Status::permission_denied(message));
}
}
Err(error) => {
let message = format!(
"Could not resolve vss_path for vss_id {}: {:?}",
vss_id, error
);
return Err(tonic::Status::invalid_argument(message));
}
}
}

let provided_vss_ids: Vec<i32> = self
.broker
.subscriptions
.read()
.await
.actuation_subscriptions
.iter()
.flat_map(|subscription| subscription.vss_ids.clone())
.collect();
let intersection: Vec<_> = vss_ids
.iter()
.filter(|&x| provided_vss_ids.contains(x))
.collect();
if !intersection.is_empty() {
let message = format!(
"ActuationProvider(s) for the following vss_ids already registered: {:?}",
intersection
);
return Err(tonic::Status::invalid_argument(message));
}

let actuation_subscription: ActuationSubscription = ActuationSubscription {
vss_ids,
actuation_provider,
permissions: self.permissions.clone(),
};
self.broker
.subscriptions
.write()
.await
.add_actuation_subscription(actuation_subscription);

Ok(())
}

async fn map_actuation_changes_by_vss_id(
&self,
actuation_changes: Vec<ActuationChange>,
) -> HashMap<i32, Vec<ActuationChange>> {
let mut actuation_changes_per_vss_id: HashMap<i32, Vec<ActuationChange>> = HashMap::new();
for ele in actuation_changes {
let vss_id = ele.id;

let opt_vss_ids = actuation_changes_per_vss_id.get_mut(&vss_id);
match opt_vss_ids {
Some(vss_ids) => {
vss_ids.push(ele.clone());
}
None => {
let vec = vec![ele.clone()];
actuation_changes_per_vss_id.insert(vss_id, vec);
}
}
}
return actuation_changes_per_vss_id;
}

pub async fn batch_actuate(
&self,
actuation_changes: Vec<ActuationChange>,
) -> Result<(), tonic::Status> {
let actuation_subscriptions = &self
.broker
.subscriptions
.read()
.await
.actuation_subscriptions;

for actuation_change in &actuation_changes {
let vss_id = actuation_change.id;
let result_entry = self.get_entry_by_id(vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!(
"Can not actuate vss_path '{}' due to permission error {:?}",
vss_path, can_write_actuator
);
return Err(tonic::Status::permission_denied(message));
}
}
Err(error) => {
let message = format!(
"Could not resolve vss_path for vss_id {}: {:?}",
vss_id, error
);
return Err(tonic::Status::invalid_argument(message));
}
}
}

let actuation_changes_per_vss_id = &self
.map_actuation_changes_by_vss_id(actuation_changes)
.await;
for ele in actuation_changes_per_vss_id {
let vss_id = ele.0.clone();
let actuation_changes = ele.1.clone();

let opt_actuation_subscription = actuation_subscriptions
.iter()
.find(|subscription| subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(actuation_subscription) => {
actuation_subscription
.actuation_provider
.actuate(actuation_changes)
.await?
}
None => {
let message = format!("No actuation provider available for vss_id: {}", vss_id);
return Err(tonic::Status::unavailable(message));
}
}
}

return Ok(());
}

pub async fn actuate(&self, vss_id: &i32, data_value: &DataValue) -> Result<(), tonic::Status> {
let vss_id = vss_id.clone();
let result_entry = self.get_entry_by_id(vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!(
"Can not actuate vss_path '{}' due to permission error {:?}",
vss_path, can_write_actuator
);
return Err(tonic::Status::permission_denied(message));
}
}
Err(error) => {
let message = format!(
"Could not resolve vss_path for vss_id {}: {:?}",
vss_id, error
);
return Err(tonic::Status::invalid_argument(message));
}
}

let entry_update = EntryUpdate {
path: None,
datapoint: None,
actuator_target: Some(Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: data_value.clone(),
})),
entry_type: None,
data_type: None,
description: None,
allowed: None,
unit: None,
};

let entry_updates = [(vss_id.clone(), entry_update)];
let legacy_result = self.update_entries(entry_updates).await;
if legacy_result.is_err() {
let update_errors = legacy_result.unwrap_err();
let opt_error = update_errors.get(0);
if opt_error.is_some() {
let error = opt_error.unwrap();
let message = format!(
"Could not set actuator target for vss_id {}: {:?}",
vss_id, error.1
);
warn!(message);
}
}

let guard = self.broker.subscriptions.read().await;
let opt_actuation_subscription = guard
.actuation_subscriptions
.iter()
.find(|subscription| subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(ref actuation_subscription) => {
actuation_subscription
.actuation_provider
.actuate(vec![ActuationChange {
id: vss_id.clone(),
data_value: data_value.clone(),
}])
.await
}
None => {
let message = format!("No actuation provider found for vss_id {}", vss_id);
return Err(tonic::Status::new(tonic::Code::Unavailable, message));
}
}
}
}

impl DataBroker {
Expand Down Expand Up @@ -1588,13 +1840,14 @@ impl DataBroker {
pub fn start_housekeeping_task(&self) {
info!("Starting housekeeping task");
let subscriptions = self.subscriptions.clone();

tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));

loop {
interval.tick().await;
// Cleanup dropped subscriptions
subscriptions.write().await.cleanup();

subscriptions.write().await.cleanup(); // Cleanup dropped subscriptions
}
});
}
Expand Down
Loading
Loading