From 1db79ba75d0c54d8a32701d93c319726b472cfda Mon Sep 17 00:00:00 2001 From: Andre Weber Date: Mon, 16 Sep 2024 07:29:56 +0200 Subject: [PATCH 1/2] Implement Provider Actuation --- Cargo.lock | 5 +- databroker/Cargo.toml | 3 +- databroker/src/broker.rs | 208 ++++++++++++++- .../src/grpc/kuksa_val_v2/conversions.rs | 168 +++++++++++++ databroker/src/grpc/kuksa_val_v2/val.rs | 238 ++++++++++++++++-- proto/kuksa/val/v2/val.proto | 8 +- 6 files changed, 595 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 762a77c2..9b7c47e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,9 +147,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.80" +version = "0.1.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", @@ -594,6 +594,7 @@ name = "databroker" version = "0.4.6" dependencies = [ "anyhow", + "async-trait", "axum", "chrono", "clap", diff --git a/databroker/Cargo.toml b/databroker/Cargo.toml index 3b152b4d..50b6c0cd 100644 --- a/databroker/Cargo.toml +++ b/databroker/Cargo.toml @@ -65,13 +65,14 @@ axum = { version = "0.6.20", optional = true, features = ["ws"] } futures = { version = "0.3.28", optional = true } chrono = { version = "0.4.31", optional = true, features = ["std"] } uuid = { version = "1.4.1", optional = true, features = ["v4"] } +async-trait = "0.1.82" # systemd related dependency, only relevant on linux systems [target.'cfg(target_os = "linux")'.dependencies] sd-notify = "0.4.1" [features] -default = ["tls"] +default = ["tls", "dep:futures"] tls = ["tonic/tls"] jemalloc = ["dep:jemallocator"] viss = ["dep:axum", "dep:chrono", "dep:futures", "dep:uuid"] diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index cbeb9c8c..0aeab51b 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -132,6 +132,7 @@ pub struct Database { #[derive(Default)] pub struct Subscriptions { + actuation_subscriptions: Vec, query_subscriptions: Vec, change_subscriptions: Vec, } @@ -179,6 +180,23 @@ pub struct DataBroker { shutdown_trigger: broadcast::Sender<()>, } +#[async_trait::async_trait] +pub trait ActuationProvider { + async fn actuate(&self, actuation_changes: Vec) -> Result<(), tonic::Status>; +} + +#[derive(Clone)] +pub struct ActuationChange { + pub id: i32, + pub data_value: DataValue, +} + +pub struct ActuationSubscription { + vss_ids: Vec, + actuation_provider: Box, + permissions: Permissions, +} + pub struct QuerySubscription { query: query::CompiledQuery, sender: mpsc::Sender, @@ -630,6 +648,10 @@ pub enum SuccessfulUpdate { } impl Subscriptions { + pub fn add_actuation_subscription(&mut self, subscription: ActuationSubscription) { + self.actuation_subscriptions.push(subscription); + } + pub fn add_query_subscription(&mut self, subscription: QuerySubscription) { self.query_subscriptions.push(subscription) } @@ -679,11 +701,13 @@ impl Subscriptions { } pub fn clear(&mut self) { + self.actuation_subscriptions.clear(); self.query_subscriptions.clear(); self.change_subscriptions.clear(); } pub fn cleanup(&mut self) { + // TODO how to cleanup actuation_subscriptions? self.query_subscriptions.retain(|sub| { if sub.sender.is_closed() { info!("Subscriber gone: removing subscription"); @@ -1561,6 +1585,185 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { Err(e) => Err(QueryError::CompilationError(format!("{e:?}"))), } } + + pub async fn provide_actuation( + &self, + vss_ids: Vec, + actuation_provider: Box, + ) -> Result<(), tonic::Status> { + for vss_id in vss_ids.clone() { + let result_entry = self.get_entry_by_id(vss_id).await; + match result_entry { + Ok(entry) => { + let vss_path = entry.metadata.path; + let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path); + if result_can_write_actuator.is_err() { + let can_write_actuator = result_can_write_actuator.unwrap_err(); + let message = format!("Can not provide actuation for vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator); + return Err(tonic::Status::permission_denied(message)); + } + }, + Err(error) => { + let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error); + return Err(tonic::Status::invalid_argument(message)); + }, + } + } + + let provided_vss_ids: Vec = self.broker.subscriptions.blocking_read().actuation_subscriptions.iter().flat_map(|subscription| subscription.vss_ids.clone()).collect(); + let intersection: Vec<_> = vss_ids.iter().filter(|&x| provided_vss_ids.contains(x)).collect(); + if !intersection.is_empty() { + let message = format!("ActuationProvider(s) for the following vss_ids already registered: {:?}", intersection); + return Err(tonic::Status::invalid_argument(message)); + } + + let actuation_subscription: ActuationSubscription = ActuationSubscription { + vss_ids, + actuation_provider, + permissions: self.permissions.clone(), + }; + self.broker.subscriptions.blocking_write().add_actuation_subscription(actuation_subscription); + + Ok(()) + } + + async fn map_actuation_changes_by_vss_id( + &self, + actuation_changes: Vec + ) -> HashMap> { + let mut actuation_changes_per_vss_id: HashMap> = HashMap::new(); + for ele in actuation_changes { + let vss_id = ele.id; + + let opt_vss_ids = actuation_changes_per_vss_id.get_mut(&vss_id); + match opt_vss_ids { + Some(vss_ids) => { + vss_ids.push(ele.clone()); + }, + None => { + let vec = vec![ele.clone()]; + actuation_changes_per_vss_id.insert(vss_id, vec); + }, + } + } + return actuation_changes_per_vss_id; + } + + pub async fn batch_actuate( + &self, + actuation_changes: Vec + ) -> Result<(), tonic::Status> { + let actuation_subscriptions = &self.broker.subscriptions.blocking_read().actuation_subscriptions; + + for actuation_change in &actuation_changes { + let vss_id = actuation_change.id; + let result_entry = self.get_entry_by_id(vss_id).await; + match result_entry { + Ok(entry) => { + let vss_path = entry.metadata.path; + let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path); + if result_can_write_actuator.is_err() { + let can_write_actuator = result_can_write_actuator.unwrap_err(); + let message = format!("Can not actuate vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator); + return Err(tonic::Status::permission_denied(message)); + } + }, + Err(error) => { + let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error); + return Err(tonic::Status::invalid_argument(message)); + }, + } + } + + let actuation_changes_per_vss_id = &self.map_actuation_changes_by_vss_id(actuation_changes).await; + for ele in actuation_changes_per_vss_id { + let vss_id = ele.0.clone(); + let actuation_changes = ele.1.clone(); + + let opt_actuation_subscription = actuation_subscriptions.iter().find(|subscription| subscription.vss_ids.contains(&vss_id)); + match opt_actuation_subscription { + Some(actuation_subscription) => { + actuation_subscription.actuation_provider.actuate(actuation_changes).await? + }, + None => { + let message = format!("No actuation provider available for vss_id: {}", vss_id); + return Err(tonic::Status::unavailable(message)); + }, + } + } + + return Ok(()) + } + + pub async fn actuate( + &self, + vss_id: &i32, + data_value: &DataValue, + ) -> Result<(), tonic::Status> { + + let vss_id = vss_id.clone(); + let result_entry = self.get_entry_by_id(vss_id).await; + match result_entry { + Ok(entry) => { + let vss_path = entry.metadata.path; + let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path); + if result_can_write_actuator.is_err() { + let can_write_actuator = result_can_write_actuator.unwrap_err(); + let message = format!("Can not actuate vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator); + return Err(tonic::Status::permission_denied(message)); + } + }, + Err(error) => { + let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error); + return Err(tonic::Status::invalid_argument(message)); + }, + } + + let entry_update = EntryUpdate { + path: None, + datapoint: None, + actuator_target: Some(Some(Datapoint { + ts: SystemTime::now(), + source_ts: None, + value: data_value.clone(), + })), + entry_type: None, + data_type: None, + description: None, + allowed: None, + unit: None, + }; + + let entry_updates = [(vss_id.clone(), entry_update)]; + let legacy_result = self.update_entries(entry_updates).await; + if legacy_result.is_err() { + let update_errors = legacy_result.unwrap_err(); + let opt_error = update_errors.get(0); + if opt_error.is_some() { + let error = opt_error.unwrap(); + let message = format!("Could not set actuator target for vss_id {}: {:?}", vss_id, error.1); + warn!(message); + } + } + + let guard = self.broker.subscriptions.blocking_read(); + let opt_actuation_subscription = guard.actuation_subscriptions.iter().find(|subscription|subscription.vss_ids.contains(&vss_id)); + match opt_actuation_subscription { + Some(ref actuation_subscription) => { + actuation_subscription.actuation_provider.actuate(vec![ + ActuationChange { + id: vss_id.clone(), + data_value: data_value.clone() + }, + ] + ).await + }, + None => { + let message = format!("No actuation provider found for vss_id {}", vss_id); + return Err(tonic::Status::new(tonic::Code::Unavailable, message)) + } + } + } } impl DataBroker { @@ -1588,13 +1791,14 @@ impl DataBroker { pub fn start_housekeeping_task(&self) { info!("Starting housekeeping task"); let subscriptions = self.subscriptions.clone(); + tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); loop { interval.tick().await; - // Cleanup dropped subscriptions - subscriptions.write().await.cleanup(); + + subscriptions.write().await.cleanup(); // Cleanup dropped subscriptions } }); } diff --git a/databroker/src/grpc/kuksa_val_v2/conversions.rs b/databroker/src/grpc/kuksa_val_v2/conversions.rs index 1d7aae72..70d830e5 100644 --- a/databroker/src/grpc/kuksa_val_v2/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v2/conversions.rs @@ -11,6 +11,7 @@ // * SPDX-License-Identifier: Apache-2.0 // ********************************************************************************/ use databroker_proto::kuksa::val::v2 as proto; +use kuksa::proto::v2::{BoolArray, DoubleArray, FloatArray, Int32Array, Int64Array, StringArray, Uint32Array, Uint64Array}; use proto::datapoint::{ValueState::Failure, ValueState::Value}; use crate::broker; @@ -306,3 +307,170 @@ impl From for proto::EntryType { } } } + +impl From for broker::DataValue { + fn from(value: proto::Value) -> Self { + match &value.typed_value { + Some(proto::value::TypedValue::String(value)) => { + broker::DataValue::String(value.to_owned()) + } + Some(proto::value::TypedValue::Bool(value)) => broker::DataValue::Bool(*value), + Some(proto::value::TypedValue::Int32(value)) => broker::DataValue::Int32(*value), + Some(proto::value::TypedValue::Int64(value)) => broker::DataValue::Int64(*value), + Some(proto::value::TypedValue::Uint32(value)) => broker::DataValue::Uint32(*value), + Some(proto::value::TypedValue::Uint64(value)) => broker::DataValue::Uint64(*value), + Some(proto::value::TypedValue::Float(value)) => broker::DataValue::Float(*value), + Some(proto::value::TypedValue::Double(value)) => broker::DataValue::Double(*value), + Some(proto::value::TypedValue::StringArray(array)) => { + broker::DataValue::StringArray(array.values.clone()) + } + Some(proto::value::TypedValue::BoolArray(array)) => { + broker::DataValue::BoolArray(array.values.clone()) + } + Some(proto::value::TypedValue::Int32Array(array)) => { + broker::DataValue::Int32Array(array.values.clone()) + } + Some(proto::value::TypedValue::Int64Array(array)) => { + broker::DataValue::Int64Array(array.values.clone()) + } + Some(proto::value::TypedValue::Uint32Array(array)) => { + broker::DataValue::Uint32Array(array.values.clone()) + } + Some(proto::value::TypedValue::Uint64Array(array)) => { + broker::DataValue::Uint64Array(array.values.clone()) + } + Some(proto::value::TypedValue::FloatArray(array)) => { + broker::DataValue::FloatArray(array.values.clone()) + } + Some(proto::value::TypedValue::DoubleArray(array)) => { + broker::DataValue::DoubleArray(array.values.clone()) + } + None => todo!(), + } + } +} + +impl From for proto::Value { + fn from(value: broker::DataValue) -> Self { + match &value { + broker::DataValue::String(value) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::String(value.to_owned())) + } + } + + broker::DataValue::Bool(value) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(*value)) + } + } + + broker::DataValue::Int32(value) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(*value)) + } + } + + broker::DataValue::Int64(value) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Int64(*value)) + } + } + + broker::DataValue::Uint32(value) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Uint32(*value)) + } + } + + broker::DataValue::Uint64(value) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Uint64(*value)) + } + } + + broker::DataValue::Float(value) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Float(*value)) + } + } + + broker::DataValue::Double(value) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Double(*value)) + } + } + + broker::DataValue::StringArray(array) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::StringArray(StringArray { + values: array.clone(), + })) + } + } + + broker::DataValue::BoolArray(array) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::BoolArray(BoolArray { + values: array.clone(), + })) + } + } + + broker::DataValue::Int32Array(array) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Int32Array(Int32Array { + values: array.clone(), + })) + } + } + + broker::DataValue::Int64Array(array) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Int64Array(Int64Array { + values: array.clone(), + })) + } + } + + broker::DataValue::Uint32Array(array) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Uint32Array(Uint32Array { + values: array.clone(), + })) + } + } + + broker::DataValue::Uint64Array(array) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::Uint64Array(Uint64Array { + values: array.clone(), + })) + } + } + + + broker::DataValue::FloatArray(array) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::FloatArray(FloatArray { + values: array.clone(), + })) + } + } + + broker::DataValue::DoubleArray(array) => { + proto::Value { + typed_value: Some(proto::value::TypedValue::DoubleArray(DoubleArray { + values: array.clone(), + })) + } + } + + broker::DataValue::NotAvailable => { + proto::Value { + typed_value: None + } + }, + } + } +} diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index 95f2061b..7f742c10 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -14,27 +14,65 @@ use std::{collections::HashMap, pin::Pin}; use crate::{ - broker::{self, AuthorizedAccess, SubscriptionError}, - glob::Matcher, - permissions::Permissions, + broker::{self, ActuationChange, ActuationProvider, AuthorizedAccess, SubscriptionError}, glob::Matcher, permissions::Permissions, types::DataValue }; use databroker_proto::kuksa::val::v2::{ self as proto, open_provider_stream_request::Action::{ - BatchActuateStreamResponse, ProvidedActuation, PublishValuesRequest, + BatchActuateStreamResponse, ProvideActuation, PublishValuesRequest, }, open_provider_stream_response, OpenProviderStreamResponse, PublishValuesResponse, }; -use kuksa::proto::v2::{ListMetadataResponse, Metadata}; +use kuksa::proto::v2::{signal_id, ActuateRequest, ActuateResponse, BatchActuateStreamRequest, ListMetadataResponse, Metadata}; use std::collections::HashSet; use tokio::{select, sync::mpsc}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; -use tracing::debug; +use tracing::{debug, warn}; const MAX_REQUEST_PATH_LENGTH: usize = 1000; +pub struct Provider { + sender: mpsc::Sender>, +} + +#[async_trait::async_trait] +impl ActuationProvider for Provider { + async fn actuate(&self, actuation_changes:Vec) -> Result<(), tonic::Status> { + let mut actuation_requests: Vec = vec![]; + for actuation_change in actuation_changes { + let data_value = actuation_change.data_value; + actuation_requests.push(ActuateRequest { + signal_id: + Some(proto::SignalId { + signal: Some(signal_id::Signal::Id(actuation_change.id)), + }), + value: Some(proto::Value::from(data_value)) }); + } + + let batch_actuate_stream_request = open_provider_stream_response::Action::BatchActuateStreamRequest(BatchActuateStreamRequest { + actuate_requests: actuation_requests, + }); + + let response = OpenProviderStreamResponse { + action: Some(batch_actuate_stream_request) + }; + + let result = self.sender.send(Ok(response)).await; + if result.is_err() { + let send_error = result.unwrap_err().0; + if send_error.is_err() { + let status = send_error.unwrap_err(); + return Err(status); + } + return Err(tonic::Status::cancelled("Could not send actuation changes to actuation provider")); + } + return Ok(()); + } +} + + #[tonic::async_trait] impl proto::val_server::Val for broker::DataBroker { async fn get_value( @@ -152,24 +190,130 @@ impl proto::val_server::Val for broker::DataBroker { )) } + // Actuate a single actuator + // + // Returns (GRPC error code): + // NOT_FOUND if the actuator does not exist. + // PERMISSION_DENIED if access is denied for of the actuator. + // UNAVAILABLE if there is no provider currently providing the actuator + // INVALID_ARGUMENT + // - if the data type used in the request does not match + // the data type of the addressed signal + // - if the requested value is not accepted, + // e.g. if sending an unsupported enum value async fn actuate( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Unimplemented", - )) + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + let broker = self.authorized_access(&permissions); + + let actuator_request = request.into_inner(); + if let Some(value) = actuator_request.value { + let opt_signal_id = actuator_request.signal_id.clone(); + match opt_signal_id { + Some(signal_id) => { + match signal_id.signal { + Some(proto::signal_id::Signal::Path(vss_path)) => { + if let Some(id) = broker.get_id_by_path(&vss_path).await { + let result = broker.actuate(&id, &DataValue::from(value)).await; + match result { + Ok(_) => return Ok(tonic::Response::new(ActuateResponse { })), + Err(err) => return Err(err), + }; + } + return Err(tonic::Status::invalid_argument(format!("Invalid vss_path provided {}", vss_path))); + }, + Some(proto::signal_id::Signal::Id(vss_id)) => { + let result = broker.actuate(&vss_id, &DataValue::from(value)).await; + match result { + Ok(_) => return Ok(tonic::Response::new(ActuateResponse { })), + Err(err) => return Err(err), + }; + }, + None => return Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Signal needs to provide Path or Id", + )), + }; + }, + None => return Err(tonic::Status::new(tonic::Code::InvalidArgument, "No Signal_Id provided")), + }; + }; + return Err(tonic::Status::invalid_argument("Invalid Actuator Request provided")); } + // Actuate simultaneously multiple actuators. + // If any error occurs, the entire operation will be aborted + // and no single actuator value will be forwarded to the provider. + // + // Returns (GRPC error code): + // NOT_FOUND if any of the actuators are non-existant. + // PERMISSION_DENIED if access is denied for any of the actuators. + // UNAVAILABLE if there is no provider currently providing an actuator + // INVALID_ARGUMENT + // - if the data type used in the request does not match + // the data type of the addressed signal + // - if the requested value is not accepted, + // e.g. if sending an unsupported enum value async fn batch_actuate( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Unimplemented", - )) + + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + let broker = self.authorized_access(&permissions); + let actuate_requests = request.into_inner().actuate_requests; + + let mut actuation_changes: Vec = vec![]; + for ele in actuate_requests { + let vss_id = match ele.signal_id { + Some(signal_id) => match signal_id.signal { + Some(proto::signal_id::Signal::Id(vss_id)) => vss_id, + Some(proto::signal_id::Signal::Path(vss_path)) => { + let result = broker.get_id_by_path(&vss_path).await; + match result { + Some(vss_id) => vss_id, + None => { + let message = format!("Could not resolve vss_id for path: {}", vss_path); + return Err(tonic::Status::invalid_argument(message)); + }, + } + }, + None => return Err(tonic::Status::invalid_argument("Signal not provided")), + }, + None => return Err(tonic::Status::invalid_argument("Signal_Id not provided")), + }; + let data_value = match ele.value { + Some(data_value) => DataValue::from(data_value), + None => return Err(tonic::Status::invalid_argument("")), + }; + let actuation_change = ActuationChange { + id: vss_id, + data_value, + }; + actuation_changes.push(actuation_change); + } + + let result = broker.batch_actuate(actuation_changes).await; + match result { + Ok(_) => Ok(tonic::Response::new(proto::BatchActuateResponse { })), + Err(error) => return Err(error), + } } /// List metadata of signals matching the wildcard branch request. @@ -466,7 +610,6 @@ impl proto::val_server::Val for broker::DataBroker { // Copy (to move into task below) let broker = self.clone(); - // Create stream (to be returned) let (response_stream_sender, response_stream_receiver) = mpsc::channel(10); @@ -482,18 +625,18 @@ impl proto::val_server::Val for broker::DataBroker { match request { Some(req) => { match req.action { - Some(ProvidedActuation(_provided_actuation)) => { - if let Err(err) = response_stream_sender.send(Err(tonic::Status::new(tonic::Code::Unimplemented, "Unimplemented"))).await { - debug!("Failed to send error response: {}", err); - } - break; - }, - Some(PublishValuesRequest(publish_values_request)) => { + Some(PublishValuesRequest(publish_values_request)) => { let response = publish_values(&broker, &publish_values_request).await; if let Err(err) = response_stream_sender.send(Ok(response)).await { debug!("Failed to send response: {}", err); } + }, + Some(ProvideActuation(provided_actuation)) => { + if let Err(err) = provide_actuation(&broker, &provided_actuation, response_stream_sender).await { + debug!("Failed to provide actuation: {}", err) + } + break; }, Some(BatchActuateStreamResponse(_batch_actuate_stream_response)) => { if let Err(err) = response_stream_sender.send(Err(tonic::Status::new(tonic::Code::Unimplemented, "Unimplemented"))).await { @@ -542,6 +685,48 @@ impl proto::val_server::Val for broker::DataBroker { } } +async fn provide_actuation( + broker: &AuthorizedAccess<'_, '_>, + request: &databroker_proto::kuksa::val::v2::ProvideActuation, + sender: mpsc::Sender>, +) -> Result<(), tonic::Status> { + let vss_paths: Vec<_> = request.actuator_identifiers.iter() + .filter_map(|signal_id| match &signal_id.signal { + Some(proto::signal_id::Signal::Path(path)) => Some(path.clone()), + _ => None, + }) + .collect(); + + let future_vss_ids = vss_paths.iter().map(|vss_path| broker.get_id_by_path(&vss_path)); + let resolved_opt_vss_ids = futures::future::join_all(future_vss_ids).await; + + for (index, opt_vss_id) in resolved_opt_vss_ids.iter().enumerate() { + if opt_vss_id.is_none() { + let message = format!("could not resolve id of vss_path: {}", vss_paths.get(index).unwrap()); + warn!(message); + } + } + + let resolved_vss_ids: Vec = resolved_opt_vss_ids.iter().filter_map(|&opt| opt).collect(); + + let vss_ids: Vec<_> = request.actuator_identifiers.iter() + .filter_map(|signal_id| match &signal_id.signal { + Some(proto::signal_id::Signal::Id(id)) => Some(id.clone()), + _ => None, + }) + .collect(); + + let mut all_vss_ids = vec![]; + all_vss_ids.extend(vss_ids); + all_vss_ids.extend(resolved_vss_ids); + + let provider = Provider { + sender, + }; + + broker.provide_actuation(all_vss_ids, Box::new(provider)).await +} + async fn publish_values( broker: &AuthorizedAccess<'_, '_>, request: &databroker_proto::kuksa::val::v2::PublishValuesRequest, @@ -566,6 +751,7 @@ async fn publish_values( }) .collect(); + // TODO check if provider is allowed to update the entries for the provided signals? match broker.update_entries(ids).await { Ok(_) => OpenProviderStreamResponse { action: Some( @@ -656,7 +842,7 @@ mod tests { use crate::{broker::DataBroker, permissions}; use databroker_proto::kuksa::val::v2::val_server::Val; use proto::open_provider_stream_response::Action::{ - BatchActuateStreamRequest, ProvideActuatorResponse, PublishValuesResponse, + BatchActuateStreamRequest, ProvideActuationResponse, PublishValuesResponse, }; use proto::{open_provider_stream_request, OpenProviderStreamRequest, PublishValuesRequest}; @@ -1020,7 +1206,7 @@ mod tests { while let Some(value) = receiver.recv().await { match value { Ok(value) => match value.action { - Some(ProvideActuatorResponse(_)) => { + Some(ProvideActuationResponse(_)) => { panic!("Should not happen") } Some(PublishValuesResponse(publish_values_response)) => { diff --git a/proto/kuksa/val/v2/val.proto b/proto/kuksa/val/v2/val.proto index 67d1a86f..9ea46feb 100644 --- a/proto/kuksa/val/v2/val.proto +++ b/proto/kuksa/val/v2/val.proto @@ -199,11 +199,11 @@ message PublishValuesResponse { map status = 2; } -message ProvidedActuation { +message ProvideActuation { repeated SignalID actuator_identifiers = 1; } -message ProvideActuatorResponse { +message ProvideActuationResponse { } message BatchActuateStreamRequest { @@ -216,7 +216,7 @@ message BatchActuateStreamResponse { message OpenProviderStreamRequest { oneof action { // Inform server of an actuator this provider provides. - ProvidedActuation provided_actuation = 1; + ProvideActuation provide_actuation = 1; // Publish a value. PublishValuesRequest publish_values_request = 2; // Sent to acknowledge the acceptance of a batch actuate @@ -228,7 +228,7 @@ message OpenProviderStreamRequest { message OpenProviderStreamResponse { oneof action { // Response to a provide actuator request. - ProvideActuatorResponse provide_actuator_response = 1; + ProvideActuationResponse provide_actuation_response = 1; // Acknowledgement that a published value was received. PublishValuesResponse publish_values_response = 2; // Send a batch actuate request to a provider. From 575991d55ba7965696b7c6a42b87d23f4ec3bd1a Mon Sep 17 00:00:00 2001 From: Andre Weber Date: Fri, 27 Sep 2024 07:20:57 +0200 Subject: [PATCH 2/2] Write Tests --- databroker/src/broker.rs | 143 +++-- .../src/grpc/kuksa_val_v2/conversions.rs | 172 +++--- databroker/src/grpc/kuksa_val_v2/val.rs | 571 ++++++++++++++++-- 3 files changed, 683 insertions(+), 203 deletions(-) diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 0aeab51b..e7c11102 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -1596,24 +1596,42 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { match result_entry { Ok(entry) => { let vss_path = entry.metadata.path; - let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path); + let result_can_write_actuator = + self.permissions.can_write_actuator_target(&vss_path); if result_can_write_actuator.is_err() { let can_write_actuator = result_can_write_actuator.unwrap_err(); let message = format!("Can not provide actuation for vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator); return Err(tonic::Status::permission_denied(message)); } - }, + } Err(error) => { - let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error); + let message = format!( + "Could not resolve vss_path for vss_id {}: {:?}", + vss_id, error + ); return Err(tonic::Status::invalid_argument(message)); - }, + } } } - let provided_vss_ids: Vec = self.broker.subscriptions.blocking_read().actuation_subscriptions.iter().flat_map(|subscription| subscription.vss_ids.clone()).collect(); - let intersection: Vec<_> = vss_ids.iter().filter(|&x| provided_vss_ids.contains(x)).collect(); + let provided_vss_ids: Vec = self + .broker + .subscriptions + .read() + .await + .actuation_subscriptions + .iter() + .flat_map(|subscription| subscription.vss_ids.clone()) + .collect(); + let intersection: Vec<_> = vss_ids + .iter() + .filter(|&x| provided_vss_ids.contains(x)) + .collect(); if !intersection.is_empty() { - let message = format!("ActuationProvider(s) for the following vss_ids already registered: {:?}", intersection); + let message = format!( + "ActuationProvider(s) for the following vss_ids already registered: {:?}", + intersection + ); return Err(tonic::Status::invalid_argument(message)); } @@ -1622,14 +1640,18 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { actuation_provider, permissions: self.permissions.clone(), }; - self.broker.subscriptions.blocking_write().add_actuation_subscription(actuation_subscription); + self.broker + .subscriptions + .write() + .await + .add_actuation_subscription(actuation_subscription); Ok(()) } async fn map_actuation_changes_by_vss_id( &self, - actuation_changes: Vec + actuation_changes: Vec, ) -> HashMap> { let mut actuation_changes_per_vss_id: HashMap> = HashMap::new(); for ele in actuation_changes { @@ -1639,11 +1661,11 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { match opt_vss_ids { Some(vss_ids) => { vss_ids.push(ele.clone()); - }, + } None => { let vec = vec![ele.clone()]; actuation_changes_per_vss_id.insert(vss_id, vec); - }, + } } } return actuation_changes_per_vss_id; @@ -1651,9 +1673,14 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { pub async fn batch_actuate( &self, - actuation_changes: Vec + actuation_changes: Vec, ) -> Result<(), tonic::Status> { - let actuation_subscriptions = &self.broker.subscriptions.blocking_read().actuation_subscriptions; + let actuation_subscriptions = &self + .broker + .subscriptions + .read() + .await + .actuation_subscriptions; for actuation_change in &actuation_changes { let vss_id = actuation_change.id; @@ -1661,62 +1688,78 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { match result_entry { Ok(entry) => { let vss_path = entry.metadata.path; - let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path); + let result_can_write_actuator = + self.permissions.can_write_actuator_target(&vss_path); if result_can_write_actuator.is_err() { let can_write_actuator = result_can_write_actuator.unwrap_err(); - let message = format!("Can not actuate vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator); + let message = format!( + "Can not actuate vss_path '{}' due to permission error {:?}", + vss_path, can_write_actuator + ); return Err(tonic::Status::permission_denied(message)); } - }, + } Err(error) => { - let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error); + let message = format!( + "Could not resolve vss_path for vss_id {}: {:?}", + vss_id, error + ); return Err(tonic::Status::invalid_argument(message)); - }, + } } } - let actuation_changes_per_vss_id = &self.map_actuation_changes_by_vss_id(actuation_changes).await; + let actuation_changes_per_vss_id = &self + .map_actuation_changes_by_vss_id(actuation_changes) + .await; for ele in actuation_changes_per_vss_id { let vss_id = ele.0.clone(); let actuation_changes = ele.1.clone(); - let opt_actuation_subscription = actuation_subscriptions.iter().find(|subscription| subscription.vss_ids.contains(&vss_id)); + let opt_actuation_subscription = actuation_subscriptions + .iter() + .find(|subscription| subscription.vss_ids.contains(&vss_id)); match opt_actuation_subscription { Some(actuation_subscription) => { - actuation_subscription.actuation_provider.actuate(actuation_changes).await? - }, + actuation_subscription + .actuation_provider + .actuate(actuation_changes) + .await? + } None => { let message = format!("No actuation provider available for vss_id: {}", vss_id); return Err(tonic::Status::unavailable(message)); - }, + } } } - return Ok(()) + return Ok(()); } - pub async fn actuate( - &self, - vss_id: &i32, - data_value: &DataValue, - ) -> Result<(), tonic::Status> { - + pub async fn actuate(&self, vss_id: &i32, data_value: &DataValue) -> Result<(), tonic::Status> { let vss_id = vss_id.clone(); let result_entry = self.get_entry_by_id(vss_id).await; match result_entry { Ok(entry) => { let vss_path = entry.metadata.path; - let result_can_write_actuator = self.permissions.can_write_actuator_target(&vss_path); + let result_can_write_actuator = + self.permissions.can_write_actuator_target(&vss_path); if result_can_write_actuator.is_err() { let can_write_actuator = result_can_write_actuator.unwrap_err(); - let message = format!("Can not actuate vss_path '{}' due to permission error {:?}", vss_path, can_write_actuator); + let message = format!( + "Can not actuate vss_path '{}' due to permission error {:?}", + vss_path, can_write_actuator + ); return Err(tonic::Status::permission_denied(message)); } - }, + } Err(error) => { - let message = format!("Could not resolve vss_path for vss_id {}: {:?}", vss_id, error); + let message = format!( + "Could not resolve vss_path for vss_id {}: {:?}", + vss_id, error + ); return Err(tonic::Status::invalid_argument(message)); - }, + } } let entry_update = EntryUpdate { @@ -1741,26 +1784,32 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let opt_error = update_errors.get(0); if opt_error.is_some() { let error = opt_error.unwrap(); - let message = format!("Could not set actuator target for vss_id {}: {:?}", vss_id, error.1); + let message = format!( + "Could not set actuator target for vss_id {}: {:?}", + vss_id, error.1 + ); warn!(message); } } - let guard = self.broker.subscriptions.blocking_read(); - let opt_actuation_subscription = guard.actuation_subscriptions.iter().find(|subscription|subscription.vss_ids.contains(&vss_id)); + let guard = self.broker.subscriptions.read().await; + let opt_actuation_subscription = guard + .actuation_subscriptions + .iter() + .find(|subscription| subscription.vss_ids.contains(&vss_id)); match opt_actuation_subscription { Some(ref actuation_subscription) => { - actuation_subscription.actuation_provider.actuate(vec![ - ActuationChange { - id: vss_id.clone(), - data_value: data_value.clone() - }, - ] - ).await - }, + actuation_subscription + .actuation_provider + .actuate(vec![ActuationChange { + id: vss_id.clone(), + data_value: data_value.clone(), + }]) + .await + } None => { let message = format!("No actuation provider found for vss_id {}", vss_id); - return Err(tonic::Status::new(tonic::Code::Unavailable, message)) + return Err(tonic::Status::new(tonic::Code::Unavailable, message)); } } } diff --git a/databroker/src/grpc/kuksa_val_v2/conversions.rs b/databroker/src/grpc/kuksa_val_v2/conversions.rs index 70d830e5..e399fb61 100644 --- a/databroker/src/grpc/kuksa_val_v2/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v2/conversions.rs @@ -11,7 +11,10 @@ // * SPDX-License-Identifier: Apache-2.0 // ********************************************************************************/ use databroker_proto::kuksa::val::v2 as proto; -use kuksa::proto::v2::{BoolArray, DoubleArray, FloatArray, Int32Array, Int64Array, StringArray, Uint32Array, Uint64Array}; +use kuksa::proto::v2::{ + BoolArray, DoubleArray, FloatArray, Int32Array, Int64Array, StringArray, Uint32Array, + Uint64Array, +}; use proto::datapoint::{ValueState::Failure, ValueState::Value}; use crate::broker; @@ -353,124 +356,87 @@ impl From for broker::DataValue { impl From for proto::Value { fn from(value: broker::DataValue) -> Self { match &value { - broker::DataValue::String(value) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::String(value.to_owned())) - } - } - - broker::DataValue::Bool(value) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Bool(*value)) - } - } - - broker::DataValue::Int32(value) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Int32(*value)) - } - } + broker::DataValue::String(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::String(value.to_owned())), + }, - broker::DataValue::Int64(value) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Int64(*value)) - } - } + broker::DataValue::Bool(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(*value)), + }, - broker::DataValue::Uint32(value) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Uint32(*value)) - } - } + broker::DataValue::Int32(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(*value)), + }, - broker::DataValue::Uint64(value) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Uint64(*value)) - } - } + broker::DataValue::Int64(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Int64(*value)), + }, - broker::DataValue::Float(value) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Float(*value)) - } - } + broker::DataValue::Uint32(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Uint32(*value)), + }, - broker::DataValue::Double(value) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Double(*value)) - } - } + broker::DataValue::Uint64(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Uint64(*value)), + }, - broker::DataValue::StringArray(array) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::StringArray(StringArray { - values: array.clone(), - })) - } - } + broker::DataValue::Float(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Float(*value)), + }, - broker::DataValue::BoolArray(array) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::BoolArray(BoolArray { - values: array.clone(), - })) - } - } + broker::DataValue::Double(value) => proto::Value { + typed_value: Some(proto::value::TypedValue::Double(*value)), + }, - broker::DataValue::Int32Array(array) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Int32Array(Int32Array { - values: array.clone(), - })) - } - } + broker::DataValue::StringArray(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::StringArray(StringArray { + values: array.clone(), + })), + }, - broker::DataValue::Int64Array(array) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Int64Array(Int64Array { - values: array.clone(), - })) - } - } + broker::DataValue::BoolArray(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::BoolArray(BoolArray { + values: array.clone(), + })), + }, - broker::DataValue::Uint32Array(array) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Uint32Array(Uint32Array { - values: array.clone(), - })) - } - } + broker::DataValue::Int32Array(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::Int32Array(Int32Array { + values: array.clone(), + })), + }, - broker::DataValue::Uint64Array(array) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::Uint64Array(Uint64Array { - values: array.clone(), - })) - } - } + broker::DataValue::Int64Array(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::Int64Array(Int64Array { + values: array.clone(), + })), + }, + broker::DataValue::Uint32Array(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::Uint32Array(Uint32Array { + values: array.clone(), + })), + }, - broker::DataValue::FloatArray(array) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::FloatArray(FloatArray { - values: array.clone(), - })) - } - } + broker::DataValue::Uint64Array(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::Uint64Array(Uint64Array { + values: array.clone(), + })), + }, - broker::DataValue::DoubleArray(array) => { - proto::Value { - typed_value: Some(proto::value::TypedValue::DoubleArray(DoubleArray { - values: array.clone(), - })) - } - } + broker::DataValue::FloatArray(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::FloatArray(FloatArray { + values: array.clone(), + })), + }, - broker::DataValue::NotAvailable => { - proto::Value { - typed_value: None - } + broker::DataValue::DoubleArray(array) => proto::Value { + typed_value: Some(proto::value::TypedValue::DoubleArray(DoubleArray { + values: array.clone(), + })), }, + + broker::DataValue::NotAvailable => proto::Value { typed_value: None }, } } } diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index 7f742c10..a768b6f5 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -14,7 +14,10 @@ use std::{collections::HashMap, pin::Pin}; use crate::{ - broker::{self, ActuationChange, ActuationProvider, AuthorizedAccess, SubscriptionError}, glob::Matcher, permissions::Permissions, types::DataValue + broker::{self, ActuationChange, ActuationProvider, AuthorizedAccess, SubscriptionError}, + glob::Matcher, + permissions::Permissions, + types::DataValue, }; use databroker_proto::kuksa::val::v2::{ @@ -25,7 +28,10 @@ use databroker_proto::kuksa::val::v2::{ open_provider_stream_response, OpenProviderStreamResponse, PublishValuesResponse, }; -use kuksa::proto::v2::{signal_id, ActuateRequest, ActuateResponse, BatchActuateStreamRequest, ListMetadataResponse, Metadata}; +use kuksa::proto::v2::{ + signal_id, ActuateRequest, ActuateResponse, BatchActuateStreamRequest, ListMetadataResponse, + Metadata, ProvideActuationResponse, +}; use std::collections::HashSet; use tokio::{select, sync::mpsc}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; @@ -39,24 +45,30 @@ pub struct Provider { #[async_trait::async_trait] impl ActuationProvider for Provider { - async fn actuate(&self, actuation_changes:Vec) -> Result<(), tonic::Status> { + async fn actuate( + &self, + actuation_changes: Vec, + ) -> Result<(), tonic::Status> { let mut actuation_requests: Vec = vec![]; for actuation_change in actuation_changes { let data_value = actuation_change.data_value; actuation_requests.push(ActuateRequest { - signal_id: - Some(proto::SignalId { - signal: Some(signal_id::Signal::Id(actuation_change.id)), - }), - value: Some(proto::Value::from(data_value)) }); + signal_id: Some(proto::SignalId { + signal: Some(signal_id::Signal::Id(actuation_change.id)), + }), + value: Some(proto::Value::from(data_value)), + }); } - let batch_actuate_stream_request = open_provider_stream_response::Action::BatchActuateStreamRequest(BatchActuateStreamRequest { - actuate_requests: actuation_requests, - }); + let batch_actuate_stream_request = + open_provider_stream_response::Action::BatchActuateStreamRequest( + BatchActuateStreamRequest { + actuate_requests: actuation_requests, + }, + ); let response = OpenProviderStreamResponse { - action: Some(batch_actuate_stream_request) + action: Some(batch_actuate_stream_request), }; let result = self.sender.send(Ok(response)).await; @@ -66,13 +78,14 @@ impl ActuationProvider for Provider { let status = send_error.unwrap_err(); return Err(status); } - return Err(tonic::Status::cancelled("Could not send actuation changes to actuation provider")); + return Err(tonic::Status::cancelled( + "Could not send actuation changes to actuation provider", + )); } return Ok(()); } } - #[tonic::async_trait] impl proto::val_server::Val for broker::DataBroker { async fn get_value( @@ -225,29 +238,41 @@ impl proto::val_server::Val for broker::DataBroker { if let Some(id) = broker.get_id_by_path(&vss_path).await { let result = broker.actuate(&id, &DataValue::from(value)).await; match result { - Ok(_) => return Ok(tonic::Response::new(ActuateResponse { })), + Ok(_) => return Ok(tonic::Response::new(ActuateResponse {})), Err(err) => return Err(err), }; } - return Err(tonic::Status::invalid_argument(format!("Invalid vss_path provided {}", vss_path))); - }, + return Err(tonic::Status::not_found(format!( + "Invalid vss_path provided {}", + vss_path + ))); + } Some(proto::signal_id::Signal::Id(vss_id)) => { let result = broker.actuate(&vss_id, &DataValue::from(value)).await; match result { - Ok(_) => return Ok(tonic::Response::new(ActuateResponse { })), + Ok(_) => return Ok(tonic::Response::new(ActuateResponse {})), Err(err) => return Err(err), }; - }, - None => return Err(tonic::Status::new( - tonic::Code::InvalidArgument, - "Signal needs to provide Path or Id", - )), + } + None => { + return Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Signal needs to provide Path or Id", + )) + } }; - }, - None => return Err(tonic::Status::new(tonic::Code::InvalidArgument, "No Signal_Id provided")), + } + None => { + return Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "No Signal_Id provided", + )) + } }; }; - return Err(tonic::Status::invalid_argument("Invalid Actuator Request provided")); + return Err(tonic::Status::invalid_argument( + "Invalid Actuator Request provided", + )); } // Actuate simultaneously multiple actuators. @@ -267,7 +292,6 @@ impl proto::val_server::Val for broker::DataBroker { &self, request: tonic::Request, ) -> Result, tonic::Status> { - debug!(?request); let permissions = match request.extensions().get::() { Some(permissions) => { @@ -289,11 +313,12 @@ impl proto::val_server::Val for broker::DataBroker { match result { Some(vss_id) => vss_id, None => { - let message = format!("Could not resolve vss_id for path: {}", vss_path); - return Err(tonic::Status::invalid_argument(message)); - }, + let message = + format!("Could not resolve vss_id for path: {}", vss_path); + return Err(tonic::Status::not_found(message)); + } } - }, + } None => return Err(tonic::Status::invalid_argument("Signal not provided")), }, None => return Err(tonic::Status::invalid_argument("Signal_Id not provided")), @@ -311,7 +336,7 @@ impl proto::val_server::Val for broker::DataBroker { let result = broker.batch_actuate(actuation_changes).await; match result { - Ok(_) => Ok(tonic::Response::new(proto::BatchActuateResponse { })), + Ok(_) => Ok(tonic::Response::new(proto::BatchActuateResponse {})), Err(error) => return Err(error), } } @@ -633,7 +658,9 @@ impl proto::val_server::Val for broker::DataBroker { } }, Some(ProvideActuation(provided_actuation)) => { - if let Err(err) = provide_actuation(&broker, &provided_actuation, response_stream_sender).await { + let response = provide_actuation(&broker, &provided_actuation, response_stream_sender.clone()).await; + if let Err(err) = response_stream_sender.send(response).await + { debug!("Failed to provide actuation: {}", err) } break; @@ -689,42 +716,68 @@ async fn provide_actuation( broker: &AuthorizedAccess<'_, '_>, request: &databroker_proto::kuksa::val::v2::ProvideActuation, sender: mpsc::Sender>, -) -> Result<(), tonic::Status> { - let vss_paths: Vec<_> = request.actuator_identifiers.iter() - .filter_map(|signal_id| match &signal_id.signal { - Some(proto::signal_id::Signal::Path(path)) => Some(path.clone()), - _ => None, - }) - .collect(); +) -> Result { + let vss_paths: Vec<_> = request + .actuator_identifiers + .iter() + .filter_map(|signal_id| match &signal_id.signal { + Some(proto::signal_id::Signal::Path(path)) => Some(path.clone()), + _ => None, + }) + .collect(); - let future_vss_ids = vss_paths.iter().map(|vss_path| broker.get_id_by_path(&vss_path)); + let future_vss_ids = vss_paths + .iter() + .map(|vss_path| broker.get_id_by_path(&vss_path)); let resolved_opt_vss_ids = futures::future::join_all(future_vss_ids).await; for (index, opt_vss_id) in resolved_opt_vss_ids.iter().enumerate() { if opt_vss_id.is_none() { - let message = format!("could not resolve id of vss_path: {}", vss_paths.get(index).unwrap()); + let message = format!( + "could not resolve id of vss_path: {}", + vss_paths.get(index).unwrap() + ); warn!(message); } } let resolved_vss_ids: Vec = resolved_opt_vss_ids.iter().filter_map(|&opt| opt).collect(); - let vss_ids: Vec<_> = request.actuator_identifiers.iter() - .filter_map(|signal_id| match &signal_id.signal { - Some(proto::signal_id::Signal::Id(id)) => Some(id.clone()), - _ => None, - }) - .collect(); + let vss_ids: Vec<_> = request + .actuator_identifiers + .iter() + .filter_map(|signal_id| match &signal_id.signal { + Some(proto::signal_id::Signal::Id(id)) => Some(id.clone()), + _ => None, + }) + .collect(); let mut all_vss_ids = vec![]; all_vss_ids.extend(vss_ids); all_vss_ids.extend(resolved_vss_ids); - let provider = Provider { - sender, - }; + let provider = Provider { sender }; + + match broker + .provide_actuation(all_vss_ids, Box::new(provider)) + .await + { + Ok(_) => { + let provide_actuation_response = ProvideActuationResponse {}; - broker.provide_actuation(all_vss_ids, Box::new(provider)).await + let response = OpenProviderStreamResponse { + action: Some( + open_provider_stream_response::Action::ProvideActuationResponse( + provide_actuation_response, + ), + ), + }; + + return Ok(response); + } + + Err(status) => return Err(status), + } } async fn publish_values( @@ -844,7 +897,10 @@ mod tests { use proto::open_provider_stream_response::Action::{ BatchActuateStreamRequest, ProvideActuationResponse, PublishValuesResponse, }; - use proto::{open_provider_stream_request, OpenProviderStreamRequest, PublishValuesRequest}; + use proto::{ + open_provider_stream_request, BatchActuateRequest, OpenProviderStreamRequest, + PublishValuesRequest, SignalId, Value, + }; async fn check_stream_next( item: &Result, @@ -1386,4 +1442,413 @@ mod tests { } } } + + #[tokio::test] + async fn test_actuate_signal_not_found() { + let broker = DataBroker::default(); + + let mut request = tonic::Request::new(ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.Cabin.Non.Existing".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::actuate(&broker, request).await; + assert!(result_response.is_err()); + assert_eq!(result_response.unwrap_err().code(), tonic::Code::NotFound) + } + + #[tokio::test] + async fn test_actuate_can_provider_unavailable() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let mut request = tonic::Request::new(ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::actuate(&broker, request).await; + assert!(result_response.is_err()); + assert_eq!( + result_response.unwrap_err().code(), + tonic::Code::Unavailable + ) + } + + #[tokio::test] + async fn test_actuate_success() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let vss_id = authorized_access + .get_id_by_path("Vehicle.ADAS.ABS.IsEnabled") + .await + .expect("Resolving the id of Vehicle.ADAS.ABS.IsEnabled should succeed"); + let vss_ids = vec![vss_id]; + + let (sender, mut receiver) = mpsc::channel(10); + let actuation_provider = Provider { sender }; + authorized_access + .provide_actuation(vss_ids, Box::new(actuation_provider)) + .await + .expect("Registering a new Actuation Provider should succeed"); + + let mut request = tonic::Request::new(ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::actuate(&broker, request).await; + assert!(result_response.is_ok()); + + let result_response = receiver.recv().await.expect("Option should be Some"); + result_response.expect("Result should be Ok"); + } + + #[tokio::test] + async fn test_batch_actuate_signal_not_found() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let mut request = tonic::Request::new(BatchActuateRequest { + actuate_requests: vec![ + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.Cabin.Non.Existing".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ], + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::batch_actuate(&broker, request).await; + assert!(result_response.is_err()); + assert_eq!(result_response.unwrap_err().code(), tonic::Code::NotFound) + } + + #[tokio::test] + async fn test_batch_actuate_provider_unavailable() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + authorized_access + .add_entry( + "Vehicle.ADAS.CruiseControl.IsActive".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let vss_id_abs = authorized_access + .get_id_by_path("Vehicle.ADAS.ABS.IsEnabled") + .await + .expect("Resolving the id of Vehicle.ADAS.ABS.IsEnabled should succeed"); + + let vss_ids = vec![vss_id_abs]; + + let (sender, mut receiver) = mpsc::channel(10); + let actuation_provider = Provider { sender }; + authorized_access + .provide_actuation(vss_ids, Box::new(actuation_provider)) + .await + .expect("Registering a new Actuation Provider should succeed"); + + let mut request = tonic::Request::new(BatchActuateRequest { + actuate_requests: vec![ + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.CruiseControl.IsActive".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ], + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::batch_actuate(&broker, request).await; + assert!(result_response.is_err()); + assert_eq!( + result_response.unwrap_err().code(), + tonic::Code::Unavailable + ) + } + + #[tokio::test] + async fn test_batch_actuate_success() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "Vehicle.ADAS.ABS.IsEnabled".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + authorized_access + .add_entry( + "Vehicle.ADAS.CruiseControl.IsActive".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Actuator, + "Some funny description".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let vss_id_abs = authorized_access + .get_id_by_path("Vehicle.ADAS.ABS.IsEnabled") + .await + .expect("Resolving the id of Vehicle.ADAS.ABS.IsEnabled should succeed"); + let vss_id_cruise_control = authorized_access + .get_id_by_path("Vehicle.ADAS.CruiseControl.IsActive") + .await + .expect("Resolving the id of Vehicle.ADAS.CruiseControl.IsActive should succeed"); + + let vss_ids = vec![vss_id_abs, vss_id_cruise_control]; + + let (sender, mut receiver) = mpsc::channel(10); + let actuation_provider = Provider { sender }; + authorized_access + .provide_actuation(vss_ids, Box::new(actuation_provider)) + .await + .expect("Registering a new Actuation Provider should succeed"); + + let mut request = tonic::Request::new(BatchActuateRequest { + actuate_requests: vec![ + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.ABS.IsEnabled".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ActuateRequest { + signal_id: Some(SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.ADAS.CruiseControl.IsActive".to_string(), + )), + }), + value: Some(Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + }, + ], + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result_response = proto::val_server::Val::batch_actuate(&broker, request).await; + assert!(result_response.is_ok()); + + let result_response = receiver.recv().await.expect("Option should be Some"); + result_response.expect("Result should be Ok"); + } + + #[tokio::test] + async fn test_provide_actuation_signal_not_found() { + let broker = DataBroker::default(); + + let request = OpenProviderStreamRequest { + action: Some(open_provider_stream_request::Action::ProvideActuation( + proto::ProvideActuation { + actuator_identifiers: vec![SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.Cabin.Non.Existing".to_string(), + )), + }], + }, + )), + }; + + let mut streaming_request = tonic_mock::streaming_request(vec![request]); + streaming_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match proto::val_server::Val::open_provider_stream(&broker, streaming_request).await { + Ok(response) => { + let stream = response.into_inner(); + let mut receiver = stream.into_inner(); + let result_response = receiver.recv().await.expect("Option should be Some"); + let response = result_response.expect("result_response should be Ok"); + + // TODO how to verify signal not found + } + Err(_) => { + panic!("Should not happen") + } + } + } + + #[tokio::test] + async fn test_provide_actuation_success() { + let broker = DataBroker::default(); + + let request = OpenProviderStreamRequest { + action: Some(open_provider_stream_request::Action::ProvideActuation( + proto::ProvideActuation { + actuator_identifiers: vec![SignalId { + signal: Some(proto::signal_id::Signal::Path( + "Vehicle.Cabin.Non.Existing".to_string(), + )), + }], + }, + )), + }; + + let mut streaming_request = tonic_mock::streaming_request(vec![request]); + streaming_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match proto::val_server::Val::open_provider_stream(&broker, streaming_request).await { + Ok(response) => { + let stream = response.into_inner(); + let mut receiver = stream.into_inner(); + let result_response = receiver.recv().await.expect("Option should be Some"); + let response = result_response.expect("result_response should be Ok"); + + // TODO how to verify success? <-> should be solvable once it's clear how to verify an error case scenario + } + Err(_) => { + panic!("Should not happen") + } + } + } }