diff --git a/src/types.rs b/src/types.rs index 122ed6b..5e817b6 100644 --- a/src/types.rs +++ b/src/types.rs @@ -955,6 +955,9 @@ pub enum CommandRecord { } impl CommandRecord { + const FENCE: &[u8] = b"fence"; + const TRIM: &[u8] = b"trim"; + pub fn fence(fencing_token: Option) -> Self { Self::Fence { fencing_token: fencing_token.unwrap_or_default(), @@ -1052,15 +1055,20 @@ impl From for api::AppendRecord { } } -impl TryFrom for AppendRecord { - type Error = ConvertError; - - fn try_from(value: CommandRecord) -> Result { +impl From for AppendRecord { + fn from(value: CommandRecord) -> Self { let (header_value, body) = match value { - CommandRecord::Fence { fencing_token } => ("fence", fencing_token.into()), - CommandRecord::Trim { seq_num } => ("trim", seq_num.to_be_bytes().to_vec()), + CommandRecord::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()), + CommandRecord::Trim { seq_num } => { + (CommandRecord::TRIM, seq_num.to_be_bytes().to_vec()) + } }; - Self::new(body)?.with_headers(vec![Header::from_value(header_value)]) + AppendRecordParts { + headers: vec![Header::from_value(header_value)], + body: body.into(), + } + .try_into() + .expect("command record is a valid append record") } } @@ -1436,6 +1444,33 @@ impl From for SequencedRecord { } } +impl SequencedRecord { + pub fn as_command_record(&self) -> Option { + if self.headers.len() != 1 { + return None; + } + + let header = self.headers.first().expect("pre-validated length"); + + if !header.name.is_empty() { + return None; + } + + match header.value.as_ref() { + CommandRecord::FENCE => { + let fencing_token = FencingToken::new(self.body.clone()).ok()?; + Some(CommandRecord::Fence { fencing_token }) + } + CommandRecord::TRIM => { + let body: &[u8] = &self.body; + let seq_num = u64::from_be_bytes(body.try_into().ok()?); + Some(CommandRecord::Trim { seq_num }) + } + _ => None, + } + } +} + #[derive(Debug, Clone)] pub struct SequencedRecordBatch { pub records: Vec,