diff --git a/Cargo.lock b/Cargo.lock index 4dc45ed1..6ea73d49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -606,6 +606,7 @@ name = "databroker" version = "0.4.7-dev.0" dependencies = [ "anyhow", + "async-trait", "axum", "chrono", "clap", diff --git a/databroker/Cargo.toml b/databroker/Cargo.toml index 492b0db7..e57719de 100644 --- a/databroker/Cargo.toml +++ b/databroker/Cargo.toml @@ -60,9 +60,11 @@ jemallocator = { version = "0.5.0", optional = true } lazy_static = "1.4.0" thiserror = "1.0.47" +futures = { version = "0.3.28" } +async-trait = "0.1.82" + # VISS axum = { version = "0.6.20", optional = true, features = ["ws"] } -futures = { version = "0.3.28", optional = true } chrono = { version = "0.4.31", optional = true, features = ["std"] } uuid = { version = "1.4.1", optional = true, features = ["v4"] } @@ -74,7 +76,7 @@ sd-notify = "0.4.1" default = ["tls"] tls = ["tonic/tls"] jemalloc = ["dep:jemallocator"] -viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"] +viss = ["dep:axum", "dep:chrono", "dep:uuid"] libtest = [] [build-dependencies] diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 4e098067..b50dcaf7 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -33,6 +33,19 @@ use tracing::{debug, info, warn}; use crate::glob; +#[derive(Debug)] +pub enum ActuationError { + NotFound, + WrongType, + OutOfBounds, + UnsupportedType, + PermissionDenied, + PermissionExpired, + ProviderNotAvailable, + ProviderAlreadyExists, + TransmissionFailure, +} + #[derive(Debug)] pub enum UpdateError { NotFound, @@ -101,6 +114,7 @@ pub struct Database { #[derive(Default)] pub struct Subscriptions { + actuation_subscriptions: Vec, query_subscriptions: Vec, change_subscriptions: Vec, } @@ -148,6 +162,27 @@ pub struct DataBroker { shutdown_trigger: broadcast::Sender<()>, } +#[async_trait::async_trait] +pub trait ActuationProvider { + async fn actuate( + &self, + actuation_changes: Vec, + ) -> Result<(), (ActuationError, String)>; + fn is_available(&self) -> bool; +} + +#[derive(Clone)] +pub struct ActuationChange { + pub id: i32, + pub data_value: DataValue, +} + +pub struct ActuationSubscription { + vss_ids: Vec, + actuation_provider: Box, + permissions: Permissions, +} + pub struct QuerySubscription { query: query::CompiledQuery, sender: mpsc::Sender, @@ -599,6 +634,10 @@ pub enum SuccessfulUpdate { } impl Subscriptions { + pub fn add_actuation_subscription(&mut self, subscription: ActuationSubscription) { + self.actuation_subscriptions.push(subscription); + } + pub fn add_query_subscription(&mut self, subscription: QuerySubscription) { self.query_subscriptions.push(subscription) } @@ -648,6 +687,7 @@ impl Subscriptions { } pub fn clear(&mut self) { + self.actuation_subscriptions.clear(); self.query_subscriptions.clear(); self.change_subscriptions.clear(); } @@ -665,18 +705,23 @@ impl Subscriptions { if sub.sender.is_closed() { info!("Subscriber gone: removing subscription"); false + } else if sub.permissions.is_expired() { + info!("Permissions of Subscriber expired: removing subscription"); + false } else { - match &sub.permissions.expired() { - Ok(()) => true, - Err(PermissionError::Expired) => { - info!("Token expired: removing subscription"); - false - } - Err(err) => { - info!("Error: {:?} -> removing subscription", err); - false - } - } + true + } + }); + + self.actuation_subscriptions.retain(|sub| { + if !sub.actuation_provider.is_available() { + info!("Provider gone: removing subscription"); + false + } else if sub.permissions.is_expired() { + info!("Permissions of Provider expired: removing subscription"); + false + } else { + true } }); } @@ -1530,6 +1575,208 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { Err(e) => Err(QueryError::CompilationError(format!("{e:?}"))), } } + + pub async fn provide_actuation( + &self, + vss_ids: Vec, + actuation_provider: Box, + ) -> Result<(), (ActuationError, String)> { + for vss_id in vss_ids.clone() { + self.can_write_actuator_target(&vss_id).await?; + } + + let provided_vss_ids: Vec = self + .broker + .subscriptions + .read() + .await + .actuation_subscriptions + .iter() + .flat_map(|subscription| subscription.vss_ids.clone()) + .collect(); + let intersection: Vec<&i32> = vss_ids + .iter() + .filter(|&x| provided_vss_ids.contains(x)) + .collect(); + if !intersection.is_empty() { + let message = format!( + "Providers for the following vss_ids already registered: {:?}", + intersection + ); + return Err((ActuationError::ProviderAlreadyExists, message)); + } + + let actuation_subscription: ActuationSubscription = ActuationSubscription { + vss_ids, + actuation_provider, + permissions: self.permissions.clone(), + }; + self.broker + .subscriptions + .write() + .await + .add_actuation_subscription(actuation_subscription); + + Ok(()) + } + + async fn map_actuation_changes_by_vss_id( + &self, + actuation_changes: Vec, + ) -> HashMap> { + let mut actuation_changes_per_vss_id: HashMap> = + HashMap::with_capacity(actuation_changes.len()); + for actuation_change in actuation_changes { + let vss_id = actuation_change.id; + + let opt_vss_ids = actuation_changes_per_vss_id.get_mut(&vss_id); + match opt_vss_ids { + Some(vss_ids) => { + vss_ids.push(actuation_change.clone()); + } + None => { + let vec = vec![actuation_change.clone()]; + actuation_changes_per_vss_id.insert(vss_id, vec); + } + } + } + actuation_changes_per_vss_id + } + + pub async fn batch_actuate( + &self, + actuation_changes: Vec, + ) -> Result<(), (ActuationError, String)> { + let read_subscription_guard = self.broker.subscriptions.read().await; + let actuation_subscriptions = &read_subscription_guard.actuation_subscriptions; + + for actuation_change in &actuation_changes { + let vss_id = actuation_change.id; + self.can_write_actuator_target(&vss_id).await?; + } + + let actuation_changes_per_vss_id = &self + .map_actuation_changes_by_vss_id(actuation_changes) + .await; + for actuation_change_per_vss_id in actuation_changes_per_vss_id { + let vss_id = *actuation_change_per_vss_id.0; + let actuation_changes = actuation_change_per_vss_id.1.clone(); + + let opt_actuation_subscription = actuation_subscriptions + .iter() + .find(|subscription| subscription.vss_ids.contains(&vss_id)); + match opt_actuation_subscription { + Some(actuation_subscription) => { + let is_expired = actuation_subscription.permissions.is_expired(); + if is_expired { + let message = format!( + "Permission for vss_ids {:?} expired", + actuation_subscription.vss_ids + ); + return Err((ActuationError::PermissionExpired, message)); + } + + if !actuation_subscription.actuation_provider.is_available() { + let message = format!("Provider for vss_id {} does not exist", vss_id); + return Err((ActuationError::ProviderNotAvailable, message)); + } + + actuation_subscription + .actuation_provider + .actuate(actuation_changes) + .await? + } + None => { + let message = format!("Provider for vss_id {} not available", vss_id); + return Err((ActuationError::ProviderNotAvailable, message)); + } + } + } + + Ok(()) + } + + pub async fn actuate( + &self, + vss_id: &i32, + data_value: &DataValue, + ) -> Result<(), (ActuationError, String)> { + let vss_id = *vss_id; + + self.can_write_actuator_target(&vss_id).await?; + + let read_subscription_guard = self.broker.subscriptions.read().await; + let opt_actuation_subscription = &read_subscription_guard + .actuation_subscriptions + .iter() + .find(|subscription| subscription.vss_ids.contains(&vss_id)); + match opt_actuation_subscription { + Some(actuation_subscription) => { + let is_expired = actuation_subscription.permissions.is_expired(); + if is_expired { + let message = format!( + "Permission for vss_ids {:?} expired", + actuation_subscription.vss_ids + ); + return Err((ActuationError::PermissionExpired, message)); + } + + if !actuation_subscription.actuation_provider.is_available() { + let message = format!("Provider for vss_id {} does not exist", vss_id); + return Err((ActuationError::ProviderNotAvailable, message)); + } + + actuation_subscription + .actuation_provider + .actuate(vec![ActuationChange { + id: vss_id, + data_value: data_value.clone(), + }]) + .await + } + None => { + let message = format!("Provider for vss_id {} does not exist", vss_id); + Err((ActuationError::ProviderNotAvailable, message)) + } + } + } + + async fn can_write_actuator_target( + &self, + vss_id: &i32, + ) -> Result<(), (ActuationError, String)> { + let result_entry = self.get_entry_by_id(*vss_id).await; + match result_entry { + Ok(entry) => { + let vss_path = entry.metadata.path; + let result_can_write_actuator = + self.permissions.can_write_actuator_target(&vss_path); + match result_can_write_actuator { + Ok(_) => Ok(()), + Err(PermissionError::Denied) => { + let message = format!("Permission denied for vss_path {}", vss_path); + Err((ActuationError::PermissionDenied, message)) + } + Err(PermissionError::Expired) => Err(( + ActuationError::PermissionExpired, + "Permission expired".to_string(), + )), + } + } + Err(ReadError::NotFound) => { + let message = format!("Could not resolve vss_path of vss_id {}", vss_id); + Err((ActuationError::NotFound, message)) + } + Err(ReadError::PermissionDenied) => { + let message = format!("Permission denied for vss_id {}", vss_id); + Err((ActuationError::PermissionDenied, message)) + } + Err(ReadError::PermissionExpired) => Err(( + ActuationError::PermissionExpired, + "Permission expired".to_string(), + )), + } + } } impl DataBroker { @@ -1557,13 +1804,14 @@ impl DataBroker { pub fn start_housekeeping_task(&self) { info!("Starting housekeeping task"); let subscriptions = self.subscriptions.clone(); + tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); loop { interval.tick().await; - // Cleanup dropped subscriptions - subscriptions.write().await.cleanup(); + + subscriptions.write().await.cleanup(); // Cleanup dropped subscriptions } }); } diff --git a/databroker/src/grpc/kuksa_val_v2/conversions.rs b/databroker/src/grpc/kuksa_val_v2/conversions.rs index e6660a4f..1178493e 100644 --- a/databroker/src/grpc/kuksa_val_v2/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v2/conversions.rs @@ -10,9 +10,12 @@ // * // * SPDX-License-Identifier: Apache-2.0 // ********************************************************************************/ -use databroker_proto::kuksa::val::v2 as proto; - use crate::broker; +use databroker_proto::kuksa::val::v2 as proto; +use kuksa::proto::v2::{ + BoolArray, DoubleArray, FloatArray, Int32Array, Int64Array, StringArray, Uint32Array, + Uint64Array, +}; use std::time::SystemTime; @@ -405,3 +408,149 @@ impl broker::UpdateError { } } } + +impl From for broker::DataValue { + fn from(value: proto::Value) -> Self { + match &value.typed_value { + Some(proto::value::TypedValue::String(value)) => { + broker::DataValue::String(value.to_owned()) + } + Some(proto::value::TypedValue::Bool(value)) => broker::DataValue::Bool(*value), + Some(proto::value::TypedValue::Int32(value)) => broker::DataValue::Int32(*value), + Some(proto::value::TypedValue::Int64(value)) => broker::DataValue::Int64(*value), + Some(proto::value::TypedValue::Uint32(value)) => broker::DataValue::Uint32(*value), + Some(proto::value::TypedValue::Uint64(value)) => broker::DataValue::Uint64(*value), + Some(proto::value::TypedValue::Float(value)) => broker::DataValue::Float(*value), + Some(proto::value::TypedValue::Double(value)) => broker::DataValue::Double(*value), + Some(proto::value::TypedValue::StringArray(array)) => { + broker::DataValue::StringArray(array.values.clone()) + } + Some(proto::value::TypedValue::BoolArray(array)) => { + broker::DataValue::BoolArray(array.values.clone()) + } + Some(proto::value::TypedValue::Int32Array(array)) => { + broker::DataValue::Int32Array(array.values.clone()) + } + Some(proto::value::TypedValue::Int64Array(array)) => { + broker::DataValue::Int64Array(array.values.clone()) + } + Some(proto::value::TypedValue::Uint32Array(array)) => { + broker::DataValue::Uint32Array(array.values.clone()) + } + Some(proto::value::TypedValue::Uint64Array(array)) => { + broker::DataValue::Uint64Array(array.values.clone()) + } + Some(proto::value::TypedValue::FloatArray(array)) => { + broker::DataValue::FloatArray(array.values.clone()) + } + Some(proto::value::TypedValue::DoubleArray(array)) => { + broker::DataValue::DoubleArray(array.values.clone()) + } + None => todo!(), + } + } +} + +impl From for proto::Value { + fn from(value: broker::DataValue) -> Self { + match &value { + broker::DataValue::String(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::String(value.to_owned())), + }, + + broker::DataValue::Bool(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(*value)), + }, + + broker::DataValue::Int32(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(*value)), + }, + + broker::DataValue::Int64(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Int64(*value)), + }, + + broker::DataValue::Uint32(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Uint32(*value)), + }, + + broker::DataValue::Uint64(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Uint64(*value)), + }, + + broker::DataValue::Float(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Float(*value)), + }, + + broker::DataValue::Double(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Double(*value)), + }, + + broker::DataValue::StringArray(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::StringArray(StringArray { + values: array.clone(), + })), + }, + + broker::DataValue::BoolArray(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::BoolArray(BoolArray { + values: array.clone(), + })), + }, + + broker::DataValue::Int32Array(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::Int32Array(Int32Array { + values: array.clone(), + })), + }, + + broker::DataValue::Int64Array(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::Int64Array(Int64Array { + values: array.clone(), + })), + }, + + broker::DataValue::Uint32Array(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::Uint32Array(Uint32Array { + values: array.clone(), + })), + }, + + broker::DataValue::Uint64Array(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::Uint64Array(Uint64Array { + values: array.clone(), + })), + }, + + broker::DataValue::FloatArray(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::FloatArray(FloatArray { + values: array.clone(), + })), + }, + + broker::DataValue::DoubleArray(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::DoubleArray(DoubleArray { + values: array.clone(), + })), + }, + + broker::DataValue::NotAvailable => proto::Value { typed_value: None }, + } + } +} + +impl broker::ActuationError { + pub fn to_tonic_status(&self, message: String) -> tonic::Status { + match self { + broker::ActuationError::NotFound => tonic::Status::not_found(message), + broker::ActuationError::WrongType => tonic::Status::invalid_argument(message), + broker::ActuationError::OutOfBounds => tonic::Status::out_of_range(message), + broker::ActuationError::UnsupportedType => tonic::Status::invalid_argument(message), + broker::ActuationError::PermissionDenied => tonic::Status::permission_denied(message), + broker::ActuationError::PermissionExpired => tonic::Status::unauthenticated(message), + broker::ActuationError::ProviderNotAvailable => tonic::Status::unavailable(message), + broker::ActuationError::ProviderAlreadyExists => tonic::Status::already_exists(message), + broker::ActuationError::TransmissionFailure => tonic::Status::data_loss(message), + } + } +} diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index 36a973b9..eea58a0d 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -14,9 +14,12 @@ use std::{collections::HashMap, pin::Pin}; use crate::{ - broker::{self, AuthorizedAccess, ReadError, SubscriptionError}, + broker::{ + self, ActuationChange, ActuationProvider, AuthorizedAccess, ReadError, SubscriptionError, + }, glob::Matcher, permissions::Permissions, + types::DataValue, }; use databroker_proto::kuksa::val::v2::{ @@ -27,7 +30,10 @@ use databroker_proto::kuksa::val::v2::{ open_provider_stream_response, OpenProviderStreamResponse, PublishValuesResponse, }; -use kuksa::proto::v2::{ListMetadataResponse, Metadata}; +use kuksa::proto::v2::{ + signal_id, ActuateRequest, ActuateResponse, BatchActuateStreamRequest, ListMetadataResponse, + Metadata, ProvideActuationResponse, +}; use std::collections::HashSet; use tokio::{select, sync::mpsc}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; @@ -35,6 +41,53 @@ use tracing::debug; const MAX_REQUEST_PATH_LENGTH: usize = 1000; +pub struct Provider { + sender: mpsc::Sender>, +} + +#[async_trait::async_trait] +impl ActuationProvider for Provider { + async fn actuate( + &self, + actuation_changes: Vec, + ) -> Result<(), (broker::ActuationError, String)> { + let mut actuation_requests: Vec = vec![]; + for actuation_change in actuation_changes { + let data_value = actuation_change.data_value; + actuation_requests.push(ActuateRequest { + signal_id: Some(proto::SignalId { + signal: Some(signal_id::Signal::Id(actuation_change.id)), + }), + value: Some(proto::Value::from(data_value)), + }); + } + + let batch_actuate_stream_request = + open_provider_stream_response::Action::BatchActuateStreamRequest( + BatchActuateStreamRequest { + actuate_requests: actuation_requests, + }, + ); + + let response = OpenProviderStreamResponse { + action: Some(batch_actuate_stream_request), + }; + + let result = self.sender.send(Ok(response)).await; + if result.is_err() { + return Err(( + broker::ActuationError::TransmissionFailure, + "An error occured while sending the data".to_string(), + )); + } + return Ok(()); + } + + fn is_available(&self) -> bool { + !self.sender.is_closed() + } +} + #[tonic::async_trait] impl proto::val_server::Val for broker::DataBroker { async fn get_value( @@ -135,10 +188,7 @@ impl proto::val_server::Val for broker::DataBroker { &self, _request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Unimplemented", - )) + Err(tonic::Status::unimplemented("Unimplemented")) } type SubscribeStream = Pin< @@ -194,16 +244,11 @@ impl proto::val_server::Val for broker::DataBroker { let stream = convert_to_proto_stream(stream, size); Ok(tonic::Response::new(Box::pin(stream))) } - Err(SubscriptionError::NotFound) => { - Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")) - } - Err(SubscriptionError::InvalidInput) => Err(tonic::Status::new( - tonic::Code::InvalidArgument, - "Invalid Argument", - )), - Err(SubscriptionError::InternalError) => { - Err(tonic::Status::new(tonic::Code::Internal, "Internal Error")) + Err(SubscriptionError::NotFound) => Err(tonic::Status::not_found("Path not found")), + Err(SubscriptionError::InvalidInput) => { + Err(tonic::Status::invalid_argument("Invalid Argument")) } + Err(SubscriptionError::InternalError) => Err(tonic::Status::internal("Internal Error")), } } @@ -220,30 +265,133 @@ impl proto::val_server::Val for broker::DataBroker { &self, _request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Unimplemented", - )) + Err(tonic::Status::unimplemented("Unimplemented")) } + // Actuate a single actuator + // + // Returns (GRPC error code): + // NOT_FOUND if the actuator does not exist. + // PERMISSION_DENIED if access is denied for the actuator. + // UNAVAILABLE if there is no provider currently providing the actuator + // INVALID_ARGUMENT + // - if the data type used in the request does not match + // the data type of the addressed signal + // - if the requested value is not accepted, + // e.g. if sending an unsupported enum value async fn actuate( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Unimplemented", - )) + debug!(?request); + let permissions = request + .extensions() + .get::() + .ok_or(tonic::Status::unauthenticated("Unauthenticated"))? + .clone(); + let broker = self.authorized_access(&permissions); + + let actuator_request = request.into_inner(); + let value = actuator_request + .value + .ok_or(tonic::Status::invalid_argument("No value provided"))?; + + let signal = actuator_request + .signal_id + .ok_or(tonic::Status::invalid_argument("No signal_id provided"))? + .signal; + + match &signal { + Some(proto::signal_id::Signal::Path(path)) => { + let id = broker + .get_id_by_path(path) + .await + .ok_or(tonic::Status::not_found(format!( + "Invalid path in signal_id provided {}", + path + )))?; + + match broker.actuate(&id, &DataValue::from(value)).await { + Ok(()) => Ok(tonic::Response::new(ActuateResponse {})), + Err(error) => Err(error.0.to_tonic_status(error.1)), + } + } + Some(proto::signal_id::Signal::Id(id)) => { + match broker.actuate(id, &DataValue::from(value)).await { + Ok(()) => Ok(tonic::Response::new(ActuateResponse {})), + Err(error) => Err(error.0.to_tonic_status(error.1)), + } + } + None => Err(tonic::Status::invalid_argument( + "SignalID contains neither path or id", + )), + } } + // Actuate simultaneously multiple actuators. + // If any error occurs, the entire operation will be aborted + // and no single actuator value will be forwarded to the provider. + // + // Returns (GRPC error code): + // NOT_FOUND if any of the actuators are non-existant. + // PERMISSION_DENIED if access is denied for any of the actuators. + // UNAVAILABLE if there is no provider currently providing an actuator + // INVALID_ARGUMENT + // - if the data type used in the request does not match + // the data type of the addressed signal + // - if the requested value is not accepted, + // e.g. if sending an unsupported enum value async fn batch_actuate( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Unimplemented", - )) + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + let broker = self.authorized_access(&permissions); + let actuate_requests = request.into_inner().actuate_requests; + + let mut actuation_changes: Vec = vec![]; + for actuate_request in actuate_requests { + let vss_id = match actuate_request.signal_id { + Some(signal_id) => match signal_id.signal { + Some(proto::signal_id::Signal::Id(vss_id)) => vss_id, + Some(proto::signal_id::Signal::Path(vss_path)) => { + let result = broker.get_id_by_path(&vss_path).await; + match result { + Some(vss_id) => vss_id, + None => { + let message = + format!("Could not resolve vss_id for path: {}", vss_path); + return Err(tonic::Status::not_found(message)); + } + } + } + None => return Err(tonic::Status::invalid_argument("Signal not provided")), + }, + None => return Err(tonic::Status::invalid_argument("Signal_Id not provided")), + }; + let data_value = match actuate_request.value { + Some(data_value) => DataValue::from(data_value), + None => return Err(tonic::Status::invalid_argument("")), + }; + let actuation_change = ActuationChange { + id: vss_id, + data_value, + }; + actuation_changes.push(actuation_change); + } + + let result = broker.batch_actuate(actuation_changes).await; + match result { + Ok(_) => Ok(tonic::Response::new(proto::BatchActuateResponse {})), + Err(error) => return Err(error.0.to_tonic_status(error.1)), + } } /// List metadata of signals matching the wildcard branch request. @@ -419,8 +567,7 @@ impl proto::val_server::Val for broker::DataBroker { }) .await; if metadata_response.is_empty() { - Err(tonic::Status::new( - tonic::Code::NotFound, + Err(tonic::Status::not_found( "Specified root branch does not exist", )) } else { @@ -429,10 +576,7 @@ impl proto::val_server::Val for broker::DataBroker { })) } } - Err(_) => Err(tonic::Status::new( - tonic::Code::InvalidArgument, - "Invalid Pattern Argument", - )), + Err(_) => Err(tonic::Status::invalid_argument("Invalid Pattern Argument")), } } @@ -646,7 +790,6 @@ impl proto::val_server::Val for broker::DataBroker { // Copy (to move into task below) let broker = self.clone(); - // Create stream (to be returned) let (response_stream_sender, response_stream_receiver) = mpsc::channel(10); @@ -662,11 +805,12 @@ impl proto::val_server::Val for broker::DataBroker { match request { Some(req) => { match req.action { - Some(ProvideActuationRequest(_provide_actuation_request)) => { - if let Err(err) = response_stream_sender.send(Err(tonic::Status::new(tonic::Code::Unimplemented, "Unimplemented"))).await { - debug!("Failed to send error response: {}", err); + Some(ProvideActuationRequest(provided_actuation)) => { + let response = provide_actuation(&broker, &provided_actuation, response_stream_sender.clone()).await; + if let Err(err) = response_stream_sender.send(response).await + { + debug!("Failed to send response: {}", err) } - break; }, Some(PublishValuesRequest(publish_values_request)) => { let response = publish_values(&broker, &publish_values_request).await; @@ -676,10 +820,7 @@ impl proto::val_server::Val for broker::DataBroker { } }, Some(BatchActuateStreamResponse(_batch_actuate_stream_response)) => { - if let Err(err) = response_stream_sender.send(Err(tonic::Status::new(tonic::Code::Unimplemented, "Unimplemented"))).await { - debug!("Failed to send error response: {}", err); - } - break; + // TODO discuss and implement }, None => { @@ -715,10 +856,75 @@ impl proto::val_server::Val for broker::DataBroker { &self, _request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Unimplemented", - )) + Err(tonic::Status::unimplemented("Unimplemented")) + } +} + +async fn provide_actuation( + broker: &AuthorizedAccess<'_, '_>, + request: &databroker_proto::kuksa::val::v2::ProvideActuationRequest, + sender: mpsc::Sender>, +) -> Result { + let vss_paths: Vec<_> = request + .actuator_identifiers + .iter() + .filter_map(|signal_id| match &signal_id.signal { + Some(proto::signal_id::Signal::Path(path)) => Some(path.clone()), + _ => None, + }) + .collect(); + + let future_vss_ids = vss_paths + .iter() + .map(|vss_path| broker.get_id_by_path(vss_path)); + let resolved_opt_vss_ids = futures::future::join_all(future_vss_ids).await; + + for (index, opt_vss_id) in resolved_opt_vss_ids.iter().enumerate() { + if opt_vss_id.is_none() { + let message = format!( + "Could not resolve id of vss_path: {}", + vss_paths.get(index).unwrap() + ); + return Err(tonic::Status::not_found(message)); + } + } + + let resolved_vss_ids: Vec = resolved_opt_vss_ids.iter().filter_map(|&opt| opt).collect(); + + let vss_ids: Vec<_> = request + .actuator_identifiers + .iter() + .filter_map(|signal_id| match &signal_id.signal { + Some(proto::signal_id::Signal::Id(id)) => Some(*id), + _ => None, + }) + .collect(); + + let mut all_vss_ids = vec![]; + all_vss_ids.extend(vss_ids); + all_vss_ids.extend(resolved_vss_ids); + + let provider = Provider { sender }; + + match broker + .provide_actuation(all_vss_ids, Box::new(provider)) + .await + { + Ok(_) => { + let provide_actuation_response = ProvideActuationResponse {}; + + let response = OpenProviderStreamResponse { + action: Some( + open_provider_stream_response::Action::ProvideActuationResponse( + provide_actuation_response, + ), + ), + }; + + Ok(response) + } + + Err(error) => Err(error.0.to_tonic_status(error.1)), } } @@ -746,6 +952,7 @@ async fn publish_values( }) .collect(); + // TODO check if provider is allowed to update the entries for the provided signals? match broker.update_entries(ids).await { Ok(_) => OpenProviderStreamResponse { action: Some( @@ -781,26 +988,22 @@ async fn get_signal( match signal { proto::signal_id::Signal::Path(path) => { if path.len() > MAX_REQUEST_PATH_LENGTH { - return Err(tonic::Status::new( - tonic::Code::InvalidArgument, + return Err(tonic::Status::invalid_argument( "The provided path is too long", )); } match broker.get_id_by_path(&path).await { Some(id) => Ok(id), - None => Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")), + None => Err(tonic::Status::not_found("Path not found")), } } proto::signal_id::Signal::Id(id) => match broker.get_metadata(id).await { Some(_metadata) => Ok(id), - None => Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")), + None => Err(tonic::Status::not_found("Path not found")), }, } } else { - Err(tonic::Status::new( - tonic::Code::InvalidArgument, - "No SignalId provided", - )) + Err(tonic::Status::invalid_argument("No SignalId provided")) } } @@ -838,7 +1041,10 @@ mod tests { use proto::open_provider_stream_response::Action::{ BatchActuateStreamRequest, ProvideActuationResponse, PublishValuesResponse, }; - use proto::{open_provider_stream_request, OpenProviderStreamRequest, PublishValuesRequest}; + use proto::{ + open_provider_stream_request, BatchActuateRequest, OpenProviderStreamRequest, + PublishValuesRequest, SignalId, Value, + }; // Helper for adding an int32 signal and adding value async fn helper_add_int32( @@ -2072,4 +2278,435 @@ mod tests { } } } + + #[tokio::test] + async fn test_actuate_signal_not_found() { + let broker = DataBroker::default(); + + let mut request = tonic::Request::new(ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.Cabin.Non.Existing".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::actuate(&broker, request).await; + assert!(result_response.is_err()); + assert_eq!(result_response.unwrap_err().code(), tonic::Code::NotFound) + } + + #[tokio::test] + async fn test_actuate_can_provider_unavailable() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let mut request = tonic::Request::new(ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::actuate(&broker, request).await; + assert!(result_response.is_err()); + assert_eq!( + result_response.unwrap_err().code(), + tonic::Code::Unavailable + ) + } + + #[tokio::test] + async fn test_actuate_success() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let vss_id = authorized_access + .get_id_by_path("Vehicle.ADAS.ABS.IsEnabled") + .await + .expect("Resolving the id of Vehicle.ADAS.ABS.IsEnabled should succeed"); + let vss_ids = vec![vss_id]; + + let (sender, mut receiver) = mpsc::channel(10); + let actuation_provider = Provider { sender }; + authorized_access + .provide_actuation(vss_ids, Box::new(actuation_provider)) + .await + .expect("Registering a new Actuation Provider should succeed"); + + let mut request = tonic::Request::new(ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::actuate(&broker, request).await; + assert!(result_response.is_ok()); + + let result_response = receiver.recv().await.expect("Option should be Some"); + result_response.expect("Result should be Ok"); + } + + #[tokio::test] + async fn test_batch_actuate_signal_not_found() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let mut request = tonic::Request::new(BatchActuateRequest { + actuate_requests: vec![ + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.Cabin.Non.Existing".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ], + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::batch_actuate(&broker, request).await; + assert!(result_response.is_err()); + assert_eq!(result_response.unwrap_err().code(), tonic::Code::NotFound) + } + + #[tokio::test] + async fn test_batch_actuate_provider_unavailable() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + authorized_access + .add_entry( + "Vehicle.ADAS.CruiseControl.IsActive".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let vss_id_abs = authorized_access + .get_id_by_path("Vehicle.ADAS.ABS.IsEnabled") + .await + .expect("Resolving the id of Vehicle.ADAS.ABS.IsEnabled should succeed"); + + let vss_ids = vec![vss_id_abs]; + + let (sender, _receiver) = mpsc::channel(10); + let actuation_provider = Provider { sender }; + authorized_access + .provide_actuation(vss_ids, Box::new(actuation_provider)) + .await + .expect("Registering a new Actuation Provider should succeed"); + + let mut request = tonic::Request::new(BatchActuateRequest { + actuate_requests: vec![ + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.CruiseControl.IsActive".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ], + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::batch_actuate(&broker, request).await; + assert!(result_response.is_err()); + assert_eq!( + result_response.unwrap_err().code(), + tonic::Code::Unavailable + ) + } + + #[tokio::test] + async fn test_batch_actuate_success() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + authorized_access + .add_entry( + "Vehicle.ADAS.CruiseControl.IsActive".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let vss_id_abs = authorized_access + .get_id_by_path("Vehicle.ADAS.ABS.IsEnabled") + .await + .expect("Resolving the id of Vehicle.ADAS.ABS.IsEnabled should succeed"); + let vss_id_cruise_control = authorized_access + .get_id_by_path("Vehicle.ADAS.CruiseControl.IsActive") + .await + .expect("Resolving the id of Vehicle.ADAS.CruiseControl.IsActive should succeed"); + + let vss_ids = vec![vss_id_abs, vss_id_cruise_control]; + + let (sender, mut receiver) = mpsc::channel(10); + let actuation_provider = Provider { sender }; + authorized_access + .provide_actuation(vss_ids, Box::new(actuation_provider)) + .await + .expect("Registering a new Actuation Provider should succeed"); + + let mut request = tonic::Request::new(BatchActuateRequest { + actuate_requests: vec![ + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.CruiseControl.IsActive".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ], + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::batch_actuate(&broker, request).await; + assert!(result_response.is_ok()); + + let result_response = receiver.recv().await.expect("Option should be Some"); + result_response.expect("Result should be Ok"); + } + + #[tokio::test] + async fn test_provide_actuation_signal_not_found() { + let broker = DataBroker::default(); + + let request = OpenProviderStreamRequest { + action: Some( + open_provider_stream_request::Action::ProvideActuationRequest( + proto::ProvideActuationRequest { + actuator_identifiers: vec![SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.Cabin.Non.Existing".to_string(), + )), + }], + }, + ), + ), + }; + + let mut streaming_request = tonic_mock::streaming_request(vec![request]); + streaming_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match proto::val_server::Val::open_provider_stream(&broker, streaming_request).await { + Ok(response) => { + let stream = response.into_inner(); + let mut receiver = stream.into_inner(); + let result_response = receiver + .recv() + .await + .expect("result_response should be Some"); + assert!(result_response.is_err()); + assert_eq!(result_response.unwrap_err().code(), tonic::Code::NotFound) + } + Err(_) => { + panic!("Should not happen") + } + } + } + + #[tokio::test] + async fn test_provide_actuation_success() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let request = OpenProviderStreamRequest { + action: Some( + open_provider_stream_request::Action::ProvideActuationRequest( + proto::ProvideActuationRequest { + actuator_identifiers: vec![SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }], + }, + ), + ), + }; + + let mut streaming_request = tonic_mock::streaming_request(vec![request]); + streaming_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match proto::val_server::Val::open_provider_stream(&broker, streaming_request).await { + Ok(response) => { + let stream = response.into_inner(); + let mut receiver = stream.into_inner(); + let result_response = receiver + .recv() + .await + .expect("result_response should be Some"); + + assert!(result_response.is_ok()) + } + Err(_) => { + panic!("Should not happen") + } + } + } } diff --git a/databroker/src/permissions.rs b/databroker/src/permissions.rs index 7da1eae1..8157b811 100644 --- a/databroker/src/permissions.rs +++ b/databroker/src/permissions.rs @@ -165,7 +165,9 @@ impl Permissions { } pub fn can_read(&self, path: &str) -> Result<(), PermissionError> { - self.expired()?; + if self.is_expired() { + return Err(PermissionError::Expired); + } if self.read.is_match(path) { return Ok(()); @@ -187,7 +189,9 @@ impl Permissions { } pub fn can_write_actuator_target(&self, path: &str) -> Result<(), PermissionError> { - self.expired()?; + if self.is_expired() { + return Err(PermissionError::Expired); + } if self.actuate.is_match(path) { return Ok(()); @@ -196,7 +200,9 @@ impl Permissions { } pub fn can_write_datapoint(&self, path: &str) -> Result<(), PermissionError> { - self.expired()?; + if self.is_expired() { + return Err(PermissionError::Expired); + } if self.provide.is_match(path) { return Ok(()); @@ -205,7 +211,9 @@ impl Permissions { } pub fn can_create(&self, path: &str) -> Result<(), PermissionError> { - self.expired()?; + if self.is_expired() { + return Err(PermissionError::Expired); + } if self.create.is_match(path) { return Ok(()); @@ -214,13 +222,13 @@ impl Permissions { } #[inline] - pub fn expired(&self) -> Result<(), PermissionError> { + pub fn is_expired(&self) -> bool { if let Some(expires_at) = self.expires_at { if expires_at < SystemTime::now() { - return Err(PermissionError::Expired); + return true; } } - Ok(()) + false } } diff --git a/proto/kuksa/val/v2/val.proto b/proto/kuksa/val/v2/val.proto index 5b9a65d1..7663e4d8 100644 --- a/proto/kuksa/val/v2/val.proto +++ b/proto/kuksa/val/v2/val.proto @@ -77,7 +77,7 @@ service VAL { // // Returns (GRPC error code): // NOT_FOUND if the actuator does not exist. - // PERMISSION_DENIED if access is denied for of the actuator. + // PERMISSION_DENIED if access is denied for the actuator. // UNAVAILABLE if there is no provider currently providing the actuator // INVALID_ARGUMENT // - if the data type used in the request does not match