Skip to content

Commit

Permalink
grpc codec simplification - get rid of NoData message
Browse files Browse the repository at this point in the history
  • Loading branch information
zehiko committed Dec 19, 2024
1 parent 662e404 commit af6c0dd
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 153 deletions.
10 changes: 3 additions & 7 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ async fn stream_recording_async(
.await
.map_err(TonicStatusError)?
.into_inner()
.filter_map(|resp| {
.map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
})
.collect::<Result<Vec<_>, tonic::Status>>()
.await
Expand Down Expand Up @@ -225,12 +224,11 @@ async fn stream_recording_async(
.await
.map_err(TonicStatusError)?
.into_inner()
.filter_map(|resp| {
.map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
});

drop(client);
Expand Down Expand Up @@ -340,20 +338,18 @@ async fn stream_catalog_async(
re_log::debug!("Fetching catalog…");

let mut resp = client
// TODO(zehiko) add support for fetching specific columns and rows
.query_catalog(QueryCatalogRequest {
column_projection: None, // fetch all columns
filter: None, // fetch all rows
})
.await
.map_err(TonicStatusError)?
.into_inner()
.filter_map(|resp| {
.map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
});

drop(client);
Expand Down
52 changes: 10 additions & 42 deletions crates/store/re_log_encoding/src/codec/wire/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,23 @@
use super::MessageHeader;
use super::TransportMessageV0;
use crate::codec::arrow::read_arrow_from_bytes;
use crate::codec::CodecError;
use re_chunk::TransportChunk;

impl MessageHeader {
pub(crate) fn decode(read: &mut impl std::io::Read) -> Result<Self, CodecError> {
let mut buffer = [0_u8; Self::SIZE_BYTES];
read.read_exact(&mut buffer)
.map_err(CodecError::HeaderDecoding)?;

let header = u8::from_le(buffer[0]);

Ok(Self(header))
}
}

impl TransportMessageV0 {
pub(crate) fn from_bytes(data: &[u8]) -> Result<Self, CodecError> {
let mut reader = std::io::Cursor::new(data);
let header = MessageHeader::decode(&mut reader)?;

match header {
MessageHeader::NO_DATA => Ok(Self::NoData),
MessageHeader::RECORD_BATCH => {
let (schema, data) = read_arrow_from_bytes(&mut reader)?;

let tc = TransportChunk {
schema: schema.clone(),
data,
};

Ok(Self::RecordBatch(tc))
}
_ => Err(CodecError::UnknownMessageHeader),
}
}
}

/// Decode transport data from a byte stream - if there's a record batch present, return it, otherwise return `None`.
pub fn decode(
version: re_protos::common::v0::EncoderVersion,
data: &[u8],
) -> Result<Option<TransportChunk>, CodecError> {
) -> Result<TransportChunk, CodecError> {
match version {
re_protos::common::v0::EncoderVersion::V0 => {
let msg = TransportMessageV0::from_bytes(data)?;
match msg {
TransportMessageV0::RecordBatch(chunk) => Ok(Some(chunk)),
TransportMessageV0::NoData => Ok(None),
}
let mut reader = std::io::Cursor::new(data);
let (schema, data) = read_arrow_from_bytes(&mut reader)?;

let tc = TransportChunk {
schema: schema.clone(),
data,
};

Ok(tc)
}
}
}
48 changes: 4 additions & 44 deletions crates/store/re_log_encoding/src/codec/wire/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,18 @@
use super::MessageHeader;
use super::TransportMessageV0;
use crate::codec::arrow::write_arrow_to_bytes;
use crate::codec::CodecError;
use re_chunk::TransportChunk;

impl MessageHeader {
pub(crate) fn encode(&self, write: &mut impl std::io::Write) -> Result<(), CodecError> {
write
.write_all(&[self.0])
.map_err(CodecError::HeaderEncoding)?;

Ok(())
}
}

impl TransportMessageV0 {
pub(crate) fn to_bytes(&self) -> Result<Vec<u8>, CodecError> {
match self {
Self::NoData => {
let mut data: Vec<u8> = Vec::new();
MessageHeader::NO_DATA.encode(&mut data)?;
Ok(data)
}
Self::RecordBatch(chunk) => {
let mut data: Vec<u8> = Vec::new();
MessageHeader::RECORD_BATCH.encode(&mut data)?;

write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?;

Ok(data)
}
}
}
}

/// Encode a `NoData` message into a byte stream. This can be used by the remote store
/// (i.e. data producer) to signal back to the client that there's no data available.
pub fn no_data(version: re_protos::common::v0::EncoderVersion) -> Result<Vec<u8>, CodecError> {
match version {
re_protos::common::v0::EncoderVersion::V0 => TransportMessageV0::NoData.to_bytes(),
}
}

// TODO(zehiko) add support for separately encoding schema from the record batch to get rid of overhead
// of sending schema in each transport message for the same stream of batches. This will require codec
// to become stateful and keep track if schema was sent / received.
/// Encode a transport chunk into a byte stream.
pub fn encode(
version: re_protos::common::v0::EncoderVersion,
chunk: TransportChunk,
) -> Result<Vec<u8>, CodecError> {
match version {
re_protos::common::v0::EncoderVersion::V0 => {
TransportMessageV0::RecordBatch(chunk).to_bytes()
let mut data: Vec<u8> = Vec::new();
write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?;

Ok(data)
}
}
}
58 changes: 1 addition & 57 deletions crates/store/re_log_encoding/src/codec/wire/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,6 @@ pub mod encoder;
pub use decoder::decode;
pub use encoder::encode;

use re_chunk::TransportChunk;

#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)]
pub struct MessageHeader(pub u8);

impl MessageHeader {
pub const NO_DATA: Self = Self(1);
pub const RECORD_BATCH: Self = Self(2);

pub const SIZE_BYTES: usize = 1;
}

#[derive(Debug)]
pub enum TransportMessageV0 {
NoData,
RecordBatch(TransportChunk),
}

#[cfg(test)]
mod tests {
use crate::{
Expand Down Expand Up @@ -55,32 +37,6 @@ mod tests {
.unwrap()
}

#[test]
fn test_message_v0_no_data() {
let msg = TransportMessageV0::NoData;
let data = msg.to_bytes().unwrap();
let decoded = TransportMessageV0::from_bytes(&data).unwrap();
assert!(matches!(decoded, TransportMessageV0::NoData));
}

#[test]
fn test_message_v0_record_batch() {
let expected_chunk = get_test_chunk();

let msg = TransportMessageV0::RecordBatch(expected_chunk.clone().to_transport().unwrap());
let data = msg.to_bytes().unwrap();
let decoded = TransportMessageV0::from_bytes(&data).unwrap();

#[allow(clippy::match_wildcard_for_single_variants)]
match decoded {
TransportMessageV0::RecordBatch(transport) => {
let decoded_chunk = Chunk::from_transport(&transport).unwrap();
assert_eq!(expected_chunk, decoded_chunk);
}
_ => panic!("unexpected message type"),
}
}

#[test]
fn test_invalid_batch_data() {
let data = vec![2, 3, 4]; // '1' is NO_DATA message header
Expand All @@ -92,18 +48,6 @@ mod tests {
));
}

#[test]
fn test_unknown_header() {
let data = vec![3];
let decoded = TransportMessageV0::from_bytes(&data);
assert!(decoded.is_err());

assert!(matches!(
decoded.err().unwrap(),
CodecError::UnknownMessageHeader
));
}

#[test]
fn test_v0_codec() {
let expected_chunk = get_test_chunk();
Expand All @@ -113,7 +57,7 @@ mod tests {
expected_chunk.clone().to_transport().unwrap(),
)
.unwrap();
let decoded = decode(EncoderVersion::V0, &encoded).unwrap().unwrap();
let decoded = decode(EncoderVersion::V0, &encoded).unwrap();
let decoded_chunk = Chunk::from_transport(&decoded).unwrap();

assert_eq!(expected_chunk, decoded_chunk);
Expand Down
3 changes: 0 additions & 3 deletions rerun_py/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl PyStorageNodeClient {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
})
.collect::<Result<Vec<_>, _>>()
.await
Expand Down Expand Up @@ -187,8 +186,6 @@ impl PyStorageNodeClient {
.into_inner();
let metadata = decode(resp.encoder_version(), &resp.payload)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?
// TODO(zehiko) this is going away soon
.ok_or(PyRuntimeError::new_err("No metadata"))?;

let recording_id = metadata
.all_columns()
Expand Down

0 comments on commit af6c0dd

Please sign in to comment.