From a89dc6e94aa4e16df98be8540bce254f66c58f91 Mon Sep 17 00:00:00 2001 From: Rafael RL Date: Tue, 5 Nov 2024 15:32:14 +0100 Subject: [PATCH] Use tokio broadcast::channel --- databroker/src/broker.rs | 73 +++++++++++++++++-------- databroker/src/grpc/kuksa_val_v1/val.rs | 6 +- databroker/src/grpc/kuksa_val_v2/val.rs | 40 ++++++++++++-- databroker/src/viss/v2/server.rs | 3 +- proto/kuksa/val/v2/val.proto | 27 ++++++++- 5 files changed, 116 insertions(+), 33 deletions(-) diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 219b0464..b4d02cd3 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -18,8 +18,9 @@ use crate::query; pub use crate::types::{ChangeType, DataType, DataValue, EntryType}; use tokio::sync::{broadcast, mpsc, RwLock}; +use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::ReceiverStream; -use tokio_stream::Stream; +use tokio_stream::{Stream, StreamExt}; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; @@ -33,6 +34,8 @@ use tracing::{debug, info, warn}; use crate::glob; +const MAX_SUBSCRIBE_BUFFER_SIZE: usize = 1000; + #[derive(Debug)] pub enum ActuationError { NotFound, @@ -135,14 +138,14 @@ pub struct QueryField { pub value: DataValue, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ChangeNotification { pub id: i32, pub update: EntryUpdate, pub fields: HashSet, } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct EntryUpdates { pub updates: Vec, } @@ -157,6 +160,7 @@ pub enum QueryError { pub enum SubscriptionError { NotFound, InvalidInput, + InvalidBufferSize, InternalError, } @@ -198,7 +202,7 @@ pub struct QuerySubscription { pub struct ChangeSubscription { entries: HashMap>, - sender: mpsc::Sender, + sender: broadcast::Sender, permissions: Permissions, } @@ -805,7 +809,7 @@ impl Subscriptions { } }); self.change_subscriptions.retain(|sub| { - if sub.sender.is_closed() { + if sub.sender.receiver_count() == 0 { info!("Subscriber gone: removing subscription"); false } else if sub.permissions.is_expired() { @@ -898,9 +902,12 @@ impl ChangeSubscription { if notifications.updates.is_empty() { Ok(()) } else { - match self.sender.send(notifications).await { - Ok(()) => Ok(()), - Err(_) => Err(NotificationError {}), + match self.sender.send(notifications) { + Ok(_number_of_receivers) => Ok(()), + Err(err) => { + debug!("Send error for entry{}: ", err); + Err(NotificationError {}) + } } } } else { @@ -939,9 +946,12 @@ impl ChangeSubscription { } notifications }; - match self.sender.send(notifications).await { - Ok(()) => Ok(()), - Err(_) => Err(NotificationError {}), + match self.sender.send(notifications) { + Ok(_number_of_receivers) => Ok(()), + Err(err) => { + debug!("Send error for entry{}: ", err); + Err(NotificationError {}) + } } } } @@ -1622,12 +1632,22 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { pub async fn subscribe( &self, valid_entries: HashMap>, + buffer_size: Option, ) -> Result, SubscriptionError> { if valid_entries.is_empty() { return Err(SubscriptionError::InvalidInput); } - let (sender, receiver) = mpsc::channel(10); + let channel_capacity = if let Some(cap) = buffer_size { + if cap > MAX_SUBSCRIBE_BUFFER_SIZE { + return Err(SubscriptionError::InvalidBufferSize); + } + cap + } else { + 1 + }; + + let (sender, receiver) = broadcast::channel(channel_capacity); let subscription = ChangeSubscription { entries: valid_entries, sender, @@ -1648,7 +1668,13 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { .await .add_change_subscription(subscription); - let stream = ReceiverStream::new(receiver); + let stream = BroadcastStream::new(receiver).filter_map(|result| match result { + Ok(message) => Some(message), + Err(err) => { + debug!("Lagged entries: {}", err); + None + } + }); Ok(stream) } @@ -4234,20 +4260,23 @@ pub mod tests { .expect("Register datapoint should succeed"); let mut stream = broker - .subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))])) + .subscribe( + HashMap::from([(id1, HashSet::from([Field::Datapoint]))]), + None, + ) .await .expect("subscription should succeed"); // Stream should yield initial notification with current values i.e. NotAvailable match stream.next().await { - Some(next) => { - assert_eq!(next.updates.len(), 1); + Some(entry) => { + assert_eq!(entry.updates.len(), 1); assert_eq!( - next.updates[0].update.path, + entry.updates[0].update.path, Some("test.datapoint1".to_string()) ); assert_eq!( - next.updates[0].update.datapoint.as_ref().unwrap().value, + entry.updates[0].update.datapoint.as_ref().unwrap().value, DataValue::NotAvailable ); } @@ -4281,14 +4310,14 @@ pub mod tests { // Value has been set, expect the next item in stream to match. match stream.next().await { - Some(next) => { - assert_eq!(next.updates.len(), 1); + Some(entry) => { + assert_eq!(entry.updates.len(), 1); assert_eq!( - next.updates[0].update.path, + entry.updates[0].update.path, Some("test.datapoint1".to_string()) ); assert_eq!( - next.updates[0].update.datapoint.as_ref().unwrap().value, + entry.updates[0].update.datapoint.as_ref().unwrap().value, DataValue::Int32(101) ); } diff --git a/databroker/src/grpc/kuksa_val_v1/val.rs b/databroker/src/grpc/kuksa_val_v1/val.rs index 02946357..90bd12ae 100644 --- a/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/databroker/src/grpc/kuksa_val_v1/val.rs @@ -600,7 +600,7 @@ impl proto::val_server::Val for broker::DataBroker { } } - match broker.subscribe(entries).await { + match broker.subscribe(entries, Some(1)).await { Ok(stream) => { let stream = convert_to_proto_stream(stream); Ok(tonic::Response::new(Box::pin(stream))) @@ -615,6 +615,10 @@ impl proto::val_server::Val for broker::DataBroker { Err(SubscriptionError::InternalError) => { Err(tonic::Status::new(tonic::Code::Internal, "Internal Error")) } + Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Subscription buffer_size max allowed value is 1000", + )), } } diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index ffe1c6a4..e86f5dd8 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -221,9 +221,15 @@ impl proto::val_server::Val for broker::DataBroker { None => return Err(tonic::Status::unauthenticated("Unauthenticated")), }; - let broker = self.authorized_access(&permissions); - let request = request.into_inner(); + if request.buffer_size == 0 { + return Err(tonic::Status::invalid_argument(format!( + "Provided buffer_size {} should be greater than zero.", + request.buffer_size + ))); + } + + let broker = self.authorized_access(&permissions); let signal_paths = request.signal_paths; let size = signal_paths.len(); @@ -247,7 +253,10 @@ impl proto::val_server::Val for broker::DataBroker { ); } - match broker.subscribe(valid_requests).await { + match broker + .subscribe(valid_requests, Some(request.buffer_size as usize)) + .await + { Ok(stream) => { let stream = convert_to_proto_stream(stream, size); Ok(tonic::Response::new(Box::pin(stream))) @@ -257,6 +266,10 @@ impl proto::val_server::Val for broker::DataBroker { Err(tonic::Status::invalid_argument("Invalid Argument")) } Err(SubscriptionError::InternalError) => Err(tonic::Status::internal("Internal Error")), + Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Subscription buffer_size max allowed value is 1000", + )), } } @@ -287,9 +300,15 @@ impl proto::val_server::Val for broker::DataBroker { None => return Err(tonic::Status::unauthenticated("Unauthenticated")), }; - let broker = self.authorized_access(&permissions); - let request = request.into_inner(); + if request.buffer_size == 0 { + return Err(tonic::Status::invalid_argument(format!( + "Provided lag_buffer_capacity {} should be greater than zero.", + request.buffer_size + ))); + } + + let broker = self.authorized_access(&permissions); let signal_ids = request.signal_ids; let size = signal_ids.len(); @@ -313,7 +332,10 @@ impl proto::val_server::Val for broker::DataBroker { ); } - match broker.subscribe(valid_requests).await { + match broker + .subscribe(valid_requests, Some(request.buffer_size as usize)) + .await + { Ok(stream) => { let stream = convert_to_proto_stream_id(stream, size); Ok(tonic::Response::new(Box::pin(stream))) @@ -328,6 +350,10 @@ impl proto::val_server::Val for broker::DataBroker { Err(SubscriptionError::InternalError) => { Err(tonic::Status::new(tonic::Code::Internal, "Internal Error")) } + Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Subscription buffer_size max allowed value is 1000", + )), } } @@ -1836,6 +1862,7 @@ mod tests { let mut request = tonic::Request::new(proto::SubscribeRequest { signal_paths: vec!["test.datapoint1".to_string()], + buffer_size: 5, }); request @@ -1982,6 +2009,7 @@ mod tests { let mut request = tonic::Request::new(proto::SubscribeByIdRequest { signal_ids: vec![entry_id], + buffer_size: 5, }); request diff --git a/databroker/src/viss/v2/server.rs b/databroker/src/viss/v2/server.rs index e22923b4..5a168ff6 100644 --- a/databroker/src/viss/v2/server.rs +++ b/databroker/src/viss/v2/server.rs @@ -271,7 +271,7 @@ impl Viss for Server { }); }; - match broker.subscribe(entries).await { + match broker.subscribe(entries, Some(1)).await { Ok(stream) => { let subscription_id = SubscriptionId::new(); @@ -303,6 +303,7 @@ impl Viss for Server { broker::SubscriptionError::NotFound => Error::NotFoundInvalidPath, broker::SubscriptionError::InvalidInput => Error::NotFoundInvalidPath, broker::SubscriptionError::InternalError => Error::InternalServerError, + broker::SubscriptionError::InvalidBufferSize => Error::InternalServerError, }, ts: SystemTime::now().into(), }), diff --git a/proto/kuksa/val/v2/val.proto b/proto/kuksa/val/v2/val.proto index b880b847..6f8ce824 100644 --- a/proto/kuksa/val/v2/val.proto +++ b/proto/kuksa/val/v2/val.proto @@ -52,13 +52,19 @@ service VAL { // NOT_FOUND if any of the signals are non-existant. // UNAUTHENTICATED if no credentials provided or credentials has expired // PERMISSION_DENIED if access is denied for any of the signals. - // INVALID_ARGUMENT if the request is empty or provided path is too long - // - MAX_REQUEST_PATH_LENGTH: usize = 1000; + // INVALID_ARGUMENT + // - if the request is empty or provided path is too long + // MAX_REQUEST_PATH_LENGTH: usize = 1000; + // - if buffer_size exceeds the maximum permitted + // MAX_BUFFER_SIZE: usize = 1000; // // When subscribing, Databroker shall immediately return the value for all // subscribed entries. // If a value isn't available when subscribing to a it, it should return None // + // If a subscriber is slow to consume signals, messages will be buffered up + // to the specified buffer_size before the oldest messages are dropped. + // rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse); // Subscribe to a set of signals using i32 id parameters @@ -66,12 +72,19 @@ service VAL { // NOT_FOUND if any of the signals are non-existant. // UNAUTHENTICATED if no credentials provided or credentials has expired // PERMISSION_DENIED if access is denied for any of the signals. - // INVALID_ARGUMENT if the request is empty or provided path is too long + // INVALID_ARGUMENT + // - if the request is empty or provided path is too long + // MAX_REQUEST_PATH_LENGTH: usize = 1000; + // - if buffer_size exceeds the maximum permitted + // MAX_BUFFER_SIZE: usize = 1000; // // When subscribing, Databroker shall immediately return the value for all // subscribed entries. // If a value isn't available when subscribing to a it, it should return None // + // If a subscriber is slow to consume signals, messages will be buffered up + // to the specified buffer_size before the oldest messages are dropped. + // rpc SubscribeById(SubscribeByIdRequest) returns (stream SubscribeByIdResponse); // Actuate a single actuator @@ -192,6 +205,10 @@ message GetValuesResponse { message SubscribeRequest { repeated string signal_paths = 1; + + // Specifies the number of messages that can be buffered for + // slow subscribers before the oldest messages are dropped. + uint32 buffer_size = 2; } message SubscribeResponse { @@ -200,6 +217,10 @@ message SubscribeResponse { message SubscribeByIdRequest { repeated int32 signal_ids = 1; + + // Specifies the number of messages that can be buffered for + // slow subscribers before the oldest messages are dropped. + uint32 buffer_size = 2; } message SubscribeByIdResponse {