Skip to content

Commit

Permalink
Handle bincode deserialization errors, report error count
Browse files Browse the repository at this point in the history
  • Loading branch information
DJAndries committed Jul 23, 2023
1 parent a6be13f commit 2d59825
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "1.0.0"
edition = "2021"

[dependencies]
star-constellation = { git = "https://github.com/brave/constellation", rev = "b6b8396abd98cc87a187e051c32a291c9faa43f7" }
star-constellation = { git = "https://github.com/brave/constellation", rev = "91e69504c0419b01e4fdd47b453fb172a8f5ed2d" }
sta-rs = { git = "https://github.com/brave/sta-rs", rev = "b08701396b4beaeec1b12382adf6bf8303ba9cd5" }
actix-web = "4"
env_logger = "0.10"
Expand Down
9 changes: 8 additions & 1 deletion src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ pub async fn start_aggregation(

let measurement_counts = try_join_all(tasks).await?;

let total_measurement_count = measurement_counts.iter().sum::<i64>();
let total_measurement_count = measurement_counts.iter().map(|(c, _)| c).sum::<i64>();
let total_error_count = measurement_counts.iter().map(|(_, e)| e).sum::<usize>();

if let Some(out_stream) = out_stream.as_ref() {
wait_and_commit_producer(out_stream).await?;
Expand All @@ -162,6 +163,12 @@ pub async fn start_aggregation(
.await;

info!("Reported {} final measurements", total_measurement_count);
if total_error_count > 0 {
error!(
"Failed to recover {} measurements due to key recovery/deserialization errors",
total_error_count
);
}

info!("Profiler summary:\n{}", profiler.summary().await);
}
Expand Down
55 changes: 41 additions & 14 deletions src/aggregator/processing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::group::GroupedMessages;
use super::group::{GroupedMessages, MessageChunk};
use super::recovered::RecoveredMessages;
use super::report::report_measurements;
use super::AggregatorError;
Expand All @@ -7,6 +7,7 @@ use crate::models::{DBConnection, DBPool, DBStorageConnections, PendingMessage,
use crate::profiler::{Profiler, ProfilerStat};
use crate::record_stream::{DynRecordStream, RecordStreamArc};
use crate::star::{parse_message, recover_key, recover_msgs, AppSTARError, MsgRecoveryInfo};
use star_constellation::api::NestedMessage;
use star_constellation::Error as ConstellationError;
use std::sync::{Arc, Mutex};
use std::time::Instant;
Expand Down Expand Up @@ -44,13 +45,24 @@ pub async fn process_expired_epochs(
Ok(())
}

fn store_new_msgs_in_chunk(
chunk: &mut MessageChunk,
msgs: &mut Vec<NestedMessage>,
new_msg_count: usize,
) {
for msg in msgs.drain(..new_msg_count) {
chunk.new_msgs.push(msg);
}
}

fn process_one_layer(
grouped_msgs: &mut GroupedMessages,
rec_msgs: &mut RecoveredMessages,
k_threshold: usize,
) -> Result<(GroupedMessages, Vec<(u8, Vec<u8>)>, bool), AggregatorError> {
) -> Result<(GroupedMessages, Vec<(u8, Vec<u8>)>, usize, bool), AggregatorError> {
let mut next_grouped_msgs = GroupedMessages::default();
let mut pending_tags_to_remove = Vec::new();
let mut error_count = 0;
let mut has_processed = false;

for (epoch, epoch_map) in &mut grouped_msgs.msg_chunks {
Expand Down Expand Up @@ -81,10 +93,8 @@ fn process_one_layer(
Err(e) => {
match e {
AppSTARError::Recovery(ConstellationError::ShareRecovery) => {
// Store new messages until we receive more shares in the future
for msg in msgs.drain(..new_msg_count) {
chunk.new_msgs.push(msg);
}
// Store new messages until we receive more shares in the future.
store_new_msgs_in_chunk(chunk, &mut msgs, new_msg_count);
continue;
}
_ => return Err(AggregatorError::AppSTAR(e)),
Expand All @@ -94,16 +104,26 @@ fn process_one_layer(
}
};

let msgs_len = msgs.len() as i64;
let msgs_len = msgs.len();

let MsgRecoveryInfo {
measurement,
next_layer_messages,
} = recover_msgs(msgs, &key)?;
} = match recover_msgs(msgs, &key) {
Err(e) => match e {
AppSTARError::Bincode(_) => {
// A bincode deserialization error could mean a bad key.
error_count += msgs_len;
continue;
}
_ => return Err(e.into()),
},
Ok(info) => info,
};

// create or update recovered msg with new count
if let Some(rec_msg) = existing_rec_msg {
rec_msg.count += msgs_len;
rec_msg.count += msgs_len as i64;
} else {
rec_msgs.add(RecoveredMessage {
id: 0,
Expand All @@ -112,7 +132,7 @@ fn process_one_layer(
metric_name: measurement.0,
metric_value: measurement.1,
parent_recovered_msg_tag: chunk.parent_msg_tag.clone(),
count: msgs_len,
count: msgs_len as i64,
key: key.to_vec(),
has_children: next_layer_messages.is_some(),
});
Expand All @@ -132,7 +152,12 @@ fn process_one_layer(
}
}

Ok((next_grouped_msgs, pending_tags_to_remove, has_processed))
Ok((
next_grouped_msgs,
pending_tags_to_remove,
error_count,
has_processed,
))
}

pub fn start_subtask(
Expand All @@ -144,13 +169,14 @@ pub fn start_subtask(
k_threshold: usize,
epoch_config: Arc<EpochConfig>,
profiler: Arc<Profiler>,
) -> JoinHandle<i64> {
) -> JoinHandle<(i64, usize)> {
tokio::spawn(async move {
let mut pending_tags_to_remove = Vec::new();

let mut rec_msgs = RecoveredMessages::default();

let processing_start_instant = Instant::now();
let mut error_count = 0;

let mut it_count = 1;
loop {
Expand All @@ -173,8 +199,9 @@ pub fn start_subtask(
.unwrap();

debug!("Task {}: Starting actual processing", id);
let (new_grouped_msgs, pending_tags_to_remove_chunk, has_processed) =
let (new_grouped_msgs, pending_tags_to_remove_chunk, layer_error_count, has_processed) =
process_one_layer(&mut grouped_msgs, &mut rec_msgs, k_threshold).unwrap();
error_count += layer_error_count;

pending_tags_to_remove.extend(pending_tags_to_remove_chunk);

Expand Down Expand Up @@ -224,6 +251,6 @@ pub fn start_subtask(
.record_range_time(ProfilerStat::TaskProcessingTime, processing_start_instant)
.await;

measurements_count
(measurements_count, error_count)
})
}
12 changes: 7 additions & 5 deletions src/star.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use derive_more::{Display, Error, From};
use rand::seq::IteratorRandom;
use rand::thread_rng;
use star_constellation::api::{
key_recover, recover, NestedMessage, PartialMeasurement, SerializableNestedMessage,
};
Expand Down Expand Up @@ -52,10 +54,10 @@ pub fn recover_key(
k_threshold: usize,
) -> Result<Vec<u8>, AppSTARError> {
let msgs_to_use = min(k_threshold + (k_threshold / 3), messages.len());
let unencrypted_layers: Vec<_> = messages[..msgs_to_use]
let unencrypted_layers: Vec<_> = messages
.iter()
.map(|v| &v.unencrypted_layer)
.collect();
.choose_multiple(&mut thread_rng(), msgs_to_use);

Ok(key_recover(&unencrypted_layers, epoch_tag)?)
}
Expand All @@ -76,10 +78,10 @@ pub fn recover_msgs(
.filter(|(_, pm)| pm.get_next_layer_key().as_ref().is_some())
.map(|(mut msg, pm)| {
let layer_key = pm.get_next_layer_key().as_ref().unwrap();
msg.decrypt_next_layer(layer_key);
msg
msg.decrypt_next_layer(layer_key)?;
Ok(msg)
})
.collect(),
.collect::<Result<Vec<_>, AppSTARError>>()?,
)
} else {
None
Expand Down

0 comments on commit 2d59825

Please sign in to comment.