Skip to content

Commit

Permalink
Merge pull request #87 from boschglobal/fix/doc_open_provider_stream
Browse files Browse the repository at this point in the history
Update incomplete information on errors
  • Loading branch information
rafaeling authored Oct 29, 2024
2 parents 50f2ba6 + 59265ab commit 95e3041
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 176 deletions.
6 changes: 3 additions & 3 deletions databroker/src/grpc/kuksa_val_v2/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,11 @@ impl broker::UpdateError {
format!("Wrong type provided (id: {})", id),
),
broker::UpdateError::OutOfBounds => tonic::Status::new(
tonic::Code::OutOfRange,
tonic::Code::InvalidArgument,
format!("Value out of bounds (id: {})", id),
),
broker::UpdateError::UnsupportedType => tonic::Status::new(
tonic::Code::Unimplemented,
tonic::Code::InvalidArgument,
format!("Unsupported type (id: {})", id),
),
broker::UpdateError::PermissionDenied => tonic::Status::new(
Expand Down Expand Up @@ -610,7 +610,7 @@ impl broker::ActuationError {
match self {
broker::ActuationError::NotFound => tonic::Status::not_found(message),
broker::ActuationError::WrongType => tonic::Status::invalid_argument(message),
broker::ActuationError::OutOfBounds => tonic::Status::out_of_range(message),
broker::ActuationError::OutOfBounds => tonic::Status::invalid_argument(message),
broker::ActuationError::UnsupportedType => tonic::Status::invalid_argument(message),
broker::ActuationError::PermissionDenied => tonic::Status::permission_denied(message),
broker::ActuationError::PermissionExpired => tonic::Status::unauthenticated(message),
Expand Down
240 changes: 72 additions & 168 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ impl ActuationProvider for Provider {

#[tonic::async_trait]
impl proto::val_server::Val for broker::DataBroker {
// Returns (GRPC error code):
// NOT_FOUND if the requested signal doesn't exist
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied
//
async fn get_value(
&self,
request: tonic::Request<proto::GetValueRequest>,
Expand Down Expand Up @@ -128,6 +133,11 @@ impl proto::val_server::Val for broker::DataBroker {
}))
}

// Returns (GRPC error code):
// NOT_FOUND if any of the requested signals doesn't exist.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied for any of the requested signals.
//
async fn get_values(
&self,
request: tonic::Request<proto::GetValuesRequest>,
Expand Down Expand Up @@ -192,7 +202,12 @@ impl proto::val_server::Val for broker::DataBroker {
+ 'static,
>,
>;

// Returns (GRPC error code):
// NOT_FOUND if any of the signals are non-existant.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied for any of the signals.
// INVALID_ARGUMENT if the request is empty or provided path is too long
//
async fn subscribe(
&self,
request: tonic::Request<proto::SubscribeRequest>,
Expand Down Expand Up @@ -253,7 +268,12 @@ impl proto::val_server::Val for broker::DataBroker {
+ 'static,
>,
>;

// Returns (GRPC error code):
// NOT_FOUND if any of the signals are non-existant.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied for any of the signals.
// INVALID_ARGUMENT if the request is empty
//
async fn subscribe_by_id(
&self,
request: tonic::Request<proto::SubscribeByIdRequest>,
Expand Down Expand Up @@ -311,17 +331,19 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

// Actuate a single actuator
//
// Returns (GRPC error code):
// NOT_FOUND if the actuator does not exist.
// PERMISSION_DENIED if access is denied for the actuator.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// UNAVAILABLE if there is no provider currently providing the actuator
// DATA_LOSS is there is a internal TransmissionFailure
// INVALID_ARGUMENT
// - if the data type used in the request does not match
// the data type of the addressed signal
// - if the requested value is not accepted,
// e.g. if sending an unsupported enum value
// - if the provided value is out of the min/max range specified
//
async fn actuate(
&self,
request: tonic::Request<proto::ActuateRequest>,
Expand Down Expand Up @@ -378,12 +400,16 @@ impl proto::val_server::Val for broker::DataBroker {
// Returns (GRPC error code):
// NOT_FOUND if any of the actuators are non-existant.
// PERMISSION_DENIED if access is denied for any of the actuators.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// UNAVAILABLE if there is no provider currently providing an actuator
// DATA_LOSS is there is a internal TransmissionFailure
// INVALID_ARGUMENT
// - if the data type used in the request does not match
// the data type of the addressed signal
// - if the requested value is not accepted,
// e.g. if sending an unsupported enum value
// - if any of the provided actuators values are out of the min/max range specified
//
async fn batch_actuate(
&self,
request: tonic::Request<proto::BatchActuateRequest>,
Expand Down Expand Up @@ -437,33 +463,11 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

/// List metadata of signals matching the wildcard branch request.
///
/// # Arguments
///
/// ```
/// `request`:
/// ListMetadataRequest {
/// root: String
/// filter: String
/// }
///
/// # Response
/// `response`:
/// ListMetadataResponse {
/// metadata: Vec<Metadata>
/// }
/// ```
///
/// # Errors
///
/// Returns (GRPC error code):
/// NOT_FOUND if the specified root branch does not exist
/// INVALID_ARGUMENT if the request pattern is invalid
///
/// # Examples
/// For details, please refer to
/// [Wildcard Matching](https://github.com/eclipse-kuksa/kuksa-databroker/blob/main/doc/wildcard_matching.md#examples)
// Returns (GRPC error code):
// NOT_FOUND if the specified root branch does not exist.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// INVALID_ARGUMENT if the provided path or wildcard is wrong.
//
async fn list_metadata(
&self,
request: tonic::Request<proto::ListMetadataRequest>,
Expand Down Expand Up @@ -505,34 +509,18 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

// Publish a signal value. Used for low frequency signals (e.g. attributes).
/// # Arguments
///
/// ```
/// `request`:
/// PublishValueRequest {
/// signal_id: <String or i32>
/// datapoint: Datapoint
/// }
///
/// # Response
/// `response`:
/// PublishValueResponse {
/// error: Error
/// }
/// ```
///
/// # Errors
/// Returns (GRPC error code):
/// NOT_FOUND if any of the signals are non-existant.
/// PERMISSION_DENIED
/// - if access is denied for any of the signals.
/// - if the signal is already provided by another provider.
/// INVALID_ARGUMENT
/// - if the data type used in the request does not match
/// the data type of the addressed signal
/// - if the published value is not accepted,
/// e.g. if sending an unsupported enum value
// Returns (GRPC error code):
// NOT_FOUND if any of the signals are non-existant.
// PERMISSION_DENIED
// - if access is denied for any of the signals.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// INVALID_ARGUMENT
// - if the data type used in the request does not match
// the data type of the addressed signal
// - if the published value is not accepted,
// e.g. if sending an unsupported enum value
// - if the published value is out of the min/max range specified
//
async fn publish_value(
&self,
request: tonic::Request<proto::PublishValueRequest>,
Expand Down Expand Up @@ -587,113 +575,33 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

// type OpenProviderStreamStream = Pin<
// Box<
// dyn Stream<Item = Result<proto::OpenProviderStreamResponse, tonic::Status>>
// + Send
// + Sync
// + 'static,
// >,
// >;

type OpenProviderStreamStream =
ReceiverStream<Result<proto::OpenProviderStreamResponse, tonic::Status>>;

/// Opens a bidirectional stream with the databroker to perform various actions such as
/// providing actuators, publishing sensor and actuator values, and receiving actuations from the databroker.
///
/// # Actions
///
/// The function handles the following actions:
///
/// 1. **Provide an Actuator**:
/// - The provider claims ownership of the actuator with the specified signal ID.
///
/// ```
/// `request`:
/// OpenProviderStreamRequest {
/// action: ProvideActuationRequest {
/// {
/// signal: id: 30,
/// },
/// {
/// signal: id: 45,
/// },
/// ...
/// }
/// }
///
/// `response`:
/// OpenProviderStreamStream {
/// action: ProvideActuattionResponse { }
/// }
/// ```
///
/// 2. **Publish Values**:
/// - The provider publishes a request ID along with a map of sensor and actuator values.
///
/// ```
/// `request`:
/// OpenProviderStreamRequest {
/// action: PublishValuesRequest {
/// request_id: 1,
/// datapoints: {
/// (30, Datapoint),
/// (45, Datapoint),
/// ...
/// }
/// }
/// }
///
/// `response`:
/// OpenProviderStreamStream {
/// action: PublishValuesResponse {
/// request_id: 1,
/// status: {
/// (If errors) {
/// (30, Error),
/// (45, Error),
/// ...
/// }
/// }
/// }
/// }
/// ```
///
/// 3. **Receive Actuations**:
/// - The provider receives actuation requests from the databroker.
///
/// ```
/// `request`:
/// OpenProviderStreamRequest {
/// action: BatchActuateStreamResponse { }
/// }
///
/// `response`:
/// OpenProviderStreamStream {
/// action: BatchActuateStreamRequest {
/// actuate_requests: {
/// (30, Value),
/// (45, Value),
/// ...
/// }
/// }
/// }
/// ```
///
/// # Arguments
///
/// * `request` - The request should contain the necessary permissions if the databroker is started with secure mode.
/// The request `OpenProviderStreamRequest` can contain messages according to the action:
/// - Action 1: `ProvideActuationRequest`
/// - Action 2: `PublishValuesRequest`
/// - Action 3: `BatchActuateStreamResponse`
///
/// # Errors
///
/// The open stream is used for request / response type communication between the
/// provider and server (where the initiator of a request can vary).
/// Errors are communicated as messages in the stream.
// Errors:
// - Provider sends ProvideActuationRequest -> Databroker returns ProvideActuationResponse
// Returns (GRPC error code) and closes the stream call (strict case).
// NOT_FOUND if any of the signals are non-existant.
// PERMISSION_DENIED if access is denied for any of the signals.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// ALREADY_EXISTS if a provider already claimed the ownership of an actuator
//
// - Provider sends PublishValuesRequest -> Databroker returns PublishValuesResponse
// GRPC errors are returned as messages in the stream
// response with the signal id `map<int32, Error> status = 2;` (permissive case)
// NOT_FOUND if a signal is non-existant.
// PERMISSION_DENIED
// - if access is denied for a signal.
// INVALID_ARGUMENT
// - if the data type used in the request does not match
// the data type of the addressed signal
// - if the published value is not accepted,
// e.g. if sending an unsupported enum value
// - if the published value is out of the min/max range specified
//
// - Provider returns BatchActuateStreamResponse <- Databroker sends BatchActuateStreamRequest
// No error definition, a BatchActuateStreamResponse is expected from provider.
//
async fn open_provider_stream(
&self,
request: tonic::Request<tonic::Streaming<proto::OpenProviderStreamRequest>>,
Expand All @@ -707,10 +615,6 @@ impl proto::val_server::Val for broker::DataBroker {
None => return Err(tonic::Status::unauthenticated("Unauthenticated")),
};

// Should databroker register internally here new opened streams????
// The provided actuation will take ownership over the actuators but what happens
// if a provider is publishing sensor values and the stream is closed?
// How will the application know that there is no provider and should stop the subscription?
let mut stream = request.into_inner();

let mut shutdown_trigger = self.get_shutdown_trigger();
Expand Down Expand Up @@ -1806,7 +1710,7 @@ mod tests {
}
Err(status) => {
// Handle the error from the publish_value function
assert_eq!(status.code(), tonic::Code::OutOfRange);
assert_eq!(status.code(), tonic::Code::InvalidArgument);
// As of the today the first added datapoint get value 0 by default.
assert_eq!(status.message(), "Value out of bounds (id: 0)");
}
Expand Down
Loading

0 comments on commit 95e3041

Please sign in to comment.