From 994e7a40b410a29c0cacd508c5a18ed104d32a9a Mon Sep 17 00:00:00 2001 From: Raul Victor Trombin Date: Wed, 24 Jul 2024 15:21:07 -0300 Subject: [PATCH 1/6] src: device: manager: Add new device status and check_status helper --- src/device/manager.rs | 84 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 82 insertions(+), 2 deletions(-) diff --git a/src/device/manager.rs b/src/device/manager.rs index 4fc28552..9bdf087b 100644 --- a/src/device/manager.rs +++ b/src/device/manager.rs @@ -76,6 +76,7 @@ pub struct SourceSerialStruct { pub enum DeviceStatus { Running, Stopped, + Broadcasting, } pub struct DeviceManager { @@ -105,7 +106,7 @@ pub enum Answer { pub enum ManagerError { DeviceNotExist(Uuid), DeviceAlreadyExist(Uuid), - DeviceIsStopped(Uuid), + DeviceStatus(DeviceStatus, Uuid), DeviceError(super::devices::DeviceError), DeviceSourceError(String), NoDevices, @@ -129,6 +130,8 @@ pub enum Request { Search, Ping(DeviceRequestStruct), GetDeviceHandler(Uuid), + EnableBroadcasting(Uuid), + DisableBroadcasting(Uuid), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -391,6 +394,12 @@ impl DeviceManager { pub async fn get_device_handler(&self, device_id: Uuid) -> Result { if self.device.contains_key(&device_id) { trace!("Getting device handler for device: {device_id:?} : Success"); + self.check_device_uuid(device_id)?; + + trace!( + "Getting device handler for device: {:?} : Success", + device_id + ); if self.device.get(&device_id).unwrap().status == DeviceStatus::Running { let handler: DeviceActorHandler = @@ -399,9 +408,80 @@ impl DeviceManager { }; return Err(ManagerError::DeviceIsStopped(device_id)); } - error!("Getting device handler for device: {device_id:?} : Error, device doesn't exist"); + error!( + "Getting device handler for device: {:?} : Error, device doesn't exist", + device_id + ); Err(ManagerError::DeviceNotExist(device_id)) } + + fn check_device_status( + &self, + device_id: Uuid, + valid_statuses: &[DeviceStatus], + ) -> Result<(), ManagerError> { + let status = &self.get_device(device_id)?.status; + if !valid_statuses.contains(status) { + return Err(ManagerError::DeviceStatus(status.clone(), device_id)); + } + Ok(()) + } + + fn get_device(&self, device_id: Uuid) -> Result<&Device, ManagerError> { + let device = self + .device + .get(&device_id) + .ok_or(ManagerError::DeviceNotExist(device_id))?; + Ok(device) + } + + fn get_mut_device(&mut self, device_id: Uuid) -> Result<&mut Device, ManagerError> { + let device = self + .device + .get_mut(&device_id) + .ok_or(ManagerError::DeviceNotExist(device_id))?; + Ok(device) + } + + fn get_device_type(&self, device_id: Uuid) -> Result { + let device_type = self.device.get(&device_id).unwrap().device_type.clone(); + Ok(device_type) + } + + fn extract_handler(&self, device_handler: Answer) -> Result { + match device_handler { + Answer::InnerDeviceHandler(handler) => Ok(handler), + answer => Err(ManagerError::Other(format!( + "Unreachable: extract_handler helper, detail: {answer:?}" + ))), + } + } + + async fn get_subscriber( + &self, + device_id: Uuid, + ) -> Result< + tokio::sync::broadcast::Receiver, + ManagerError, + > { + let handler_request = self.get_device_handler(device_id).await?; + let handler = self.extract_handler(handler_request)?; + + let subscriber = handler + .send(super::devices::PingRequest::GetSubscriber) + .await + .map_err(|err| { + trace!("Something went wrong while executing get_subscriber, details: {err:?}"); + ManagerError::DeviceError(err) + })?; + + match subscriber { + super::devices::PingAnswer::Subscriber(subscriber) => Ok(subscriber), + _ => Err(ManagerError::Other( + "Unreachable: get_subscriber helper".to_string(), + )), + } + } } impl ManagerActorHandler { From 915523f42f89a110677de28b7a39ebe5f40912b3 Mon Sep 17 00:00:00 2001 From: Raul Victor Trombin Date: Wed, 24 Jul 2024 21:42:57 -0300 Subject: [PATCH 2/6] src: device: manager: Add broadcasting option(continuous mode) for devices --- src/device/manager.rs | 201 +++++++++++++++++------------------------- 1 file changed, 82 insertions(+), 119 deletions(-) diff --git a/src/device/manager.rs b/src/device/manager.rs index 9bdf087b..091048c1 100644 --- a/src/device/manager.rs +++ b/src/device/manager.rs @@ -14,13 +14,14 @@ use uuid::Uuid; use super::devices::{DeviceActor, DeviceActorHandler}; use bluerobotics_ping::device::{Ping1D, Ping360}; -struct Device { - id: Uuid, - source: SourceSelection, - handler: super::devices::DeviceActorHandler, - actor: tokio::task::JoinHandle, - status: DeviceStatus, - device_type: DeviceSelection, +pub struct Device { + pub id: Uuid, + pub source: SourceSelection, + pub handler: super::devices::DeviceActorHandler, + pub actor: tokio::task::JoinHandle, + pub broadcast: Option>, + pub status: DeviceStatus, + pub device_type: DeviceSelection, } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -76,12 +77,12 @@ pub struct SourceSerialStruct { pub enum DeviceStatus { Running, Stopped, - Broadcasting, + ContinuousMode, } pub struct DeviceManager { receiver: mpsc::Receiver, - device: HashMap, + pub device: HashMap, } #[derive(Debug)] @@ -112,6 +113,7 @@ pub enum ManagerError { NoDevices, TokioMpsc(String), NotImplemented(Request), + Other(String), } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -130,8 +132,8 @@ pub enum Request { Search, Ping(DeviceRequestStruct), GetDeviceHandler(Uuid), - EnableBroadcasting(Uuid), - DisableBroadcasting(Uuid), + EnableContinuousMode(Uuid), + DisableContinuousMode(Uuid), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -174,6 +176,24 @@ impl DeviceManager { error!("DeviceManager: Failed to return Info response: {:?}", e); } } + Request::EnableContinuousMode(uuid) => { + let result = self.continuous_mode(uuid).await; + if let Err(e) = actor_request.respond_to.send(result) { + error!( + "DeviceManager: Failed to return EnableContinuousMode response: {:?}", + e + ); + } + } + Request::DisableContinuousMode(uuid) => { + let result = self.continuous_mode_off(uuid).await; + if let Err(e) = actor_request.respond_to.send(result) { + error!( + "DeviceManager: Failed to return DisableContinuousMode response: {:?}", + e + ); + } + } Request::GetDeviceHandler(id) => { let answer = self.get_device_handler(id).await; if let Err(e) = actor_request.respond_to.send(answer) { @@ -328,18 +348,17 @@ impl DeviceManager { handler, actor, status: DeviceStatus::Running, + broadcast: None, device_type: device_selection, }; - let device_info = device.info(); - self.device.insert(hash, device); - info!( - "New device created and available, details: {:?}", - device_info - ); - Ok(Answer::DeviceInfo(vec![device_info])) + trace!("Device broadcast enable by default for: {hash:?}"); + let device_info = self.continuous_mode(hash).await?; + + info!("New device created and available, details: {device_info:?}"); + Ok(device_info) } pub async fn list(&self) -> Result { @@ -359,25 +378,6 @@ impl DeviceManager { Ok(Answer::DeviceInfo(vec![self.get_device(device_id)?.info()])) } - fn check_device_uuid(&self, device_id: Uuid) -> Result<(), ManagerError> { - if self.device.contains_key(&device_id) { - return Ok(()); - } - error!( - "Getting device handler for device: {:?} : Error, device doesn't exist", - device_id - ); - Err(ManagerError::DeviceNotExist(device_id)) - } - - fn get_device(&self, device_id: Uuid) -> Result<&Device, ManagerError> { - let device = self - .device - .get(&device_id) - .ok_or(ManagerError::DeviceNotExist(device_id))?; - Ok(device) - } - pub async fn delete(&mut self, device_id: Uuid) -> Result { match self.device.remove(&device_id) { Some(device) => { @@ -391,96 +391,59 @@ impl DeviceManager { } } - pub async fn get_device_handler(&self, device_id: Uuid) -> Result { - if self.device.contains_key(&device_id) { - trace!("Getting device handler for device: {device_id:?} : Success"); - self.check_device_uuid(device_id)?; + pub async fn continuous_mode(&mut self, device_id: Uuid) -> Result { + self.check_device_status(device_id, &[DeviceStatus::Running])?; + let device_type = self.get_device_type(device_id)?; + + // Get an inner subscriber for device's stream + let subscriber = self.get_subscriber(device_id).await?; + + let broadcast_handle = self + .continuous_mode_start(subscriber, device_id, device_type.clone()) + .await; + if let Some(handle) = &broadcast_handle { + if !handle.is_finished() { + trace!("Success start_continuous_mode for {device_id:?}"); + } else { + return Err(ManagerError::Other( + "Error while start_continuous_mode".to_string(), + )); + } + } else { + return Err(ManagerError::Other( + "Error while start_continuous_mode".to_string(), + )); + }; - trace!( - "Getting device handler for device: {:?} : Success", - device_id - ); - - if self.device.get(&device_id).unwrap().status == DeviceStatus::Running { - let handler: DeviceActorHandler = - self.device.get(&device_id).unwrap().handler.clone(); - return Ok(Answer::InnerDeviceHandler(handler)); - }; - return Err(ManagerError::DeviceIsStopped(device_id)); - } - error!( - "Getting device handler for device: {:?} : Error, device doesn't exist", - device_id - ); - Err(ManagerError::DeviceNotExist(device_id)) - } + self.continuous_mode_startup_routine(device_id, device_type) + .await?; - fn check_device_status( - &self, - device_id: Uuid, - valid_statuses: &[DeviceStatus], - ) -> Result<(), ManagerError> { - let status = &self.get_device(device_id)?.status; - if !valid_statuses.contains(status) { - return Err(ManagerError::DeviceStatus(status.clone(), device_id)); - } - Ok(()) - } + let device = self.get_mut_device(device_id)?; + device.broadcast = broadcast_handle; + device.status = DeviceStatus::ContinuousMode; - fn get_device(&self, device_id: Uuid) -> Result<&Device, ManagerError> { - let device = self - .device - .get(&device_id) - .ok_or(ManagerError::DeviceNotExist(device_id))?; - Ok(device) - } + let updated_device_info = self.get_device(device_id)?.info(); - fn get_mut_device(&mut self, device_id: Uuid) -> Result<&mut Device, ManagerError> { - let device = self - .device - .get_mut(&device_id) - .ok_or(ManagerError::DeviceNotExist(device_id))?; - Ok(device) + Ok(Answer::DeviceInfo(vec![updated_device_info])) } - fn get_device_type(&self, device_id: Uuid) -> Result { - let device_type = self.device.get(&device_id).unwrap().device_type.clone(); - Ok(device_type) - } + pub async fn continuous_mode_off(&mut self, device_id: Uuid) -> Result { + self.check_device_status(device_id, &[DeviceStatus::ContinuousMode])?; + let device_type = self.get_device_type(device_id)?; - fn extract_handler(&self, device_handler: Answer) -> Result { - match device_handler { - Answer::InnerDeviceHandler(handler) => Ok(handler), - answer => Err(ManagerError::Other(format!( - "Unreachable: extract_handler helper, detail: {answer:?}" - ))), + let device = self.get_mut_device(device_id)?; + if let Some(broadcast) = device.broadcast.take() { + broadcast.abort_handle().abort(); } - } - async fn get_subscriber( - &self, - device_id: Uuid, - ) -> Result< - tokio::sync::broadcast::Receiver, - ManagerError, - > { - let handler_request = self.get_device_handler(device_id).await?; - let handler = self.extract_handler(handler_request)?; - - let subscriber = handler - .send(super::devices::PingRequest::GetSubscriber) - .await - .map_err(|err| { - trace!("Something went wrong while executing get_subscriber, details: {err:?}"); - ManagerError::DeviceError(err) - })?; - - match subscriber { - super::devices::PingAnswer::Subscriber(subscriber) => Ok(subscriber), - _ => Err(ManagerError::Other( - "Unreachable: get_subscriber helper".to_string(), - )), - } + device.status = DeviceStatus::Running; + + let updated_device_info = device.info(); + + self.continuous_mode_shutdown_routine(device_id, device_type) + .await?; + + Ok(Answer::DeviceInfo(vec![updated_device_info])) } } @@ -554,7 +517,7 @@ impl ManagerActorHandler { .map_err(|err| ManagerError::TokioMpsc(err.to_string()))? { Ok(ans) => { - info!("Handling DeviceManager request: {request:?}: Success"); + trace!("Handling DeviceManager request: {request:?}: Success"); Ok(ans) } Err(err) => { From bfc4e346fa1c2b30633c8e1c98714b605d1221f1 Mon Sep 17 00:00:00 2001 From: Raul Victor Trombin Date: Wed, 24 Jul 2024 21:49:37 -0300 Subject: [PATCH 3/6] src: device: manager: Fix: Remove device method --- src/device/manager.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/device/manager.rs b/src/device/manager.rs index 091048c1..3584563a 100644 --- a/src/device/manager.rs +++ b/src/device/manager.rs @@ -42,6 +42,20 @@ impl Device { } } +impl Drop for Device { + fn drop(&mut self) { + trace!( + "Removing Device from DeviceManager, details: {:?}", + self.info() + ); + self.actor.abort(); + if let Some(broadcast_handle) = &self.broadcast { + trace!("Device broadcast handle closed for: {:?}", self.info().id); + broadcast_handle.abort(); + } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum DeviceSelection { Common, @@ -381,8 +395,10 @@ impl DeviceManager { pub async fn delete(&mut self, device_id: Uuid) -> Result { match self.device.remove(&device_id) { Some(device) => { - info!("Device delete id {:?}: Success", device_id); - Ok(Answer::DeviceInfo(vec![device.info()])) + let device_info = device.info(); + drop(device); + trace!("Device delete id {:?}: Success", device_id); + Ok(Answer::DeviceInfo(vec![device_info])) } None => { error!("Device delete id {device_id:?} : Error, device doesn't exist"); From 12d3f3a02b3d54188282674b8780c82824300697 Mon Sep 17 00:00:00 2001 From: Raul Victor Trombin Date: Wed, 31 Jul 2024 22:52:56 -0300 Subject: [PATCH 4/6] src: device: Major rework on module structure --- src/device/helpers/continuous_mode.rs | 210 ++++++++++++++++++++++++++ src/device/helpers/device_handle.rs | 110 ++++++++++++++ src/device/helpers/mod.rs | 4 + src/device/mod.rs | 28 ++-- src/main.rs | 1 + 5 files changed, 341 insertions(+), 12 deletions(-) create mode 100644 src/device/helpers/continuous_mode.rs create mode 100644 src/device/helpers/device_handle.rs create mode 100644 src/device/helpers/mod.rs diff --git a/src/device/helpers/continuous_mode.rs b/src/device/helpers/continuous_mode.rs new file mode 100644 index 00000000..9a1ec1ac --- /dev/null +++ b/src/device/helpers/continuous_mode.rs @@ -0,0 +1,210 @@ +use serde_json::json; +use tracing::{error, trace, warn}; +use uuid::Uuid; + +use crate::device::{ + manager::{Answer, DeviceAnswer, ManagerError}, + manager::{DeviceManager, DeviceSelection}, +}; + +impl DeviceManager { + // Call the helpers specifically for each device type + pub async fn continuous_mode_start( + &mut self, + mut subscriber: tokio::sync::broadcast::Receiver< + bluerobotics_ping::message::ProtocolMessage, + >, + device_id: Uuid, + device_type: DeviceSelection, + ) -> Option> { + let raw_handler = match self.get_device_handler(device_id).await { + Ok(handler) => handler.clone(), + Err(err) => { + trace!("Error during start_continuous_mode: Failed to get device handler: {err:?}"); + return None; + } + }; + + let handler = match self.extract_handler(raw_handler) { + Ok(handler) => handler, + Err(err) => { + trace!("Error during start_continuous_mode: Failed to extract handler: {err:?}"); + return None; + } + }; + + match device_type { + DeviceSelection::Ping1D => Some(tokio::spawn(async move { + loop { + match subscriber.recv().await { + Ok(msg) => { + Self::ping1d_continuous_mode_helper(msg, device_id); + } + Err(err) => { + Self::handle_error_continuous_mode(err, device_id); + break; + } + } + } + })), + DeviceSelection::Ping360 => { + Some(tokio::spawn(async move { + let handler = handler.clone(); + + // Attempt to send the Ping360 request and handle the result + let device_data = match handler + .send(crate::device::devices::PingRequest::Ping360( + crate::device::devices::Ping360Request::DeviceData, + )) + .await + { + Ok(response) => match response { + crate::device::devices::PingAnswer::PingMessage( + bluerobotics_ping::Messages::Ping360( + bluerobotics_ping::ping360::Messages::DeviceData(msg), + ), + ) => msg, + msg => { + error!("Error during start_continuous_mode: unexpected message: {msg:?}"); + return; + } + }, + Err(err) => { + error!("Error during start_continuous_mode: Device Error: {err:?}"); + return; + } + }; + + loop { + for n in 0..399 { + // Handle timeout and errors + let result = tokio::time::timeout( + std::time::Duration::from_millis(1000), + handler.send(crate::device::devices::PingRequest::Ping360( + crate::device::devices::Ping360Request::Transducer( + bluerobotics_ping::ping360::TransducerStruct { + mode: device_data.mode, + gain_setting: device_data.gain_setting, + transmit_duration: device_data.transmit_duration, + sample_period: device_data.sample_period, + transmit_frequency: device_data.transmit_frequency, + number_of_samples: device_data.number_of_samples, + angle: n, + transmit: 1, + reserved: 0, + }, + ), + )), + ) + .await; + + match result { + Ok(Ok(answer)) => match answer { + crate::device::devices::PingAnswer::PingMessage(msg) => { + Self::ping360_continuous_mode_helper(msg, device_id) + } + msg => { + error!("Error during continuous_mode: Unexpected Message: {msg:?}"); + return; + } + }, + Ok(Err(err)) => { + error!("Error during continuous_mode: Device Error: {err:?}"); + return; + } + Err(_err) => { + warn!("Error during continuous_mode: Answer delayed more than 1 s"); + } + } + } + } + })) + } + DeviceSelection::Common | DeviceSelection::Auto => None, + } + } + + // Execute some especial commands required for device enter in auto_send mode + pub async fn continuous_mode_startup_routine( + &self, + device_id: Uuid, + device_type: DeviceSelection, + ) -> Result<(), ManagerError> { + if device_type == DeviceSelection::Ping1D { + let handler_request = self.get_device_handler(device_id).await?; + let handler = self.extract_handler(handler_request)?; + + let id = ::id(); + let _ = handler + .send(crate::device::devices::PingRequest::Ping1D( + crate::device::devices::Ping1DRequest::ContinuousStart( + bluerobotics_ping::ping1d::ContinuousStartStruct { id }, + ), + )) + .await + .map_err(|err| {trace!("Something went wrong while executing continuous_mode_startup, details: {err:?}"); ManagerError::DeviceError(err)})?; + } + Ok(()) + } + + // Execute some especial commands required for device stop auto_send mode + pub async fn continuous_mode_shutdown_routine( + &self, + device_id: Uuid, + device_type: DeviceSelection, + ) -> Result<(), ManagerError> { + let handler_request = self.get_device_handler(device_id).await?; + let handler = self.extract_handler(handler_request)?; + + if device_type == DeviceSelection::Ping1D { + let id = ::id(); + let _ = handler + .send(crate::device::devices::PingRequest::Ping1D( + crate::device::devices::Ping1DRequest::ContinuousStop( + bluerobotics_ping::ping1d::ContinuousStopStruct { id }, + ), + )) + .await + .map_err(|err| {trace!("Something went wrong while executing broadcast_startup_routine, details: {err:?}"); ManagerError::DeviceError(err)})?; + } + Ok(()) + } + + // An inner helper focused on Ping1D, which uses Profile message to plot graphs + pub fn ping1d_continuous_mode_helper( + msg: bluerobotics_ping::message::ProtocolMessage, + device_id: Uuid, + ) { + if msg.message_id == ::id() { + if let Ok(bluerobotics_ping::Messages::Ping1D(bluerobotics_ping::ping1d::Messages::Profile(_answer))) = bluerobotics_ping::Messages::try_from(&msg) { + let answer = Answer::DeviceMessage(DeviceAnswer { + answer: crate::device::devices::PingAnswer::PingMessage( + bluerobotics_ping::Messages::try_from(&msg).unwrap(), + ), + device_id, + }); + crate::server::protocols::v1::websocket::send_to_websockets(json!(answer), Some(device_id)); + } + } + } + + // An inner helper focused on Ping360, which uses DeviceData message to plot graphs + pub fn ping360_continuous_mode_helper(msg: bluerobotics_ping::Messages, device_id: Uuid) { + let answer = Answer::DeviceMessage(DeviceAnswer { + answer: crate::device::devices::PingAnswer::PingMessage(msg), + device_id, + }); + crate::server::protocols::v1::websocket::send_to_websockets(json!(answer), Some(device_id)); + } + + // An inner helper that returns error to requester + pub fn handle_error_continuous_mode( + error: tokio::sync::broadcast::error::RecvError, + device_id: Uuid, + ) { + let error = ManagerError::DeviceError(crate::device::devices::DeviceError::PingError( + bluerobotics_ping::error::PingError::TokioBroadcastError(error.to_string()), + )); + crate::server::protocols::v1::websocket::send_to_websockets(json!(error), Some(device_id)); + } +} diff --git a/src/device/helpers/device_handle.rs b/src/device/helpers/device_handle.rs new file mode 100644 index 00000000..2df17742 --- /dev/null +++ b/src/device/helpers/device_handle.rs @@ -0,0 +1,110 @@ +use tracing::{error, trace, warn}; +use uuid::Uuid; + +use crate::device::{ + devices::{self, DeviceActorHandler}, + manager::{Answer, Device, DeviceManager, DeviceSelection, DeviceStatus, ManagerError}, +}; + +impl DeviceManager { + pub fn check_device_uuid(&self, device_id: Uuid) -> Result<(), ManagerError> { + if self.device.contains_key(&device_id) { + return Ok(()); + } + error!( + "Getting device handler for device: {:?} : Error, device doesn't exist", + device_id + ); + Err(ManagerError::DeviceNotExist(device_id)) + } + + pub fn get_device(&self, device_id: Uuid) -> Result<&Device, ManagerError> { + let device = self + .device + .get(&device_id) + .ok_or(ManagerError::DeviceNotExist(device_id))?; + Ok(device) + } + + pub async fn get_device_handler(&self, device_id: Uuid) -> Result { + self.check_device_uuid(device_id)?; + + trace!( + "Getting device handler for device: {:?} : Success", + device_id + ); + + // Fail-fast if device is stopped + self.check_device_status( + device_id, + &[DeviceStatus::ContinuousMode, DeviceStatus::Running], + )?; + + let handler: DeviceActorHandler = self.get_device(device_id)?.handler.clone(); + + Ok(Answer::InnerDeviceHandler(handler)) + } + + pub fn check_device_status( + &self, + device_id: Uuid, + valid_statuses: &[DeviceStatus], + ) -> Result<(), ManagerError> { + let status = &self.get_device(device_id)?.status; + if !valid_statuses.contains(status) { + return Err(ManagerError::DeviceStatus(status.clone(), device_id)); + } + Ok(()) + } + + pub fn get_mut_device(&mut self, device_id: Uuid) -> Result<&mut Device, ManagerError> { + let device = self + .device + .get_mut(&device_id) + .ok_or(ManagerError::DeviceNotExist(device_id))?; + Ok(device) + } + + pub fn get_device_type(&self, device_id: Uuid) -> Result { + let device_type = self.device.get(&device_id).unwrap().device_type.clone(); + Ok(device_type) + } + + pub fn extract_handler( + &self, + device_handler: Answer, + ) -> Result { + match device_handler { + Answer::InnerDeviceHandler(handler) => Ok(handler), + answer => Err(ManagerError::Other(format!( + "Unreachable: extract_handler helper, detail: {answer:?}" + ))), + } + } + + pub async fn get_subscriber( + &self, + device_id: Uuid, + ) -> Result< + tokio::sync::broadcast::Receiver, + ManagerError, + > { + let handler_request = self.get_device_handler(device_id).await?; + let handler = self.extract_handler(handler_request)?; + + let subscriber = handler + .send(devices::PingRequest::GetSubscriber) + .await + .map_err(|err| { + warn!("Something went wrong while executing get_subscriber, details: {err:?}"); + ManagerError::DeviceError(err) + })?; + + match subscriber { + devices::PingAnswer::Subscriber(subscriber) => Ok(subscriber), + _ => Err(ManagerError::Other( + "Unreachable: get_subscriber helper".to_string(), + )), + } + } +} diff --git a/src/device/helpers/mod.rs b/src/device/helpers/mod.rs new file mode 100644 index 00000000..da344b6b --- /dev/null +++ b/src/device/helpers/mod.rs @@ -0,0 +1,4 @@ +/// Specially for DeviceManager to retrieve checks and structures from Devices stored in it's hashmap collection +pub mod continuous_mode; +/// Specially for continuous_mode methods, startup, shutdown, handle and errors routines for each device type +pub mod device_handle; diff --git a/src/device/mod.rs b/src/device/mod.rs index dc2afc8f..fd417070 100644 --- a/src/device/mod.rs +++ b/src/device/mod.rs @@ -1,14 +1,18 @@ +/// The `devices` module defines the pattern for managing devices and their handlers. +/// It includes the `Device` and `DeviceHandler` structures. +/// +/// The `DeviceHandler` can forward requests defined in the `PingRequest` enum. pub mod devices; -pub mod manager; -// The Device module consists of two main modules: devices and manager. -// -// Manager: -// The Manager module includes two primary structures: Manager and its Handler. -// This design allows the Manager to receive and process requests from multiple, distinct threads. -// The ManagerHandler can forward requests defined in the Request enum, creating a Device if necessary. -// If a device is stopped or encounters an error during execution, the user can recover the device and make it available again. -// -// Device: -// Each device follows the same pattern, consisting of a Device and its Handler. -// The DeviceHandler can forward requests defined in the PingRequest enum. +/// The `helpers` module contains utility functions and types for use within the `devices` module. +pub mod helpers; + +/// The `manager` module provides the `Manager` and `ManagerHandler` structures. +/// +/// The `Manager` can handle requests from multiple threads. The `ManagerHandler` +/// is capable of forwarding requests defined in the `Request` enum and can create +/// devices as needed. +/// +/// If a device is stopped or encounters an error during execution, it can be recovered +/// and made available again. +pub mod manager; diff --git a/src/main.rs b/src/main.rs index a92bcbe5..57c0e0d9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use tracing::info; extern crate lazy_static; mod cli; +/// The Device module consists of two main modules: devices and manager. mod device; mod logger; mod server; From 7e938aa5f5d43ff5117a337c80c75c9e225c506f Mon Sep 17 00:00:00 2001 From: Raul Victor Trombin Date: Thu, 1 Aug 2024 00:34:14 -0300 Subject: [PATCH 5/6] src: device: Change request pattern --- src/device/manager.rs | 59 ++++++++++++++++++++-------- src/main.rs | 10 +++++ src/server/protocols/v1/websocket.rs | 54 ++++++++++++++----------- 3 files changed, 83 insertions(+), 40 deletions(-) diff --git a/src/device/manager.rs b/src/device/manager.rs index 3584563a..1ea0bcfd 100644 --- a/src/device/manager.rs +++ b/src/device/manager.rs @@ -138,16 +138,40 @@ pub struct DeviceAnswer { } #[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)] +#[serde(tag = "type", content = "payload")] pub enum Request { Create(CreateStruct), - Delete(Uuid), + Delete(UuidWrapper), List, - Info(Uuid), + Info(UuidWrapper), Search, Ping(DeviceRequestStruct), - GetDeviceHandler(Uuid), - EnableContinuousMode(Uuid), - DisableContinuousMode(Uuid), + GetDeviceHandler(UuidWrapper), + EnableContinuousMode(UuidWrapper), + DisableContinuousMode(UuidWrapper), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UuidWrapper { + pub uuid: Uuid, +} + +impl UuidWrapper { + pub fn uuid(&self) -> Uuid { + self.uuid + } +} + +impl From for UuidWrapper { + fn from(uuid: Uuid) -> Self { + UuidWrapper { uuid } + } +} + +impl From for Uuid { + fn from(wrapper: UuidWrapper) -> Self { + wrapper.uuid + } } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -158,8 +182,8 @@ pub struct CreateStruct { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeviceRequestStruct { - pub target: Uuid, - pub request: crate::device::devices::PingRequest, + pub uuid: Uuid, + pub device_request: crate::device::devices::PingRequest, } impl DeviceManager { @@ -173,7 +197,7 @@ impl DeviceManager { } } Request::Delete(uuid) => { - let result = self.delete(uuid).await; + let result = self.delete(uuid.into()).await; if let Err(e) = actor_request.respond_to.send(result) { error!("DeviceManager: Failed to return Delete response: {e:?}"); } @@ -185,13 +209,13 @@ impl DeviceManager { } } Request::Info(device_id) => { - let result = self.info(device_id).await; + let result = self.info(device_id.into()).await; if let Err(e) = actor_request.respond_to.send(result) { error!("DeviceManager: Failed to return Info response: {:?}", e); } } Request::EnableContinuousMode(uuid) => { - let result = self.continuous_mode(uuid).await; + let result = self.continuous_mode(uuid.into()).await; if let Err(e) = actor_request.respond_to.send(result) { error!( "DeviceManager: Failed to return EnableContinuousMode response: {:?}", @@ -200,7 +224,7 @@ impl DeviceManager { } } Request::DisableContinuousMode(uuid) => { - let result = self.continuous_mode_off(uuid).await; + let result = self.continuous_mode_off(uuid.into()).await; if let Err(e) = actor_request.respond_to.send(result) { error!( "DeviceManager: Failed to return DisableContinuousMode response: {:?}", @@ -209,7 +233,7 @@ impl DeviceManager { } } Request::GetDeviceHandler(id) => { - let answer = self.get_device_handler(id).await; + let answer = self.get_device_handler(id.into()).await; if let Err(e) = actor_request.respond_to.send(answer) { error!("DeviceManager: Failed to return GetDeviceHandler response: {e:?}"); } @@ -471,8 +495,11 @@ impl ManagerActorHandler { // Devices requests are forwarded directly to device and let manager handle other incoming request. Request::Ping(request) => { trace!("Handling Ping request: {request:?}: Forwarding request to device handler"); - let get_handler_target = request.target; - let handler_request = Request::GetDeviceHandler(get_handler_target); + let get_handler_target = request.uuid; + let handler_request = + Request::GetDeviceHandler(crate::device::manager::UuidWrapper { + uuid: get_handler_target, + }); let manager_request = ManagerActorRequest { request: handler_request, respond_to: result_sender, @@ -497,13 +524,13 @@ impl ManagerActorHandler { trace!( "Handling Ping request: {request:?}: Successfully received the handler" ); - let result = handler.send(request.request.clone()).await; + let result = handler.send(request.device_request.clone()).await; match result { Ok(result) => { info!("Handling Ping request: {request:?}: Success"); Ok(Answer::DeviceMessage(DeviceAnswer { answer: result, - device_id: request.target, + device_id: request.uuid, })) } Err(err) => { diff --git a/src/main.rs b/src/main.rs index 57c0e0d9..4c7dd828 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,16 @@ mod device; mod logger; mod server; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Command { + pub command: CommandType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum CommandType { + DeviceManager(device::manager::Request), +} + #[tokio::main] async fn main() { // CLI should be started before logger to allow control over verbosity diff --git a/src/server/protocols/v1/websocket.rs b/src/server/protocols/v1/websocket.rs index 1e638154..bc8d4e70 100644 --- a/src/server/protocols/v1/websocket.rs +++ b/src/server/protocols/v1/websocket.rs @@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex}; use tracing::info; use uuid::Uuid; -use crate::device::manager::{ManagerActorHandler, Request}; +use crate::device::manager::ManagerActorHandler; pub struct StringMessage(String); @@ -134,7 +134,7 @@ impl StreamHandler> for WebsocketActor { match msg { Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Text(text)) => { - let manager_requests: Vec = match serde_json::from_str(&text) { + let manager_requests: Vec = match serde_json::from_str(&text) { Ok(requests) => requests, Err(err) => match serde_json::from_str(&text) { Ok(request) => vec![request], @@ -147,27 +147,31 @@ impl StreamHandler> for WebsocketActor { }; for request in manager_requests { - let manager_handler = self.manager_handler.clone(); - - let future = - async move { manager_handler.send(request).await }.into_actor(self); - - future - .then(|res, _, ctx| { - match &res { - Ok(result) => { - crate::server::protocols::v1::websocket::send_to_websockets( - json!(result), - None, - ); - } - Err(err) => { - ctx.text(serde_json::to_string_pretty(err).unwrap()); - } - } - fut::ready(()) - }) - .wait(ctx); + match request.command { + crate::CommandType::DeviceManager(request) => { + let manager_handler = self.manager_handler.clone(); + + let future = + async move { manager_handler.send(request).await }.into_actor(self); + + future + .then(|res, _, ctx| { + match &res { + Ok(result) => { + crate::server::protocols::v1::websocket::send_to_websockets( + json!(result), + None, + ); + } + Err(err) => { + ctx.text(serde_json::to_string_pretty(err).unwrap()); + } + } + fut::ready(()) + }) + .wait(ctx); + } + } } } Ok(ws::Message::Close(msg)) => ctx.close(msg), @@ -191,7 +195,9 @@ pub async fn websocket( let device_number = query.into_inner().device_number; if let Some(device_number) = device_number { - let request = crate::device::manager::Request::Info(device_number); + let request = crate::device::manager::Request::Info(crate::device::manager::UuidWrapper { + uuid: device_number, + }); match manager_handler.send(request).await { Ok(response) => { info!( From 1a185310bf81f7663ea3917921b77fe18af350ca Mon Sep 17 00:00:00 2001 From: Raul Victor Trombin Date: Thu, 1 Aug 2024 17:12:25 -0300 Subject: [PATCH 6/6] backup --- Cargo.toml | 1 + src/device/devices.rs | 23 +++++- src/device/helpers/device_handling.rs | 110 ++++++++++++++++++++++++++ src/device/manager.rs | 17 ++-- src/main.rs | 70 +++++++++++++++- 5 files changed, 207 insertions(+), 14 deletions(-) create mode 100644 src/device/helpers/device_handling.rs diff --git a/Cargo.toml b/Cargo.toml index 74502323..a5eccf2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ udp-stream = "0.0.12" uuid = { version = "1.8", features = ["serde"] } validator = "0.18.1" thiserror = "1.0.61" +ts-rs = { version = "9.0.1" , features = ["serde-compat", "uuid-impl"] } [build-dependencies] vergen-gix = { version = "1.0.0-beta.2", default-features = false, features = ["build", "cargo"] } diff --git a/src/device/devices.rs b/src/device/devices.rs index d0217811..0fac866f 100644 --- a/src/device/devices.rs +++ b/src/device/devices.rs @@ -2,6 +2,7 @@ use bluerobotics_ping::device::PingDevice; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot}; use tracing::{error, trace, warn}; +use ts_rs::TS; pub struct DeviceActor { pub receiver: mpsc::Receiver, @@ -267,7 +268,7 @@ pub enum UpgradeResult { Ping360, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, TS)] pub enum PingRequest { Ping1D(Ping1DRequest), Ping360(Ping360Request), @@ -277,7 +278,7 @@ pub enum PingRequest { Stop, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, TS)] pub enum Ping1DRequest { DeviceID, ModeAuto, @@ -296,33 +297,47 @@ pub enum Ping1DRequest { GainSetting, PingEnable, DistanceSimple, + #[ts(skip)] SetDeviceId(bluerobotics_ping::ping1d::SetDeviceIdStruct), + #[ts(skip)] SetModeAuto(bluerobotics_ping::ping1d::SetModeAutoStruct), + #[ts(skip)] SetPingInterval(bluerobotics_ping::ping1d::SetPingIntervalStruct), + #[ts(skip)] SetPingEnable(bluerobotics_ping::ping1d::SetPingEnableStruct), + #[ts(skip)] SetSpeedOfSound(bluerobotics_ping::ping1d::SetSpeedOfSoundStruct), + #[ts(skip)] SetRange(bluerobotics_ping::ping1d::SetRangeStruct), + #[ts(skip)] SetGainSetting(bluerobotics_ping::ping1d::SetGainSettingStruct), + #[ts(skip)] ContinuousStart(bluerobotics_ping::ping1d::ContinuousStartStruct), + #[ts(skip)] ContinuousStop(bluerobotics_ping::ping1d::ContinuousStopStruct), GotoBootloader, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, TS)] pub enum Ping360Request { MotorOff, DeviceData, AutoDeviceData, + #[ts(skip)] SetDeviceId(bluerobotics_ping::ping360::SetDeviceIdStruct), + #[ts(skip)] Transducer(bluerobotics_ping::ping360::TransducerStruct), + #[ts(skip)] Reset(bluerobotics_ping::ping360::ResetStruct), + #[ts(skip)] AutoTransmit(bluerobotics_ping::ping360::AutoTransmitStruct), } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, TS)] pub enum PingCommonRequest { DeviceInformation, ProtocolVersion, + #[ts(skip)] SetDeviceId(bluerobotics_ping::common::SetDeviceIdStruct), } diff --git a/src/device/helpers/device_handling.rs b/src/device/helpers/device_handling.rs new file mode 100644 index 00000000..2df17742 --- /dev/null +++ b/src/device/helpers/device_handling.rs @@ -0,0 +1,110 @@ +use tracing::{error, trace, warn}; +use uuid::Uuid; + +use crate::device::{ + devices::{self, DeviceActorHandler}, + manager::{Answer, Device, DeviceManager, DeviceSelection, DeviceStatus, ManagerError}, +}; + +impl DeviceManager { + pub fn check_device_uuid(&self, device_id: Uuid) -> Result<(), ManagerError> { + if self.device.contains_key(&device_id) { + return Ok(()); + } + error!( + "Getting device handler for device: {:?} : Error, device doesn't exist", + device_id + ); + Err(ManagerError::DeviceNotExist(device_id)) + } + + pub fn get_device(&self, device_id: Uuid) -> Result<&Device, ManagerError> { + let device = self + .device + .get(&device_id) + .ok_or(ManagerError::DeviceNotExist(device_id))?; + Ok(device) + } + + pub async fn get_device_handler(&self, device_id: Uuid) -> Result { + self.check_device_uuid(device_id)?; + + trace!( + "Getting device handler for device: {:?} : Success", + device_id + ); + + // Fail-fast if device is stopped + self.check_device_status( + device_id, + &[DeviceStatus::ContinuousMode, DeviceStatus::Running], + )?; + + let handler: DeviceActorHandler = self.get_device(device_id)?.handler.clone(); + + Ok(Answer::InnerDeviceHandler(handler)) + } + + pub fn check_device_status( + &self, + device_id: Uuid, + valid_statuses: &[DeviceStatus], + ) -> Result<(), ManagerError> { + let status = &self.get_device(device_id)?.status; + if !valid_statuses.contains(status) { + return Err(ManagerError::DeviceStatus(status.clone(), device_id)); + } + Ok(()) + } + + pub fn get_mut_device(&mut self, device_id: Uuid) -> Result<&mut Device, ManagerError> { + let device = self + .device + .get_mut(&device_id) + .ok_or(ManagerError::DeviceNotExist(device_id))?; + Ok(device) + } + + pub fn get_device_type(&self, device_id: Uuid) -> Result { + let device_type = self.device.get(&device_id).unwrap().device_type.clone(); + Ok(device_type) + } + + pub fn extract_handler( + &self, + device_handler: Answer, + ) -> Result { + match device_handler { + Answer::InnerDeviceHandler(handler) => Ok(handler), + answer => Err(ManagerError::Other(format!( + "Unreachable: extract_handler helper, detail: {answer:?}" + ))), + } + } + + pub async fn get_subscriber( + &self, + device_id: Uuid, + ) -> Result< + tokio::sync::broadcast::Receiver, + ManagerError, + > { + let handler_request = self.get_device_handler(device_id).await?; + let handler = self.extract_handler(handler_request)?; + + let subscriber = handler + .send(devices::PingRequest::GetSubscriber) + .await + .map_err(|err| { + warn!("Something went wrong while executing get_subscriber, details: {err:?}"); + ManagerError::DeviceError(err) + })?; + + match subscriber { + devices::PingAnswer::Subscriber(subscriber) => Ok(subscriber), + _ => Err(ManagerError::Other( + "Unreachable: get_subscriber helper".to_string(), + )), + } + } +} diff --git a/src/device/manager.rs b/src/device/manager.rs index 1ea0bcfd..e20a47e5 100644 --- a/src/device/manager.rs +++ b/src/device/manager.rs @@ -8,6 +8,7 @@ use std::{ use tokio::sync::{mpsc, oneshot}; use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream}; use tracing::{error, info, trace, warn}; +use ts_rs::TS; use udp_stream::UdpStream; use uuid::Uuid; @@ -56,7 +57,7 @@ impl Drop for Device { } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TS)] pub enum DeviceSelection { Common, Ping1D, @@ -64,7 +65,7 @@ pub enum DeviceSelection { Auto, } -#[derive(Debug, Clone, Deserialize, Serialize, Hash)] +#[derive(Debug, Clone, Deserialize, Serialize, Hash, TS)] pub enum SourceSelection { UdpStream(SourceUdpStruct), SerialStream(SourceSerialStruct), @@ -75,13 +76,13 @@ enum SourceType { Serial(SerialStream), } -#[derive(Clone, Debug, Deserialize, Serialize, Hash, Apiv2Schema)] +#[derive(Clone, Debug, Deserialize, Serialize, Hash, Apiv2Schema, TS)] pub struct SourceUdpStruct { pub ip: Ipv4Addr, pub port: u16, } -#[derive(Clone, Debug, Deserialize, Serialize, Hash, Apiv2Schema)] +#[derive(Clone, Debug, Deserialize, Serialize, Hash, Apiv2Schema, TS)] pub struct SourceSerialStruct { pub path: String, pub baudrate: u32, @@ -137,7 +138,7 @@ pub struct DeviceAnswer { pub device_id: Uuid, } -#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)] +#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema, TS)] #[serde(tag = "type", content = "payload")] pub enum Request { Create(CreateStruct), @@ -151,7 +152,7 @@ pub enum Request { DisableContinuousMode(UuidWrapper), } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, TS)] pub struct UuidWrapper { pub uuid: Uuid, } @@ -174,13 +175,13 @@ impl From for Uuid { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, TS)] pub struct CreateStruct { pub source: SourceSelection, pub device_selection: DeviceSelection, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, TS)] pub struct DeviceRequestStruct { pub uuid: Uuid, pub device_request: crate::device::devices::PingRequest, diff --git a/src/main.rs b/src/main.rs index 4c7dd828..de6cb23e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,9 @@ + + +use device::manager::{CreateStruct, DeviceRequestStruct, UuidWrapper}; +use serde::{Deserialize, Serialize}; use tracing::info; +use ts_rs::TS; #[macro_use] extern crate lazy_static; @@ -9,16 +14,23 @@ mod device; mod logger; mod server; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[ts(export, export_to = "RequestModels.ts")] pub struct Command { pub command: CommandType, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(tag = "module")] pub enum CommandType { DeviceManager(device::manager::Request), } +// #[derive(Debug, Clone, Serialize, Deserialize)] +// pub enum Command { +// DeviceManager(device::manager::Request), +// } + #[tokio::main] async fn main() { // CLI should be started before logger to allow control over verbosity @@ -34,6 +46,60 @@ async fn main() { "DeviceManager initialized with following devices: {:?}", handler.send(crate::device::manager::Request::List).await ); + use serde_json::json; + use uuid::Uuid; + + // Define your requests + let requests = vec![ + crate::device::manager::Request::Ping(DeviceRequestStruct { + uuid: Uuid::parse_str("00000000-0000-0000-001e-10da679f8cee").unwrap(), + device_request: crate::device::devices::PingRequest::Ping360( + crate::device::devices::Ping360Request::Transducer( + bluerobotics_ping::ping360::TransducerStruct { + mode: 1, + gain_setting: 2, + angle: 0, + transmit_duration: 500, + sample_period: 80, + transmit_frequency: 700, + number_of_samples: 1200, + transmit: 1, + reserved: 1, + }, + ), + ), + }), + crate::device::manager::Request::EnableContinuousMode(UuidWrapper { + uuid: Uuid::parse_str("00000000-0000-0000-001e-10da679f8cee").unwrap(), + }), + crate::device::manager::Request::List, + crate::device::manager::Request::Create(CreateStruct { + source: device::manager::SourceSelection::SerialStream( + device::manager::SourceSerialStruct { + path: "/dev/ttyUSB0".to_string(), + baudrate: 115200, + }, + ), + device_selection: device::manager::DeviceSelection::Auto, + }), + crate::device::manager::Request::Create(CreateStruct { + source: device::manager::SourceSelection::UdpStream(device::manager::SourceUdpStruct { + ip: "192.168.0.1".parse().unwrap(), + port: 9092, + }), + device_selection: device::manager::DeviceSelection::Auto, + }), + ]; + + // Print each request as JSON + for request in requests { + println!( + "{}", + json!(Command { + command: CommandType::DeviceManager(request) + }) + ); + } server::manager::run(&cli::manager::server_address(), handler) .await