Skip to content

Commit

Permalink
Update incomplete information on errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Oct 24, 2024
1 parent 12c7355 commit f7fe77f
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 174 deletions.
239 changes: 72 additions & 167 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ impl ActuationProvider for Provider {

#[tonic::async_trait]
impl proto::val_server::Val for broker::DataBroker {
// Returns (GRPC error code):
// NOT_FOUND if the requested signal doesn't exist
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied
//
async fn get_value(
&self,
request: tonic::Request<proto::GetValueRequest>,
Expand Down Expand Up @@ -128,6 +133,11 @@ impl proto::val_server::Val for broker::DataBroker {
}))
}

// Returns (GRPC error code):
// NOT_FOUND if any of the requested signals doesn't exist.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied for any of the requested signals.
//
async fn get_values(
&self,
request: tonic::Request<proto::GetValuesRequest>,
Expand Down Expand Up @@ -192,7 +202,12 @@ impl proto::val_server::Val for broker::DataBroker {
+ 'static,
>,
>;

// Returns (GRPC error code):
// NOT_FOUND if any of the signals are non-existant.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied for any of the signals.
// INVALID_ARGUMENT if the request is empty or provided path is too long
//
async fn subscribe(
&self,
request: tonic::Request<proto::SubscribeRequest>,
Expand Down Expand Up @@ -253,7 +268,12 @@ impl proto::val_server::Val for broker::DataBroker {
+ 'static,
>,
>;

// Returns (GRPC error code):
// NOT_FOUND if any of the signals are non-existant.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// PERMISSION_DENIED if access is denied for any of the signals.
// INVALID_ARGUMENT if the request is empty
//
async fn subscribe_by_id(
&self,
request: tonic::Request<proto::SubscribeByIdRequest>,
Expand Down Expand Up @@ -311,17 +331,19 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

// Actuate a single actuator
//
// Returns (GRPC error code):
// NOT_FOUND if the actuator does not exist.
// PERMISSION_DENIED if access is denied for the actuator.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// UNAVAILABLE if there is no provider currently providing the actuator
// OUT_OF_RANGE is the provided value is out of the min/max range specified
// DATA_LOSS is there is a internal TransmissionFailure
// INVALID_ARGUMENT
// - if the data type used in the request does not match
// the data type of the addressed signal
// - if the requested value is not accepted,
// e.g. if sending an unsupported enum value
//
async fn actuate(
&self,
request: tonic::Request<proto::ActuateRequest>,
Expand Down Expand Up @@ -378,12 +400,16 @@ impl proto::val_server::Val for broker::DataBroker {
// Returns (GRPC error code):
// NOT_FOUND if any of the actuators are non-existant.
// PERMISSION_DENIED if access is denied for any of the actuators.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// UNAVAILABLE if there is no provider currently providing an actuator
// OUT_OF_RANGE is the provided value is out of the min/max range specified
// DATA_LOSS is there is a internal TransmissionFailure
// INVALID_ARGUMENT
// - if the data type used in the request does not match
// the data type of the addressed signal
// - if the requested value is not accepted,
// e.g. if sending an unsupported enum value
//
async fn batch_actuate(
&self,
request: tonic::Request<proto::BatchActuateRequest>,
Expand Down Expand Up @@ -437,33 +463,11 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

/// List metadata of signals matching the wildcard branch request.
///
/// # Arguments
///
/// ```
/// `request`:
/// ListMetadataRequest {
/// root: String
/// filter: String
/// }
///
/// # Response
/// `response`:
/// ListMetadataResponse {
/// metadata: Vec<Metadata>
/// }
/// ```
///
/// # Errors
///
/// Returns (GRPC error code):
/// NOT_FOUND if the specified root branch does not exist
/// INVALID_ARGUMENT if the request pattern is invalid
///
/// # Examples
/// For details, please refer to
/// [Wildcard Matching](https://github.com/eclipse-kuksa/kuksa-databroker/blob/main/doc/wildcard_matching.md#examples)
// Returns (GRPC error code):
// NOT_FOUND if the specified root branch does not exist.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// INVALID_ARGUMENT if the provided path or wildcard is wrong.
//
async fn list_metadata(
&self,
request: tonic::Request<proto::ListMetadataRequest>,
Expand Down Expand Up @@ -505,34 +509,18 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

// Publish a signal value. Used for low frequency signals (e.g. attributes).
/// # Arguments
///
/// ```
/// `request`:
/// PublishValueRequest {
/// signal_id: <String or i32>
/// datapoint: Datapoint
/// }
///
/// # Response
/// `response`:
/// PublishValueResponse {
/// error: Error
/// }
/// ```
///
/// # Errors
/// Returns (GRPC error code):
/// NOT_FOUND if any of the signals are non-existant.
/// PERMISSION_DENIED
/// - if access is denied for any of the signals.
/// - if the signal is already provided by another provider.
/// INVALID_ARGUMENT
/// - if the data type used in the request does not match
/// the data type of the addressed signal
/// - if the published value is not accepted,
/// e.g. if sending an unsupported enum value
// Returns (GRPC error code):
// NOT_FOUND if any of the signals are non-existant.
// PERMISSION_DENIED
// - if access is denied for any of the signals.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// OUT_OF_RANGE is the provider is out of the min/max range specified
// INVALID_ARGUMENT
// - if the data type used in the request does not match
// the data type of the addressed signal
// - if the published value is not accepted,
// e.g. if sending an unsupported enum value
//
async fn publish_value(
&self,
request: tonic::Request<proto::PublishValueRequest>,
Expand Down Expand Up @@ -587,113 +575,34 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

// type OpenProviderStreamStream = Pin<
// Box<
// dyn Stream<Item = Result<proto::OpenProviderStreamResponse, tonic::Status>>
// + Send
// + Sync
// + 'static,
// >,
// >;

type OpenProviderStreamStream =
ReceiverStream<Result<proto::OpenProviderStreamResponse, tonic::Status>>;

/// Opens a bidirectional stream with the databroker to perform various actions such as
/// providing actuators, publishing sensor and actuator values, and receiving actuations from the databroker.
///
/// # Actions
///
/// The function handles the following actions:
///
/// 1. **Provide an Actuator**:
/// - The provider claims ownership of the actuator with the specified signal ID.
///
/// ```
/// `request`:
/// OpenProviderStreamRequest {
/// action: ProvideActuationRequest {
/// {
/// signal: id: 30,
/// },
/// {
/// signal: id: 45,
/// },
/// ...
/// }
/// }
///
/// `response`:
/// OpenProviderStreamStream {
/// action: ProvideActuattionResponse { }
/// }
/// ```
///
/// 2. **Publish Values**:
/// - The provider publishes a request ID along with a map of sensor and actuator values.
///
/// ```
/// `request`:
/// OpenProviderStreamRequest {
/// action: PublishValuesRequest {
/// request_id: 1,
/// datapoints: {
/// (30, Datapoint),
/// (45, Datapoint),
/// ...
/// }
/// }
/// }
///
/// `response`:
/// OpenProviderStreamStream {
/// action: PublishValuesResponse {
/// request_id: 1,
/// status: {
/// (If errors) {
/// (30, Error),
/// (45, Error),
/// ...
/// }
/// }
/// }
/// }
/// ```
///
/// 3. **Receive Actuations**:
/// - The provider receives actuation requests from the databroker.
///
/// ```
/// `request`:
/// OpenProviderStreamRequest {
/// action: BatchActuateStreamResponse { }
/// }
///
/// `response`:
/// OpenProviderStreamStream {
/// action: BatchActuateStreamRequest {
/// actuate_requests: {
/// (30, Value),
/// (45, Value),
/// ...
/// }
/// }
/// }
/// ```
///
/// # Arguments
///
/// * `request` - The request should contain the necessary permissions if the databroker is started with secure mode.
/// The request `OpenProviderStreamRequest` can contain messages according to the action:
/// - Action 1: `ProvideActuationRequest`
/// - Action 2: `PublishValuesRequest`
/// - Action 3: `BatchActuateStreamResponse`
///
/// # Errors
///
/// The open stream is used for request / response type communication between the
/// provider and server (where the initiator of a request can vary).
/// Errors are communicated as messages in the stream.
// Errors:
// - Provider sends ProvideActuationRequest -> Databroker returns ProvideActuationResponse
// Returns (GRPC error code) and closes the stream call (strict case).
// NOT_FOUND if any of the signals are non-existant.
// PERMISSION_DENIED if access is denied for any of the signals.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// ALREADY_EXISTS if a provider already claimed the ownership of an actuator
// UNAUTHENTICATED if permission expired
//
// - Provider sends PublishValuesRequest -> Databroker returns PublishValuesResponse
// GRPC errors are returned as messages in the stream
// response with the signal id `map<int32, Error> status = 2;` (permissive case)
// NOT_FOUND if a signal is non-existant.
// PERMISSION_DENIED
// - if access is denied for a signal.
// OUT_OF_RANGE is the provider is out of the min/max range specified
// INVALID_ARGUMENT
// - if the data type used in the request does not match
// the data type of the addressed signal
// - if the published value is not accepted,
// e.g. if sending an unsupported enum value
//
// - Provider returns BatchActuateStreamResponse <- Databroker sends BatchActuateStreamRequest
// No error definition, a BatchActuateStreamResponse is expected from provider.
//
async fn open_provider_stream(
&self,
request: tonic::Request<tonic::Streaming<proto::OpenProviderStreamRequest>>,
Expand All @@ -707,10 +616,6 @@ impl proto::val_server::Val for broker::DataBroker {
None => return Err(tonic::Status::unauthenticated("Unauthenticated")),
};

// Should databroker register internally here new opened streams????
// The provided actuation will take ownership over the actuators but what happens
// if a provider is publishing sensor values and the stream is closed?
// How will the application know that there is no provider and should stop the subscription?
let mut stream = request.into_inner();

let mut shutdown_trigger = self.get_shutdown_trigger();
Expand Down
Loading

0 comments on commit f7fe77f

Please sign in to comment.