diff --git a/databroker-proto/build.rs b/databroker-proto/build.rs index d02a006d..68747c85 100644 --- a/databroker-proto/build.rs +++ b/databroker-proto/build.rs @@ -23,6 +23,8 @@ fn main() -> Result<(), Box> { "proto/sdv/databroker/v1/collector.proto", "proto/kuksa/val/v1/val.proto", "proto/kuksa/val/v1/types.proto", + "proto/kuksa/val/v2/val.proto", + "proto/kuksa/val/v2/types.proto", ], &["proto"], )?; diff --git a/databroker-proto/src/lib.rs b/databroker-proto/src/lib.rs index 83fe2005..ac0d359e 100644 --- a/databroker-proto/src/lib.rs +++ b/databroker-proto/src/lib.rs @@ -143,5 +143,8 @@ pub mod kuksa { } } } + pub mod v2 { + tonic::include_proto!("kuksa.val.v2"); + } } } diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 4e098067..cbeb9c8c 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -15,7 +15,7 @@ use crate::permissions::{PermissionError, Permissions}; pub use crate::types; use crate::query; -pub use crate::types::{ChangeType, DataType, DataValue, EntryType}; +pub use crate::types::{ChangeType, DataType, DataValue, EntryType, ValueFailure}; use tokio::sync::{broadcast, mpsc, RwLock}; use tokio_stream::wrappers::ReceiverStream; @@ -43,6 +43,37 @@ pub enum UpdateError { PermissionExpired, } +impl UpdateError { + pub fn to_status_with_code(&self, id: &i32) -> tonic::Status { + match self { + UpdateError::NotFound => tonic::Status::new( + tonic::Code::NotFound, + format!("Signal not found (id: {})", id), + ), + UpdateError::WrongType => tonic::Status::new( + tonic::Code::InvalidArgument, + format!("Wrong type provided (id: {})", id), + ), + UpdateError::OutOfBounds => tonic::Status::new( + tonic::Code::OutOfRange, + format!("Index out of bounds (id: {})", id), + ), + UpdateError::UnsupportedType => tonic::Status::new( + tonic::Code::Unimplemented, + format!("Unsupported type (id: {})", id), + ), + UpdateError::PermissionDenied => tonic::Status::new( + tonic::Code::PermissionDenied, + format!("Permission denied (id: {})", id), + ), + UpdateError::PermissionExpired => tonic::Status::new( + tonic::Code::Unauthenticated, + format!("Permission expired (id: {})", id), + ), + } + } +} + #[derive(Debug, Clone)] pub enum ReadError { NotFound, diff --git a/databroker/src/grpc/kuksa_val_v1/conversions.rs b/databroker/src/grpc/kuksa_val_v1/conversions.rs index d9b972d1..a4df19d1 100644 --- a/databroker/src/grpc/kuksa_val_v1/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v1/conversions.rs @@ -143,6 +143,7 @@ impl From for Option { })), timestamp: Some(from.ts.into()), }), + broker::DataValue::ValueFailure(_) => None, } } } @@ -231,6 +232,7 @@ impl From for Option { })), timestamp: None, }), + broker::DataValue::ValueFailure(_) => None, } } } diff --git a/databroker/src/grpc/kuksa_val_v2/conversions.rs b/databroker/src/grpc/kuksa_val_v2/conversions.rs new file mode 100644 index 00000000..44a990cc --- /dev/null +++ b/databroker/src/grpc/kuksa_val_v2/conversions.rs @@ -0,0 +1,339 @@ +// /******************************************************************************** +// * Copyright (c) 2024 Contributors to the Eclipse Foundation +// * +// * See the NOTICE file(s) distributed with this work for additional +// * information regarding copyright ownership. +// * +// * This program and the accompanying materials are made available under the +// * terms of the Apache License 2.0 which is available at +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * SPDX-License-Identifier: Apache-2.0 +// ********************************************************************************/ +use databroker_proto::kuksa::val::v2 as proto; +use proto::datapoint::{ValueState::Failure, ValueState::Value}; + +use crate::broker; + +use std::time::SystemTime; + +impl From<&proto::Datapoint> for broker::Datapoint { + fn from(datapoint: &proto::Datapoint) -> Self { + let value = broker::DataValue::from(datapoint); + let ts = SystemTime::now(); + + match &datapoint.timestamp { + Some(source_timestamp) => { + let source: Option = match source_timestamp.clone().try_into() { + Ok(source) => Some(source), + Err(_) => None, + }; + broker::Datapoint { + ts, + source_ts: source, + value, + } + } + None => broker::Datapoint { + ts, + source_ts: None, + value, + }, + } + } +} + +impl From for Option { + fn from(from: broker::Datapoint) -> Self { + match from.value { + broker::DataValue::NotAvailable => None, + broker::DataValue::Bool(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::String(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::String(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Int32(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Int64(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Int64(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Uint32(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Uint32(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Uint64(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Uint64(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Float(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Float(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Double(value) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Double(value)), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::BoolArray(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::BoolArray(proto::BoolArray { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::StringArray(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::StringArray(proto::StringArray { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Int32Array(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Int32Array(proto::Int32Array { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Int64Array(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Int64Array(proto::Int64Array { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Uint32Array(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Uint32Array(proto::Uint32Array { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::Uint64Array(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Uint64Array(proto::Uint64Array { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::FloatArray(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::FloatArray(proto::FloatArray { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::DoubleArray(values) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::DoubleArray(proto::DoubleArray { + values, + })), + })), + timestamp: Some(from.ts.into()), + }), + broker::DataValue::ValueFailure(failure) => Some(proto::Datapoint { + value_state: Some(proto::datapoint::ValueState::Failure(i32::from(&failure))), + timestamp: Some(from.ts.into()), + }), + } + } +} + +impl From<&broker::ValueFailure> for i32 { + fn from(from: &broker::ValueFailure) -> Self { + match from { + broker::ValueFailure::Unspecified => 0, + broker::ValueFailure::InvalidValue => 1, + broker::ValueFailure::NotProvided => 2, + broker::ValueFailure::UnknownSignal => 3, + broker::ValueFailure::AccessDenied => 4, + broker::ValueFailure::InternalError => 5, + } + } +} + +impl From<&i32> for broker::ValueFailure { + fn from(from: &i32) -> Self { + match from { + 1 => broker::ValueFailure::InvalidValue, + 2 => broker::ValueFailure::NotProvided, + 3 => broker::ValueFailure::UnknownSignal, + 4 => broker::ValueFailure::AccessDenied, + 5 => broker::ValueFailure::InternalError, + _ => broker::ValueFailure::Unspecified, + } + } +} + +fn from_i32(value: i32) -> proto::ValueFailure { + // Use a match statement to convert the i32 to the corresponding enum variant + match value { + 1 => proto::ValueFailure::InvalidValue, + 2 => proto::ValueFailure::NotProvided, + 3 => proto::ValueFailure::UnknownSignal, + 4 => proto::ValueFailure::AccessDenied, + 5 => proto::ValueFailure::InternalError, + _ => proto::ValueFailure::Unspecified, + } +} + +impl From<&proto::ValueFailure> for broker::ValueFailure { + fn from(value_failure: &proto::ValueFailure) -> Self { + match value_failure { + proto::ValueFailure::Unspecified => broker::ValueFailure::Unspecified, + proto::ValueFailure::InvalidValue => broker::ValueFailure::InvalidValue, + proto::ValueFailure::NotProvided => broker::ValueFailure::NotProvided, + proto::ValueFailure::UnknownSignal => broker::ValueFailure::UnknownSignal, + proto::ValueFailure::AccessDenied => broker::ValueFailure::AccessDenied, + proto::ValueFailure::InternalError => broker::ValueFailure::InternalError, + } + } +} + +impl From<&proto::Datapoint> for broker::DataValue { + fn from(datapoint: &proto::Datapoint) -> Self { + match &datapoint.value_state { + Some(Value(value)) => match &value.typed_value { + Some(proto::value::TypedValue::String(value)) => { + broker::DataValue::String(value.to_owned()) + } + Some(proto::value::TypedValue::Bool(value)) => broker::DataValue::Bool(*value), + Some(proto::value::TypedValue::Int32(value)) => broker::DataValue::Int32(*value), + Some(proto::value::TypedValue::Int64(value)) => broker::DataValue::Int64(*value), + Some(proto::value::TypedValue::Uint32(value)) => broker::DataValue::Uint32(*value), + Some(proto::value::TypedValue::Uint64(value)) => broker::DataValue::Uint64(*value), + Some(proto::value::TypedValue::Float(value)) => broker::DataValue::Float(*value), + Some(proto::value::TypedValue::Double(value)) => broker::DataValue::Double(*value), + Some(proto::value::TypedValue::StringArray(array)) => { + broker::DataValue::StringArray(array.values.clone()) + } + Some(proto::value::TypedValue::BoolArray(array)) => { + broker::DataValue::BoolArray(array.values.clone()) + } + Some(proto::value::TypedValue::Int32Array(array)) => { + broker::DataValue::Int32Array(array.values.clone()) + } + Some(proto::value::TypedValue::Int64Array(array)) => { + broker::DataValue::Int64Array(array.values.clone()) + } + Some(proto::value::TypedValue::Uint32Array(array)) => { + broker::DataValue::Uint32Array(array.values.clone()) + } + Some(proto::value::TypedValue::Uint64Array(array)) => { + broker::DataValue::Uint64Array(array.values.clone()) + } + Some(proto::value::TypedValue::FloatArray(array)) => { + broker::DataValue::FloatArray(array.values.clone()) + } + Some(proto::value::TypedValue::DoubleArray(array)) => { + broker::DataValue::DoubleArray(array.values.clone()) + } + None => todo!(), + }, + Some(Failure(value)) => { + broker::DataValue::ValueFailure(broker::ValueFailure::from(&from_i32(*value))) + } + None => broker::DataValue::NotAvailable, + } + } +} + +impl From<&broker::UpdateError> for proto::Error { + fn from(update_error: &broker::UpdateError) -> Self { + match update_error { + broker::UpdateError::NotFound => proto::Error { + code: proto::ErrorCode::NotFound.into(), + message: "Not Found".to_string(), + }, + broker::UpdateError::WrongType => proto::Error { + code: proto::ErrorCode::InvalidArgument.into(), + message: "Wrong Type".to_string(), + }, + broker::UpdateError::OutOfBounds => proto::Error { + code: proto::ErrorCode::InvalidArgument.into(), + message: "Out of Bounds".to_string(), + }, + broker::UpdateError::UnsupportedType => proto::Error { + code: proto::ErrorCode::InvalidArgument.into(), + message: "Unsupported Type".to_string(), + }, + broker::UpdateError::PermissionDenied => proto::Error { + code: proto::ErrorCode::PermissionDenied.into(), + message: "Permission Denied".to_string(), + }, + broker::UpdateError::PermissionExpired => proto::Error { + code: proto::ErrorCode::PermissionDenied.into(), + message: "Permission Expired".to_string(), + }, + } + } +} + +impl From for proto::DataType { + fn from(from: broker::DataType) -> Self { + match from { + broker::DataType::String => proto::DataType::String, + broker::DataType::Bool => proto::DataType::Boolean, + broker::DataType::Int8 => proto::DataType::Int8, + broker::DataType::Int16 => proto::DataType::Int16, + broker::DataType::Int32 => proto::DataType::Int32, + broker::DataType::Int64 => proto::DataType::Int64, + broker::DataType::Uint8 => proto::DataType::Uint8, + broker::DataType::Uint16 => proto::DataType::Uint16, + broker::DataType::Uint32 => proto::DataType::Uint32, + broker::DataType::Uint64 => proto::DataType::Uint64, + broker::DataType::Float => proto::DataType::Float, + broker::DataType::Double => proto::DataType::Double, + broker::DataType::StringArray => proto::DataType::StringArray, + broker::DataType::BoolArray => proto::DataType::BooleanArray, + broker::DataType::Int8Array => proto::DataType::Int8Array, + broker::DataType::Int16Array => proto::DataType::Int16Array, + broker::DataType::Int32Array => proto::DataType::Int32Array, + broker::DataType::Int64Array => proto::DataType::Int64Array, + broker::DataType::Uint8Array => proto::DataType::Uint8Array, + broker::DataType::Uint16Array => proto::DataType::Uint16Array, + broker::DataType::Uint32Array => proto::DataType::Uint32Array, + broker::DataType::Uint64Array => proto::DataType::Uint64Array, + broker::DataType::FloatArray => proto::DataType::FloatArray, + broker::DataType::DoubleArray => proto::DataType::DoubleArray, + } + } +} + +impl From for proto::EntryType { + fn from(from: broker::EntryType) -> Self { + match from { + broker::EntryType::Sensor => proto::EntryType::Sensor, + broker::EntryType::Attribute => proto::EntryType::Attribute, + broker::EntryType::Actuator => proto::EntryType::Actuator, + } + } +} diff --git a/databroker/src/grpc/kuksa_val_v2/mod.rs b/databroker/src/grpc/kuksa_val_v2/mod.rs new file mode 100644 index 00000000..88302b19 --- /dev/null +++ b/databroker/src/grpc/kuksa_val_v2/mod.rs @@ -0,0 +1,15 @@ +/******************************************************************************** +* Copyright (c) 2024 Contributors to the Eclipse Foundation +* +* See the NOTICE file(s) distributed with this work for additional +* information regarding copyright ownership. +* +* This program and the accompanying materials are made available under the +* terms of the Apache License 2.0 which is available at +* http://www.apache.org/licenses/LICENSE-2.0 +* +* SPDX-License-Identifier: Apache-2.0 +********************************************************************************/ + +mod conversions; +mod val; diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs new file mode 100644 index 00000000..95f2061b --- /dev/null +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -0,0 +1,1203 @@ +/******************************************************************************** +* Copyright (c) 2024 Contributors to the Eclipse Foundation +* +* See the NOTICE file(s) distributed with this work for additional +* information regarding copyright ownership. +* +* This program and the accompanying materials are made available under the +* terms of the Apache License 2.0 which is available at +* http://www.apache.org/licenses/LICENSE-2.0 +* +* SPDX-License-Identifier: Apache-2.0 +********************************************************************************/ + +use std::{collections::HashMap, pin::Pin}; + +use crate::{ + broker::{self, AuthorizedAccess, SubscriptionError}, + glob::Matcher, + permissions::Permissions, +}; + +use databroker_proto::kuksa::val::v2::{ + self as proto, + open_provider_stream_request::Action::{ + BatchActuateStreamResponse, ProvidedActuation, PublishValuesRequest, + }, + open_provider_stream_response, OpenProviderStreamResponse, PublishValuesResponse, +}; + +use kuksa::proto::v2::{ListMetadataResponse, Metadata}; +use std::collections::HashSet; +use tokio::{select, sync::mpsc}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tracing::debug; + +const MAX_REQUEST_PATH_LENGTH: usize = 1000; + +#[tonic::async_trait] +impl proto::val_server::Val for broker::DataBroker { + async fn get_value( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) + } + + async fn get_values( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) + } + + async fn list_values( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) + } + + type SubscribeStream = Pin< + Box< + dyn Stream> + + Send + + Sync + + 'static, + >, + >; + + async fn subscribe( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + let broker = self.authorized_access(&permissions); + + let request = request.into_inner(); + + let signal_paths = request.signal_paths; + let size = signal_paths.len(); + + let mut valid_requests: HashMap> = HashMap::with_capacity(size); + + for path in signal_paths { + valid_requests.insert( + match get_signal( + Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Path(path)), + }), + &broker, + ) + .await + { + Ok(signal_id) => signal_id, + Err(err) => return Err(err), + }, + vec![broker::Field::Datapoint].into_iter().collect(), + ); + } + + match broker.subscribe(valid_requests).await { + Ok(stream) => { + let stream = convert_to_proto_stream(stream, size); + Ok(tonic::Response::new(Box::pin(stream))) + } + Err(SubscriptionError::NotFound) => { + Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")) + } + Err(SubscriptionError::InvalidInput) => Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Invalid Argument", + )), + Err(SubscriptionError::InternalError) => { + Err(tonic::Status::new(tonic::Code::Internal, "Internal Error")) + } + } + } + + type SubscribeIdStream = Pin< + Box< + dyn Stream> + + Send + + Sync + + 'static, + >, + >; + + async fn subscribe_id( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) + } + + async fn actuate( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) + } + + async fn batch_actuate( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) + } + + /// List metadata of signals matching the wildcard branch request. + /// + /// # Arguments + /// + /// ``` + /// `request`: + /// ListMetadataRequest { + /// root: String + /// filter: String + /// } + /// + /// # Response + /// `response`: + /// ListMetadataResponse { + /// metadata: Vec + /// } + /// ``` + /// + /// # Errors + /// + /// Returns (GRPC error code): + /// NOT_FOUND if the specified root branch does not exist + /// INVALID_ARGUMENT if the request pattern is invalid + /// + /// # Examples + /// For details, please refer to + /// [Wildcard Matching](https://github.com/eclipse-kuksa/kuksa-databroker/blob/main/doc/wildcard_matching.md#examples) + async fn list_metadata( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + let broker = self.authorized_access(&permissions); + + let metadata_request = request.into_inner(); + + match Matcher::new(&metadata_request.root) { + Ok(matcher) => { + let mut metadata_response = Vec::new(); + broker + .for_each_entry(|entry| { + let entry_metadata = &entry.metadata(); + if matcher.is_match(&entry_metadata.glob_path) { + metadata_response.push(Metadata { + id: entry_metadata.id, + data_type: proto::DataType::from(entry_metadata.data_type.clone()) + as i32, + entry_type: proto::EntryType::from( + entry_metadata.entry_type.clone(), + ) as i32, + description: Some(entry_metadata.description.clone()), + comment: None, + deprecation: None, + unit: entry_metadata.unit.clone(), + value_restriction: None, + }) + } + }) + .await; + if metadata_response.is_empty() { + Err(tonic::Status::new( + tonic::Code::NotFound, + "Specified root branch does not exist", + )) + } else { + Ok(tonic::Response::new(ListMetadataResponse { + metadata: metadata_response, + })) + } + } + Err(_) => Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Invalid Pattern Argument", + )), + } + } + + // Publish a signal value. Used for low frequency signals (e.g. attributes). + /// # Arguments + /// + /// ``` + /// `request`: + /// PublishValueRequest { + /// signal_id: + /// datapoint: Datapoint + /// } + /// + /// # Response + /// `response`: + /// PublishValueResponse { + /// error: Error + /// } + /// ``` + /// + /// # Errors + /// Returns (GRPC error code): + /// NOT_FOUND if any of the signals are non-existant. + /// PERMISSION_DENIED + /// - if access is denied for any of the signals. + /// - if the signal is already provided by another provider. + /// INVALID_ARGUMENT + /// - if the data type used in the request does not match + /// the data type of the addressed signal + /// - if the published value is not accepted, + /// e.g. if sending an unsupported enum value + async fn publish_value( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + let broker = self.authorized_access(&permissions); + + let request = request.into_inner(); + + let mut updates: HashMap = HashMap::with_capacity(1); + + updates.insert( + match get_signal(request.signal_id, &broker).await { + Ok(signal_id) => signal_id, + Err(err) => return Err(err), + }, + broker::EntryUpdate { + path: None, + datapoint: Some(broker::Datapoint::from(&request.data_point.unwrap())), + actuator_target: None, + entry_type: None, + data_type: None, + description: None, + allowed: None, + unit: None, + }, + ); + + match broker.update_entries(updates).await { + Ok(()) => Ok(tonic::Response::new(proto::PublishValueResponse {})), + Err(errors) => { + if errors.is_empty() { + Ok(tonic::Response::new(proto::PublishValueResponse {})) + } else if let Some((id, err)) = errors.first() { + Err(err.to_status_with_code(id)) + } else { + Err(tonic::Status::internal( + "There is no error provided for the entry", + )) + } + } + } + } + + // type OpenProviderStreamStream = Pin< + // Box< + // dyn Stream> + // + Send + // + Sync + // + 'static, + // >, + // >; + + type OpenProviderStreamStream = + ReceiverStream>; + + /// Opens a bidirectional stream with the databroker to perform various actions such as + /// providing actuators, publishing sensor and actuator values, and receiving actuations from the databroker. + /// + /// # Actions + /// + /// The function handles the following actions: + /// + /// 1. **Provide an Actuator**: + /// - The provider claims ownership of the actuator with the specified signal ID. + /// + /// ``` + /// `request`: + /// OpenProviderStreamRequest { + /// action: ProvidedActuation { + /// { + /// signal: id: 30, + /// }, + /// { + /// signal: id: 45, + /// }, + /// ... + /// } + /// } + /// + /// `response`: + /// OpenProviderStreamStream { + /// action: ProvideActuatorResponse { } + /// } + /// ``` + /// + /// 2. **Publish Values**: + /// - The provider publishes a request ID along with a map of sensor and actuator values. + /// + /// ``` + /// `request`: + /// OpenProviderStreamRequest { + /// action: PublishValuesRequest { + /// request_id: 1, + /// datapoints: { + /// (30, Datapoint), + /// (45, Datapoint), + /// ... + /// } + /// } + /// } + /// + /// `response`: + /// OpenProviderStreamStream { + /// action: PublishValuesResponse { + /// request_id: 1, + /// status: { + /// (If errors) { + /// (30, Error), + /// (45, Error), + /// ... + /// } + /// } + /// } + /// } + /// ``` + /// + /// 3. **Receive Actuations**: + /// - The provider receives actuation requests from the databroker. + /// + /// ``` + /// `request`: + /// OpenProviderStreamRequest { + /// action: BatchActuateStreamResponse { } + /// } + /// + /// `response`: + /// OpenProviderStreamStream { + /// action: BatchActuateStreamRequest { + /// actuate_requests: { + /// (30, Value), + /// (45, Value), + /// ... + /// } + /// } + /// } + /// ``` + /// + /// # Arguments + /// + /// * `request` - The request should contain the necessary permissions if the databroker is started with secure mode. + /// The request `OpenProviderStreamRequest` can contain messages according to the action: + /// - Action 1: `ProvidedActuation` + /// - Action 2: `PublishValuesRequest` + /// - Action 3: `BatchActuateStreamResponse` + /// + /// # Errors + /// + /// The open stream is used for request / response type communication between the + /// provider and server (where the initiator of a request can vary). + /// Errors are communicated as messages in the stream. + async fn open_provider_stream( + &self, + request: tonic::Request>, + ) -> Result, tonic::Status> { + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + // Should databroker register internally here new opened streams???? + // The provided actuation will take ownership over the actuators but what happens + // if a provider is publishing sensor values and the stream is closed? + // How will the application know that there is no provider and should stop the subscription? + let mut stream = request.into_inner(); + + let mut shutdown_trigger = self.get_shutdown_trigger(); + + // Copy (to move into task below) + let broker = self.clone(); + + // Create stream (to be returned) + let (response_stream_sender, response_stream_receiver) = mpsc::channel(10); + + // Listening on stream + tokio::spawn(async move { + let permissions = permissions; + let broker = broker.authorized_access(&permissions); + loop { + select! { + message = stream.message() => { + match message { + Ok(request) => { + match request { + Some(req) => { + match req.action { + Some(ProvidedActuation(_provided_actuation)) => { + if let Err(err) = response_stream_sender.send(Err(tonic::Status::new(tonic::Code::Unimplemented, "Unimplemented"))).await { + debug!("Failed to send error response: {}", err); + } + break; + }, + Some(PublishValuesRequest(publish_values_request)) => { + let response = publish_values(&broker, &publish_values_request).await; + if let Err(err) = response_stream_sender.send(Ok(response)).await + { + debug!("Failed to send response: {}", err); + } + }, + Some(BatchActuateStreamResponse(_batch_actuate_stream_response)) => { + if let Err(err) = response_stream_sender.send(Err(tonic::Status::new(tonic::Code::Unimplemented, "Unimplemented"))).await { + debug!("Failed to send error response: {}", err); + } + break; + }, + None => { + + }, + } + }, + None => { + debug!("provider: no more messages"); + break; + } + } + }, + Err(err) => { + debug!("provider: connection broken: {:?}", err); + break; + }, + } + }, + _ = shutdown_trigger.recv() => { + debug!("provider: shutdown received"); + break; + } + } + } + }); + + Ok(tonic::Response::new(ReceiverStream::new( + response_stream_receiver, + ))) + } + + async fn get_server_info( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::new( + tonic::Code::Unimplemented, + "Unimplemented", + )) + } +} + +async fn publish_values( + broker: &AuthorizedAccess<'_, '_>, + request: &databroker_proto::kuksa::val::v2::PublishValuesRequest, +) -> OpenProviderStreamResponse { + let ids: Vec<(i32, broker::EntryUpdate)> = request + .datapoints + .iter() + .map(|(id, datapoint)| { + ( + *id, + broker::EntryUpdate { + path: None, + datapoint: Some(broker::Datapoint::from(datapoint)), + actuator_target: None, + entry_type: None, + data_type: None, + description: None, + allowed: None, + unit: None, + }, + ) + }) + .collect(); + + match broker.update_entries(ids).await { + Ok(_) => OpenProviderStreamResponse { + action: Some( + open_provider_stream_response::Action::PublishValuesResponse( + PublishValuesResponse { + request_id: request.request_id, + status: HashMap::new(), + }, + ), + ), + }, + Err(err) => OpenProviderStreamResponse { + action: Some( + open_provider_stream_response::Action::PublishValuesResponse( + PublishValuesResponse { + request_id: request.request_id, + status: err + .iter() + .map(|(id, error)| (*id, proto::Error::from(error))) + .collect(), + }, + ), + ), + }, + } +} + +async fn get_signal( + signal_id: Option, + broker: &AuthorizedAccess<'_, '_>, +) -> Result { + if let Some(signal) = signal_id.unwrap().signal { + match signal { + proto::signal_id::Signal::Path(path) => { + if path.len() > MAX_REQUEST_PATH_LENGTH { + return Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "The provided path is too long", + )); + } + match broker.get_id_by_path(&path).await { + Some(id) => Ok(id), + None => Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")), + } + } + proto::signal_id::Signal::Id(id) => match broker.get_metadata(id).await { + Some(_metadata) => Ok(id), + None => Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")), + }, + } + } else { + Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "No SignalId provided", + )) + } +} + +fn convert_to_proto_stream( + input: impl Stream, + size: usize, +) -> impl Stream> { + input.map(move |item| { + let mut entries: HashMap = HashMap::with_capacity(size); + for update in item.updates { + let update_datapoint: Option = match update.update.datapoint { + Some(datapoint) => datapoint.into(), + None => None, + }; + if let Some(dp) = update_datapoint { + entries.insert( + update + .update + .path + .expect("Something wrong with update path of subscriptions!"), + dp, + ); + } + } + let response = proto::SubscribeResponse { entries }; + Ok(response) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{broker::DataBroker, permissions}; + use databroker_proto::kuksa::val::v2::val_server::Val; + use proto::open_provider_stream_response::Action::{ + BatchActuateStreamRequest, ProvideActuatorResponse, PublishValuesResponse, + }; + use proto::{open_provider_stream_request, OpenProviderStreamRequest, PublishValuesRequest}; + + async fn check_stream_next( + item: &Result, + expected_response: HashMap, + ) { + let f = false; + match item { + Ok(subscribe_response) => { + // Process the SubscribeResponse + let response = &subscribe_response.entries; + assert_eq!(response.len(), expected_response.len()); + for key in response + .keys() + .chain(expected_response.keys()) + .collect::>() + { + match (response.get(key), expected_response.get(key)) { + (Some(entry1), Some(entry2)) => { + assert_eq!(entry1.value_state, entry2.value_state); + } + (Some(entry1), None) => { + assert!(f, "Key '{}' is only in response: {:?}", key, entry1) + } + (None, Some(entry2)) => assert!( + f, + "Key '{}' is only in expected_response: {:?}", + key, entry2 + ), + (None, None) => unreachable!(), + } + } + } + Err(err) => { + assert!(f, "Error {:?}", err) + } + } + } + + #[tokio::test] + async fn test_publish_value() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + let f = false; + + let entry_id = authorized_access + .add_entry( + "test.datapoint1".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Test datapoint 1".to_owned(), + None, + None, + ) + .await + .unwrap(); + + let request = proto::PublishValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + data_point: { + let timestamp = Some(std::time::SystemTime::now().into()); + + let value = proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }; + + Some(proto::Datapoint { + timestamp, + value_state: Some(proto::datapoint::ValueState::Value(value)), + }) + }, + }; + + // Manually insert permissions + let mut publish_value_request = tonic::Request::new(request); + publish_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.publish_value(publish_value_request).await { + Ok(response) => { + // Handle the successful response + let publish_response = response.into_inner(); + assert_eq!(publish_response, proto::PublishValueResponse {}) + } + Err(status) => { + // Handle the error from the publish_value function + assert!(f, "Publish failed with status: {:?}", status); + } + } + } + + #[tokio::test] + async fn test_publish_value_signal_id_not_found() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + let f = false; + + let _entry_id = authorized_access + .add_entry( + "test.datapoint1".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Test datapoint 1".to_owned(), + None, + None, + ) + .await + .unwrap(); + + let request = proto::PublishValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(1234)), + }), + data_point: { + let timestamp = Some(std::time::SystemTime::now().into()); + + let value = proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }; + + Some(proto::Datapoint { + timestamp, + value_state: Some(proto::datapoint::ValueState::Value(value)), + }) + }, + }; + + // Manually insert permissions + let mut publish_value_request = tonic::Request::new(request); + publish_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.publish_value(publish_value_request).await { + Ok(_) => { + // Handle the successful response + assert!(f, "Should not happen!"); + } + Err(status) => { + // Handle the error from the publish_value function + assert_eq!(status.code(), tonic::Code::NotFound); + assert_eq!(status.message(), "Path not found"); + } + } + } + + /* + Test subscribe service method + */ + #[tokio::test(flavor = "multi_thread")] + async fn test_subscribe() { + let f = false; + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + let entry_id = authorized_access + .add_entry( + "test.datapoint1".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Test datapoint 1".to_owned(), + None, + None, + ) + .await + .unwrap(); + + let mut request = tonic::Request::new(proto::SubscribeRequest { + signal_paths: vec!["test.datapoint1".to_string()], + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result = tokio::task::block_in_place(|| { + // Blocking operation here + // Since broker.subscribe is async, you need to run it in an executor + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(broker.subscribe(request)) + }); + + let mut request = tonic::Request::new(proto::PublishValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + data_point: Some(proto::Datapoint { + timestamp: None, + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + })), + }), + }); + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + match broker.publish_value(request).await { + Ok(response) => { + // Handle the successful response + let publish_response = response.into_inner(); + + // Check if there is an error in the response + assert_eq!(publish_response, proto::PublishValueResponse {}); + } + Err(status) => { + // Handle the error from the publish_value function + assert!(f, "Publish failed with status: {:?}", status); + } + } + + let mut request_false = tonic::Request::new(proto::PublishValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + data_point: Some(proto::Datapoint { + timestamp: None, + value_state: Some(proto::datapoint::ValueState::Value(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(false)), + })), + }), + }); + request_false + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + match broker.publish_value(request_false).await { + Ok(response) => { + // Handle the successful response + let publish_response = response.into_inner(); + + // Check if there is an error in the response + assert_eq!(publish_response, proto::PublishValueResponse {}); + } + Err(status) => { + // Handle the error from the publish_value function + assert!(f, "Publish failed with status: {:?}", status); + } + } + + if let Ok(stream) = result { + // Process the stream by iterating over the items + let mut stream = stream.into_inner(); + + let mut expected_entries: HashMap = HashMap::new(); + + let mut item_count = 0; + while let Some(item) = stream.next().await { + match item_count { + 0 => { + check_stream_next(&item, expected_entries.clone()).await; + expected_entries.insert( + "test.datapoint1".to_string(), + proto::Datapoint { + timestamp: None, + value_state: Some(proto::datapoint::ValueState::Value( + proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }, + )), + }, + ); + } + 1 => { + check_stream_next(&item, expected_entries.clone()).await; + expected_entries.clear(); + expected_entries.insert( + "test.datapoint1".to_string(), + proto::Datapoint { + timestamp: None, + value_state: Some(proto::datapoint::ValueState::Value( + proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(false)), + }, + )), + }, + ); + } + 2 => { + check_stream_next(&item, expected_entries.clone()).await; + break; + } + _ => assert!( + f, + "You shouldn't land here too many items reported back to the stream." + ), + } + item_count += 1; + } + } else { + assert!(f, "Something went wrong while getting the stream.") + } + } + + /* + Test open_provider_stream service method + */ + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_open_provider_stream() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + let request_id = 1; + + let entry_id = authorized_access + .add_entry( + "test.datapoint1".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Test datapoint 1".to_owned(), + None, + None, + ) + .await + .unwrap(); + + let request = OpenProviderStreamRequest { + action: Some(open_provider_stream_request::Action::PublishValuesRequest( + PublishValuesRequest { + request_id, + datapoints: { + let timestamp = Some(std::time::SystemTime::now().into()); + + let value = proto::Value { + typed_value: Some(proto::value::TypedValue::String( + "example_value".to_string(), + )), + }; + + let datapoint = proto::Datapoint { + timestamp, + value_state: Some(proto::datapoint::ValueState::Value(value)), + }; + + let mut map = HashMap::new(); + map.insert(entry_id, datapoint); + map + }, + }, + )), + }; + + // Manually insert permissions + let mut streaming_request = tonic_mock::streaming_request(vec![request]); + streaming_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.open_provider_stream(streaming_request).await { + Ok(response) => { + std::thread::sleep(std::time::Duration::from_secs(3)); + tokio::spawn(async move { + std::thread::sleep(std::time::Duration::from_secs(3)); + let stream = response.into_inner(); + let mut receiver = stream.into_inner(); + while let Some(value) = receiver.recv().await { + match value { + Ok(value) => match value.action { + Some(ProvideActuatorResponse(_)) => { + panic!("Should not happen") + } + Some(PublishValuesResponse(publish_values_response)) => { + assert_eq!(publish_values_response.request_id, request_id); + assert_eq!(publish_values_response.status.len(), 1); + match publish_values_response.status.get(&entry_id) { + Some(value) => { + assert_eq!(value.code, 1); + assert_eq!(value.message, "Wrong Type"); + } + None => { + panic!("Should not happen") + } + } + } + Some(BatchActuateStreamRequest(_)) => { + panic!("Should not happen") + } + None => { + panic!("Should not happen") + } + }, + Err(_) => { + panic!("Should not happen") + } + } + } + }); + } + Err(_) => { + panic!("Should not happen") + } + } + } + + #[tokio::test] + async fn test_list_metadata_using_wildcard() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "test.datapoint1".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Test datapoint 1".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + authorized_access + .add_entry( + "test.branch.datapoint2".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Test branch datapoint 2".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let mut wildcard_req_two_asteriks = tonic::Request::new(proto::ListMetadataRequest { + root: "test.**".to_owned(), + filter: "".to_owned(), + }); + + let mut wildcard_req_one_asterik = tonic::Request::new(proto::ListMetadataRequest { + root: "test.*".to_owned(), + filter: "".to_owned(), + }); + // Manually insert permissions + wildcard_req_two_asteriks + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + wildcard_req_one_asterik + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match proto::val_server::Val::list_metadata(&broker, wildcard_req_two_asteriks) + .await + .map(|res| res.into_inner()) + { + Ok(list_response) => { + let entries_size = list_response.metadata.len(); + assert_eq!(entries_size, 2); + } + Err(_status) => panic!("failed to execute get request"), + } + + match proto::val_server::Val::list_metadata(&broker, wildcard_req_one_asterik) + .await + .map(|res| res.into_inner()) + { + Ok(list_response) => { + let entries_size = list_response.metadata.len(); + assert_eq!(entries_size, 1); + } + Err(_status) => panic!("failed to execute get request"), + } + } + + #[tokio::test] + async fn test_list_metadata_bad_request_pattern_or_not_found() { + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + authorized_access + .add_entry( + "test.datapoint1".to_owned(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Test datapoint 1".to_owned(), + None, + None, + ) + .await + .expect("Register datapoint should succeed"); + + let mut wildcard_req = tonic::Request::new(proto::ListMetadataRequest { + root: "test. **".to_owned(), + filter: "".to_owned(), + }); + + // Manually insert permissions + wildcard_req + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match proto::val_server::Val::list_metadata(&broker, wildcard_req) + .await + .map(|res| res.into_inner()) + { + Ok(_) => {} + Err(error) => { + assert_eq!( + error.code(), + tonic::Code::InvalidArgument, + "unexpected error code" + ); + assert_eq!( + error.message(), + "Invalid Pattern Argument", + "unexpected error reason" + ); + } + } + + let mut not_found_req = tonic::Request::new(proto::ListMetadataRequest { + root: "test.notfound".to_owned(), + filter: "".to_owned(), + }); + + // Manually insert permissions + not_found_req + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match proto::val_server::Val::list_metadata(&broker, not_found_req) + .await + .map(|res| res.into_inner()) + { + Ok(_) => {} + Err(error) => { + assert_eq!(error.code(), tonic::Code::NotFound, "unexpected error code"); + assert_eq!( + error.message(), + "Specified root branch does not exist", + "unexpected error reason" + ); + } + } + } +} diff --git a/databroker/src/grpc/mod.rs b/databroker/src/grpc/mod.rs index c4c86d4a..a7a15a2b 100644 --- a/databroker/src/grpc/mod.rs +++ b/databroker/src/grpc/mod.rs @@ -14,4 +14,5 @@ pub mod server; mod kuksa_val_v1; +mod kuksa_val_v2; mod sdv_databroker_v1; diff --git a/databroker/src/grpc/sdv_databroker_v1/conversions.rs b/databroker/src/grpc/sdv_databroker_v1/conversions.rs index e028c390..0df1578c 100644 --- a/databroker/src/grpc/sdv_databroker_v1/conversions.rs +++ b/databroker/src/grpc/sdv_databroker_v1/conversions.rs @@ -101,6 +101,9 @@ impl From<&broker::Datapoint> for proto::Datapoint { broker::DataValue::NotAvailable => proto::datapoint::Value::FailureValue( proto::datapoint::Failure::NotAvailable as i32, ), + broker::DataValue::ValueFailure(_) => proto::datapoint::Value::FailureValue( + proto::datapoint::Failure::InternalError as i32, + ), }; proto::Datapoint { @@ -166,6 +169,9 @@ impl From<&broker::QueryField> for proto::Datapoint { broker::DataValue::NotAvailable => proto::datapoint::Value::FailureValue( proto::datapoint::Failure::NotAvailable.into(), ), + broker::DataValue::ValueFailure(_) => proto::datapoint::Value::FailureValue( + proto::datapoint::Failure::InternalError.into(), + ), }; proto::Datapoint { diff --git a/databroker/src/grpc/server.rs b/databroker/src/grpc/server.rs index 8bc282ca..9ba338ee 100644 --- a/databroker/src/grpc/server.rs +++ b/databroker/src/grpc/server.rs @@ -37,6 +37,7 @@ pub enum ServerTLS { #[derive(PartialEq)] pub enum Api { KuksaValV1, + KuksaValV2, SdvDatabrokerV1, } @@ -187,6 +188,15 @@ where let mut router = server.add_optional_service(kuksa_val_v1); + if apis.contains(&Api::KuksaValV2) { + router = router.add_optional_service(Some( + kuksa::val::v2::val_server::ValServer::with_interceptor( + broker.clone(), + authorization.clone(), + ), + )); + } + if apis.contains(&Api::SdvDatabrokerV1) { router = router.add_optional_service(Some( sdv::databroker::v1::broker_server::BrokerServer::with_interceptor( diff --git a/databroker/src/main.rs b/databroker/src/main.rs index c576d566..15bc8e60 100644 --- a/databroker/src/main.rs +++ b/databroker/src/main.rs @@ -445,7 +445,7 @@ async fn main() -> Result<(), Box> { } } - let mut apis = vec![grpc::server::Api::KuksaValV1]; + let mut apis = vec![grpc::server::Api::KuksaValV1, grpc::server::Api::KuksaValV2]; if args.get_flag("enable-databroker-v1") { apis.push(grpc::server::Api::SdvDatabrokerV1); diff --git a/databroker/src/types.rs b/databroker/src/types.rs index 6d9241fd..16000c31 100644 --- a/databroker/src/types.rs +++ b/databroker/src/types.rs @@ -55,6 +55,16 @@ pub enum ChangeType { Continuous, } +#[derive(Debug, Clone, PartialEq)] +pub enum ValueFailure { + Unspecified, + InvalidValue, + NotProvided, + UnknownSignal, + AccessDenied, + InternalError, +} + #[derive(Debug, Clone, PartialEq)] pub enum DataValue { NotAvailable, @@ -74,6 +84,7 @@ pub enum DataValue { Uint64Array(Vec), FloatArray(Vec), DoubleArray(Vec), + ValueFailure(ValueFailure), } #[derive(Debug)] diff --git a/databroker/src/viss/v2/conversions.rs b/databroker/src/viss/v2/conversions.rs index a209cb6a..e6fcf06c 100644 --- a/databroker/src/viss/v2/conversions.rs +++ b/databroker/src/viss/v2/conversions.rs @@ -270,6 +270,7 @@ impl From for Value { broker::DataValue::DoubleArray(array) => { Value::Array(array.iter().map(|value| value.to_string()).collect()) } + broker::DataValue::ValueFailure(_) => Value::None, } } } diff --git a/proto/kuksa/val/v2/types.proto b/proto/kuksa/val/v2/types.proto new file mode 100644 index 00000000..9adc6053 --- /dev/null +++ b/proto/kuksa/val/v2/types.proto @@ -0,0 +1,239 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License 2.0 which is available at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +syntax = "proto3"; + +package kuksa.val.v2; +import "google/protobuf/timestamp.proto"; + +option go_package = "kuksa/val/v2"; + +message Datapoint { + google.protobuf.Timestamp timestamp = 1; + + oneof value_state { + ValueFailure failure = 2; + Value value = 3; + } +} + +message Value { + oneof typed_value { + string string = 11; + bool bool = 12; + sint32 int32 = 13; + sint64 int64 = 14; + uint32 uint32 = 15; + uint64 uint64 = 16; + float float = 17; + double double = 18; + StringArray string_array = 21; + BoolArray bool_array = 22; + Int32Array int32_array = 23; + Int64Array int64_array = 24; + Uint32Array uint32_array = 25; + Uint64Array uint64_array = 26; + FloatArray float_array = 27; + DoubleArray double_array = 28; + } +} + +message SignalID { + oneof signal { + int32 id = 1; + string path = 2; + } +} + +message Error { + ErrorCode code = 1; + string message = 2; +} + +enum ErrorCode { + OK = 0; + INVALID_ARGUMENT = 1; + NOT_FOUND = 2; + PERMISSION_DENIED = 3; +} + +message Metadata { + // ID field + int32 id = 10; // Unique identifier for the metadata entry + + // Data type + // The VSS data type of the entry (i.e. the value, min, max etc). + // + // NOTE: protobuf doesn't have int8, int16, uint8 or uint16 which means + // that these values must be serialized as int32 and uint32 respectively. + DataType data_type = 11; // [field: FIELD_METADATA_DATA_TYPE] + + // Entry type + EntryType entry_type = 12; // [field: FIELD_METADATA_ENTRY_TYPE] + + // Description + // Describes the meaning and content of the entry. + optional string description = 13; // [field: FIELD_METADATA_DESCRIPTION] + + // Comment [optional] + // A comment can be used to provide additional informal information + // on a entry. + optional string comment = 14; // [field: FIELD_METADATA_COMMENT] + + // Deprecation [optional] + // Whether this entry is deprecated. Can contain recommendations of what + // to use instead. + optional string deprecation = 15; // [field: FIELD_METADATA_DEPRECATION] + + // Unit [optional] + // The unit of measurement + optional string unit = 16; // [field: FIELD_METADATA_UNIT] + + // Value restrictions [optional] + // Restrict which values are allowed. + // Only restrictions matching the DataType {datatype} above are valid. + ValueRestriction value_restriction = 17; // [field: FIELD_METADATA_VALUE_RESTRICTION] +} + +// Value restriction +// +// One ValueRestriction{type} for each type, since +// they don't make sense unless the types match +// +message ValueRestriction { + oneof type { + ValueRestrictionString string = 21; + // For signed VSS integers + ValueRestrictionInt signed = 22; + // For unsigned VSS integers + ValueRestrictionUint unsigned = 23; + // For floating point VSS values (float and double) + ValueRestrictionFloat floating_point = 24; + } +} + +message ValueRestrictionInt { + optional sint64 min = 1; + optional sint64 max = 2; + repeated sint64 allowed_values = 3; +} + +message ValueRestrictionUint { + optional uint64 min = 1; + optional uint64 max = 2; + repeated uint64 allowed_values = 3; +} + +message ValueRestrictionFloat { + optional double min = 1; + optional double max = 2; + + // allowed for doubles/floats not recommended + repeated double allowed_values = 3; +} + +// min, max doesn't make much sense for a string +message ValueRestrictionString { + repeated string allowed_values = 1; +} + +enum ValueFailure { + // Unspecified value failure, reserved for gRPC backwards compatibility + // (see https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum) + UNSPECIFIED = 0; + // The signal is known and provided, but doesn't have a valid value + INVALID_VALUE = 1; + // The signal is known, but no value is provided currently + NOT_PROVIDED = 2; + // The referred signal is unknown on the system + UNKNOWN_SIGNAL = 3; + // The client does not have the necessary access rights to the signal + ACCESS_DENIED = 4; + // Unexpected internal error + INTERNAL_ERROR = 5; +} + +// VSS Data type of a signal +// +// Protobuf doesn't support int8, int16, uint8 or uint16. +// These are mapped to int32 and uint32 respectively. +// +enum DataType { + DATA_TYPE_UNSPECIFIED = 0; + DATA_TYPE_STRING = 1; + DATA_TYPE_BOOLEAN = 2; + DATA_TYPE_INT8 = 3; + DATA_TYPE_INT16 = 4; + DATA_TYPE_INT32 = 5; + DATA_TYPE_INT64 = 6; + DATA_TYPE_UINT8 = 7; + DATA_TYPE_UINT16 = 8; + DATA_TYPE_UINT32 = 9; + DATA_TYPE_UINT64 = 10; + DATA_TYPE_FLOAT = 11; + DATA_TYPE_DOUBLE = 12; + DATA_TYPE_TIMESTAMP = 13; + DATA_TYPE_STRING_ARRAY = 20; + DATA_TYPE_BOOLEAN_ARRAY = 21; + DATA_TYPE_INT8_ARRAY = 22; + DATA_TYPE_INT16_ARRAY = 23; + DATA_TYPE_INT32_ARRAY = 24; + DATA_TYPE_INT64_ARRAY = 25; + DATA_TYPE_UINT8_ARRAY = 26; + DATA_TYPE_UINT16_ARRAY = 27; + DATA_TYPE_UINT32_ARRAY = 28; + DATA_TYPE_UINT64_ARRAY = 29; + DATA_TYPE_FLOAT_ARRAY = 30; + DATA_TYPE_DOUBLE_ARRAY = 31; + DATA_TYPE_TIMESTAMP_ARRAY = 32; +} + +// Entry type +enum EntryType { + ENTRY_TYPE_UNSPECIFIED = 0; + ENTRY_TYPE_ATTRIBUTE = 1; + ENTRY_TYPE_SENSOR = 2; + ENTRY_TYPE_ACTUATOR = 3; +} + +message StringArray { + repeated string values = 1; +} + +message BoolArray { + repeated bool values = 1; +} + +message Int32Array { + repeated sint32 values = 1; +} + +message Int64Array { + repeated sint64 values = 1; +} + +message Uint32Array { + repeated uint32 values = 1; +} + +message Uint64Array { + repeated uint64 values = 1; +} + +message FloatArray { + repeated float values = 1; +} + +message DoubleArray { + repeated double values = 1; +} diff --git a/proto/kuksa/val/v2/val.proto b/proto/kuksa/val/v2/val.proto new file mode 100644 index 00000000..67d1a86f --- /dev/null +++ b/proto/kuksa/val/v2/val.proto @@ -0,0 +1,246 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License 2.0 which is available at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +syntax = "proto3"; + +package kuksa.val.v2; + +option go_package = "kuksa/val/v2"; + +import "kuksa/val/v2/types.proto"; + +service VAL { + // Get the latest value of a signal + // + // Returns (GRPC error code): + // NOT_FOUND if the requested signal doesn't exist + // PERMISSION_DENIED if access is denied + rpc GetValue(GetValueRequest) returns (GetValueResponse); + + // Get the latest values of a set of signals. + // The returned list of data points has the same order as the list of the request. + // + // Returns (GRPC error code): + // NOT_FOUND if any of the requested signals doesn't exist. + // PERMISSION_DENIED if access is denied for any of the requested signals. + rpc GetValues(GetValuesRequest) returns (GetValuesResponse); + + // List values of signals matching the request. + // + // Returns a list of signal values. Only values of signals that the user + // is allowed to read are included (everything else is ignored). + // + // Returns (GRPC error code): + // NOT_FOUND if the specified root branch does not exist. + rpc ListValues(ListValuesRequest) returns (ListValuesResponse); + + // Subscribe to a set of signals using string path parameters + // Returns (GRPC error code): + // NOT_FOUND if any of the signals are non-existant. + // PERMISSION_DENIED if access is denied for any of the signals. + rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse); + + // Subscribe to a set of signals using i32 id parameters + // Returns (GRPC error code): + // NOT_FOUND if any of the signals are non-existant. + // PERMISSION_DENIED if access is denied for any of the signals. + rpc SubscribeId(SubscribeRequestId) returns (stream SubscribeResponseId); + + // Actuate a single actuator + // + // Returns (GRPC error code): + // NOT_FOUND if the actuator does not exist. + // PERMISSION_DENIED if access is denied for of the actuator. + // UNAVAILABLE if there is no provider currently providing the actuator + // INVALID_ARGUMENT + // - if the data type used in the request does not match + // the data type of the addressed signal + // - if the requested value is not accepted, + // e.g. if sending an unsupported enum value + rpc Actuate(ActuateRequest) returns (ActuateResponse); + + // Actuate simultaneously multiple actuators. + // If any error occurs, the entire operation will be aborted + // and no single actuator value will be forwarded to the provider. + // + // Returns (GRPC error code): + // NOT_FOUND if any of the actuators are non-existant. + // PERMISSION_DENIED if access is denied for any of the actuators. + // UNAVAILABLE if there is no provider currently providing an actuator + // INVALID_ARGUMENT + // - if the data type used in the request does not match + // the data type of the addressed signal + // - if the requested value is not accepted, + // e.g. if sending an unsupported enum value + rpc BatchActuate(BatchActuateRequest) returns (BatchActuateResponse); + + // List metadata of signals matching the request. + // + // Returns (GRPC error code): + // NOT_FOUND if the specified root branch does not exist. + rpc ListMetadata(ListMetadataRequest) returns (ListMetadataResponse); + + // Publish a signal value. Used for low frequency signals (e.g. attributes). + // + // Returns (GRPC error code): + // NOT_FOUND if any of the signals are non-existant. + // PERMISSION_DENIED + // - if access is denied for any of the signals. + // - if the signal is already provided by another provider. + // INVALID_ARGUMENT + // - if the data type used in the request does not match + // the data type of the addressed signal + // - if the published value is not accepted, + // e.g. if sending an unsupported enum value + rpc PublishValue(PublishValueRequest) returns (PublishValueResponse); + + // Open a stream used to provide actuation and/or publishing values using + // a streaming interface. Used to provide actuators and to enable high frequency + // updates of values. + // + // The open stream is used for request / response type communication between the + // provider and server (where the initiator of a request can vary). + // Errors are communicated as messages in the stream. + rpc OpenProviderStream(stream OpenProviderStreamRequest) returns (stream OpenProviderStreamResponse); + + // Get server information + rpc GetServerInfo(GetServerInfoRequest) returns (GetServerInfoResponse); +} + +message GetValueRequest { + SignalID signal_id = 1; +} + +message GetValueResponse { + Datapoint data_point = 1; +} + +message GetValuesRequest { + repeated SignalID signal_ids = 1; +} + +message GetValuesResponse { + repeated Datapoint datapoints = 1; +} + +message ListValuesRequest { + repeated SignalID signal_ids = 1; +} + +message ListValuesResponse { + repeated Datapoint datapoints = 1; +} + +message SubscribeRequest { + repeated string signal_paths = 1; +} + +message SubscribeResponse { + map entries = 1; +} + +message SubscribeRequestId { + repeated int32 signal_ids = 1; +} + +message SubscribeResponseId { + map entries = 1; +} + +message ActuateRequest { + SignalID signal_id = 1; + Value value = 2; +} + +message ActuateResponse { +} + +message BatchActuateRequest { + repeated ActuateRequest actuate_requests = 1; +} + +message BatchActuateResponse { +} + +message ListMetadataRequest { + string root = 1; + string filter = 2; +} + +message ListMetadataResponse { + repeated Metadata metadata = 1; +} + +message PublishValueRequest { + SignalID signal_id = 1; + Datapoint data_point = 2; +} + +message PublishValueResponse { +} + +message PublishValuesRequest { + int32 request_id = 1; /// Unique request id for the stream that can be used to identify the response. + map datapoints = 2; +} + +message PublishValuesResponse { + int32 request_id = 1; + map status = 2; +} + +message ProvidedActuation { + repeated SignalID actuator_identifiers = 1; +} + +message ProvideActuatorResponse { +} + +message BatchActuateStreamRequest { + repeated ActuateRequest actuate_requests = 1; +} + +message BatchActuateStreamResponse { +} + +message OpenProviderStreamRequest { + oneof action { + // Inform server of an actuator this provider provides. + ProvidedActuation provided_actuation = 1; + // Publish a value. + PublishValuesRequest publish_values_request = 2; + // Sent to acknowledge the acceptance of a batch actuate + // request. + BatchActuateStreamResponse batch_actuate_stream_response = 3; + } +} + +message OpenProviderStreamResponse { + oneof action { + // Response to a provide actuator request. + ProvideActuatorResponse provide_actuator_response = 1; + // Acknowledgement that a published value was received. + PublishValuesResponse publish_values_response = 2; + // Send a batch actuate request to a provider. + BatchActuateStreamRequest batch_actuate_stream_request = 3; + } +} + +message GetServerInfoRequest { + // Nothing yet +} + +message GetServerInfoResponse { + string name = 1; + string version = 2; +}