Skip to content

Commit

Permalink
refactor: DRY serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Oct 9, 2024
1 parent 8528898 commit 2498e28
Showing 1 changed file with 68 additions and 73 deletions.
141 changes: 68 additions & 73 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,23 +381,7 @@ impl<C: MqttClient> Serializer<C> {
select! {
data = self.collector_rx.recv_async() => {
let data = data?;
let stream = data.stream_config();
let publish = construct_publish(data, &mut self.stream_metrics)?;
let storage = self.storage_handler.select(&stream);
match write_to_storage(publish, storage) {
Ok(Some(deleted)) => {
debug!("Lost segment = {deleted}");
self.metrics.increment_lost_segments();
}
Ok(_) => {},
Err(e) => {
error!("Storage write error = {e}");
self.metrics.increment_errors();
}
};

// Update metrics
self.metrics.add_batch();
store_received_data(data, &mut self.storage_handler,&mut self.stream_metrics, &mut self.metrics)?;
}
o = &mut publish => match o {
Ok(_) => break Ok(Status::EventLoopReady),
Expand Down Expand Up @@ -438,82 +422,38 @@ impl<C: MqttClient> Serializer<C> {
self.metrics.set_mode("catchup");

let max_packet_size = self.config.mqtt.max_packet_size;
let client = self.client.clone();

let Some((stream, storage)) = self.storage_handler.next(&mut self.metrics) else {
let Some((mut last_publish_stream, publish)) =
next_publish(&mut self.storage_handler, &mut self.metrics, max_packet_size)?
else {
return Ok(Status::Normal);
};

// TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk.
// This leads to force switching to normal mode. Increasing max_payload_size to bypass this
let publish = match Packet::read(storage.reader(), max_packet_size) {
Ok(Packet::Publish(publish)) => publish,
Ok(packet) => unreachable!("Unexpected packet: {:?}", packet),
Err(e) => {
self.metrics.increment_errors();
error!("Failed to read from storage. Forcing into Normal mode. Error = {e}");
save_and_prepare_next_metrics(
&mut self.pending_metrics,
&mut self.metrics,
&mut self.stream_metrics,
&self.storage_handler,
);
return Ok(Status::Normal);
}
};

let mut last_publish_payload_size = publish.payload.len();
let mut last_publish_stream = stream.clone();
let send = send_publish(client, publish.topic, publish.payload);
let send = send_publish(self.client.clone(), publish.topic, publish.payload);
tokio::pin!(send);

let v: Result<Status, Error> = loop {
select! {
data = self.collector_rx.recv_async() => {
let data = data?;
let stream = data.stream_config();
let publish = construct_publish(data, &mut self.stream_metrics)?;
let storage = self.storage_handler.select(&stream);
match write_to_storage(publish, storage) {
Ok(Some(deleted)) => {
debug!("Lost segment = {deleted}");
self.metrics.increment_lost_segments();
}
Ok(_) => {},
Err(e) => {
error!("Storage write error = {e}");
self.metrics.increment_errors();
}
};

// Update metrics
self.metrics.add_batch();
store_received_data(data, &mut self.storage_handler,&mut self.stream_metrics, &mut self.metrics)?;
}
o = &mut send => {
self.metrics.add_sent_size(last_publish_payload_size);
// Send failure implies eventloop crash. Switch state to
// indefinitely write to disk to not loose data
let client = match o {
Ok(c) => c,
Err(MqttError::Send(Request::Publish(publish))) => break Ok(Status::EventLoopCrash(publish, last_publish_stream.clone())),
Err(e) => unreachable!("Unexpected error: {e}"),
};

let Some((stream, storage)) = self.storage_handler.next(&mut self.metrics) else {
return Ok(Status::Normal);
};

let publish = match Packet::read(storage.reader(), max_packet_size) {
Ok(Packet::Publish(publish)) => publish,
Ok(packet) => unreachable!("Unexpected packet: {:?}", packet),
Err(e) => {
error!("Failed to read from storage. Forcing into Normal mode. Error = {e}");
break Ok(Status::Normal)
Err(MqttError::Send(Request::Publish(publish))) => {
break Ok(Status::EventLoopCrash(publish, last_publish_stream.clone()))
}
Err(e) => unreachable!("Unexpected error: {e}"),
};

self.metrics.add_batch();

let Some((stream, publish)) = next_publish(&mut self.storage_handler, &mut self.metrics, max_packet_size)?
else {
return Ok(Status::Normal);
};
let payload = publish.payload;
last_publish_payload_size = payload.len();
last_publish_stream = stream.clone();
Expand Down Expand Up @@ -606,6 +546,61 @@ impl<C: MqttClient> Serializer<C> {
}
}

// Selects the right read buffer for storage and serializes received data as a Publish packet into it.
// Updates metrics regarding the serializer as well.
fn store_received_data(
data: Box<dyn Package>,
storage_handler: &mut StorageHandler,
stream_metrics: &mut HashMap<String, StreamMetrics>,
metrics: &mut Metrics,
) -> Result<(), Error> {
let stream = data.stream_config();
let publish = construct_publish(data, stream_metrics)?;
let storage = storage_handler.select(&stream);
match write_to_storage(publish, storage) {
Ok(Some(deleted)) => {
debug!("Lost segment = {deleted}");
metrics.increment_lost_segments();
}
Ok(_) => {}
Err(e) => {
error!("Storage write error = {e}");
metrics.increment_errors();
}
};

// Update metrics
metrics.add_batch();

Ok(())
}

// Deserializes a Publish packet from the storage read buffer and updates metrics regarding the serializer.
fn next_publish(
storage_handler: &mut StorageHandler,
metrics: &mut Metrics,
max_packet_size: usize,
) -> Result<Option<(Arc<StreamConfig>, Publish)>, Error> {
let Some((stream, storage)) = storage_handler.next(metrics) else {
return Ok(None);
};

// TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk.
// This leads to force switching to normal mode. Increasing max_payload_size to bypass this
let publish = match Packet::read(storage.reader(), max_packet_size) {
Ok(Packet::Publish(publish)) => publish,
Ok(packet) => unreachable!("Unexpected packet: {:?}", packet),
Err(e) => {
error!("Failed to read from storage. Forcing into Normal mode. Error = {e}");
return Ok(None);
}
};

metrics.add_batch();

Ok(Some((stream.clone(), publish)))
}

async fn send_publish<C: MqttClient>(
client: C,
topic: String,
Expand Down

0 comments on commit 2498e28

Please sign in to comment.