diff --git a/Cargo.lock b/Cargo.lock index 4059ea8..a125dde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1695,10 +1695,11 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "streamstore" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=35cf0314dbc230f8f96c860a2ca8f5217f68151e#35cf0314dbc230f8f96c860a2ca8f5217f68151e" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=88d6539cbec2165a79f07de4d3a13a9539b547d6#88d6539cbec2165a79f07de4d3a13a9539b547d6" dependencies = [ "async-stream", "backon", + "bytes", "bytesize", "futures", "http", @@ -1767,7 +1768,7 @@ dependencies = [ [[package]] name = "sync_docs" version = "0.1.0" -source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=35cf0314dbc230f8f96c860a2ca8f5217f68151e#35cf0314dbc230f8f96c860a2ca8f5217f68151e" +source = "git+https://github.com/s2-streamstore/s2-sdk-rust.git?rev=88d6539cbec2165a79f07de4d3a13a9539b547d6#88d6539cbec2165a79f07de4d3a13a9539b547d6" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index db7e089..064b3a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,10 +19,13 @@ serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" signal-hook = "0.3.17" signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } -streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "35cf0314dbc230f8f96c860a2ca8f5217f68151e" } thiserror = "1.0.67" -tokio = { version = "*", features = ["full"] } +tokio = { version = "1.41.1", features = ["full"] } tokio-stream = { version = "0.1.16", features = ["io-util"] } toml = "0.8.19" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } + +[dependencies.streamstore] +git = "https://github.com/s2-streamstore/s2-sdk-rust.git" +rev = "88d6539cbec2165a79f07de4d3a13a9539b547d6" diff --git a/src/main.rs b/src/main.rs index a369a5d..553bee7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use stream::{RecordStream, StreamService}; use streamstore::{ bytesize::ByteSize, client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient}, - types::{BasinInfo, BasinName, MeteredSize as _, ReadOutput, StreamInfo}, + types::{BasinInfo, BasinName, FencingToken, MeteredSize as _, ReadOutput, StreamInfo}, HeaderValue, }; use tokio::{ @@ -210,6 +210,15 @@ enum StreamActions { /// Append records to a stream. Currently, only newline delimited records are supported. Append { + /// Enforce a fencing token which must have been previously set by a + /// `fence` command record. + #[arg(short = 'f', long)] + fencing_token: Option, + + /// Enforce that the sequence number issued to the first record matches. + #[arg(short = 's', long)] + match_seq_num: Option, + /// Input newline delimited records to append from a file or stdin. /// All records are treated as plain text. /// Use "-" to read from stdin. @@ -515,7 +524,11 @@ async fn run() -> Result<(), S2CliError> { let next_seq_num = StreamService::new(stream_client).check_tail().await?; println!("{}", next_seq_num); } - StreamActions::Append { input } => { + StreamActions::Append { + input, + fencing_token, + match_seq_num, + } => { let stream_client = StreamClient::new(client_config, basin, stream); let append_input_stream = RecordStream::new( input @@ -529,7 +542,7 @@ async fn run() -> Result<(), S2CliError> { Signals::new([SIGTSTP, SIGINT, SIGTERM]).expect("valid signals"); let mut append_output_stream = StreamService::new(stream_client) - .append_session(append_input_stream) + .append_session(append_input_stream, fencing_token, match_seq_num) .await?; loop { select! { diff --git a/src/stream.rs b/src/stream.rs index 20b869c..5550c7e 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,7 +1,7 @@ use streamstore::{ - batching::AppendRecordsBatchingStream, + batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream}, client::StreamClient, - types::{AppendOutput, ReadLimit, ReadOutput, ReadSessionRequest}, + types::{AppendOutput, FencingToken, ReadLimit, ReadOutput, ReadSessionRequest}, Streaming, }; use tokio::io::AsyncBufRead; @@ -72,9 +72,14 @@ impl StreamService { pub async fn append_session( &self, append_input_stream: RecordStream>, + fencing_token: Option, + match_seq_num: Option, ) -> Result, ServiceError> { - let append_record_stream = - AppendRecordsBatchingStream::new(append_input_stream, Default::default()); + let opts = AppendRecordsBatchingOpts::default() + .with_fencing_token(fencing_token) + .with_match_seq_num(match_seq_num); + + let append_record_stream = AppendRecordsBatchingStream::new(append_input_stream, opts); self.client .append_session(append_record_stream)