Skip to content

Commit

Permalink
refactor: separate out DeviceConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Aug 21, 2024
1 parent 0bb9346 commit 235b52a
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 55 deletions.
10 changes: 6 additions & 4 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use std::fs;
use std::path::PathBuf;
use std::{collections::HashMap, fmt::Debug, pin::Pin, sync::Arc, time::Duration};

use crate::base::actions::Cancellation;
use crate::config::{ActionRoute, Config, DeviceConfig};
use crate::{Action, ActionResponse};

use super::streams::Streams;
use super::{ActionBridgeShutdown, Package, StreamMetrics};
use crate::base::actions::Cancellation;
use crate::config::ActionRoute;
use crate::{Action, ActionResponse, Config};

const TUNSHELL_ACTION: &str = "launch_shell";

Expand Down Expand Up @@ -75,6 +76,7 @@ pub struct ActionsBridge {
impl ActionsBridge {
pub fn new(
config: Arc<Config>,
device_config: Arc<DeviceConfig>,
package_tx: Sender<Box<dyn Package>>,
actions_rx: Receiver<Action>,
shutdown_handle: Sender<()>,
Expand All @@ -94,7 +96,7 @@ impl ActionsBridge {
action_status.batch_size = 1;

streams_config.insert("action_status".to_owned(), action_status);
let mut streams = Streams::new(config.clone(), package_tx, metrics_tx);
let mut streams = Streams::new(config.clone(), device_config, package_tx, metrics_tx);
streams.config_streams(streams_config);

Self {
Expand Down
5 changes: 3 additions & 2 deletions uplink/src/base/bridge/data_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use flume::{bounded, Receiver, RecvError, Sender};
use log::{debug, error};
use tokio::{select, time::interval};

use crate::Config;
use crate::config::{Config, DeviceConfig};

use super::{streams::Streams, DataBridgeShutdown, Package, Payload, StreamMetrics};

Expand All @@ -30,13 +30,14 @@ pub struct DataBridge {
impl DataBridge {
pub fn new(
config: Arc<Config>,
device_config: Arc<DeviceConfig>,
package_tx: Sender<Box<dyn Package>>,
metrics_tx: Sender<StreamMetrics>,
) -> Self {
let (data_tx, data_rx) = bounded(10);
let (ctrl_tx, ctrl_rx) = bounded(1);

let mut streams = Streams::new(config.clone(), package_tx, metrics_tx);
let mut streams = Streams::new(config.clone(), device_config, package_tx, metrics_tx);
streams.config_streams(config.streams.clone());

Self { data_tx, data_rx, config, streams, ctrl_rx, ctrl_tx }
Expand Down
24 changes: 18 additions & 6 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use flume::{bounded, Receiver, Sender};
pub use metrics::StreamMetrics;
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand All @@ -16,9 +17,8 @@ pub use actions_lane::{CtrlTx as ActionsLaneCtrlTx, StatusTx};
use data_lane::DataBridge;
pub use data_lane::{CtrlTx as DataLaneCtrlTx, DataTx};

use crate::config::{ActionRoute, StreamConfig};
use crate::{Action, ActionResponse, Config};
pub use metrics::StreamMetrics;
use crate::config::{ActionRoute, Config, DeviceConfig, StreamConfig};
use crate::{Action, ActionResponse};

pub trait Point: Send + Debug + Serialize + 'static {
fn stream_name(&self) -> &str;
Expand Down Expand Up @@ -80,14 +80,26 @@ pub struct Bridge {
impl Bridge {
pub fn new(
config: Arc<Config>,
device_config: Arc<DeviceConfig>,
package_tx: Sender<Box<dyn Package>>,
metrics_tx: Sender<StreamMetrics>,
actions_rx: Receiver<Action>,
shutdown_handle: Sender<()>,
) -> Self {
let data = DataBridge::new(config.clone(), package_tx.clone(), metrics_tx.clone());
let actions =
ActionsBridge::new(config, package_tx, actions_rx, shutdown_handle, metrics_tx);
let data = DataBridge::new(
config.clone(),
device_config.clone(),
package_tx.clone(),
metrics_tx.clone(),
);
let actions = ActionsBridge::new(
config,
device_config,
package_tx,
actions_rx,
shutdown_handle,
metrics_tx,
);
Self { data, actions }
}

Expand Down
26 changes: 18 additions & 8 deletions uplink/src/base/bridge/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ use std::sync::Arc;
use flume::Sender;
use log::{error, info, trace};

use super::stream::{self, StreamStatus};
use super::{Point, StreamMetrics};
use crate::config::StreamConfig;
use crate::{Config, Package, Stream};
use crate::config::{Config, DeviceConfig, StreamConfig};

use super::delaymap::DelayMap;
use super::{
delaymap::DelayMap,
stream::{self, Stream, StreamStatus},
Package, Point, StreamMetrics,
};

pub struct Streams<T> {
config: Arc<Config>,
device_config: Arc<DeviceConfig>,
data_tx: Sender<Box<dyn Package>>,
metrics_tx: Sender<StreamMetrics>,
map: HashMap<String, Stream<T>>,
Expand All @@ -22,10 +24,18 @@ pub struct Streams<T> {
impl<T: Point> Streams<T> {
pub fn new(
config: Arc<Config>,
device_config: Arc<DeviceConfig>,
data_tx: Sender<Box<dyn Package>>,
metrics_tx: Sender<StreamMetrics>,
) -> Self {
Self { config, data_tx, metrics_tx, map: HashMap::new(), stream_timeouts: DelayMap::new() }
Self {
config,
device_config,
data_tx,
metrics_tx,
map: HashMap::new(),
stream_timeouts: DelayMap::new(),
}
}

pub fn config_streams(&mut self, streams_config: HashMap<String, StreamConfig>) {
Expand All @@ -47,8 +57,8 @@ impl<T: Point> Streams<T> {

let stream = Stream::dynamic(
&stream_name,
&self.config.project_id,
&self.config.device_id,
&self.device_config.project_id,
&self.device_config.device_id,
self.data_tx.clone(),
);

Expand Down
14 changes: 9 additions & 5 deletions uplink/src/base/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ use std::io::Read;
use std::path::Path;
use std::sync::Mutex;

use crate::{Action, Config};
use rumqttc::{
AsyncClient, ConnectionError, Event, EventLoop, Incoming, MqttOptions, Packet, Publish, QoS,
Request, TlsConfiguration, Transport,
};
use std::sync::Arc;

use crate::config::{Config, DeviceConfig};
use crate::Action;

pub use self::metrics::MqttMetrics;

mod metrics;
Expand Down Expand Up @@ -66,12 +68,13 @@ pub struct Mqtt {
impl Mqtt {
pub fn new(
config: Arc<Config>,
device_config: &DeviceConfig,
actions_tx: Sender<Action>,
metrics_tx: Sender<MqttMetrics>,
network_up: Arc<Mutex<bool>>,
) -> Mqtt {
// create a new eventloop and reuse it during every reconnection
let options = mqttoptions(&config);
let options = mqttoptions(&config, device_config);
let (client, mut eventloop) = AsyncClient::new(options, 0);
eventloop.network_options.set_connection_timeout(config.mqtt.network_timeout);
let (ctrl_tx, ctrl_rx) = bounded(1);
Expand Down Expand Up @@ -287,14 +290,15 @@ impl Mqtt {
}
}

fn mqttoptions(config: &Config) -> MqttOptions {
fn mqttoptions(config: &Config, device_config: &DeviceConfig) -> MqttOptions {
// let (rsa_private, ca) = get_certs(&config.key.unwrap(), &config.ca.unwrap());
let mut mqttoptions = MqttOptions::new(&config.device_id, &config.broker, config.port);
let mut mqttoptions =
MqttOptions::new(&device_config.device_id, &device_config.broker, device_config.port);
mqttoptions.set_max_packet_size(config.mqtt.max_packet_size, config.mqtt.max_packet_size);
mqttoptions.set_keep_alive(Duration::from_secs(config.mqtt.keep_alive));
mqttoptions.set_inflight(config.mqtt.max_inflight);

if let Some(auth) = config.authentication.clone() {
if let Some(auth) = device_config.authentication.clone() {
let ca = auth.ca_certificate.into_bytes();
let device_certificate = auth.device_certificate.into_bytes();
let device_private_key = auth.device_private_key.into_bytes();
Expand Down
6 changes: 4 additions & 2 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ use std::{
use std::{io::Write, path::PathBuf};

use crate::base::actions::Cancellation;
use crate::{base::bridge::BridgeTx, config::DownloaderConfig, Action, ActionResponse, Config};
use crate::config::{Authentication, Config, DownloaderConfig};
use crate::{base::bridge::BridgeTx, Action, ActionResponse};

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -116,14 +117,15 @@ impl FileDownloader {
/// Creates a handler for download actions within uplink and uses HTTP to download files.
pub fn new(
config: Arc<Config>,
authentication: &Option<Authentication>,
actions_rx: Receiver<Action>,
bridge_tx: BridgeTx,
shutdown_rx: Receiver<DownloaderShutdown>,
disabled: Arc<Mutex<bool>>,
) -> Result<Self, Error> {
// Authenticate with TLS certs from config
let client_builder = ClientBuilder::new();
let client = match &config.authentication {
let client = match authentication {
Some(certs) => {
let ca = Certificate::from_pem(certs.ca_certificate.as_bytes())?;
let mut buf = BytesMut::from(certs.device_private_key.as_bytes());
Expand Down
8 changes: 6 additions & 2 deletions uplink/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,18 @@ pub struct PreconditionCheckerConfig {
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct Config {
pub struct DeviceConfig {
pub project_id: String,
pub device_id: String,
pub broker: String,
pub port: u16,
pub authentication: Option<Authentication>,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct Config {
#[serde(default)]
pub console: ConsoleConfig,
pub authentication: Option<Authentication>,
#[serde(default = "default_tcpapps")]
pub tcpapps: HashMap<String, AppConfig>,
pub mqtt: MqttConfig,
Expand Down
18 changes: 14 additions & 4 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub mod collector;
pub mod config;
pub mod mock;

use self::config::{ActionRoute, Config};
use self::config::{ActionRoute, Config, DeviceConfig};
pub use base::actions::{Action, ActionResponse};
use base::bridge::{stream::Stream, Bridge, Package, Payload, Point, StreamMetrics};
use base::monitor::Monitor;
Expand Down Expand Up @@ -85,6 +85,7 @@ where

pub struct Uplink {
config: Arc<Config>,
device_config: Arc<DeviceConfig>,
action_rx: Receiver<Action>,
action_tx: Sender<Action>,
data_rx: Receiver<Box<dyn Package>>,
Expand All @@ -98,7 +99,7 @@ pub struct Uplink {
}

impl Uplink {
pub fn new(config: Arc<Config>) -> Result<Uplink, Error> {
pub fn new(config: Arc<Config>, device_config: Arc<DeviceConfig>) -> Result<Uplink, Error> {
let (action_tx, action_rx) = bounded(10);
let (data_tx, data_rx) = bounded(10);
let (stream_metrics_tx, stream_metrics_rx) = bounded(10);
Expand All @@ -107,6 +108,7 @@ impl Uplink {

Ok(Uplink {
config,
device_config,
action_rx,
action_tx,
data_rx,
Expand All @@ -123,6 +125,7 @@ impl Uplink {
pub fn configure_bridge(&mut self) -> Bridge {
Bridge::new(
self.config.clone(),
self.device_config.clone(),
self.data_tx.clone(),
self.stream_metrics_tx(),
self.action_rx.clone(),
Expand All @@ -132,15 +135,21 @@ impl Uplink {

pub fn spawn(
&mut self,
device_config: &DeviceConfig,
mut bridge: Bridge,
downloader_disable: Arc<Mutex<bool>>,
network_up: Arc<Mutex<bool>>,
) -> Result<CtrlTx, Error> {
let (mqtt_metrics_tx, mqtt_metrics_rx) = bounded(10);
let (ctrl_actions_lane, ctrl_data_lane) = bridge.ctrl_tx();

let mut mqtt =
Mqtt::new(self.config.clone(), self.action_tx.clone(), mqtt_metrics_tx, network_up);
let mut mqtt = Mqtt::new(
self.config.clone(),
device_config,
self.action_tx.clone(),
mqtt_metrics_tx,
network_up,
);
let mqtt_client = mqtt.client();
let ctrl_mqtt = mqtt.ctrl_tx();

Expand All @@ -160,6 +169,7 @@ impl Uplink {
let actions_rx = bridge.register_action_routes(&self.config.downloader.actions)?;
let file_downloader = FileDownloader::new(
self.config.clone(),
&device_config.authentication,
actions_rx,
bridge.bridge_tx(),
ctrl_rx,
Expand Down
Loading

0 comments on commit 235b52a

Please sign in to comment.