Skip to content

Commit

Permalink
Write Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wba2hi committed Sep 27, 2024
1 parent 1db79ba commit 575991d
Show file tree
Hide file tree
Showing 3 changed files with 683 additions and 203 deletions.
143 changes: 96 additions & 47 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1596,24 +1596,42 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path);
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!("Can not provide actuation for vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator);
return Err(tonic::Status::permission_denied(message));
}
},
}
Err(error) => {
let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error);
let message = format!(
"Could not resolve vss_path for vss_id {}: {:?}",
vss_id, error
);
return Err(tonic::Status::invalid_argument(message));
},
}
}
}

let provided_vss_ids: Vec<i32> = self.broker.subscriptions.blocking_read().actuation_subscriptions.iter().flat_map(|subscription| subscription.vss_ids.clone()).collect();
let intersection: Vec<_> = vss_ids.iter().filter(|&x| provided_vss_ids.contains(x)).collect();
let provided_vss_ids: Vec<i32> = self
.broker
.subscriptions
.read()
.await
.actuation_subscriptions
.iter()
.flat_map(|subscription| subscription.vss_ids.clone())
.collect();
let intersection: Vec<_> = vss_ids
.iter()
.filter(|&x| provided_vss_ids.contains(x))
.collect();
if !intersection.is_empty() {
let message = format!("ActuationProvider(s) for the following vss_ids already registered: {:?}", intersection);
let message = format!(
"ActuationProvider(s) for the following vss_ids already registered: {:?}",
intersection
);
return Err(tonic::Status::invalid_argument(message));
}

Expand All @@ -1622,14 +1640,18 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
actuation_provider,
permissions: self.permissions.clone(),
};
self.broker.subscriptions.blocking_write().add_actuation_subscription(actuation_subscription);
self.broker
.subscriptions
.write()
.await
.add_actuation_subscription(actuation_subscription);

Ok(())
}

async fn map_actuation_changes_by_vss_id(
&self,
actuation_changes: Vec<ActuationChange>
actuation_changes: Vec<ActuationChange>,
) -> HashMap<i32, Vec<ActuationChange>> {
let mut actuation_changes_per_vss_id: HashMap<i32, Vec<ActuationChange>> = HashMap::new();
for ele in actuation_changes {
Expand All @@ -1639,84 +1661,105 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
match opt_vss_ids {
Some(vss_ids) => {
vss_ids.push(ele.clone());
},
}
None => {
let vec = vec![ele.clone()];
actuation_changes_per_vss_id.insert(vss_id, vec);
},
}
}
}
return actuation_changes_per_vss_id;
}

pub async fn batch_actuate(
&self,
actuation_changes: Vec<ActuationChange>
actuation_changes: Vec<ActuationChange>,
) -> Result<(), tonic::Status> {
let actuation_subscriptions = &self.broker.subscriptions.blocking_read().actuation_subscriptions;
let actuation_subscriptions = &self
.broker
.subscriptions
.read()
.await
.actuation_subscriptions;

for actuation_change in &actuation_changes {
let vss_id = actuation_change.id;
let result_entry = self.get_entry_by_id(vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path);
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!("Can not actuate vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator);
let message = format!(
"Can not actuate vss_path '{}' due to permission error {:?}",
vss_path, can_write_actuator
);
return Err(tonic::Status::permission_denied(message));
}
},
}
Err(error) => {
let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error);
let message = format!(
"Could not resolve vss_path for vss_id {}: {:?}",
vss_id, error
);
return Err(tonic::Status::invalid_argument(message));
},
}
}
}

let actuation_changes_per_vss_id = &self.map_actuation_changes_by_vss_id(actuation_changes).await;
let actuation_changes_per_vss_id = &self
.map_actuation_changes_by_vss_id(actuation_changes)
.await;
for ele in actuation_changes_per_vss_id {
let vss_id = ele.0.clone();
let actuation_changes = ele.1.clone();

let opt_actuation_subscription = actuation_subscriptions.iter().find(|subscription| subscription.vss_ids.contains(&vss_id));
let opt_actuation_subscription = actuation_subscriptions
.iter()
.find(|subscription| subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(actuation_subscription) => {
actuation_subscription.actuation_provider.actuate(actuation_changes).await?
},
actuation_subscription
.actuation_provider
.actuate(actuation_changes)
.await?
}
None => {
let message = format!("No actuation provider available for vss_id: {}", vss_id);
return Err(tonic::Status::unavailable(message));
},
}
}
}

return Ok(())
return Ok(());
}

pub async fn actuate(
&self,
vss_id: &i32,
data_value: &DataValue,
) -> Result<(), tonic::Status> {

pub async fn actuate(&self, vss_id: &i32, data_value: &DataValue) -> Result<(), tonic::Status> {
let vss_id = vss_id.clone();
let result_entry = self.get_entry_by_id(vss_id).await;
match result_entry {
Ok(entry) => {
let vss_path = entry.metadata.path;
let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path);
let result_can_write_actuator =
self.permissions.can_write_actuator_target(&vss_path);
if result_can_write_actuator.is_err() {
let can_write_actuator = result_can_write_actuator.unwrap_err();
let message = format!("Can not actuate vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator);
let message = format!(
"Can not actuate vss_path '{}' due to permission error {:?}",
vss_path, can_write_actuator
);
return Err(tonic::Status::permission_denied(message));
}
},
}
Err(error) => {
let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error);
let message = format!(
"Could not resolve vss_path for vss_id {}: {:?}",
vss_id, error
);
return Err(tonic::Status::invalid_argument(message));
},
}
}

let entry_update = EntryUpdate {
Expand All @@ -1741,26 +1784,32 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
let opt_error = update_errors.get(0);
if opt_error.is_some() {
let error = opt_error.unwrap();
let message = format!("Could not set actuator target for vss_id {}: {:?}", vss_id, error.1);
let message = format!(
"Could not set actuator target for vss_id {}: {:?}",
vss_id, error.1
);
warn!(message);
}
}

let guard = self.broker.subscriptions.blocking_read();
let opt_actuation_subscription = guard.actuation_subscriptions.iter().find(|subscription|subscription.vss_ids.contains(&vss_id));
let guard = self.broker.subscriptions.read().await;
let opt_actuation_subscription = guard
.actuation_subscriptions
.iter()
.find(|subscription| subscription.vss_ids.contains(&vss_id));
match opt_actuation_subscription {
Some(ref actuation_subscription) => {
actuation_subscription.actuation_provider.actuate(vec![
ActuationChange {
id: vss_id.clone(),
data_value: data_value.clone()
},
]
).await
},
actuation_subscription
.actuation_provider
.actuate(vec![ActuationChange {
id: vss_id.clone(),
data_value: data_value.clone(),
}])
.await
}
None => {
let message = format!("No actuation provider found for vss_id {}", vss_id);
return Err(tonic::Status::new(tonic::Code::Unavailable, message))
return Err(tonic::Status::new(tonic::Code::Unavailable, message));
}
}
}
Expand Down
Loading

0 comments on commit 575991d

Please sign in to comment.