Skip to content

Commit

Permalink
chore: Upgrade SDK (#41)
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Dec 2, 2024
1 parent 217c1f5 commit 0431ef4
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 21 deletions.
22 changes: 13 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[dependencies.streamstore]
git = "https://github.com/s2-streamstore/s2-sdk-rust.git"
rev = "88d6539cbec2165a79f07de4d3a13a9539b547d6"
rev = "d40de3df1c76ba71361f9e9f5327aee1619763cb"
15 changes: 7 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ use signal_hook::consts::{SIGINT, SIGTERM, SIGTSTP};
use signal_hook_tokio::Signals;
use stream::{RecordStream, StreamService};
use streamstore::{
bytesize::ByteSize,
client::{BasinClient, Client, ClientConfig, S2Endpoints, StreamClient},
types::{BasinInfo, BasinName, FencingToken, MeteredSize as _, ReadOutput, StreamInfo},
types::{BasinInfo, BasinName, FencingToken, MeteredBytes as _, ReadOutput, StreamInfo},
HeaderValue,
};
use tokio::{
Expand Down Expand Up @@ -245,7 +244,7 @@ enum StreamActions {

/// Limit the number of bytes returned.
#[arg(short = 'b', long)]
limit_bytes: Option<ByteSize>,
limit_bytes: Option<u64>,
},
}

Expand Down Expand Up @@ -600,7 +599,7 @@ async fn run() -> Result<(), S2CliError> {
let mut writer = output.into_writer().await.unwrap();

let mut start = None;
let mut total_data_len = ByteSize::b(0);
let mut total_data_len = 0;

loop {
select! {
Expand All @@ -613,7 +612,7 @@ async fn run() -> Result<(), S2CliError> {
match read_result {
Ok(ReadOutput::Batch(sequenced_record_batch)) => {
let num_records = sequenced_record_batch.records.len();
let mut batch_len = ByteSize::b(0);
let mut batch_len = 0;

let seq_range = match (
sequenced_record_batch.records.first(),
Expand All @@ -624,7 +623,7 @@ async fn run() -> Result<(), S2CliError> {
};
for sequenced_record in sequenced_record_batch.records {
let data = &sequenced_record.body;
batch_len += sequenced_record.metered_size();
batch_len += sequenced_record.metered_bytes();

writer
.write_all(data)
Expand All @@ -637,7 +636,7 @@ async fn run() -> Result<(), S2CliError> {
}
total_data_len += batch_len;

let throughput_mibps = (total_data_len.0 as f64
let throughput_mibps = (total_data_len as f64
/ start.unwrap().elapsed().as_secs_f64())
/ 1024.0
/ 1024.0;
Expand Down Expand Up @@ -683,7 +682,7 @@ async fn run() -> Result<(), S2CliError> {
let total_elapsed_time = start.unwrap().elapsed().as_secs_f64();

let total_throughput_mibps =
(total_data_len.0 as f64 / total_elapsed_time) / 1024.0 / 1024.0;
(total_data_len as f64 / total_elapsed_time) / 1024.0 / 1024.0;

eprintln!(
"{}",
Expand Down
5 changes: 2 additions & 3 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use tokio::io::Lines;
use tokio_stream::Stream;

use crate::error::{ServiceError, ServiceErrorContext};
use crate::ByteSize;

pin_project! {
#[derive(Debug)]
Expand Down Expand Up @@ -91,11 +90,11 @@ impl StreamService {
&self,
start_seq_num: u64,
limit_count: Option<u64>,
limit_bytes: Option<ByteSize>,
limit_bytes: Option<u64>,
) -> Result<Streaming<ReadOutput>, ServiceError> {
let read_session_req = ReadSessionRequest {
start_seq_num: Some(start_seq_num),
limit: match (limit_count, limit_bytes.map(|b| b.as_u64())) {
limit: match (limit_count, limit_bytes) {
(Some(count), Some(bytes)) => Some(ReadLimit { count, bytes }),
(Some(count), None) => Some(ReadLimit { count, bytes: 0 }),
(None, Some(bytes)) => Some(ReadLimit { count: 0, bytes }),
Expand Down

0 comments on commit 0431ef4

Please sign in to comment.