Skip to content

Commit

Permalink
feat: Appends with fencing_token and match_seq_num (#38)
Browse files Browse the repository at this point in the history
Resolves: #34, #35

Depends on s2-streamstore/s2-sdk-rust#87 for
parsing fencing token.

---------

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Nov 29, 2024
1 parent 2d711b8 commit 217c1f5
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 11 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"
signal-hook = "0.3.17"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
streamstore = { git = "https://github.com/s2-streamstore/s2-sdk-rust.git", rev = "35cf0314dbc230f8f96c860a2ca8f5217f68151e" }
thiserror = "1.0.67"
tokio = { version = "*", features = ["full"] }
tokio = { version = "1.41.1", features = ["full"] }
tokio-stream = { version = "0.1.16", features = ["io-util"] }
toml = "0.8.19"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[dependencies.streamstore]
git = "https://github.com/s2-streamstore/s2-sdk-rust.git"
rev = "88d6539cbec2165a79f07de4d3a13a9539b547d6"
19 changes: 16 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use stream::{RecordStream, StreamService};
use streamstore::{
bytesize::ByteSize,
client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient},
types::{BasinInfo, BasinName, MeteredSize as _, ReadOutput, StreamInfo},
types::{BasinInfo, BasinName, FencingToken, MeteredSize as _, ReadOutput, StreamInfo},
HeaderValue,
};
use tokio::{
Expand Down Expand Up @@ -210,6 +210,15 @@ enum StreamActions {

/// Append records to a stream. Currently, only newline delimited records are supported.
Append {
/// Enforce a fencing token which must have been previously set by a
/// `fence` command record.
#[arg(short = 'f', long)]
fencing_token: Option<FencingToken>,

/// Enforce that the sequence number issued to the first record matches.
#[arg(short = 'm', long)]
match_seq_num: Option<u64>,

/// Input newline delimited records to append from a file or stdin.
/// All records are treated as plain text.
/// Use "-" to read from stdin.
Expand Down Expand Up @@ -515,7 +524,11 @@ async fn run() -> Result<(), S2CliError> {
let next_seq_num = StreamService::new(stream_client).check_tail().await?;
println!("{}", next_seq_num);
}
StreamActions::Append { input } => {
StreamActions::Append {
input,
fencing_token,
match_seq_num,
} => {
let stream_client = StreamClient::new(client_config, basin, stream);
let append_input_stream = RecordStream::new(
input
Expand All @@ -529,7 +542,7 @@ async fn run() -> Result<(), S2CliError> {
Signals::new([SIGTSTP, SIGINT, SIGTERM]).expect("valid signals");

let mut append_output_stream = StreamService::new(stream_client)
.append_session(append_input_stream)
.append_session(append_input_stream, fencing_token, match_seq_num)
.await?;
loop {
select! {
Expand Down
13 changes: 9 additions & 4 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use streamstore::{
batching::AppendRecordsBatchingStream,
batching::{AppendRecordsBatchingOpts, AppendRecordsBatchingStream},
client::StreamClient,
types::{AppendOutput, ReadLimit, ReadOutput, ReadSessionRequest},
types::{AppendOutput, FencingToken, ReadLimit, ReadOutput, ReadSessionRequest},
Streaming,
};
use tokio::io::AsyncBufRead;
Expand Down Expand Up @@ -72,9 +72,14 @@ impl StreamService {
pub async fn append_session(
&self,
append_input_stream: RecordStream<Box<dyn AsyncBufRead + Send + Unpin>>,
fencing_token: Option<FencingToken>,
match_seq_num: Option<u64>,
) -> Result<Streaming<AppendOutput>, ServiceError> {
let append_record_stream =
AppendRecordsBatchingStream::new(append_input_stream, Default::default());
let opts = AppendRecordsBatchingOpts::default()
.with_fencing_token(fencing_token)
.with_match_seq_num(match_seq_num);

let append_record_stream = AppendRecordsBatchingStream::new(append_input_stream, opts);

self.client
.append_session(append_record_stream)
Expand Down

0 comments on commit 217c1f5

Please sign in to comment.