Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement SequencedRecord::as_command_record #96

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,9 @@ pub enum CommandRecord {
}

impl CommandRecord {
const FENCE: &[u8] = b"fence";
const TRIM: &[u8] = b"trim";

pub fn fence(fencing_token: Option<FencingToken>) -> Self {
Self::Fence {
fencing_token: fencing_token.unwrap_or_default(),
Expand Down Expand Up @@ -1052,15 +1055,20 @@ impl From<AppendRecord> for api::AppendRecord {
}
}

impl TryFrom<CommandRecord> for AppendRecord {
type Error = ConvertError;

fn try_from(value: CommandRecord) -> Result<Self, Self::Error> {
impl From<CommandRecord> for AppendRecord {
fn from(value: CommandRecord) -> Self {
let (header_value, body) = match value {
CommandRecord::Fence { fencing_token } => ("fence", fencing_token.into()),
CommandRecord::Trim { seq_num } => ("trim", seq_num.to_be_bytes().to_vec()),
CommandRecord::Fence { fencing_token } => (CommandRecord::FENCE, fencing_token.into()),
shikhar marked this conversation as resolved.
Show resolved Hide resolved
CommandRecord::Trim { seq_num } => {
(CommandRecord::TRIM, seq_num.to_be_bytes().to_vec())
}
};
Self::new(body)?.with_headers(vec![Header::from_value(header_value)])
AppendRecordParts {
headers: vec![Header::from_value(header_value)],
body: body.into(),
}
.try_into()
.expect("command record is a valid append record")
}
}

Expand Down Expand Up @@ -1436,6 +1444,33 @@ impl From<api::SequencedRecord> for SequencedRecord {
}
}

impl SequencedRecord {
pub fn as_command_record(&self) -> Option<CommandRecord> {
if self.headers.len() != 1 {
return None;
}

let header = self.headers.first().expect("pre-validated length");

if !header.name.is_empty() {
return None;
}

match header.value.as_ref() {
CommandRecord::FENCE => {
let fencing_token = FencingToken::new(self.body.clone()).ok()?;
Some(CommandRecord::Fence { fencing_token })
}
CommandRecord::TRIM => {
let body: &[u8] = &self.body;
let seq_num = u64::from_be_bytes(body.try_into().ok()?);
Some(CommandRecord::Trim { seq_num })
}
_ => None,
}
}
}

#[derive(Debug, Clone)]
pub struct SequencedRecordBatch {
pub records: Vec<SequencedRecord>,
Expand Down
Loading