From 65f041a991f78e15b0037938271a15e9cdfa623d Mon Sep 17 00:00:00 2001 From: Raul Victor Trombin Date: Thu, 1 Aug 2024 00:34:14 -0300 Subject: [PATCH] src: device: Change request pattern --- src/device/manager/manager.rs | 576 +++++++++++++++++++++++++++ src/main.rs | 7 + src/server/protocols/v1/websocket.rs | 54 +-- 3 files changed, 613 insertions(+), 24 deletions(-) create mode 100644 src/device/manager/manager.rs diff --git a/src/device/manager/manager.rs b/src/device/manager/manager.rs new file mode 100644 index 00000000..1ea0bcfd --- /dev/null +++ b/src/device/manager/manager.rs @@ -0,0 +1,576 @@ +use paperclip::actix::Apiv2Schema; +use serde::{Deserialize, Serialize}; +use std::{ + collections::{hash_map::DefaultHasher, HashMap}, + hash::{Hash, Hasher}, + net::{Ipv4Addr, SocketAddrV4}, +}; +use tokio::sync::{mpsc, oneshot}; +use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream}; +use tracing::{error, info, trace, warn}; +use udp_stream::UdpStream; +use uuid::Uuid; + +use super::devices::{DeviceActor, DeviceActorHandler}; +use bluerobotics_ping::device::{Ping1D, Ping360}; + +pub struct Device { + pub id: Uuid, + pub source: SourceSelection, + pub handler: super::devices::DeviceActorHandler, + pub actor: tokio::task::JoinHandle, + pub broadcast: Option>, + pub status: DeviceStatus, + pub device_type: DeviceSelection, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DeviceInfo { + pub id: Uuid, + pub source: SourceSelection, + pub status: DeviceStatus, + pub device_type: DeviceSelection, +} +impl Device { + pub fn info(&self) -> DeviceInfo { + DeviceInfo { + id: self.id, + source: self.source.clone(), + status: self.status.clone(), + device_type: self.device_type.clone(), + } + } +} + +impl Drop for Device { + fn drop(&mut self) { + trace!( + "Removing Device from DeviceManager, details: {:?}", + self.info() + ); + self.actor.abort(); + if let Some(broadcast_handle) = &self.broadcast { + trace!("Device broadcast handle closed for: {:?}", self.info().id); + broadcast_handle.abort(); + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum DeviceSelection { + Common, + Ping1D, + Ping360, + Auto, +} + +#[derive(Debug, Clone, Deserialize, Serialize, Hash)] +pub enum SourceSelection { + UdpStream(SourceUdpStruct), + SerialStream(SourceSerialStruct), +} + +enum SourceType { + Udp(UdpStream), + Serial(SerialStream), +} + +#[derive(Clone, Debug, Deserialize, Serialize, Hash, Apiv2Schema)] +pub struct SourceUdpStruct { + pub ip: Ipv4Addr, + pub port: u16, +} + +#[derive(Clone, Debug, Deserialize, Serialize, Hash, Apiv2Schema)] +pub struct SourceSerialStruct { + pub path: String, + pub baudrate: u32, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum DeviceStatus { + Running, + Stopped, + ContinuousMode, +} + +pub struct DeviceManager { + receiver: mpsc::Receiver, + pub device: HashMap, +} + +#[derive(Debug)] +pub struct ManagerActorRequest { + pub request: Request, + pub respond_to: oneshot::Sender>, +} +#[derive(Clone)] +pub struct ManagerActorHandler { + pub sender: mpsc::Sender, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Apiv2Schema)] +pub enum Answer { + DeviceMessage(DeviceAnswer), + #[serde(skip)] + InnerDeviceHandler(DeviceActorHandler), + DeviceInfo(Vec), +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum ManagerError { + DeviceNotExist(Uuid), + DeviceAlreadyExist(Uuid), + DeviceStatus(DeviceStatus, Uuid), + DeviceError(super::devices::DeviceError), + DeviceSourceError(String), + NoDevices, + TokioMpsc(String), + NotImplemented(Request), + Other(String), +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DeviceAnswer { + #[serde(flatten)] + pub answer: crate::device::devices::PingAnswer, + pub device_id: Uuid, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Apiv2Schema)] +#[serde(tag = "type", content = "payload")] +pub enum Request { + Create(CreateStruct), + Delete(UuidWrapper), + List, + Info(UuidWrapper), + Search, + Ping(DeviceRequestStruct), + GetDeviceHandler(UuidWrapper), + EnableContinuousMode(UuidWrapper), + DisableContinuousMode(UuidWrapper), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UuidWrapper { + pub uuid: Uuid, +} + +impl UuidWrapper { + pub fn uuid(&self) -> Uuid { + self.uuid + } +} + +impl From for UuidWrapper { + fn from(uuid: Uuid) -> Self { + UuidWrapper { uuid } + } +} + +impl From for Uuid { + fn from(wrapper: UuidWrapper) -> Self { + wrapper.uuid + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateStruct { + pub source: SourceSelection, + pub device_selection: DeviceSelection, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeviceRequestStruct { + pub uuid: Uuid, + pub device_request: crate::device::devices::PingRequest, +} + +impl DeviceManager { + async fn handle_message(&mut self, actor_request: ManagerActorRequest) { + trace!("DeviceManager: Received a request, details: {actor_request:?}"); + match actor_request.request { + Request::Create(request) => { + let result = self.create(request.source, request.device_selection).await; + if let Err(e) = actor_request.respond_to.send(result) { + error!("DeviceManager: Failed to return Create response: {e:?}"); + } + } + Request::Delete(uuid) => { + let result = self.delete(uuid.into()).await; + if let Err(e) = actor_request.respond_to.send(result) { + error!("DeviceManager: Failed to return Delete response: {e:?}"); + } + } + Request::List => { + let result = self.list().await; + if let Err(e) = actor_request.respond_to.send(result) { + error!("DeviceManager: Failed to return List response: {e:?}"); + } + } + Request::Info(device_id) => { + let result = self.info(device_id.into()).await; + if let Err(e) = actor_request.respond_to.send(result) { + error!("DeviceManager: Failed to return Info response: {:?}", e); + } + } + Request::EnableContinuousMode(uuid) => { + let result = self.continuous_mode(uuid.into()).await; + if let Err(e) = actor_request.respond_to.send(result) { + error!( + "DeviceManager: Failed to return EnableContinuousMode response: {:?}", + e + ); + } + } + Request::DisableContinuousMode(uuid) => { + let result = self.continuous_mode_off(uuid.into()).await; + if let Err(e) = actor_request.respond_to.send(result) { + error!( + "DeviceManager: Failed to return DisableContinuousMode response: {:?}", + e + ); + } + } + Request::GetDeviceHandler(id) => { + let answer = self.get_device_handler(id.into()).await; + if let Err(e) = actor_request.respond_to.send(answer) { + error!("DeviceManager: Failed to return GetDeviceHandler response: {e:?}"); + } + } + _ => { + if let Err(e) = actor_request + .respond_to + .send(Err(ManagerError::NotImplemented(actor_request.request))) + { + warn!("DeviceManager: Failed to return response: {e:?}"); + } + } + } + } + + pub fn new(size: usize) -> (Self, ManagerActorHandler) { + let (sender, receiver) = mpsc::channel(size); + let actor = DeviceManager { + receiver, + device: HashMap::new(), + }; + let actor_handler = ManagerActorHandler { sender }; + + trace!("DeviceManager and handler successfully created: Success"); + (actor, actor_handler) + } + + pub async fn run(mut self) { + while let Some(msg) = self.receiver.recv().await { + self.update_devices_status().await; // Todo: move to an outer process + self.handle_message(msg).await; + } + error!("DeviceManager has stopped please check your application"); + } + + pub async fn update_devices_status(&mut self) { + if let Ok(Answer::DeviceInfo(answer)) = self.list().await { + for device in answer { + if let Some(device_entry) = self.device.get_mut(&device.id) { + if device_entry.status == DeviceStatus::Stopped { + break; + } + if device_entry.actor.is_finished() { + info!("Device stopped, device id: {device:?}"); + device_entry.status = DeviceStatus::Stopped; + } + } + } + } + } + + pub async fn create( + &mut self, + source: SourceSelection, + mut device_selection: DeviceSelection, + ) -> Result { + let mut hasher = DefaultHasher::new(); + source.hash(&mut hasher); + let hash = Uuid::from_u128(hasher.finish().into()); + + if self.device.contains_key(&hash) { + trace!("Device creation error: Device already exist for provided SourceSelection, details: {source:?}"); + return Err(ManagerError::DeviceAlreadyExist(hash)); + } + + let port = match &source { + SourceSelection::UdpStream(source_udp_struct) => { + let socket_addr = SocketAddrV4::new(source_udp_struct.ip, source_udp_struct.port); + + let udp_stream = UdpStream::connect(socket_addr.into()) + .await + .map_err(|err| ManagerError::DeviceSourceError(err.to_string()))?; + SourceType::Udp(udp_stream) + } + SourceSelection::SerialStream(source_serial_struct) => { + let serial_stream = tokio_serial::new( + source_serial_struct.path.clone(), + source_serial_struct.baudrate, + ) + .open_native_async() + .map_err(|err| ManagerError::DeviceSourceError(err.to_string()))?; + + serial_stream + .clear(tokio_serial::ClearBuffer::All) + .map_err(|err| ManagerError::DeviceSourceError(err.to_string()))?; + + SourceType::Serial(serial_stream) + } + }; + + let device = match port { + SourceType::Udp(udp_port) => match device_selection { + DeviceSelection::Common | DeviceSelection::Auto => { + crate::device::devices::DeviceType::Common( + bluerobotics_ping::common::Device::new(udp_port), + ) + } + DeviceSelection::Ping1D => { + crate::device::devices::DeviceType::Ping1D(Ping1D::new(udp_port)) + } + DeviceSelection::Ping360 => { + crate::device::devices::DeviceType::Ping360(Ping360::new(udp_port)) + } + }, + SourceType::Serial(serial_port) => match device_selection { + DeviceSelection::Common | DeviceSelection::Auto => { + crate::device::devices::DeviceType::Common( + bluerobotics_ping::common::Device::new(serial_port), + ) + } + DeviceSelection::Ping1D => { + crate::device::devices::DeviceType::Ping1D(Ping1D::new(serial_port)) + } + DeviceSelection::Ping360 => { + crate::device::devices::DeviceType::Ping360(Ping360::new(serial_port)) + } + }, + }; + + let (mut device, handler) = super::devices::DeviceActor::new(device, 10); + + if device_selection == DeviceSelection::Auto { + match device.try_upgrade().await { + Ok(super::devices::PingAnswer::UpgradeResult(result)) => match result { + super::devices::UpgradeResult::Unknown => { + device_selection = DeviceSelection::Common; + } + super::devices::UpgradeResult::Ping1D => { + device_selection = DeviceSelection::Ping1D; + } + super::devices::UpgradeResult::Ping360 => { + device_selection = DeviceSelection::Ping360; + } + }, + Err(err) => { + error!( + "Device creation error: Can't auto upgrade the DeviceType, details: {err:?}" + ); + return Err(ManagerError::DeviceError(err)); + } + _ => todo!(), + } + } + + let actor = tokio::spawn(async move { device.run().await }); + + let device = Device { + id: hash, + source, + handler, + actor, + status: DeviceStatus::Running, + broadcast: None, + device_type: device_selection, + }; + + self.device.insert(hash, device); + + trace!("Device broadcast enable by default for: {hash:?}"); + let device_info = self.continuous_mode(hash).await?; + + info!("New device created and available, details: {device_info:?}"); + Ok(device_info) + } + + pub async fn list(&self) -> Result { + if self.device.is_empty() { + trace!("No devices available for list generation request"); + return Err(ManagerError::NoDevices); + }; + let mut list = Vec::new(); + for device in self.device.values() { + list.push(device.info()) + } + Ok(Answer::DeviceInfo(list)) + } + + pub async fn info(&self, device_id: Uuid) -> Result { + self.check_device_uuid(device_id)?; + Ok(Answer::DeviceInfo(vec![self.get_device(device_id)?.info()])) + } + + pub async fn delete(&mut self, device_id: Uuid) -> Result { + match self.device.remove(&device_id) { + Some(device) => { + let device_info = device.info(); + drop(device); + trace!("Device delete id {:?}: Success", device_id); + Ok(Answer::DeviceInfo(vec![device_info])) + } + None => { + error!("Device delete id {device_id:?} : Error, device doesn't exist"); + Err(ManagerError::DeviceNotExist(device_id)) + } + } + } + + pub async fn continuous_mode(&mut self, device_id: Uuid) -> Result { + self.check_device_status(device_id, &[DeviceStatus::Running])?; + let device_type = self.get_device_type(device_id)?; + + // Get an inner subscriber for device's stream + let subscriber = self.get_subscriber(device_id).await?; + + let broadcast_handle = self + .continuous_mode_start(subscriber, device_id, device_type.clone()) + .await; + if let Some(handle) = &broadcast_handle { + if !handle.is_finished() { + trace!("Success start_continuous_mode for {device_id:?}"); + } else { + return Err(ManagerError::Other( + "Error while start_continuous_mode".to_string(), + )); + } + } else { + return Err(ManagerError::Other( + "Error while start_continuous_mode".to_string(), + )); + }; + + self.continuous_mode_startup_routine(device_id, device_type) + .await?; + + let device = self.get_mut_device(device_id)?; + device.broadcast = broadcast_handle; + device.status = DeviceStatus::ContinuousMode; + + let updated_device_info = self.get_device(device_id)?.info(); + + Ok(Answer::DeviceInfo(vec![updated_device_info])) + } + + pub async fn continuous_mode_off(&mut self, device_id: Uuid) -> Result { + self.check_device_status(device_id, &[DeviceStatus::ContinuousMode])?; + let device_type = self.get_device_type(device_id)?; + + let device = self.get_mut_device(device_id)?; + if let Some(broadcast) = device.broadcast.take() { + broadcast.abort_handle().abort(); + } + + device.status = DeviceStatus::Running; + + let updated_device_info = device.info(); + + self.continuous_mode_shutdown_routine(device_id, device_type) + .await?; + + Ok(Answer::DeviceInfo(vec![updated_device_info])) + } +} + +impl ManagerActorHandler { + pub async fn send(&self, request: Request) -> Result { + let (result_sender, result_receiver) = oneshot::channel(); + + match &request { + // Devices requests are forwarded directly to device and let manager handle other incoming request. + Request::Ping(request) => { + trace!("Handling Ping request: {request:?}: Forwarding request to device handler"); + let get_handler_target = request.uuid; + let handler_request = + Request::GetDeviceHandler(crate::device::manager::UuidWrapper { + uuid: get_handler_target, + }); + let manager_request = ManagerActorRequest { + request: handler_request, + respond_to: result_sender, + }; + self.sender + .send(manager_request) + .await + .map_err(|err| ManagerError::TokioMpsc(err.to_string()))?; + let result = match result_receiver + .await + .map_err(|err| ManagerError::TokioMpsc(err.to_string())) + { + Ok(ans) => ans, + Err(err) => { + error!("DeviceManagerHandler: Failed to receive handler from Manager, details: {err:?}"); + return Err(err); + } + }; + + match result? { + Answer::InnerDeviceHandler(handler) => { + trace!( + "Handling Ping request: {request:?}: Successfully received the handler" + ); + let result = handler.send(request.device_request.clone()).await; + match result { + Ok(result) => { + info!("Handling Ping request: {request:?}: Success"); + Ok(Answer::DeviceMessage(DeviceAnswer { + answer: result, + device_id: request.uuid, + })) + } + Err(err) => { + error!( + "Handling Ping request: {request:?}: Error ocurred on device: {err:?}" ); + Err(ManagerError::DeviceError(err)) + } + } + } + answer => Ok(answer), //should be unreachable + } + } + _ => { + trace!("Handling DeviceManager request: {request:?}: Forwarding request."); + let device_request = ManagerActorRequest { + request: request.clone(), + respond_to: result_sender, + }; + + self.sender + .send(device_request) + .await + .map_err(|err| ManagerError::TokioMpsc(err.to_string()))?; + + match result_receiver + .await + .map_err(|err| ManagerError::TokioMpsc(err.to_string()))? + { + Ok(ans) => { + trace!("Handling DeviceManager request: {request:?}: Success"); + Ok(ans) + } + Err(err) => { + error!( + "Handling DeviceManager request: {request:?}: Error ocurred on manager: {err:?}", + ); + Err(err) + } + } + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 07a6a750..04c288bf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use serde::{Deserialize, Serialize}; use tracing::info; #[macro_use] @@ -9,6 +10,12 @@ mod device; mod logger; mod server; +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "module")] +pub enum ModuleType { + DeviceManager(device::manager::Request), +} + #[tokio::main] async fn main() { // CLI should be started before logger to allow control over verbosity diff --git a/src/server/protocols/v1/websocket.rs b/src/server/protocols/v1/websocket.rs index 1e638154..da7fae50 100644 --- a/src/server/protocols/v1/websocket.rs +++ b/src/server/protocols/v1/websocket.rs @@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex}; use tracing::info; use uuid::Uuid; -use crate::device::manager::{ManagerActorHandler, Request}; +use crate::device::manager::ManagerActorHandler; pub struct StringMessage(String); @@ -134,7 +134,7 @@ impl StreamHandler> for WebsocketActor { match msg { Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Text(text)) => { - let manager_requests: Vec = match serde_json::from_str(&text) { + let manager_requests: Vec = match serde_json::from_str(&text) { Ok(requests) => requests, Err(err) => match serde_json::from_str(&text) { Ok(request) => vec![request], @@ -147,27 +147,31 @@ impl StreamHandler> for WebsocketActor { }; for request in manager_requests { - let manager_handler = self.manager_handler.clone(); - - let future = - async move { manager_handler.send(request).await }.into_actor(self); - - future - .then(|res, _, ctx| { - match &res { - Ok(result) => { - crate::server::protocols::v1::websocket::send_to_websockets( - json!(result), - None, - ); - } - Err(err) => { - ctx.text(serde_json::to_string_pretty(err).unwrap()); - } - } - fut::ready(()) - }) - .wait(ctx); + match request { + crate::ModuleType::DeviceManager(request) => { + let manager_handler = self.manager_handler.clone(); + + let future = + async move { manager_handler.send(request).await }.into_actor(self); + + future + .then(|res, _, ctx| { + match &res { + Ok(result) => { + crate::server::protocols::v1::websocket::send_to_websockets( + json!(result), + None, + ); + } + Err(err) => { + ctx.text(serde_json::to_string_pretty(err).unwrap()); + } + } + fut::ready(()) + }) + .wait(ctx); + } + } } } Ok(ws::Message::Close(msg)) => ctx.close(msg), @@ -191,7 +195,9 @@ pub async fn websocket( let device_number = query.into_inner().device_number; if let Some(device_number) = device_number { - let request = crate::device::manager::Request::Info(device_number); + let request = crate::device::manager::Request::Info(crate::device::manager::UuidWrapper { + uuid: device_number, + }); match manager_handler.send(request).await { Ok(response) => { info!(