Skip to content

Commit

Permalink
doc: improve documentation of uplink base, with some refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Nov 13, 2024
1 parent 912284a commit 115291e
Show file tree
Hide file tree
Showing 13 changed files with 491 additions and 348 deletions.
15 changes: 8 additions & 7 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use flume::{bounded, Receiver, RecvError, Sender, TrySendError};
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::time::{self, interval, Instant, Sleep};

use std::collections::HashSet;
use std::fs;
use std::path::PathBuf;
use std::{collections::HashMap, fmt::Debug, pin::Pin, sync::Arc, time::Duration};

use crate::base::actions::Cancellation;
use crate::config::{ActionRoute, Config, DeviceConfig};
use crate::{Action, ActionResponse};
Expand Down Expand Up @@ -96,8 +98,7 @@ impl ActionsBridge {
action_status.batch_size = 1;

streams_config.insert("action_status".to_owned(), action_status);
let mut streams = Streams::new(1, device_config, package_tx, metrics_tx);
streams.config_streams(streams_config);
let streams = Streams::new(1, device_config, streams_config, package_tx, metrics_tx);

Self {
status_tx,
Expand Down
78 changes: 50 additions & 28 deletions uplink/src/base/bridge/data_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,114 +4,136 @@ use flume::{bounded, Receiver, RecvError, Sender};
use log::{debug, error};
use tokio::{select, time::interval};

use crate::config::{Config, DeviceConfig};

use super::{streams::Streams, DataBridgeShutdown, Package, Payload, StreamMetrics};
use crate::config::{Config, DeviceConfig};

/// Custom error type for `DataBridge`
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Receiver error {0}")]
#[error("DataBridge receive error: {0}")]
Recv(#[from] RecvError),
}

/// A bridge to manage data transmission from applications to data streams with metrics and control signals.
pub struct DataBridge {
/// All configuration
/// Shared configuration
config: Arc<Config>,
/// Tx handle to give to apps
/// Sender for data payloads from applications
data_tx: Sender<Payload>,
/// Rx to receive data from apps
/// Receiver for data payloads
data_rx: Receiver<Payload>,
/// Handle to send data over streams
/// Handler for managing data streams
streams: Streams<Payload>,
/// Receiver for control signals to shutdown or control the bridge
ctrl_rx: Receiver<DataBridgeShutdown>,
/// Sender for control signals to shutdown or control the bridge
ctrl_tx: Sender<DataBridgeShutdown>,
}

impl DataBridge {
/// Creates a new `DataBridge` with the specified configuration, device config, and channels for metrics and packages.
pub fn new(
config: Arc<Config>,
device_config: Arc<DeviceConfig>,
package_tx: Sender<Box<dyn Package>>,
metrics_tx: Sender<StreamMetrics>,
) -> Self {
// Channels for data payload and control messages
let (data_tx, data_rx) = bounded(10);
let (ctrl_tx, ctrl_rx) = bounded(1);

let mut streams =
Streams::new(config.max_stream_count, device_config, package_tx, metrics_tx);
streams.config_streams(config.streams.clone());
// Initialize stream handler with configuration
let streams = Streams::new(
config.max_stream_count,
device_config,
config.streams.clone(),
package_tx,
metrics_tx,
);

Self { data_tx, data_rx, config, streams, ctrl_rx, ctrl_tx }
Self { config, data_tx, data_rx, streams, ctrl_rx, ctrl_tx }
}

/// Handle to send data points from source application
/// Returns a handle for sending data payloads.
pub fn data_tx(&self) -> DataTx {
DataTx { inner: self.data_tx.clone() }
}

/// Handle to send data lane control message
/// Returns a handle for sending control messages to the data bridge.
pub fn ctrl_tx(&self) -> CtrlTx {
CtrlTx { inner: self.ctrl_tx.clone() }
}

/// Starts the main loop for the `DataBridge`, handling data reception, stream timeouts, metrics flushing, and shutdown.
pub async fn start(&mut self) -> Result<(), Error> {
let mut metrics_timeout = interval(self.config.stream_metrics.timeout);

loop {
select! {
// Process incoming data payloads
data = self.data_rx.recv_async() => {
let data = data?;
self.streams.forward(data).await;
}
// Flush streams that timeout

// Handle stream timeouts and flush expired streams
Some(timedout_stream) = self.streams.stream_timeouts.next(), if self.streams.stream_timeouts.has_pending() => {
debug!("Flushing stream = {timedout_stream}");
debug!("Flushing timed-out stream: {timedout_stream}");
if let Err(e) = self.streams.flush_stream(&timedout_stream).await {
error!("Failed to flush stream = {timedout_stream}. Error = {e}");
error!("Failed to flush stream: {timedout_stream}. Error: {e}");
}
}
// Flush all metrics when timed out

// Periodically flush metrics for all streams
_ = metrics_timeout.tick() => {
if let Err(e) = self.streams.check_and_flush_metrics() {
debug!("Failed to flush stream metrics. Error = {e}");
debug!("Failed to flush stream metrics. Error: {e}");
}
}
// Handle a shutdown signal

// Handle shutdown signal and flush all remaining data
_ = self.ctrl_rx.recv_async() => {
self.streams.flush_all().await;

return Ok(())
return Ok(());
}
}
}
}
}

/// Handle for apps to send action status to bridge
/// A handle for applications to send data payloads to the `DataBridge`.
#[derive(Debug, Clone)]
pub struct DataTx {
pub inner: Sender<Payload>,
}

impl DataTx {
/// Sends a payload asynchronously.
pub async fn send_payload(&self, payload: Payload) {
self.inner.send_async(payload).await.unwrap()
if let Err(e) = self.inner.send_async(payload).await {
error!("Failed to send payload asynchronously: {e}");
}
}

/// Sends a payload synchronously.
pub fn send_payload_sync(&self, payload: Payload) {
self.inner.send(payload).unwrap()
if let Err(e) = self.inner.send(payload) {
error!("Failed to send payload synchronously: {e}");
}
}
}

/// Handle to send control messages to data lane
/// A handle to control the `DataBridge`, allowing shutdown signals to be sent.
#[derive(Debug, Clone)]
pub struct CtrlTx {
pub(crate) inner: Sender<DataBridgeShutdown>,
inner: Sender<DataBridgeShutdown>,
}

impl CtrlTx {
/// Triggers shutdown of `bridge::data_lane`
/// Triggers an asynchronous shutdown of the `DataBridge`.
pub async fn trigger_shutdown(&self) {
self.inner.send_async(DataBridgeShutdown).await.unwrap()
if let Err(e) = self.inner.send_async(DataBridgeShutdown).await {
error!("Failed to send shutdown signal: {e}");
}
}
}
52 changes: 25 additions & 27 deletions uplink/src/base/bridge/delaymap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,50 @@ use log::warn;
use tokio_stream::StreamExt;
use tokio_util::time::{delay_queue::Key, DelayQueue};

/// A map to store and retrieve delays from a DelayQueue.
/// A structure to manage delayed items, allowing items to be added with a timeout period,
/// removed when expired, or queried to check if any items remain pending.
#[derive(Debug, Default)]
pub struct DelayMap<T> {
/// A delay queue for managing timed entries
queue: DelayQueue<T>,
/// A map to track item keys for efficient removal from the queue
map: HashMap<T, Key>,
}

impl<T: Eq + Hash + Clone + Display> DelayMap<T> {
pub fn new() -> Self {
Self { queue: DelayQueue::new(), map: HashMap::new() }
}

// Removes timeout if it exists, else returns false.
/// Removes a timeout from the `DelayMap` for the specified `item`.
/// Logs a warning if the item was not found in the map.
pub fn remove(&mut self, item: &T) {
let Some(key) = self.map.remove(item) else {
warn!("Timeout couldn't be removed from DelayMap: {item}");
return;
};
self.queue.remove(&key);
if let Some(key) = self.map.remove(item) {
self.queue.remove(&key);
} else {
warn!("Attempted to remove non-existent timeout from DelayMap: {item}");
}
}

// Insert new timeout.
/// Inserts a new timeout for the specified `item` with a given `period`.
/// If the item already exists in the map, logs a warning but replaces the existing timeout.
pub fn insert(&mut self, item: &T, period: Duration) {
let key = self.queue.insert(item.clone(), period);
if self.map.insert(item.to_owned(), key).is_some() {
warn!("Timeout might have already been in DelayMap: {item}");
if self.map.insert(item.clone(), key).is_some() {
warn!("Timeout for {item} was already present in DelayMap and has been replaced.");
}
}

// Remove a key from map if it has timedout.
/// Waits for the next item to time out, removes it from the map, and returns it.
/// Returns `None` if the queue is empty or no items have timed out.
pub async fn next(&mut self) -> Option<T> {
if let Some(item) = self.queue.next().await {
self.map.remove(item.get_ref());
return Some(item.into_inner());
match self.queue.next().await {
Some(item) => {
self.map.remove(item.get_ref());
Some(item.into_inner())
}
None => None,
}

None
}

// Check if queue is empty.
/// Returns `true` if there are any pending timeouts in the queue.
pub fn has_pending(&self) -> bool {
!self.queue.is_empty()
}
}

impl<T: Eq + Hash + Clone + Display> Default for DelayMap<T> {
fn default() -> Self {
Self::new()
}
}
30 changes: 26 additions & 4 deletions uplink/src/base/bridge/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
use serde::Serialize;
use std::time::Instant;

use serde::Serialize;

use crate::base::clock;

/// Metrics collected for a stream, tracking batch statistics and latency information.
#[derive(Debug, Serialize, Clone)]
pub struct StreamMetrics {
// Timestamp for the first point in the current sequence.
pub timestamp: u128,
// Sequence number, incremented for each new batch.
pub sequence: u32,
// Name of the stream being tracked.
pub stream: String,
// Total points collected in the current batch.
pub points: usize,
// Count of batches processed.
pub batches: u64,
// Maximum allowed points per batch.
pub max_batch_points: usize,
// Start time for the current batch, used for latency.
#[serde(skip_serializing)]
pub batch_start_time: Instant,
// Sum of latencies across all batches.
#[serde(skip_serializing)]
pub total_latency: u64,
// Minimum latency observed in any batch.
pub min_batch_latency: u64,
// Maximum latency observed in any batch.
pub max_batch_latency: u64,
// Average latency across all batches.
pub average_batch_latency: u64,
}

impl StreamMetrics {
/// Creates a new `StreamMetrics` instance with a specified stream name and max points per batch.
pub fn new(name: &str, max_batch_points: usize) -> Self {
StreamMetrics {
stream: name.to_owned(),
Expand All @@ -32,45 +46,53 @@ impl StreamMetrics {
batch_start_time: Instant::now(),
total_latency: 0,
average_batch_latency: 0,
min_batch_latency: 0,
min_batch_latency: u64::MAX, // Initialized to max to track minimum latency.
max_batch_latency: 0,
}
}

/// Returns the stream name.
pub fn stream(&self) -> &String {
&self.stream
}

/// Returns the number of points collected in the current batch.
pub fn points(&self) -> usize {
self.points
}

/// Adds a point to the current batch, updating the timestamp for the first point.
pub fn add_point(&mut self) {
self.points += 1;
// Set the timestamp to the current time if this is the first point.
if self.points == 1 {
self.timestamp = clock();
}
}

/// Adds a batch, updating latency metrics (min, max, average).
pub fn add_batch(&mut self) {
self.batches += 1;

// Calculate the current batch latency.
let latency = self.batch_start_time.elapsed().as_millis() as u64;

// Update max, min, total, and average latencies.
self.max_batch_latency = self.max_batch_latency.max(latency);
self.min_batch_latency = self.min_batch_latency.min(latency);
self.total_latency += latency;
self.average_batch_latency = self.total_latency / self.batches;
}

/// Prepares the metrics for the next batch or sequence by resetting counters.
pub fn prepare_next(&mut self) {
self.timestamp = clock();
self.sequence += 1;
self.batches = 0;
self.points = 0;
self.batches = 0;
self.batch_start_time = Instant::now();
self.total_latency = 0;
self.min_batch_latency = 0;
self.min_batch_latency = u64::MAX;
self.max_batch_latency = 0;
self.average_batch_latency = 0;
}
Expand Down
Loading

0 comments on commit 115291e

Please sign in to comment.