From 8f0ab75d8a9ac02087fea76ef612acfe3bdfa518 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Fri, 27 Dec 2024 23:29:15 +0530 Subject: [PATCH 1/6] feat: Support different formats for append Signed-off-by: Vaibhav Rabber --- Cargo.lock | 1 + Cargo.toml | 1 + src/formats.rs | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 67 ++++++++++++++++++++++++++++++++++++++++++++----- src/stream.rs | 41 +++++++++++++++++------------- 5 files changed, 155 insertions(+), 23 deletions(-) create mode 100644 src/formats.rs diff --git a/Cargo.lock b/Cargo.lock index b5e9c68..79273e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1658,6 +1658,7 @@ version = "0.7.0" dependencies = [ "async-stream", "base16ct", + "bytes", "clap", "color-print", "colored", diff --git a/Cargo.toml b/Cargo.toml index f877881..2435afd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ path = "src/main.rs" [dependencies] async-stream = "0.3.6" base16ct = { version = "0.2.0", features = ["alloc"] } +bytes = "1.9.0" clap = { version = "4.5.20", features = ["derive"] } color-print = "0.3.6" colored = "2.1.0" diff --git a/src/formats.rs b/src/formats.rs new file mode 100644 index 0000000..0e2b8fe --- /dev/null +++ b/src/formats.rs @@ -0,0 +1,68 @@ +use s2::types::{AppendRecord, ConvertError}; +use std::io; + +use futures::Stream; + +#[derive(Debug, thiserror::Error)] +pub enum RecordParseError { + #[error("Error reading: {0}")] + Io(#[from] io::Error), + #[error("Error parsing: {0}")] + Parse(#[from] ConvertError), +} + +pub trait RecordParser +where + I: Stream> + Send + Unpin, +{ + type RecordStream: Stream> + Send + Unpin; + + fn parse_records(lines: I) -> Self::RecordStream; +} + +pub mod text { + use futures::{Stream, StreamExt}; + use s2::types::AppendRecord; + use std::io; + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + + use super::RecordParseError; + + pub struct Formatter; + + impl super::RecordParser for Formatter + where + I: Stream> + Send + Unpin, + { + type RecordStream = RecordStream; + + fn parse_records(lines: I) -> Self::RecordStream { + RecordStream(lines) + } + } + + pub struct RecordStream(S); + + impl Stream for RecordStream + where + S: Stream> + Send + Unpin, + { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.0.poll_next_unpin(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))), + Poll::Ready(Some(Ok(s))) => Poll::Ready(Some(Ok(AppendRecord::new(s)?))), + } + } + } +} + +pub mod json { + // TODO: To be implemented. +} diff --git a/src/main.rs b/src/main.rs index 29eb1df..43a87ca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,9 @@ use std::{ + fmt::Display, io::BufRead, path::PathBuf, pin::Pin, + str::FromStr, time::{Duration, UNIX_EPOCH}, }; @@ -44,6 +46,7 @@ mod stream; mod config; mod error; +mod formats; mod ping; mod types; @@ -262,6 +265,10 @@ enum Commands { #[arg(short = 'm', long)] match_seq_num: Option, + /// Input format. + #[arg(long)] + format: Format, + /// Input newline delimited records to append from a file or stdin. /// All records are treated as plain text. /// Use "-" to read from stdin. @@ -328,6 +335,45 @@ enum ConfigActions { }, } +#[derive(Debug, Clone, Copy, Default)] +pub enum Format { + #[default] + Text, + Json, +} + +impl Format { + const TEXT: &str = "text"; + const JSON: &str = "json"; + + fn as_str(&self) -> &str { + match self { + Self::Text => Self::TEXT, + Self::Json => Self::JSON, + } + } +} + +impl Display for Format { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +impl FromStr for Format { + type Err = String; + + fn from_str(s: &str) -> Result { + if s.eq_ignore_ascii_case(Self::TEXT) { + Ok(Self::Text) + } else if s.eq_ignore_ascii_case(Self::JSON) { + Ok(Self::Json) + } else { + Err("Unsupported format".to_owned()) + } + } +} + #[derive(Debug, Clone)] pub enum RecordsIn { File(PathBuf), @@ -756,17 +802,26 @@ async fn run() -> Result<(), S2CliError> { input, fencing_token, match_seq_num, + format, } => { let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; let client_config = client_config(cfg.auth_token)?; let stream_client = StreamClient::new(client_config, basin, stream); - let append_input_stream = RecordStream::new( - input - .into_reader() - .await - .map_err(|e| S2CliError::RecordReaderInit(e.to_string()))?, - ); + + let records_in = input + .into_reader() + .await + .map_err(|e| S2CliError::RecordReaderInit(e.to_string()))?; + + let append_input_stream = match format { + Format::Text => { + Box::new(RecordStream::<_, formats::text::Formatter>::new(records_in)) + } + Format::Json => { + todo!() + } + }; let mut append_output_stream = StreamService::new(stream_client) .append_session( diff --git a/src/stream.rs b/src/stream.rs index a34f875..01e9bfc 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,3 +1,4 @@ +use colored::Colorize; use s2::{ batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream}, client::StreamClient, @@ -13,33 +14,39 @@ use s2::types::AppendRecord; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::error::{ServiceError, ServiceErrorContext}; +use crate::{ + error::{ServiceError, ServiceErrorContext}, + formats::RecordParser, +}; #[derive(Debug)] -pub struct RecordStream { - inner: S, -} +pub struct RecordStream(P::RecordStream) +where + S: Stream> + Send + Unpin, + P: RecordParser; -impl RecordStream { - pub fn new(inner: S) -> Self { - Self { inner } +impl RecordStream +where + S: Stream> + Send + Unpin, + P: RecordParser, +{ + pub fn new(s: S) -> Self { + Self(P::parse_records(s)) } } -impl>> Stream for RecordStream { +impl Stream for RecordStream +where + S: Stream> + Send + Unpin, + P: RecordParser, +{ type Item = AppendRecord; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.inner.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(line))) => match AppendRecord::new(line) { - Ok(record) => Poll::Ready(Some(record)), - Err(e) => { - eprintln!("Error parsing line: {}", e); - Poll::Ready(None) - } - }, + match self.0.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(record))) => Poll::Ready(Some(record)), Poll::Ready(Some(Err(e))) => { - eprintln!("Error reading line: {}", e); + eprintln!("{}", e.to_string().red()); Poll::Ready(None) } Poll::Ready(None) => Poll::Ready(None), From 84350e3cf339497691a955c2d7f0b8eccce46c4d Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Sat, 28 Dec 2024 00:50:46 +0530 Subject: [PATCH 2/6] json Signed-off-by: Vaibhav Rabber --- src/formats.rs | 86 +++++++++++++++++++++++++++++++++++++++++++++++--- src/main.rs | 27 ++++++++-------- 2 files changed, 94 insertions(+), 19 deletions(-) diff --git a/src/formats.rs b/src/formats.rs index 0e2b8fe..a8ffe8a 100644 --- a/src/formats.rs +++ b/src/formats.rs @@ -8,7 +8,7 @@ pub enum RecordParseError { #[error("Error reading: {0}")] Io(#[from] io::Error), #[error("Error parsing: {0}")] - Parse(#[from] ConvertError), + Convert(#[from] ConvertError), } pub trait RecordParser @@ -21,14 +21,15 @@ where } pub mod text { - use futures::{Stream, StreamExt}; - use s2::types::AppendRecord; - use std::io; use std::{ + io, pin::Pin, task::{Context, Poll}, }; + use futures::{Stream, StreamExt}; + use s2::types::AppendRecord; + use super::RecordParseError; pub struct Formatter; @@ -64,5 +65,80 @@ pub mod text { } pub mod json { - // TODO: To be implemented. + use std::{ + collections::HashMap, + io, + pin::Pin, + task::{Context, Poll}, + }; + + use futures::{Stream, StreamExt}; + use s2::types::{AppendRecord, AppendRecordParts, ConvertError, Header}; + use serde::Deserialize; + + use super::RecordParseError; + + pub struct Formatter; + + impl super::RecordParser for Formatter + where + I: Stream> + Send + Unpin, + { + type RecordStream = RecordStream; + + fn parse_records(lines: I) -> Self::RecordStream { + RecordStream(lines) + } + } + + #[derive(Debug, Clone, Deserialize)] + struct DeserializableAppendRecord { + #[serde(default)] + headers: HashMap, + #[serde(default)] + body: String, + } + + impl TryFrom for AppendRecord { + type Error = ConvertError; + + fn try_from(value: DeserializableAppendRecord) -> Result { + let DeserializableAppendRecord { headers, body } = value; + + let parts = AppendRecordParts { + headers: headers + .into_iter() + .map(|(k, v)| Header::new(k, v)) + .collect(), + body: body.into(), + }; + + parts.try_into() + } + } + + pub struct RecordStream(S); + + impl Stream for RecordStream + where + S: Stream> + Send + Unpin, + { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn parse_record(s: String) -> Result { + let append_record: DeserializableAppendRecord = serde_json::from_str(&s) + .map_err(|e| RecordParseError::Convert(e.to_string().into()))?; + + Ok(append_record.try_into()?) + } + + match self.0.poll_next_unpin(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))), + Poll::Ready(Some(Ok(s))) => Poll::Ready(Some(parse_record(s))), + } + } + } } diff --git a/src/main.rs b/src/main.rs index 43a87ca..b0f592b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,8 +20,8 @@ use s2::{ batching::AppendRecordsBatchingOpts, client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, types::{ - AppendRecordBatch, BasinInfo, CommandRecord, ConvertError, FencingToken, MeteredBytes as _, - ReadOutput, StreamInfo, + AppendRecord, AppendRecordBatch, BasinInfo, CommandRecord, ConvertError, FencingToken, + MeteredBytes as _, ReadOutput, StreamInfo, }, }; use stream::{RecordStream, StreamService}; @@ -251,8 +251,6 @@ enum Commands { }, /// Append records to a stream. - /// - /// Currently, only newline delimited records are supported. Append { #[arg(value_name = "S2_URI")] uri: S2BasinAndStreamUri, @@ -265,8 +263,8 @@ enum Commands { #[arg(short = 'm', long)] match_seq_num: Option, - /// Input format. - #[arg(long)] + /// Input format. Can be one of "text" or "json". + #[arg(long, default_value_t)] format: Format, /// Input newline delimited records to append from a file or stdin. @@ -814,14 +812,15 @@ async fn run() -> Result<(), S2CliError> { .await .map_err(|e| S2CliError::RecordReaderInit(e.to_string()))?; - let append_input_stream = match format { - Format::Text => { - Box::new(RecordStream::<_, formats::text::Formatter>::new(records_in)) - } - Format::Json => { - todo!() - } - }; + let append_input_stream: Box + Send + Unpin> = + match format { + Format::Text => { + Box::new(RecordStream::<_, formats::text::Formatter>::new(records_in)) + } + Format::Json => { + Box::new(RecordStream::<_, formats::json::Formatter>::new(records_in)) + } + }; let mut append_output_stream = StreamService::new(stream_client) .append_session( From 437d4c9c80d7672906fa3a9d5de4cbea43ee7a6d Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Mon, 30 Dec 2024 21:10:17 +0530 Subject: [PATCH 3/6] read output Signed-off-by: Vaibhav Rabber --- src/formats.rs | 83 +++++++++++++++++++++++++++++++++++++++++++++----- src/main.rs | 20 +++++++++--- 2 files changed, 90 insertions(+), 13 deletions(-) diff --git a/src/formats.rs b/src/formats.rs index a8ffe8a..6aa33fa 100644 --- a/src/formats.rs +++ b/src/formats.rs @@ -1,5 +1,6 @@ -use s2::types::{AppendRecord, ConvertError}; +use s2::types::{AppendRecord, ConvertError, SequencedRecord}; use std::io; +use tokio::io::AsyncWrite; use futures::Stream; @@ -20,6 +21,13 @@ where fn parse_records(lines: I) -> Self::RecordStream; } +pub trait RecordWriter { + async fn write_record( + record: &SequencedRecord, + writer: &mut (impl AsyncWrite + Unpin), + ) -> io::Result<()>; +} + pub mod text { use std::{ io, @@ -28,13 +36,24 @@ pub mod text { }; use futures::{Stream, StreamExt}; - use s2::types::AppendRecord; + use s2::types::{AppendRecord, SequencedRecord}; + use tokio::io::{AsyncWrite, AsyncWriteExt}; - use super::RecordParseError; + use super::{RecordParseError, RecordParser, RecordWriter}; pub struct Formatter; - impl super::RecordParser for Formatter + impl RecordWriter for Formatter { + async fn write_record( + record: &SequencedRecord, + writer: &mut (impl AsyncWrite + Unpin), + ) -> io::Result<()> { + let s = String::from_utf8_lossy(&record.body); + writer.write_all(s.as_ref().as_bytes()).await + } + } + + impl RecordParser for Formatter where I: Stream> + Send + Unpin, { @@ -66,6 +85,7 @@ pub mod text { pub mod json { use std::{ + borrow::Cow, collections::HashMap, io, pin::Pin, @@ -73,14 +93,61 @@ pub mod json { }; use futures::{Stream, StreamExt}; - use s2::types::{AppendRecord, AppendRecordParts, ConvertError, Header}; - use serde::Deserialize; + use s2::types::{AppendRecord, AppendRecordParts, ConvertError, Header, SequencedRecord}; + use serde::{Deserialize, Serialize}; + use tokio::io::{AsyncWrite, AsyncWriteExt}; - use super::RecordParseError; + use super::{RecordParseError, RecordParser, RecordWriter}; pub struct Formatter; - impl super::RecordParser for Formatter + #[derive(Debug, Clone, Serialize)] + struct SerializableSequencedRecord<'a> { + seq_num: u64, + headers: HashMap, Cow<'a, str>>, + body: Cow<'a, str>, + } + + impl<'a> From<&'a SequencedRecord> for SerializableSequencedRecord<'a> { + fn from(value: &'a SequencedRecord) -> Self { + let SequencedRecord { + seq_num, + headers, + body, + } = value; + + let headers = headers + .iter() + .map(|Header { name, value }| { + ( + String::from_utf8_lossy(name), + String::from_utf8_lossy(value), + ) + }) + .collect::>(); + + let body = String::from_utf8_lossy(body); + + SerializableSequencedRecord { + seq_num: *seq_num, + headers, + body, + } + } + } + + impl RecordWriter for Formatter { + async fn write_record( + record: &SequencedRecord, + writer: &mut (impl AsyncWrite + Unpin), + ) -> io::Result<()> { + let record: SerializableSequencedRecord = record.into(); + let s = serde_json::to_string(&record).map_err(io::Error::other)?; + writer.write_all(s.as_bytes()).await + } + } + + impl RecordParser for Formatter where I: Stream> + Send + Unpin, { diff --git a/src/main.rs b/src/main.rs index b0f592b..18f07c0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use clap::{builder::styling, Parser, Subcommand}; use colored::*; use config::{config_path, create_config}; use error::{S2CliError, ServiceError, ServiceErrorContext}; +use formats::RecordWriter; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use ping::{LatencyStats, PingResult, Pinger}; use rand::Rng; @@ -286,6 +287,10 @@ enum Commands { #[arg(short = 's', long, default_value_t = 0)] start_seq_num: u64, + /// Output format. Can be one of "text" or "json". + #[arg(long, default_value_t)] + format: Format, + /// Output records to a file or stdout. /// Use "-" to write to stdout. #[arg(short = 'o', long, value_parser = parse_records_output_source, default_value = "-")] @@ -874,6 +879,7 @@ async fn run() -> Result<(), S2CliError> { output, limit_count, limit_bytes, + format, } => { let S2BasinAndStreamUri { basin, stream } = uri; let cfg = config::load_config(&config_path)?; @@ -923,11 +929,15 @@ async fn run() -> Result<(), S2CliError> { }; eprintln!("{} with {}", cmd.bold(), description.green().bold()); } else { - let data = &sequenced_record.body; - writer - .write_all(data) - .await - .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; + match format { + Format::Text => { + formats::text::Formatter::write_record(&sequenced_record, &mut writer).await + }, + Format::Json => { + formats::json::Formatter::write_record(&sequenced_record, &mut writer).await + }, + } + .map_err(|e| S2CliError::RecordWrite(e.to_string()))?; writer .write_all(b"\n") .await From b2d302fb585b73571c51e55c870e78d47efc2615 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Wed, 1 Jan 2025 03:07:29 +0530 Subject: [PATCH 4/6] binsafe format Signed-off-by: Vaibhav Rabber --- src/main.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main.rs b/src/main.rs index 18f07c0..d8fba73 100644 --- a/src/main.rs +++ b/src/main.rs @@ -343,16 +343,19 @@ pub enum Format { #[default] Text, Json, + JsonBinsafe, } impl Format { const TEXT: &str = "text"; const JSON: &str = "json"; + const JSON_BINSAFE: &str = "json-binsafe"; fn as_str(&self) -> &str { match self { Self::Text => Self::TEXT, Self::Json => Self::JSON, + Self::JsonBinsafe => Self::JSON_BINSAFE, } } } @@ -371,6 +374,8 @@ impl FromStr for Format { Ok(Self::Text) } else if s.eq_ignore_ascii_case(Self::JSON) { Ok(Self::Json) + } else if s.eq_ignore_ascii_case(Self::JSON_BINSAFE) { + Ok(Self::JsonBinsafe) } else { Err("Unsupported format".to_owned()) } From 31fa84dc04357a87e8bd379faf3316aab16515a8 Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Thu, 2 Jan 2025 01:15:16 +0530 Subject: [PATCH 5/6] clippy Signed-off-by: Vaibhav Rabber --- src/formats.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/formats.rs b/src/formats.rs index 9651f6a..0a3dd4f 100644 --- a/src/formats.rs +++ b/src/formats.rs @@ -134,7 +134,7 @@ mod json { } } - impl<'a, const BIN_SAFE: bool> Serialize for CowStr<'a, BIN_SAFE> { + impl Serialize for CowStr<'_, BIN_SAFE> { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, From b83a7d037f0940128ea062f1ca82b04ed6a94fda Mon Sep 17 00:00:00 2001 From: Vaibhav Rabber Date: Fri, 3 Jan 2025 20:22:01 +0530 Subject: [PATCH 6/6] address comments Signed-off-by: Vaibhav Rabber --- src/formats.rs | 8 ++++++++ src/main.rs | 16 ++++++++++------ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/formats.rs b/src/formats.rs index 0a3dd4f..98f6d8e 100644 --- a/src/formats.rs +++ b/src/formats.rs @@ -107,6 +107,12 @@ mod json { #[derive(Debug, Clone, Default)] struct CowStr<'a, const BIN_SAFE: bool>(Cow<'a, str>); + impl CowStr<'_, BIN_SAFE> { + fn is_empty(&self) -> bool { + self.0.is_empty() + } + } + type OwnedCowStr = CowStr<'static, BIN_SAFE>; impl<'a, const BIN_SAFE: bool> From<&'a [u8]> for CowStr<'a, BIN_SAFE> { @@ -157,7 +163,9 @@ mod json { #[derive(Debug, Clone, Serialize)] struct SerializableSequencedRecord<'a, const BIN_SAFE: bool> { seq_num: u64, + #[serde(skip_serializing_if = "Vec::is_empty")] headers: Vec<(CowStr<'a, BIN_SAFE>, CowStr<'a, BIN_SAFE>)>, + #[serde(skip_serializing_if = "CowStr::is_empty")] body: CowStr<'a, BIN_SAFE>, } diff --git a/src/main.rs b/src/main.rs index 27d8851..6075392 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ use std::{ use account::AccountService; use base64ct::{Base64, Encoding}; use basin::BasinService; -use clap::{builder::styling, Parser, Subcommand}; +use clap::{builder::styling, Parser, Subcommand, ValueEnum}; use colored::*; use config::{config_path, create_config}; use error::{S2CliError, ServiceError, ServiceErrorContext}; @@ -265,8 +265,8 @@ enum Commands { #[arg(short = 'm', long)] match_seq_num: Option, - /// Input format. Can be one of "text" or "json". - #[arg(long, default_value_t)] + /// Input format. + #[arg(long, value_enum, default_value_t)] format: Format, /// Input newline delimited records to append from a file or stdin. @@ -288,8 +288,8 @@ enum Commands { #[arg(short = 's', long, default_value_t = 0)] start_seq_num: u64, - /// Output format. Can be one of "text" or "json". - #[arg(long, default_value_t)] + /// Output format. + #[arg(long, value_enum, default_value_t)] format: Format, /// Output records to a file or stdout. @@ -339,11 +339,15 @@ enum ConfigActions { }, } -#[derive(Debug, Clone, Copy, Default)] +#[derive(Debug, Clone, Copy, Default, ValueEnum)] pub enum Format { + /// Newline delimited records with UTF-8 bodies. #[default] Text, + /// Newline delimited records in JSON format with UTF-8 headers and body. Json, + /// Newline delimited records in JSON format with base64 encoded headers + /// and body. JsonBinsafe, }