Skip to content

Commit

Permalink
feat: allow option per-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Oct 14, 2024
1 parent 8252720 commit 534c74b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 7 deletions.
10 changes: 8 additions & 2 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ max_stream_count = 10
# All streams will first push the latest packet before pushing historical data in
# FIFO order, defaults to false. This solves the problem of bad networks leading to
# data being pushed so slow that it is practically impossible to track the device.
live_data_first = true
default_live_data_first = true

# MQTT client configuration
#
Expand Down Expand Up @@ -89,13 +89,19 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"]
# used when there is a network/system failure.
# - priority(optional, u8): Higher prioirity streams get to push their data
# onto the network first.
# - live_data_first(optional, bool): All streams will first push the latest packet
# before pushing historical data in FIFO order, defaults to false. This solves the
# problem of bad networks leading to data being pushed so slow that it is practically
# impossible to track the device.
#
# In the following config for the device_shadow stream we set batch_size to 1 and mark
# it as non-persistent. streams are internally constructed as a map of Name -> Config
# it as non-persistent, also setting up live_data_first to enable quick delivery of stats.
# Streams are internally constructed as a map of Name -> Config
[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
flush_period = 5
priority = 75
live_data_first = true

# Example using compression
[streams.imu]
Expand Down
4 changes: 2 additions & 2 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl StorageHandler {
let mut storage = Storage::new(
&stream_config.topic,
stream_config.persistence.max_file_size,
config.live_data_first,
stream_config.live_data_first,
);
if stream_config.persistence.max_file_count > 0 {
let mut path = config.persistence_path.clone();
Expand Down Expand Up @@ -264,7 +264,7 @@ impl StorageHandler {
Storage::new(
&stream.topic,
self.config.default_buf_size,
self.config.live_data_first,
self.config.default_live_data_first,
)
})
.write(publish)
Expand Down
5 changes: 4 additions & 1 deletion uplink/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub struct StreamConfig {
pub persistence: Persistence,
#[serde(default)]
pub priority: u8,
#[serde(default)]
pub live_data_first: bool,
}

impl Default for StreamConfig {
Expand All @@ -104,6 +106,7 @@ impl Default for StreamConfig {
compression: Compression::Disabled,
persistence: Persistence::default(),
priority: 0,
live_data_first: false,
}
}
}
Expand Down Expand Up @@ -537,7 +540,7 @@ pub struct Config {
pub precondition_checks: Option<PreconditionCheckerConfig>,
pub bus: Option<BusConfig>,
#[serde(default)]
pub live_data_first: bool,
pub default_live_data_first: bool,
}

impl Config {
Expand Down
8 changes: 6 additions & 2 deletions uplink/tests/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ async fn prefer_live_data() {
let mut config = Config::default();
config.default_buf_size = 1024 * 1024;
config.mqtt.max_packet_size = 1024 * 1024;
config.live_data_first = true;
let config = Arc::new(config);
let (data_tx, data_rx) = bounded(0);
let (net_tx, req_rx) = bounded(0);
Expand All @@ -237,7 +236,12 @@ async fn prefer_live_data() {
spawn(async {
let mut default = MockCollector::new(
"default",
StreamConfig { topic: "topic/default".to_owned(), batch_size: 1, ..Default::default() },
StreamConfig {
topic: "topic/default".to_owned(),
batch_size: 1,
live_data_first: true,
..Default::default()
},
data_tx,
);
for i in 0.. {
Expand Down

0 comments on commit 534c74b

Please sign in to comment.