From af6c0ddaa1a90f87e7ef883526e98e4d002afca3 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 19 Dec 2024 16:47:23 +0100 Subject: [PATCH] grpc codec simplification - get rid of NoData message --- crates/store/re_grpc_client/src/lib.rs | 10 +--- .../re_log_encoding/src/codec/wire/decoder.rs | 52 ++++------------- .../re_log_encoding/src/codec/wire/encoder.rs | 48 ++------------- .../re_log_encoding/src/codec/wire/mod.rs | 58 +------------------ rerun_py/src/remote.rs | 3 - 5 files changed, 18 insertions(+), 153 deletions(-) diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index f324862dc3a8..9e6fa3bf7680 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -191,12 +191,11 @@ async fn stream_recording_async( .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, tonic::Status>>() .await @@ -225,12 +224,11 @@ async fn stream_recording_async( .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }); drop(client); @@ -340,7 +338,6 @@ async fn stream_catalog_async( re_log::debug!("Fetching catalog…"); let mut resp = client - // TODO(zehiko) add support for fetching specific columns and rows .query_catalog(QueryCatalogRequest { column_projection: None, // fetch all columns filter: None, // fetch all rows @@ -348,12 +345,11 @@ async fn stream_catalog_async( .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }); drop(client); diff --git a/crates/store/re_log_encoding/src/codec/wire/decoder.rs b/crates/store/re_log_encoding/src/codec/wire/decoder.rs index 06b9bccdbc23..db0db4613cd1 100644 --- a/crates/store/re_log_encoding/src/codec/wire/decoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/decoder.rs @@ -1,55 +1,23 @@ -use super::MessageHeader; -use super::TransportMessageV0; use crate::codec::arrow::read_arrow_from_bytes; use crate::codec::CodecError; use re_chunk::TransportChunk; -impl MessageHeader { - pub(crate) fn decode(read: &mut impl std::io::Read) -> Result { - let mut buffer = [0_u8; Self::SIZE_BYTES]; - read.read_exact(&mut buffer) - .map_err(CodecError::HeaderDecoding)?; - - let header = u8::from_le(buffer[0]); - - Ok(Self(header)) - } -} - -impl TransportMessageV0 { - pub(crate) fn from_bytes(data: &[u8]) -> Result { - let mut reader = std::io::Cursor::new(data); - let header = MessageHeader::decode(&mut reader)?; - - match header { - MessageHeader::NO_DATA => Ok(Self::NoData), - MessageHeader::RECORD_BATCH => { - let (schema, data) = read_arrow_from_bytes(&mut reader)?; - - let tc = TransportChunk { - schema: schema.clone(), - data, - }; - - Ok(Self::RecordBatch(tc)) - } - _ => Err(CodecError::UnknownMessageHeader), - } - } -} - /// Decode transport data from a byte stream - if there's a record batch present, return it, otherwise return `None`. pub fn decode( version: re_protos::common::v0::EncoderVersion, data: &[u8], -) -> Result, CodecError> { +) -> Result { match version { re_protos::common::v0::EncoderVersion::V0 => { - let msg = TransportMessageV0::from_bytes(data)?; - match msg { - TransportMessageV0::RecordBatch(chunk) => Ok(Some(chunk)), - TransportMessageV0::NoData => Ok(None), - } + let mut reader = std::io::Cursor::new(data); + let (schema, data) = read_arrow_from_bytes(&mut reader)?; + + let tc = TransportChunk { + schema: schema.clone(), + data, + }; + + Ok(tc) } } } diff --git a/crates/store/re_log_encoding/src/codec/wire/encoder.rs b/crates/store/re_log_encoding/src/codec/wire/encoder.rs index e6ae62c1e1b7..c2e8312762fd 100644 --- a/crates/store/re_log_encoding/src/codec/wire/encoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/encoder.rs @@ -1,50 +1,7 @@ -use super::MessageHeader; -use super::TransportMessageV0; use crate::codec::arrow::write_arrow_to_bytes; use crate::codec::CodecError; use re_chunk::TransportChunk; -impl MessageHeader { - pub(crate) fn encode(&self, write: &mut impl std::io::Write) -> Result<(), CodecError> { - write - .write_all(&[self.0]) - .map_err(CodecError::HeaderEncoding)?; - - Ok(()) - } -} - -impl TransportMessageV0 { - pub(crate) fn to_bytes(&self) -> Result, CodecError> { - match self { - Self::NoData => { - let mut data: Vec = Vec::new(); - MessageHeader::NO_DATA.encode(&mut data)?; - Ok(data) - } - Self::RecordBatch(chunk) => { - let mut data: Vec = Vec::new(); - MessageHeader::RECORD_BATCH.encode(&mut data)?; - - write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?; - - Ok(data) - } - } - } -} - -/// Encode a `NoData` message into a byte stream. This can be used by the remote store -/// (i.e. data producer) to signal back to the client that there's no data available. -pub fn no_data(version: re_protos::common::v0::EncoderVersion) -> Result, CodecError> { - match version { - re_protos::common::v0::EncoderVersion::V0 => TransportMessageV0::NoData.to_bytes(), - } -} - -// TODO(zehiko) add support for separately encoding schema from the record batch to get rid of overhead -// of sending schema in each transport message for the same stream of batches. This will require codec -// to become stateful and keep track if schema was sent / received. /// Encode a transport chunk into a byte stream. pub fn encode( version: re_protos::common::v0::EncoderVersion, @@ -52,7 +9,10 @@ pub fn encode( ) -> Result, CodecError> { match version { re_protos::common::v0::EncoderVersion::V0 => { - TransportMessageV0::RecordBatch(chunk).to_bytes() + let mut data: Vec = Vec::new(); + write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?; + + Ok(data) } } } diff --git a/crates/store/re_log_encoding/src/codec/wire/mod.rs b/crates/store/re_log_encoding/src/codec/wire/mod.rs index 587e9e31e2ce..19ab51d39d54 100644 --- a/crates/store/re_log_encoding/src/codec/wire/mod.rs +++ b/crates/store/re_log_encoding/src/codec/wire/mod.rs @@ -4,24 +4,6 @@ pub mod encoder; pub use decoder::decode; pub use encoder::encode; -use re_chunk::TransportChunk; - -#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)] -pub struct MessageHeader(pub u8); - -impl MessageHeader { - pub const NO_DATA: Self = Self(1); - pub const RECORD_BATCH: Self = Self(2); - - pub const SIZE_BYTES: usize = 1; -} - -#[derive(Debug)] -pub enum TransportMessageV0 { - NoData, - RecordBatch(TransportChunk), -} - #[cfg(test)] mod tests { use crate::{ @@ -55,32 +37,6 @@ mod tests { .unwrap() } - #[test] - fn test_message_v0_no_data() { - let msg = TransportMessageV0::NoData; - let data = msg.to_bytes().unwrap(); - let decoded = TransportMessageV0::from_bytes(&data).unwrap(); - assert!(matches!(decoded, TransportMessageV0::NoData)); - } - - #[test] - fn test_message_v0_record_batch() { - let expected_chunk = get_test_chunk(); - - let msg = TransportMessageV0::RecordBatch(expected_chunk.clone().to_transport().unwrap()); - let data = msg.to_bytes().unwrap(); - let decoded = TransportMessageV0::from_bytes(&data).unwrap(); - - #[allow(clippy::match_wildcard_for_single_variants)] - match decoded { - TransportMessageV0::RecordBatch(transport) => { - let decoded_chunk = Chunk::from_transport(&transport).unwrap(); - assert_eq!(expected_chunk, decoded_chunk); - } - _ => panic!("unexpected message type"), - } - } - #[test] fn test_invalid_batch_data() { let data = vec![2, 3, 4]; // '1' is NO_DATA message header @@ -92,18 +48,6 @@ mod tests { )); } - #[test] - fn test_unknown_header() { - let data = vec![3]; - let decoded = TransportMessageV0::from_bytes(&data); - assert!(decoded.is_err()); - - assert!(matches!( - decoded.err().unwrap(), - CodecError::UnknownMessageHeader - )); - } - #[test] fn test_v0_codec() { let expected_chunk = get_test_chunk(); @@ -113,7 +57,7 @@ mod tests { expected_chunk.clone().to_transport().unwrap(), ) .unwrap(); - let decoded = decode(EncoderVersion::V0, &encoded).unwrap().unwrap(); + let decoded = decode(EncoderVersion::V0, &encoded).unwrap(); let decoded_chunk = Chunk::from_transport(&decoded).unwrap(); assert_eq!(expected_chunk, decoded_chunk); diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 2619d3990088..2c5557747295 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -102,7 +102,6 @@ impl PyStorageNodeClient { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, _>>() .await @@ -187,8 +186,6 @@ impl PyStorageNodeClient { .into_inner(); let metadata = decode(resp.encoder_version(), &resp.payload) .map_err(|err| PyRuntimeError::new_err(err.to_string()))? - // TODO(zehiko) this is going away soon - .ok_or(PyRuntimeError::new_err("No metadata"))?; let recording_id = metadata .all_columns()