diff --git a/README.md b/README.md index 080808a..47207c5 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ use sunspec::{ models::{model1::Model1, model103::Model103}, }; use tokio::time::sleep; -use tokio_modbus::{client::tcp::connect_slave, Slave}; +use tokio_modbus::client::tcp::connect; #[derive(Parser)] struct Args { @@ -62,12 +62,10 @@ struct Args { async fn main() -> Result<(), Box> { let args = Args::parse(); - let mut client = AsyncClient::new( - connect_slave(args.addr, Slave(args.device_id)).await?, - Config::default(), - ).await?; + let client = AsyncClient::new(connect(args.addr).await?, Config::default()); + let device = client.device(args.device_id).await?; - let m1: Model1 = client.read_model().await?; + let m1: Model1 = device.read_model().await?; println!("Manufacturer: {}", m1.mn); println!("Model: {}", m1.md); @@ -76,7 +74,7 @@ async fn main() -> Result<(), Box> { println!( "Supported models: {}", - client + device .models .supported_model_ids() .iter() @@ -85,7 +83,7 @@ async fn main() -> Result<(), Box> { ); loop { - let m103: Model103 = client.read_model().await?; + let m103: Model103 = device.read_model().await?; let w = m103.w as f32 * 10f32.powf(m103.w_sf.into()); let wh = m103.wh as f32 * 10f32.powf(m103.wh_sf.into()); println!("{:12.3} kWh {:9.3} kW", wh / 1000.0, w / 1000.0,); diff --git a/examples/model103/src/main.rs b/examples/model103/src/main.rs index a7ff184..830404f 100644 --- a/examples/model103/src/main.rs +++ b/examples/model103/src/main.rs @@ -9,7 +9,7 @@ use sunspec::{ models::model1::Model1, }; use tokio::time::sleep; -use tokio_modbus::{client::tcp::connect_slave, Slave}; +use tokio_modbus::client::tcp::connect; #[derive(Parser)] struct Args { @@ -37,18 +37,18 @@ struct Args { async fn main() -> Result<(), Box> { let args = Args::parse(); - let mut client = AsyncClient::new( - connect_slave(args.addr, Slave(args.device_id)).await?, + let client = AsyncClient::new( + connect(args.addr).await?, Config { discovery_addresses: args.discovery_addresses, read_timeout: (args.read_timeout != 0.0) .then(|| Duration::from_secs_f32(args.read_timeout)), ..Default::default() }, - ) - .await?; + ); - let m1: Model1 = client.read_model().await?; + let device = client.device(args.device_id).await?; + let m1: Model1 = device.read_model().await?; println!("Manufacturer: {}", m1.mn); println!("Model: {}", m1.md); @@ -57,7 +57,7 @@ async fn main() -> Result<(), Box> { println!( "Supported models: {}", - client + device .models .supported_model_ids() .iter() @@ -66,7 +66,7 @@ async fn main() -> Result<(), Box> { ); loop { - let m103: Model103 = client.read_model().await?; + let m103: Model103 = device.read_model().await?; let w = m103.w as f32 * 10f32.powf(m103.w_sf.into()); let wh = m103.wh as f32 * 10f32.powf(m103.wh_sf.into()); println!("{:12.3} kWh {:9.3} kW", wh / 1000.0, w / 1000.0,); diff --git a/examples/readme/src/main.rs b/examples/readme/src/main.rs index 872d853..11a6bb9 100644 --- a/examples/readme/src/main.rs +++ b/examples/readme/src/main.rs @@ -7,7 +7,7 @@ use sunspec::{ models::{model1::Model1, model103::Model103}, }; use tokio::time::sleep; -use tokio_modbus::{client::tcp::connect_slave, Slave}; +use tokio_modbus::client::tcp::connect; #[derive(Parser)] struct Args { @@ -19,13 +19,10 @@ struct Args { async fn main() -> Result<(), Box> { let args = Args::parse(); - let mut client = AsyncClient::new( - connect_slave(args.addr, Slave(args.device_id)).await?, - Config::default(), - ) - .await?; + let client = AsyncClient::new(connect(args.addr).await?, Config::default()); + let device = client.device(args.device_id).await?; - let m1: Model1 = client.read_model().await?; + let m1: Model1 = device.read_model().await?; println!("Manufacturer: {}", m1.mn); println!("Model: {}", m1.md); @@ -34,7 +31,7 @@ async fn main() -> Result<(), Box> { println!( "Supported models: {}", - client + device .models .supported_model_ids() .iter() @@ -43,7 +40,7 @@ async fn main() -> Result<(), Box> { ); loop { - let m103: Model103 = client.read_model().await?; + let m103: Model103 = device.read_model().await?; let w = m103.w as f32 * 10f32.powf(m103.w_sf.into()); let wh = m103.wh as f32 * 10f32.powf(m103.wh_sf.into()); println!("{:12.3} kWh {:9.3} kW", wh / 1000.0, w / 1000.0,); diff --git a/src/client/async.rs b/src/client/async.rs index 22572d0..b4ae422 100644 --- a/src/client/async.rs +++ b/src/client/async.rs @@ -12,40 +12,77 @@ use super::{ pub struct AsyncClient { /// This is the actual modbus client which implements the `AsyncModbusClient` trait. pub client: C, - /// Model configuration + /// Client configuration pub config: Config, - /// Discovered models - pub models: Models, - /// Unknown models - pub unknown_models: Vec, } impl AsyncClient { /// Create new AsyncClient using a `AsyncModbusClient` and a `Config` - /// and perform "Device Information Model Discovery" as explained in + pub fn new(client: impl IntoAsyncModbusClient, config: Config) -> Self { + Self { + client: client.into_async_modbus_client(), + config, + } + } + /// Perform "Device Information Model Discovery" as explained in + /// [SunSpec Device Information Specification V1.1](https://sunspec.org/wp-content/uploads/2022/05/SunSpec-Device-Information-Model-Specificiation-V1-1-final.pdf) + /// for all slave IDs (0..=255) and return a vector of discovered + /// devices. + pub async fn devices(&self) -> Vec> { + let mut devices = Vec::new(); + for slave_id in 0..=255 { + if let Ok(device) = self.device(slave_id).await { + devices.push(device); + } + } + devices + } + /// Perform "Device Information Model Discovery" as explained in /// [SunSpec Device Information Specification V1.1](https://sunspec.org/wp-content/uploads/2022/05/SunSpec-Device-Information-Model-Specificiation-V1-1-final.pdf) - pub async fn new(mut client: C, config: Config) -> Result { + /// for a single slave ID and return the discovered device. + pub async fn device(&self, slave_id: u8) -> Result, DiscoveryError> { let discovery_result = discover_models( - &mut client, - &config.discovery_addresses, - config.read_timeout, + &self.client, + slave_id, + &self.config.discovery_addresses, + self.config.read_timeout, ) .await?; - Ok(Self { - client, - config, + Ok(AsyncDevice { + client: self.client.clone(), + config: self.config.clone(), + slave_id, models: discovery_result.models, unknown_models: discovery_result.unknown_models, }) } +} + +/// Client structure for a discovered device +#[derive(Debug)] +pub struct AsyncDevice { + /// This is the actual modbus client which implements the `AsyncModbusClient` trait. + pub client: C, + /// Client configuration + pub config: Config, + /// The Slave ID + pub slave_id: u8, + /// Discovered models + pub models: Models, + /// Unknown models + pub unknown_models: Vec, +} + +impl AsyncDevice { /// Read model data from modbus /// /// Note: Some models are too big to be fetched in a single request /// and multiple read_holding_registers calls will be issued. - pub async fn read_model(&mut self) -> Result { + pub async fn read_model(&self) -> Result { let addr = M::addr(&self.models); read_model( - &mut self.client, + &self.client, + self.slave_id, addr, self.config.max_read_length, self.config.read_timeout, @@ -56,12 +93,13 @@ impl AsyncClient { /// `read_model` is more efficient when loading multiple /// points from a single model. pub async fn read_point( - &mut self, + &self, point: Point, ) -> Result { let model_addr = M::addr(&self.models); read_point( - &mut self.client, + &self.client, + self.slave_id, model_addr, point, self.config.read_timeout, @@ -70,13 +108,14 @@ impl AsyncClient { } /// Write data for a single point pub async fn write_point( - &mut self, + &self, point: Point, value: T, ) -> Result<(), WritePointError> { let model_addr = M::addr(&self.models); write_point( - &mut self.client, + &self.client, + self.slave_id, model_addr, point, value, @@ -87,38 +126,55 @@ impl AsyncClient { } /// Async Modbus client -pub trait AsyncModbusClient { +pub trait AsyncModbusClient: Sync + Clone { /// Read registers from Modbus device fn read_registers( - &mut self, + &self, + slave_id: u8, addr: u16, len: u16, ) -> impl Future, ModbusError>> + Send; /// Write registers to Modbus device fn write_registers( - &mut self, + &self, + slave_id: u8, addr: u16, data: &[u16], ) -> impl Future> + Send; } +pub trait IntoAsyncModbusClient { + fn into_async_modbus_client(self) -> C; +} + +impl IntoAsyncModbusClient for C { + fn into_async_modbus_client(self) -> C { + self + } +} + async fn read_holding_registers_array( - client: &mut impl AsyncModbusClient, + client: &impl AsyncModbusClient, + slave_id: u8, addr: u16, ) -> Result<[u16; CNT], ModbusError> { // Unwrap is fine here as read_holding_registers is guaranteed to // return the right amount of words. - client.read_registers(addr, CNT as u16).await.map(|words| { - words - .try_into() - .expect("read_holding_registers returned the wrong amount of words") - }) + client + .read_registers(slave_id, addr, CNT as u16) + .await + .map(|words| { + words + .try_into() + .expect("read_holding_registers returned the wrong amount of words") + }) } /// This function implements the "Device Information Model Discovery" /// as explained in [SunSpec Device Information Specification V1.1](https://sunspec.org/wp-content/uploads/2022/05/SunSpec-Device-Information-Model-Specificiation-V1-1-final.pdf) async fn discover_models( - client: &mut impl AsyncModbusClient, + client: &impl AsyncModbusClient, + slave_id: u8, discovery_addresses: &[u16], read_timeout: Option, ) -> Result { @@ -127,7 +183,7 @@ async fn discover_models( for &addr in discovery_addresses.iter() { // TODO add timeout match apply_timeout( - read_holding_registers_array::<2>(client, addr), + read_holding_registers_array::<2>(client, slave_id, addr), read_timeout, ) .await @@ -153,7 +209,7 @@ async fn discover_models( loop { let [model_id, len] = apply_timeout( - read_holding_registers_array::<2>(client, addr), + read_holding_registers_array::<2>(client, slave_id, addr), read_timeout, ) .await?; @@ -184,13 +240,18 @@ async fn discover_models( /// Note: Some models are too big to be fetched in a single request /// and multiple read_holding_registers calls will be issued. async fn read_model( - client: &mut impl AsyncModbusClient, + client: &impl AsyncModbusClient, + slave_id: u8, addr: ModelAddr, max_read_length: u16, read_timeout: Option, ) -> Result { let data = if addr.len <= max_read_length { - apply_timeout(client.read_registers(addr.addr, addr.len), read_timeout).await? + apply_timeout( + client.read_registers(slave_id, addr.addr, addr.len), + read_timeout, + ) + .await? } else { let mut data: Vec = Vec::with_capacity(addr.len.into()); let begin = addr.addr; @@ -201,6 +262,7 @@ async fn read_model( for range in ranges { let chunk = apply_timeout( client.read_registers( + slave_id, range.start, range .len() @@ -221,13 +283,14 @@ async fn read_model( /// `read_model` is more efficient when loading multiple /// points from a single model. async fn read_point( - client: &mut impl AsyncModbusClient, + client: &impl AsyncModbusClient, + slave_id: u8, model_addr: ModelAddr, point: Point, read_timeout: Option, ) -> Result { let data = apply_timeout( - client.read_registers(model_addr.addr + point.offset, point.length), + client.read_registers(slave_id, model_addr.addr + point.offset, point.length), read_timeout, ) .await?; @@ -236,7 +299,8 @@ async fn read_point( /// Write data for a single point async fn write_point( - client: &mut impl AsyncModbusClient, + client: &impl AsyncModbusClient, + slave_id: u8, model_addr: ModelAddr, point: Point, value: T, @@ -247,7 +311,7 @@ async fn write_point( return Err(WritePointError::ValueTooLarge); } apply_timeout( - client.write_registers(model_addr.addr + point.offset, &data), + client.write_registers(slave_id, model_addr.addr + point.offset, &data), write_timeout, ) .await?; diff --git a/src/client/mod.rs b/src/client/mod.rs index 77d4564..faa2c73 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -11,4 +11,4 @@ pub use config::{ }; pub use discovery::{DiscoveryError, DiscoveryResult, UnknownModel}; pub use error::{ModbusError, ReadModelError, ReadPointError, WritePointError}; -pub use r#async::{AsyncClient, AsyncModbusClient}; +pub use r#async::{AsyncClient, AsyncDevice, AsyncModbusClient}; diff --git a/src/client/tokio_modbus.rs b/src/client/tokio_modbus.rs index fd0ff1d..948840f 100644 --- a/src/client/tokio_modbus.rs +++ b/src/client/tokio_modbus.rs @@ -1,16 +1,43 @@ //! This module contains the actual communication methods via //! `tokio-modbus`. -use tokio_modbus::client::{Context, Reader, Writer}; +use std::sync::Arc; -use super::{AsyncModbusClient, ModbusError}; +use tokio::sync::Mutex; +use tokio_modbus::{ + client::{Context, Reader, Writer}, + slave::SlaveContext, + Slave, +}; -impl AsyncModbusClient for Context { - async fn read_registers(&mut self, addr: u16, len: u16) -> Result, ModbusError> { - Ok(self.read_holding_registers(addr, len).await??) +use super::{r#async::IntoAsyncModbusClient, AsyncModbusClient, ModbusError}; + +impl AsyncModbusClient for Arc> { + async fn read_registers( + &self, + slave_id: u8, + addr: u16, + len: u16, + ) -> Result, ModbusError> { + let mut ctx = self.lock().await; + ctx.set_slave(Slave(slave_id)); + Ok(ctx.read_holding_registers(addr, len).await??) + } + async fn write_registers( + &self, + slave_id: u8, + addr: u16, + data: &[u16], + ) -> Result<(), ModbusError> { + let mut ctx = self.lock().await; + ctx.set_slave(Slave(slave_id)); + Ok(ctx.write_multiple_registers(addr, data).await??) } - async fn write_registers(&mut self, addr: u16, data: &[u16]) -> Result<(), ModbusError> { - Ok(self.write_multiple_registers(addr, data).await??) +} + +impl IntoAsyncModbusClient>> for Context { + fn into_async_modbus_client(self) -> Arc> { + Arc::new(Mutex::new(self)) } }