Skip to content

Commit

Permalink
Minor Refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
wba2hi committed Oct 7, 2024
1 parent 254b692 commit a712ad9
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 84 deletions.
110 changes: 43 additions & 67 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,40 +705,23 @@ impl Subscriptions {
if sub.sender.is_closed() {
info!("Subscriber gone: removing subscription");
false
} else if sub.permissions.expired() {
info!("Permissions of Subscriber expired: removing subscription");
false
} else {
match &sub.permissions.expired() {
Ok(()) => true,
Err(PermissionError::Expired) => {
info!("Token expired: removing subscription");
false
}
Err(err) => {
info!("Error: {:?} -> removing subscription", err);
false
}
}
true
}
});

self.actuation_subscriptions.retain(|sub| {
if !sub.actuation_provider.is_available() {
info!("actuation provider gone: removing subscription");
info!("Provider gone: removing subscription");
false
} else if sub.permissions.expired() {
info!("Permissions of Provider expired: removing subscription");
false
} else {
match sub.permissions.expired() {
Ok(()) => true,
Err(PermissionError::Expired) => {
info!("Token expired for actuation provider: removing subscription");
false
}
Err(err) => {
info!(
"actuation provider error: {:?} -> removing subscription",
err
);
false
}
}
true
}
});
}
Expand Down Expand Up @@ -1752,30 +1735,26 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
match opt_actuation_subscription {
Some(actuation_subscription) => {
let is_expired = actuation_subscription.permissions.expired();
match is_expired {
Err(_) => {
let message = format!(
"Permission for vss_ids {:?} expired",
actuation_subscription.vss_ids
);
return Err((ActuationError::PermissionExpired, message));
}
Ok(_) => {
if !actuation_subscription.actuation_provider.is_available() {
let message =
format!("Provider for vss_id {} does not exist", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}
if is_expired {
let message = format!(
"Permission for vss_ids {:?} expired",
actuation_subscription.vss_ids
);
return Err((ActuationError::PermissionExpired, message));
}

actuation_subscription
.actuation_provider
.actuate(actuation_changes)
.await?
}
if !actuation_subscription.actuation_provider.is_available() {
let message = format!("Provider for vss_id {} does not exist", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}

actuation_subscription
.actuation_provider
.actuate(actuation_changes)
.await?
}
None => {
let message = format!("actuation provider for vss_id {} not available", vss_id);
let message = format!("Provider for vss_id {} not available", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}
}
Expand Down Expand Up @@ -1834,29 +1813,26 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
match opt_actuation_subscription {
Some(actuation_subscription) => {
let is_expired = actuation_subscription.permissions.expired();
match is_expired {
Err(_) => {
let message = format!(
"Permission for vss_ids {:?} expired",
actuation_subscription.vss_ids
);
Err((ActuationError::PermissionExpired, message))
}
Ok(_) => {
if !actuation_subscription.actuation_provider.is_available() {
let message = format!("Provider for vss_id {} does not exist", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}
if is_expired {
let message = format!(
"Permission for vss_ids {:?} expired",
actuation_subscription.vss_ids
);
return Err((ActuationError::PermissionExpired, message));
}

actuation_subscription
.actuation_provider
.actuate(vec![ActuationChange {
id: vss_id,
data_value: data_value.clone(),
}])
.await
}
if !actuation_subscription.actuation_provider.is_available() {
let message = format!("Provider for vss_id {} does not exist", vss_id);
return Err((ActuationError::ProviderNotAvailable, message));
}

actuation_subscription
.actuation_provider
.actuate(vec![ActuationChange {
id: vss_id,
data_value: data_value.clone(),
}])
.await
}
None => {
let message = format!("Provider for vss_id {} does not exist", vss_id);
Expand Down
11 changes: 2 additions & 9 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ impl ActuationProvider for Provider {

let result = self.sender.send(Ok(response)).await;
if result.is_err() {
let send_error = result.unwrap_err().0;
if send_error.is_err() {
return Err((
broker::ActuationError::TransmissionFailure,
"An error occured while sending the data".to_string(),
));
}
return Err((
broker::ActuationError::TransmissionFailure,
"An error occured while sending the data".to_string(),
Expand Down Expand Up @@ -197,7 +190,7 @@ impl proto::val_server::Val for broker::DataBroker {
//
// Returns (GRPC error code):
// NOT_FOUND if the actuator does not exist.
// PERMISSION_DENIED if access is denied for of the actuator.
// PERMISSION_DENIED if access is denied for the actuator.
// UNAVAILABLE if there is no provider currently providing the actuator
// INVALID_ARGUMENT
// - if the data type used in the request does not match
Expand Down Expand Up @@ -813,7 +806,7 @@ async fn provide_actuation(
for (index, opt_vss_id) in resolved_opt_vss_ids.iter().enumerate() {
if opt_vss_id.is_none() {
let message = format!(
"could not resolve id of vss_path: {}",
"Could not resolve id of vss_path: {}",
vss_paths.get(index).unwrap()
);
return Err(tonic::Status::not_found(message));
Expand Down
22 changes: 15 additions & 7 deletions databroker/src/permissions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ impl Permissions {
}

pub fn can_read(&self, path: &str) -> Result<(), PermissionError> {
self.expired()?;
if self.expired() {
return Err(PermissionError::Expired);
}

if self.read.is_match(path) {
return Ok(());
Expand All @@ -187,7 +189,9 @@ impl Permissions {
}

pub fn can_write_actuator_target(&self, path: &str) -> Result<(), PermissionError> {
self.expired()?;
if self.expired() {
return Err(PermissionError::Expired);
}

if self.actuate.is_match(path) {
return Ok(());
Expand All @@ -196,7 +200,9 @@ impl Permissions {
}

pub fn can_write_datapoint(&self, path: &str) -> Result<(), PermissionError> {
self.expired()?;
if self.expired() {
return Err(PermissionError::Expired);
}

if self.provide.is_match(path) {
return Ok(());
Expand All @@ -205,7 +211,9 @@ impl Permissions {
}

pub fn can_create(&self, path: &str) -> Result<(), PermissionError> {
self.expired()?;
if self.expired() {
return Err(PermissionError::Expired);
}

if self.create.is_match(path) {
return Ok(());
Expand All @@ -214,13 +222,13 @@ impl Permissions {
}

#[inline]
pub fn expired(&self) -> Result<(), PermissionError> {
pub fn expired(&self) -> bool {
if let Some(expires_at) = self.expires_at {
if expires_at < SystemTime::now() {
return Err(PermissionError::Expired);
return true;
}
}
Ok(())
false
}
}

Expand Down
2 changes: 1 addition & 1 deletion proto/kuksa/val/v2/val.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ service VAL {
//
// Returns (GRPC error code):
// NOT_FOUND if the actuator does not exist.
// PERMISSION_DENIED if access is denied for of the actuator.
// PERMISSION_DENIED if access is denied for the actuator.
// UNAVAILABLE if there is no provider currently providing the actuator
// INVALID_ARGUMENT
// - if the data type used in the request does not match
Expand Down

0 comments on commit a712ad9

Please sign in to comment.