-
Notifications
You must be signed in to change notification settings - Fork 374
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
grpc codec simplification - get rid of NoData message (#8550)
- Loading branch information
Showing
5 changed files
with
37 additions
and
175 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,55 +1,23 @@ | ||
use super::MessageHeader; | ||
use super::TransportMessageV0; | ||
use crate::codec::arrow::read_arrow_from_bytes; | ||
use crate::codec::CodecError; | ||
use re_chunk::TransportChunk; | ||
|
||
impl MessageHeader { | ||
pub(crate) fn decode(read: &mut impl std::io::Read) -> Result<Self, CodecError> { | ||
let mut buffer = [0_u8; Self::SIZE_BYTES]; | ||
read.read_exact(&mut buffer) | ||
.map_err(CodecError::HeaderDecoding)?; | ||
|
||
let header = u8::from_le(buffer[0]); | ||
|
||
Ok(Self(header)) | ||
} | ||
} | ||
|
||
impl TransportMessageV0 { | ||
pub(crate) fn from_bytes(data: &[u8]) -> Result<Self, CodecError> { | ||
let mut reader = std::io::Cursor::new(data); | ||
let header = MessageHeader::decode(&mut reader)?; | ||
|
||
match header { | ||
MessageHeader::NO_DATA => Ok(Self::NoData), | ||
MessageHeader::RECORD_BATCH => { | ||
let (schema, data) = read_arrow_from_bytes(&mut reader)?; | ||
|
||
let tc = TransportChunk { | ||
schema: schema.clone(), | ||
data, | ||
}; | ||
|
||
Ok(Self::RecordBatch(tc)) | ||
} | ||
_ => Err(CodecError::UnknownMessageHeader), | ||
} | ||
} | ||
} | ||
|
||
/// Decode transport data from a byte stream - if there's a record batch present, return it, otherwise return `None`. | ||
pub fn decode( | ||
version: re_protos::common::v0::EncoderVersion, | ||
data: &[u8], | ||
) -> Result<Option<TransportChunk>, CodecError> { | ||
) -> Result<TransportChunk, CodecError> { | ||
match version { | ||
re_protos::common::v0::EncoderVersion::V0 => { | ||
let msg = TransportMessageV0::from_bytes(data)?; | ||
match msg { | ||
TransportMessageV0::RecordBatch(chunk) => Ok(Some(chunk)), | ||
TransportMessageV0::NoData => Ok(None), | ||
} | ||
let mut reader = std::io::Cursor::new(data); | ||
let (schema, data) = read_arrow_from_bytes(&mut reader)?; | ||
|
||
let tc = TransportChunk { | ||
schema: schema.clone(), | ||
data, | ||
}; | ||
|
||
Ok(tc) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,58 +1,18 @@ | ||
use super::MessageHeader; | ||
use super::TransportMessageV0; | ||
use crate::codec::arrow::write_arrow_to_bytes; | ||
use crate::codec::CodecError; | ||
use re_chunk::TransportChunk; | ||
|
||
impl MessageHeader { | ||
pub(crate) fn encode(&self, write: &mut impl std::io::Write) -> Result<(), CodecError> { | ||
write | ||
.write_all(&[self.0]) | ||
.map_err(CodecError::HeaderEncoding)?; | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl TransportMessageV0 { | ||
pub(crate) fn to_bytes(&self) -> Result<Vec<u8>, CodecError> { | ||
match self { | ||
Self::NoData => { | ||
let mut data: Vec<u8> = Vec::new(); | ||
MessageHeader::NO_DATA.encode(&mut data)?; | ||
Ok(data) | ||
} | ||
Self::RecordBatch(chunk) => { | ||
let mut data: Vec<u8> = Vec::new(); | ||
MessageHeader::RECORD_BATCH.encode(&mut data)?; | ||
|
||
write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?; | ||
|
||
Ok(data) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Encode a `NoData` message into a byte stream. This can be used by the remote store | ||
/// (i.e. data producer) to signal back to the client that there's no data available. | ||
pub fn no_data(version: re_protos::common::v0::EncoderVersion) -> Result<Vec<u8>, CodecError> { | ||
match version { | ||
re_protos::common::v0::EncoderVersion::V0 => TransportMessageV0::NoData.to_bytes(), | ||
} | ||
} | ||
|
||
// TODO(zehiko) add support for separately encoding schema from the record batch to get rid of overhead | ||
// of sending schema in each transport message for the same stream of batches. This will require codec | ||
// to become stateful and keep track if schema was sent / received. | ||
/// Encode a transport chunk into a byte stream. | ||
pub fn encode( | ||
version: re_protos::common::v0::EncoderVersion, | ||
chunk: TransportChunk, | ||
chunk: &TransportChunk, | ||
) -> Result<Vec<u8>, CodecError> { | ||
match version { | ||
re_protos::common::v0::EncoderVersion::V0 => { | ||
TransportMessageV0::RecordBatch(chunk).to_bytes() | ||
let mut data: Vec<u8> = Vec::new(); | ||
write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?; | ||
|
||
Ok(data) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters