diff --git a/data/vss-core/README.md b/data/vss-core/README.md index dbba2cff..73289905 100644 --- a/data/vss-core/README.md +++ b/data/vss-core/README.md @@ -60,7 +60,7 @@ use the full name. When official release is created replace the copied *.json-fi Build and run kuksa_databroker using the new VSS file according to [documentation](../../README.md), e.g. ```sh -$cargo run --bin databroker -- --metadata ../data/vss-core/vss_release_4.0.json +$cargo run --bin databroker -- --metadata ./data/vss-core/vss_release_4.0.json ``` Use the client to verify that changes in VSS are reflected, by doing e.g. set/get on some new or renamed signals. diff --git a/databroker/src/grpc/kuksa_val_v2/conversions.rs b/databroker/src/grpc/kuksa_val_v2/conversions.rs index 2c66c6c7..e6660a4f 100644 --- a/databroker/src/grpc/kuksa_val_v2/conversions.rs +++ b/databroker/src/grpc/kuksa_val_v2/conversions.rs @@ -45,7 +45,10 @@ impl From<&proto::Datapoint> for broker::Datapoint { impl From for Option { fn from(from: broker::Datapoint) -> Self { match from.value { - broker::DataValue::NotAvailable => None, + broker::DataValue::NotAvailable => Some(proto::Datapoint { + value: None, + timestamp: Some(from.ts.into()), + }), broker::DataValue::Bool(value) => Some(proto::Datapoint { timestamp: Some(from.ts.into()), value: Some(proto::Value { diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index dcc213e2..36a973b9 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -14,7 +14,7 @@ use std::{collections::HashMap, pin::Pin}; use crate::{ - broker::{self, AuthorizedAccess, SubscriptionError}, + broker::{self, AuthorizedAccess, ReadError, SubscriptionError}, glob::Matcher, permissions::Permissions, }; @@ -39,22 +39,96 @@ const MAX_REQUEST_PATH_LENGTH: usize = 1000; impl proto::val_server::Val for broker::DataBroker { async fn get_value( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Unimplemented", - )) + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + let broker = self.authorized_access(&permissions); + + let request = request.into_inner(); + + let signal_id = match get_signal(request.signal_id, &broker).await { + Ok(signal_id) => signal_id, + Err(err) => return Err(err), + }; + + let datapoint = match broker.get_datapoint(signal_id).await { + Ok(datapoint) => datapoint, + Err(ReadError::NotFound) => return Err(tonic::Status::not_found("Path not found")), + Err(ReadError::PermissionDenied) => { + return Err(tonic::Status::permission_denied("Permission denied")) + } + Err(ReadError::PermissionExpired) => { + return Err(tonic::Status::unauthenticated("Permission expired")) + } + }; + + Ok(tonic::Response::new(proto::GetValueResponse { + data_point: datapoint.into(), + })) } async fn get_values( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Unimplemented", - )) + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + let broker = self.authorized_access(&permissions); + + let requested = request.into_inner().signal_ids; + let mut response_datapoints = Vec::new(); + + for request in requested { + let signal_id = match get_signal(Some(request), &broker).await { + Ok(signal_id) => signal_id, + Err(err) => return Err(err), + }; + + match broker.get_datapoint(signal_id).await { + Ok(datapoint) => { + let proto_datapoint_opt: Option = datapoint.into(); + //let proto_datapoint: proto::Datapoint = proto_datapoint_opt.into(); + response_datapoints.push(proto_datapoint_opt.unwrap()); + } + Err(ReadError::NotFound) => { + return Err(tonic::Status::not_found(format!( + "Path not found (id: {})", + signal_id + ))); + } + Err(ReadError::PermissionDenied) => { + return Err(tonic::Status::permission_denied(format!( + "Permission denied(id: {})", + signal_id + ))) + } + Err(ReadError::PermissionExpired) => { + return Err(tonic::Status::unauthenticated(format!( + "Permission expired (id: {})", + signal_id + ))) + } + }; + } + + Ok(tonic::Response::new(proto::GetValuesResponse { + data_points: response_datapoints, + })) } async fn list_values( @@ -766,43 +840,678 @@ mod tests { }; use proto::{open_provider_stream_request, OpenProviderStreamRequest, PublishValuesRequest}; - async fn check_stream_next( - item: &Result, - expected_response: HashMap, - ) { - let f = false; - match item { - Ok(subscribe_response) => { - // Process the SubscribeResponse - let response = &subscribe_response.entries; - assert_eq!(response.len(), expected_response.len()); - for key in response - .keys() - .chain(expected_response.keys()) - .collect::>() - { - match (response.get(key), expected_response.get(key)) { - (Some(entry1), Some(entry2)) => { - assert_eq!(entry1.value, entry2.value); - } - (Some(entry1), None) => { - assert!(f, "Key '{}' is only in response: {:?}", key, entry1) - } - (None, Some(entry2)) => assert!( - f, - "Key '{}' is only in expected_response: {:?}", - key, entry2 - ), - (None, None) => unreachable!(), + // Helper for adding an int32 signal and adding value + async fn helper_add_int32( + broker: &DataBroker, + name: &str, + value: i32, + timestamp: std::time::SystemTime, + ) -> i32 { + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + let entry_id = authorized_access + .add_entry( + name.to_owned(), + broker::DataType::Int32, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Some Description That Does Not Matter".to_owned(), + None, + None, + ) + .await + .unwrap(); + + let _ = authorized_access + .update_entries([( + entry_id, + broker::EntryUpdate { + path: None, + datapoint: Some(broker::Datapoint { + //ts: std::time::SystemTime::now(), + ts: timestamp, + source_ts: None, + value: broker::types::DataValue::Int32(value), + }), + actuator_target: None, + entry_type: None, + data_type: None, + description: None, + allowed: None, + unit: None, + }, + )]) + .await; + + entry_id + } + + #[tokio::test] + async fn test_get_value_id_ok() { + let broker = DataBroker::default(); + + let timestamp = std::time::SystemTime::now(); + + let entry_id = helper_add_int32(&broker, "test.datapoint1", -64, timestamp).await; + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + }; + + // Manually insert permissions + let mut get_value_request = tonic::Request::new(request); + get_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.get_value(get_value_request).await { + Ok(response) => { + // Handle the successful response + let get_response = response.into_inner(); + + let value = proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(-64)), + }; + assert_eq!( + get_response, + proto::GetValueResponse { + data_point: { + Some(proto::Datapoint { + timestamp: Some(timestamp.into()), + value: Some(value), + }) + }, } + ); + } + Err(status) => { + panic!("Get failed with status: {:?}", status); + } + } + } + + #[tokio::test] + async fn test_get_value_name_ok() { + let broker = DataBroker::default(); + + let timestamp = std::time::SystemTime::now(); + + let _entry_id = helper_add_int32(&broker, "test.datapoint1", -64, timestamp).await; + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Path( + "test.datapoint1".to_string(), + )), + }), + }; + + // Manually insert permissions + let mut get_value_request = tonic::Request::new(request); + get_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.get_value(get_value_request).await { + Ok(response) => { + // Handle the successful response + let get_response = response.into_inner(); + + let value = proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(-64)), + }; + assert_eq!( + get_response, + proto::GetValueResponse { + data_point: { + Some(proto::Datapoint { + timestamp: Some(timestamp.into()), + value: Some(value), + }) + }, + } + ); + } + Err(status) => { + panic!("Get failed with status: {:?}", status); + } + } + } + + #[tokio::test] + async fn test_get_value_id_not_authorized() { + let broker = DataBroker::default(); + + let timestamp = std::time::SystemTime::now(); + + let entry_id = helper_add_int32(&broker, "test.datapoint1", -64, timestamp).await; + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + }; + + // Do not insert permissions + let get_value_request = tonic::Request::new(request); + + match broker.get_value(get_value_request).await { + Ok(_response) => { + panic!("Did not expect success"); + } + Err(status) => { + assert_eq!(status.code(), tonic::Code::Unauthenticated) + } + } + } + + #[tokio::test] + async fn test_get_value_id_no_value() { + // Define signal but do not assign any value + + let broker = DataBroker::default(); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + + let entry_id = authorized_access + .add_entry( + "test.datapoint1".to_string(), + broker::DataType::Int32, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Some Description hat Does Not Matter".to_owned(), + None, + None, + ) + .await + .unwrap(); + + // Now try to get it + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + }; + + // Manually insert permissions + let mut get_value_request = tonic::Request::new(request); + get_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.get_value(get_value_request).await { + Ok(response) => { + // Handle the successful response + let get_response = response.into_inner(); + + // As of today Databroker assigns "Now" when registering a Datapoint so if there is no value + // we do not know exact time. For now just checking that it is not None + assert_eq!(get_response.data_point.clone().unwrap().value, None); + assert_ne!(get_response.data_point.unwrap().timestamp, None); + } + Err(status) => { + // Handle the error from the publish_value function + panic!("Get failed with status: {:?}", status); + } + } + } + + #[tokio::test] + async fn test_get_value_id_not_defined() { + let broker = DataBroker::default(); + // Just use some arbitrary number + let entry_id: i32 = 12345; + + // Now try to get it + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + }; + + // Manually insert permissions + let mut get_value_request = tonic::Request::new(request); + get_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.get_value(get_value_request).await { + Ok(_response) => { + panic!("Did not expect success"); + } + Err(status) => { + assert_eq!(status.code(), tonic::Code::NotFound) + } + } + } + + #[tokio::test] + async fn test_get_value_name_not_defined() { + let broker = DataBroker::default(); + + // Now try to get it + + let request = proto::GetValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Path( + "test.datapoint1".to_string(), + )), + }), + }; + + // Manually insert permissions + let mut get_value_request = tonic::Request::new(request); + get_value_request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + match broker.get_value(get_value_request).await { + Ok(_response) => { + panic!("Did not expect success"); + } + Err(status) => { + assert_eq!(status.code(), tonic::Code::NotFound) + } + } + } + + struct GetValuesConfig { + send_auth: bool, + request_first: bool, + use_name_for_first: bool, + first_exist: bool, + auth_first: bool, + request_second: bool, + use_name_for_second: bool, + second_exist: bool, + auth_second: bool, + } + + struct GetValuesConfigBuilder { + send_auth: bool, + request_first: bool, + use_name_for_first: bool, + first_exist: bool, + auth_first: bool, + request_second: bool, + use_name_for_second: bool, + second_exist: bool, + auth_second: bool, + } + + impl GetValuesConfigBuilder { + fn new() -> GetValuesConfigBuilder { + GetValuesConfigBuilder { + send_auth: false, + request_first: false, + use_name_for_first: false, + first_exist: false, + auth_first: false, + request_second: false, + use_name_for_second: false, + second_exist: false, + auth_second: false, + } + } + + // Request credentials to be sent. + // Do not need to be explcitly requested if auth_first/auth_second is used + fn send_auth(&mut self) -> &mut Self { + self.send_auth = true; + self + } + + fn request_first(&mut self) -> &mut Self { + self.request_first = true; + self + } + + fn use_name_for_first(&mut self) -> &mut Self { + self.use_name_for_first = true; + self + } + + fn first_exist(&mut self) -> &mut Self { + self.first_exist = true; + self + } + + // Request credentials and include credentials for signal 1 + fn auth_first(&mut self) -> &mut Self { + self.auth_first = true; + self.send_auth = true; + self + } + + fn request_second(&mut self) -> &mut Self { + self.request_second = true; + self + } + + fn use_name_for_second(&mut self) -> &mut Self { + self.use_name_for_second = true; + self + } + + fn second_exist(&mut self) -> &mut Self { + self.second_exist = true; + self + } + + // Request credentials and include credentials for signal 2 + fn auth_second(&mut self) -> &mut Self { + self.send_auth = true; + self.auth_second = true; + self + } + + fn build(&self) -> GetValuesConfig { + GetValuesConfig { + send_auth: self.send_auth, + request_first: self.request_first, + use_name_for_first: self.use_name_for_first, + first_exist: self.first_exist, + auth_first: self.auth_first, + request_second: self.request_second, + use_name_for_second: self.use_name_for_second, + second_exist: self.second_exist, + auth_second: self.auth_second, + } + } + } + + async fn test_get_values_combo(config: GetValuesConfig) { + static SIGNAL1: &str = "test.datapoint1"; + static SIGNAL2: &str = "test.datapoint2"; + + let broker = DataBroker::default(); + + let timestamp = std::time::SystemTime::now(); + + let mut entry_id = -1; + if config.first_exist { + entry_id = helper_add_int32(&broker, SIGNAL1, -64, timestamp).await; + } + + let mut entry_id2 = -1; + if config.second_exist { + entry_id2 = helper_add_int32(&broker, SIGNAL2, -13, timestamp).await; + } + + let mut permission_builder = permissions::PermissionBuilder::new(); + + if config.auth_first { + permission_builder = permission_builder + .add_read_permission(permissions::Permission::Glob(SIGNAL1.to_string())); + } + if config.auth_second { + permission_builder = permission_builder + .add_read_permission(permissions::Permission::Glob(SIGNAL2.to_string())); + } + let permissions = permission_builder.build().expect("Oops!"); + + // Build the request + + let mut request_signals = Vec::new(); + if config.request_first { + if !config.use_name_for_first { + request_signals.push(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }); + } else { + request_signals.push(proto::SignalId { + signal: Some(proto::signal_id::Signal::Path(SIGNAL1.to_string())), + }); + } + } + if config.request_second { + if !config.use_name_for_second { + request_signals.push(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id2)), + }); + } else { + request_signals.push(proto::SignalId { + signal: Some(proto::signal_id::Signal::Path(SIGNAL2.to_string())), + }); + } + } + + let request = proto::GetValuesRequest { + signal_ids: request_signals, + }; + + let mut tonic_request = tonic::Request::new(request); + + if config.send_auth { + tonic_request.extensions_mut().insert(permissions); + } + + match broker.get_values(tonic_request).await { + Ok(response) => { + // Check that we actually expect an Ok answer + + if config.request_first & !config.first_exist { + panic!("Should not get Ok as signal test.datapoint1 should not exist") + } + if config.request_first & !config.auth_first { + panic!("Should not get Ok as we do not have permission for signal test.datapoint2 ") + } + if config.request_second & !config.second_exist { + panic!("Should not get Ok as signal test.datapoint1 should not exist") + } + if config.request_second & !config.auth_second { + panic!("Should not get Ok as we do not have permission for signal test.datapoint2 ") + } + + let get_response = response.into_inner(); + + let mut response_signals = Vec::new(); + + if config.request_first { + let value = proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(-64)), + }; + let datapoint = proto::Datapoint { + timestamp: Some(timestamp.into()), + value: Some(value), + }; + response_signals.push(datapoint); + } + if config.request_second { + let value = proto::Value { + typed_value: Some(proto::value::TypedValue::Int32(-13)), + }; + let datapoint = proto::Datapoint { + timestamp: Some(timestamp.into()), + value: Some(value), + }; + response_signals.push(datapoint); } + + assert_eq!( + get_response, + proto::GetValuesResponse { + data_points: response_signals, + } + ); } - Err(err) => { - assert!(f, "Error {:?}", err) + Err(status) => { + // It can be discussed what has precendce NotFound or Unauthenticated, does not really matter + // For now assuming that NotFound has precedence, at least if we have a valid token + if !config.send_auth { + assert_eq!(status.code(), tonic::Code::Unauthenticated) + } else if config.request_first & !config.first_exist { + assert_eq!(status.code(), tonic::Code::NotFound) + } else if config.request_first & !config.auth_first { + assert_eq!(status.code(), tonic::Code::PermissionDenied) + } else if config.request_second & !config.second_exist { + assert_eq!(status.code(), tonic::Code::NotFound) + } else if config.request_second & !config.auth_second { + assert_eq!(status.code(), tonic::Code::PermissionDenied) + } else { + panic!("GetValues failed with status: {:?}", status); + } } } } + #[tokio::test] + async fn test_get_values_id_one_signal_ok() { + let config = GetValuesConfigBuilder::new() + .first_exist() + .request_first() + .auth_first() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_id_two_signals_ok() { + let config = GetValuesConfigBuilder::new() + .first_exist() + .second_exist() + .request_first() + .request_second() + .auth_first() + .auth_second() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_path_one_signal_ok() { + let config = GetValuesConfigBuilder::new() + .first_exist() + .request_first() + .use_name_for_first() + .auth_first() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_path_two_signals_ok() { + let config = GetValuesConfigBuilder::new() + .first_exist() + .second_exist() + .request_first() + .use_name_for_first() + .request_second() + .use_name_for_second() + .auth_first() + .auth_second() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_no_signals_ok() { + // Expecting an empty list back + + let config = GetValuesConfigBuilder::new() + .first_exist() + .second_exist() + .auth_first() + .auth_second() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_id_two_signals_first_missing() { + let config = GetValuesConfigBuilder::new() + .second_exist() + .request_first() + .request_second() + .auth_first() + .auth_second() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_id_two_signals_second_missing() { + let config = GetValuesConfigBuilder::new() + .first_exist() + .request_first() + .request_second() + .auth_first() + .auth_second() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_id_two_signals_first_unauthorized() { + let config = GetValuesConfigBuilder::new() + .first_exist() + .second_exist() + .request_first() + .request_second() + .auth_second() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_id_two_signals_second_unauthorized() { + let config = GetValuesConfigBuilder::new() + .first_exist() + .second_exist() + .request_first() + .request_second() + .auth_first() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_id_two_signals_both_unauthorized() { + let config = GetValuesConfigBuilder::new() + .first_exist() + .second_exist() + .request_first() + .request_second() + .send_auth() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_id_two_signals_first_missing_unauthorized() { + let config = GetValuesConfigBuilder::new() + .second_exist() + .request_first() + .request_second() + .auth_second() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_id_two_signals_second_missing_unauthorized() { + let config = GetValuesConfigBuilder::new() + .first_exist() + .request_first() + .request_second() + .auth_first() + .build(); + test_get_values_combo(config).await; + } + + #[tokio::test] + async fn test_get_values_id_two_signals_not_send_auth() { + let config = GetValuesConfigBuilder::new() + .first_exist() + .second_exist() + .request_first() + .request_second() + .build(); + test_get_values_combo(config).await; + } + #[tokio::test] async fn test_publish_value() { let broker = DataBroker::default(); @@ -918,25 +1627,129 @@ mod tests { /* Test subscribe service method */ - #[tokio::test(flavor = "multi_thread")] - async fn test_subscribe() { + async fn test_subscribe_case(has_value: bool) { + async fn check_stream_next( + item: &Result, + input_value: Option, + ) { + // Create Datapoint + let mut expected_response: HashMap = HashMap::new(); + // We expect to get an empty response first + expected_response.insert( + "test.datapoint1".to_string(), + proto::Datapoint { + timestamp: None, + value: match input_value { + Some(true) => Some(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + Some(false) => Some(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(false)), + }), + None => None, + }, + }, + ); + + let f = false; + match item { + Ok(subscribe_response) => { + // Process the SubscribeResponse + let response = &subscribe_response.entries; + assert_eq!(response.len(), expected_response.len()); + for key in response + .keys() + .chain(expected_response.keys()) + .collect::>() + { + match (response.get(key), expected_response.get(key)) { + (Some(entry1), Some(entry2)) => { + assert_eq!(entry1.value, entry2.value); + } + (Some(entry1), None) => { + assert!(f, "Key '{}' is only in response: {:?}", key, entry1) + } + (None, Some(entry2)) => assert!( + f, + "Key '{}' is only in expected_response: {:?}", + key, entry2 + ), + (None, None) => unreachable!(), + } + } + } + Err(err) => { + assert!(f, "Error {:?}", err) + } + } + } + + async fn publish_value( + broker: &DataBroker, + entry_id: i32, + input_value: Option, + input_timestamp: Option, + ) { + let timestamp = input_timestamp.map(|input_timestamp| input_timestamp.into()); + + let mut request = tonic::Request::new(proto::PublishValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + data_point: Some(proto::Datapoint { + timestamp, + + value: match input_value { + Some(true) => Some(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + Some(false) => Some(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(false)), + }), + None => None, + }, + }), + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + match broker.publish_value(request).await { + Ok(response) => { + // Handle the successful response + let publish_response = response.into_inner(); + + // Check if there is an error in the response + assert_eq!(publish_response, proto::PublishValueResponse {}); + } + Err(status) => { + // Handle the error from the publish_value function + panic!("Publish failed with status: {:?}", status); + } + } + } + let f = false; let broker = DataBroker::default(); - let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); let entry_id = authorized_access .add_entry( - "test.datapoint1".to_owned(), + "test.datapoint1".to_string(), broker::DataType::Bool, broker::ChangeType::OnChange, broker::EntryType::Sensor, - "Test datapoint 1".to_owned(), + "Some Description that Does Not Matter".to_owned(), None, None, ) .await .unwrap(); + if has_value { + publish_value(&broker, entry_id, Some(false), None).await + } + let mut request = tonic::Request::new(proto::SubscribeRequest { signal_paths: vec!["test.datapoint1".to_string()], }); @@ -952,98 +1765,47 @@ mod tests { rt.block_on(broker.subscribe(request)) }); - let mut request = tonic::Request::new(proto::PublishValueRequest { - signal_id: Some(proto::SignalId { - signal: Some(proto::signal_id::Signal::Id(entry_id)), - }), - data_point: Some(proto::Datapoint { - timestamp: None, - value: Some(proto::Value { - typed_value: Some(proto::value::TypedValue::Bool(true)), - }), - }), - }); - request - .extensions_mut() - .insert(permissions::ALLOW_ALL.clone()); - match broker.publish_value(request).await { - Ok(response) => { - // Handle the successful response - let publish_response = response.into_inner(); + // Publish "true" as value + publish_value(&broker, entry_id, Some(true), None).await; - // Check if there is an error in the response - assert_eq!(publish_response, proto::PublishValueResponse {}); - } - Err(status) => { - // Handle the error from the publish_value function - assert!(f, "Publish failed with status: {:?}", status); - } - } + // Publish "false" as value + publish_value(&broker, entry_id, Some(false), None).await; - let mut request_false = tonic::Request::new(proto::PublishValueRequest { - signal_id: Some(proto::SignalId { - signal: Some(proto::signal_id::Signal::Id(entry_id)), - }), - data_point: Some(proto::Datapoint { - timestamp: None, - value: Some(proto::Value { - typed_value: Some(proto::value::TypedValue::Bool(false)), - }), - }), - }); - request_false - .extensions_mut() - .insert(permissions::ALLOW_ALL.clone()); - match broker.publish_value(request_false).await { - Ok(response) => { - // Handle the successful response - let publish_response = response.into_inner(); + // Publish "false" again but with new timestamp - as it is not an update we shall not get anything - // Check if there is an error in the response - assert_eq!(publish_response, proto::PublishValueResponse {}); - } - Err(status) => { - // Handle the error from the publish_value function - assert!(f, "Publish failed with status: {:?}", status); - } - } + let timestamp = std::time::SystemTime::now(); + publish_value(&broker, entry_id, Some(false), timestamp.into()).await; + + // Publish None as value, equals reset + publish_value(&broker, entry_id, None, None).await; + + // Publish "true" as value + + publish_value(&broker, entry_id, Some(true), None).await; if let Ok(stream) = result { // Process the stream by iterating over the items let mut stream = stream.into_inner(); - let mut expected_entries: HashMap = HashMap::new(); - let mut item_count = 0; while let Some(item) = stream.next().await { match item_count { 0 => { - check_stream_next(&item, expected_entries.clone()).await; - expected_entries.insert( - "test.datapoint1".to_string(), - proto::Datapoint { - timestamp: None, - value: Some(proto::Value { - typed_value: Some(proto::value::TypedValue::Bool(true)), - }), - }, - ); + check_stream_next(&item, if has_value { Some(false) } else { None }).await; } 1 => { - check_stream_next(&item, expected_entries.clone()).await; - expected_entries.clear(); - expected_entries.insert( - "test.datapoint1".to_string(), - proto::Datapoint { - timestamp: None, - value: Some(proto::Value { - typed_value: Some(proto::value::TypedValue::Bool(false)), - }), - }, - ); + check_stream_next(&item, Some(true)).await; } 2 => { - check_stream_next(&item, expected_entries.clone()).await; + // As long as value stays as false we do not get anything new, so prepare for None + check_stream_next(&item, Some(false)).await; + } + 3 => { + check_stream_next(&item, None).await; + } + 4 => { + check_stream_next(&item, Some(true)).await; + // And we do not expect more break; } _ => assert!( @@ -1053,11 +1815,19 @@ mod tests { } item_count += 1; } + // Make sure stream is not closed in advance + assert_eq!(item_count, 4); } else { assert!(f, "Something went wrong while getting the stream.") } } + #[tokio::test(flavor = "multi_thread")] + async fn test_subscribe() { + test_subscribe_case(false).await; + test_subscribe_case(true).await; + } + /* Test open_provider_stream service method */ diff --git a/proto/kuksa/val/v2/types.proto b/proto/kuksa/val/v2/types.proto index ff87d13a..45fd4e4f 100644 --- a/proto/kuksa/val/v2/types.proto +++ b/proto/kuksa/val/v2/types.proto @@ -48,7 +48,14 @@ message Value { message SignalID { oneof signal { + // Numeric identifier to the signal + // As of today Databroker assigns arbitrary unique numbers to each registered signal + // at startup, meaning that identifiers may change after restarting Databroker. + // A mechanism for static identifiers may be introduced in the future. int32 id = 1; + // Full VSS-style path to a specific signal, like "Vehicle.Speed" + // Wildcards and paths to branches are not supported. + // The given path must be known by the Databroker. string path = 2; } } diff --git a/proto/kuksa/val/v2/val.proto b/proto/kuksa/val/v2/val.proto index fc1c5b3c..5b9a65d1 100644 --- a/proto/kuksa/val/v2/val.proto +++ b/proto/kuksa/val/v2/val.proto @@ -21,33 +21,50 @@ import "kuksa/val/v2/types.proto"; service VAL { // Get the latest value of a signal + // If the signal exist but does not have a valid value + // a DataPoint where value is None shall be returned. // // Returns (GRPC error code): // NOT_FOUND if the requested signal doesn't exist + // UNAUTHENTICATED if no credentials provided or credentials has expired // PERMISSION_DENIED if access is denied + // rpc GetValue(GetValueRequest) returns (GetValueResponse); // Get the latest values of a set of signals. // The returned list of data points has the same order as the list of the request. + // If a requested signal has no value a DataPoint where value is None will be returned. // // Returns (GRPC error code): // NOT_FOUND if any of the requested signals doesn't exist. + // UNAUTHENTICATED if no credentials provided or credentials has expired // PERMISSION_DENIED if access is denied for any of the requested signals. + // rpc GetValues(GetValuesRequest) returns (GetValuesResponse); // List values of signals matching the request. // // Returns a list of signal values. Only values of signals that the user // is allowed to read are included (everything else is ignored). + // If a requested signal has no value a DataPoint where value is None will be returned. // // Returns (GRPC error code): - // NOT_FOUND if the specified root branch does not exist. + // NOT_FOUND if any of the requested signals doesn't exist. + // + // TODO: Needs to be redesigned, returning a list with only DataPoints (without names/id) + // is not sufficient for an rpc were you may get back a subset of the values you requested. + // Like if you request A, B, C and only have access to one of them, then you do not + // know which of them that is returned. rpc ListValues(ListValuesRequest) returns (ListValuesResponse); // Subscribe to a set of signals using string path parameters // Returns (GRPC error code): // NOT_FOUND if any of the signals are non-existant. // PERMISSION_DENIED if access is denied for any of the signals. + // + // When subscribing the Broker shall immediately return the value for all + // subscribed entries. If no value is available when subscribing a DataPoint + // with value None shall be returned. rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse); // Subscribe to a set of signals using i32 id parameters @@ -130,7 +147,7 @@ message GetValuesRequest { } message GetValuesResponse { - repeated Datapoint datapoints = 1; + repeated Datapoint data_points = 1; } message ListValuesRequest { @@ -138,7 +155,7 @@ message ListValuesRequest { } message ListValuesResponse { - repeated Datapoint datapoints = 1; + repeated Datapoint data_points = 1; } message SubscribeRequest {