Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use tokio broadcast::channel #95

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 51 additions & 22 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use crate::query;
pub use crate::types::{ChangeType, DataType, DataValue, EntryType};

use tokio::sync::{broadcast, mpsc, RwLock};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tokio_stream::{Stream, StreamExt};

use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
Expand All @@ -33,6 +34,8 @@ use tracing::{debug, info, warn};

use crate::glob;

const MAX_SUBSCRIBE_BUFFER_SIZE: usize = 1000;

#[derive(Debug)]
pub enum ActuationError {
NotFound,
Expand Down Expand Up @@ -135,14 +138,14 @@ pub struct QueryField {
pub value: DataValue,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ChangeNotification {
pub id: i32,
pub update: EntryUpdate,
pub fields: HashSet<Field>,
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct EntryUpdates {
pub updates: Vec<ChangeNotification>,
}
Expand All @@ -157,6 +160,7 @@ pub enum QueryError {
pub enum SubscriptionError {
NotFound,
InvalidInput,
InvalidBufferSize,
InternalError,
}

Expand Down Expand Up @@ -198,7 +202,7 @@ pub struct QuerySubscription {

pub struct ChangeSubscription {
entries: HashMap<i32, HashSet<Field>>,
sender: mpsc::Sender<EntryUpdates>,
sender: broadcast::Sender<EntryUpdates>,
permissions: Permissions,
}

Expand Down Expand Up @@ -805,7 +809,7 @@ impl Subscriptions {
}
});
self.change_subscriptions.retain(|sub| {
if sub.sender.is_closed() {
if sub.sender.receiver_count() == 0 {
info!("Subscriber gone: removing subscription");
false
} else if sub.permissions.is_expired() {
Expand Down Expand Up @@ -898,9 +902,12 @@ impl ChangeSubscription {
if notifications.updates.is_empty() {
Ok(())
} else {
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
match self.sender.send(notifications) {
Ok(_number_of_receivers) => Ok(()),
Err(err) => {
debug!("Send error for entry{}: ", err);
Err(NotificationError {})
}
}
}
} else {
Expand Down Expand Up @@ -939,9 +946,12 @@ impl ChangeSubscription {
}
notifications
};
match self.sender.send(notifications).await {
Ok(()) => Ok(()),
Err(_) => Err(NotificationError {}),
match self.sender.send(notifications) {
Ok(_number_of_receivers) => Ok(()),
Err(err) => {
debug!("Send error for entry{}: ", err);
Err(NotificationError {})
}
}
}
}
Expand Down Expand Up @@ -1622,12 +1632,22 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
pub async fn subscribe(
&self,
valid_entries: HashMap<i32, HashSet<Field>>,
buffer_size: Option<usize>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
}

let (sender, receiver) = mpsc::channel(10);
let channel_capacity = if let Some(cap) = buffer_size {
if cap > MAX_SUBSCRIBE_BUFFER_SIZE {
return Err(SubscriptionError::InvalidBufferSize);
}
cap
} else {
1
};

let (sender, receiver) = broadcast::channel(channel_capacity);
let subscription = ChangeSubscription {
entries: valid_entries,
sender,
Expand All @@ -1648,7 +1668,13 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
.await
.add_change_subscription(subscription);

let stream = ReceiverStream::new(receiver);
let stream = BroadcastStream::new(receiver).filter_map(|result| match result {
Ok(message) => Some(message),
Err(err) => {
debug!("Lagged entries: {}", err);
None
}
});
rafaeling marked this conversation as resolved.
Show resolved Hide resolved
Ok(stream)
}

Expand Down Expand Up @@ -4234,20 +4260,23 @@ pub mod tests {
.expect("Register datapoint should succeed");

let mut stream = broker
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
None,
)
.await
.expect("subscription should succeed");

// Stream should yield initial notification with current values i.e. NotAvailable
match stream.next().await {
Some(next) => {
assert_eq!(next.updates.len(), 1);
Some(entry) => {
assert_eq!(entry.updates.len(), 1);
assert_eq!(
next.updates[0].update.path,
entry.updates[0].update.path,
Some("test.datapoint1".to_string())
);
assert_eq!(
next.updates[0].update.datapoint.as_ref().unwrap().value,
entry.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::NotAvailable
);
}
Expand Down Expand Up @@ -4281,14 +4310,14 @@ pub mod tests {

// Value has been set, expect the next item in stream to match.
match stream.next().await {
Some(next) => {
assert_eq!(next.updates.len(), 1);
Some(entry) => {
assert_eq!(entry.updates.len(), 1);
assert_eq!(
next.updates[0].update.path,
entry.updates[0].update.path,
Some("test.datapoint1".to_string())
);
assert_eq!(
next.updates[0].update.datapoint.as_ref().unwrap().value,
entry.updates[0].update.datapoint.as_ref().unwrap().value,
DataValue::Int32(101)
);
}
Expand Down
6 changes: 5 additions & 1 deletion databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

match broker.subscribe(entries).await {
match broker.subscribe(entries, Some(1)).await {
Ok(stream) => {
let stream = convert_to_proto_stream(stream);
Ok(tonic::Response::new(Box::pin(stream)))
Expand All @@ -615,6 +615,10 @@ impl proto::val_server::Val for broker::DataBroker {
Err(SubscriptionError::InternalError) => {
Err(tonic::Status::new(tonic::Code::Internal, "Internal Error"))
}
Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Subscription buffer_size max allowed value is 1000",
)),
}
}

Expand Down
40 changes: 34 additions & 6 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,15 @@ impl proto::val_server::Val for broker::DataBroker {
None => return Err(tonic::Status::unauthenticated("Unauthenticated")),
};

let broker = self.authorized_access(&permissions);

let request = request.into_inner();
if request.buffer_size == 0 {
return Err(tonic::Status::invalid_argument(format!(
"Provided buffer_size {} should be greater than zero.",
request.buffer_size
)));
}

let broker = self.authorized_access(&permissions);

let signal_paths = request.signal_paths;
let size = signal_paths.len();
Expand All @@ -247,7 +253,10 @@ impl proto::val_server::Val for broker::DataBroker {
);
}

match broker.subscribe(valid_requests).await {
match broker
.subscribe(valid_requests, Some(request.buffer_size as usize))
.await
{
Ok(stream) => {
let stream = convert_to_proto_stream(stream, size);
Ok(tonic::Response::new(Box::pin(stream)))
Expand All @@ -257,6 +266,10 @@ impl proto::val_server::Val for broker::DataBroker {
Err(tonic::Status::invalid_argument("Invalid Argument"))
}
Err(SubscriptionError::InternalError) => Err(tonic::Status::internal("Internal Error")),
Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Subscription buffer_size max allowed value is 1000",
)),
}
}

Expand Down Expand Up @@ -287,9 +300,15 @@ impl proto::val_server::Val for broker::DataBroker {
None => return Err(tonic::Status::unauthenticated("Unauthenticated")),
};

let broker = self.authorized_access(&permissions);

let request = request.into_inner();
if request.buffer_size == 0 {
return Err(tonic::Status::invalid_argument(format!(
"Provided lag_buffer_capacity {} should be greater than zero.",
request.buffer_size
)));
}

let broker = self.authorized_access(&permissions);

let signal_ids = request.signal_ids;
let size = signal_ids.len();
Expand All @@ -313,7 +332,10 @@ impl proto::val_server::Val for broker::DataBroker {
);
}

match broker.subscribe(valid_requests).await {
match broker
.subscribe(valid_requests, Some(request.buffer_size as usize))
.await
{
Ok(stream) => {
let stream = convert_to_proto_stream_id(stream, size);
Ok(tonic::Response::new(Box::pin(stream)))
Expand All @@ -328,6 +350,10 @@ impl proto::val_server::Val for broker::DataBroker {
Err(SubscriptionError::InternalError) => {
Err(tonic::Status::new(tonic::Code::Internal, "Internal Error"))
}
Err(SubscriptionError::InvalidBufferSize) => Err(tonic::Status::new(
tonic::Code::InvalidArgument,
"Subscription buffer_size max allowed value is 1000",
)),
}
}

Expand Down Expand Up @@ -1836,6 +1862,7 @@ mod tests {

let mut request = tonic::Request::new(proto::SubscribeRequest {
signal_paths: vec!["test.datapoint1".to_string()],
buffer_size: 5,
});

request
Expand Down Expand Up @@ -1982,6 +2009,7 @@ mod tests {

let mut request = tonic::Request::new(proto::SubscribeByIdRequest {
signal_ids: vec![entry_id],
buffer_size: 5,
});

request
Expand Down
3 changes: 2 additions & 1 deletion databroker/src/viss/v2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl Viss for Server {
});
};

match broker.subscribe(entries).await {
match broker.subscribe(entries, Some(1)).await {
Ok(stream) => {
let subscription_id = SubscriptionId::new();

Expand Down Expand Up @@ -303,6 +303,7 @@ impl Viss for Server {
broker::SubscriptionError::NotFound => Error::NotFoundInvalidPath,
broker::SubscriptionError::InvalidInput => Error::NotFoundInvalidPath,
broker::SubscriptionError::InternalError => Error::InternalServerError,
broker::SubscriptionError::InvalidBufferSize => Error::InternalServerError,
},
ts: SystemTime::now().into(),
}),
Expand Down
27 changes: 24 additions & 3 deletions proto/kuksa/val/v2/val.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,39 @@ service VAL {
// NOT_FOUND if any of the signals are non-existant.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied for any of the signals.
// INVALID_ARGUMENT if the request is empty or provided path is too long
// - MAX_REQUEST_PATH_LENGTH: usize = 1000;
// INVALID_ARGUMENT
// - if the request is empty or provided path is too long
// MAX_REQUEST_PATH_LENGTH: usize = 1000;
// - if buffer_size exceeds the maximum permitted
// MAX_BUFFER_SIZE: usize = 1000;
//
// When subscribing, Databroker shall immediately return the value for all
// subscribed entries.
// If a value isn't available when subscribing to a it, it should return None
//
// If a subscriber is slow to consume signals, messages will be buffered up
// to the specified buffer_size before the oldest messages are dropped.
//
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse);

// Subscribe to a set of signals using i32 id parameters
// Returns (GRPC error code):
// NOT_FOUND if any of the signals are non-existant.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied for any of the signals.
// INVALID_ARGUMENT if the request is empty or provided path is too long
// INVALID_ARGUMENT
// - if the request is empty or provided path is too long
// MAX_REQUEST_PATH_LENGTH: usize = 1000;
// - if buffer_size exceeds the maximum permitted
// MAX_BUFFER_SIZE: usize = 1000;
//
// When subscribing, Databroker shall immediately return the value for all
// subscribed entries.
// If a value isn't available when subscribing to a it, it should return None
//
// If a subscriber is slow to consume signals, messages will be buffered up
// to the specified buffer_size before the oldest messages are dropped.
//
rpc SubscribeById(SubscribeByIdRequest) returns (stream SubscribeByIdResponse);

// Actuate a single actuator
Expand Down Expand Up @@ -192,6 +205,10 @@ message GetValuesResponse {

message SubscribeRequest {
repeated string signal_paths = 1;

// Specifies the number of messages that can be buffered for
// slow subscribers before the oldest messages are dropped.
uint32 buffer_size = 2;
}

message SubscribeResponse {
Expand All @@ -200,6 +217,10 @@ message SubscribeResponse {

message SubscribeByIdRequest {
repeated int32 signal_ids = 1;

// Specifies the number of messages that can be buffered for
// slow subscribers before the oldest messages are dropped.
uint32 buffer_size = 2;
}

message SubscribeByIdResponse {
Expand Down
Loading