Skip to content

Commit

Permalink
chore: Add examples for API (#113)
Browse files Browse the repository at this point in the history
These are the examples I am thinking of having for the API docs:

- [x] Create basin and get basin config
- [x] Reconfigure basin
- [x] Delete basin
- [x] List all basins: Fetch all basins taking `has_more` in
consideration.
- [x] Create stream and get stream config
- [x] Reconfigure stream
- [x] Delete stream
- [x] List streams with prefix
- [x] Explicit trim: Delete data till latest sequence number
- [x] Producer (with Fencing token)
- [x] Get latest record
- [x] Consumer

---------

Signed-off-by: Vaibhav Rabber <[email protected]>
  • Loading branch information
vrongmeal authored Dec 16, 2024
1 parent d9a09b7 commit 723dfcd
Show file tree
Hide file tree
Showing 15 changed files with 371 additions and 182 deletions.
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ keywords = ["wal", "grpc", "s2", "log", "stream"]
repository = "https://github.com/s2-streamstore/s2-sdk-rust"
homepage = "https://github.com/s2-streamstore/s2-sdk-rust"

[package.metadata.docs.rs]
features = ["connector"]
cargo-args = ["-Zunstable-options", "-Zrustdoc-scrape-examples"]

[[example]]
# `doc-scrape-examples` requires *any* one example to specify the option.
name = "create_basin"
doc-scrape-examples = true

[dependencies]
async-stream = "0.3.6"
backon = "1.2.0"
Expand Down
168 changes: 0 additions & 168 deletions examples/basic.rs

This file was deleted.

32 changes: 32 additions & 0 deletions examples/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use futures::StreamExt;
use streamstore::{
client::{ClientConfig, StreamClient},
types::{BasinName, ReadSessionRequest},
};
use tokio::select;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let token = std::env::var("S2_AUTH_TOKEN")?;
let config = ClientConfig::new(token);
let basin: BasinName = "my-basin".parse()?;
let stream = "my-stream";
let stream_client = StreamClient::new(config, basin, stream);

let start_seq_num = 0;
let read_session_request = ReadSessionRequest::new(start_seq_num);
let mut read_stream = stream_client.read_session(read_session_request).await?;

loop {
select! {
next_batch = read_stream.next() => {
let Some(next_batch) = next_batch else { break };
let next_batch = next_batch?;
println!("{next_batch:?}");
}
_ = tokio::signal::ctrl_c() => break,
}
}

Ok(())
}
34 changes: 34 additions & 0 deletions examples/create_basin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::time::Duration;

use streamstore::{
client::{Client, ClientConfig},
types::{BasinConfig, BasinName, CreateBasinRequest, RetentionPolicy, StreamConfig},
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let token = std::env::var("S2_AUTH_TOKEN")?;
let config = ClientConfig::new(token);
let client = Client::new(config);

let basin: BasinName = "my-basin".parse()?;

let default_stream_config = StreamConfig::new().with_retention_policy(RetentionPolicy::Age(
// Set the default retention age to 10 days.
Duration::from_secs(10 * 24 * 60 * 60),
));

let basin_config = BasinConfig {
default_stream_config: Some(default_stream_config),
};

let create_basin_request = CreateBasinRequest::new(basin.clone()).with_config(basin_config);

let created_basin = client.create_basin(create_basin_request).await?;
println!("{created_basin:#?}");

let basin_config = client.get_basin_config(basin).await?;
println!("{basin_config:#?}");

Ok(())
}
28 changes: 28 additions & 0 deletions examples/create_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use streamstore::{
client::{Client, ClientConfig},
types::{BasinName, CreateStreamRequest, StorageClass, StreamConfig},
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let token = std::env::var("S2_AUTH_TOKEN")?;
let config = ClientConfig::new(token);
let client = Client::new(config);

let basin: BasinName = "my-basin".parse()?;
let basin_client = client.basin_client(basin);

let stream = "my-stream";

let stream_config = StreamConfig::new().with_storage_class(StorageClass::Express);

let create_stream_request = CreateStreamRequest::new(stream).with_config(stream_config);

let created_stream = basin_client.create_stream(create_stream_request).await?;
println!("{created_stream:#?}");

let stream_config = basin_client.get_stream_config(stream).await?;
println!("{stream_config:#?}");

Ok(())
}
21 changes: 21 additions & 0 deletions examples/delete_basin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use streamstore::{
client::{Client, ClientConfig},
types::{BasinName, DeleteBasinRequest},
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let token = std::env::var("S2_AUTH_TOKEN")?;
let config = ClientConfig::new(token);
let client = Client::new(config);

let basin: BasinName = "my-basin".parse()?;

let delete_basin_request = DeleteBasinRequest::new(basin)
// Don't error if the basin doesn't exist.
.with_if_exists(true);

client.delete_basin(delete_basin_request).await?;

Ok(())
}
20 changes: 20 additions & 0 deletions examples/delete_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use streamstore::{
client::{BasinClient, ClientConfig},
types::{BasinName, DeleteStreamRequest},
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let token = std::env::var("S2_AUTH_TOKEN")?;
let config = ClientConfig::new(token);
let basin: BasinName = "my-basin".parse()?;
let basin_client = BasinClient::new(config, basin);

let stream = "my-stream";

let delete_stream_request = DeleteStreamRequest::new(stream);

basin_client.delete_stream(delete_stream_request).await?;

Ok(())
}
31 changes: 31 additions & 0 deletions examples/explicit_trim.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use streamstore::{
client::{ClientConfig, StreamClient},
types::{AppendInput, AppendRecordBatch, BasinName, CommandRecord},
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let token = std::env::var("S2_AUTH_TOKEN")?;
let config = ClientConfig::new(token);
let basin: BasinName = "my-basin".parse()?;
let stream = "my-stream";
let stream_client = StreamClient::new(config, basin, stream);

let tail = stream_client.check_tail().await?;
if tail == 0 {
println!("Empty stream");
return Ok(());
}

let latest_seq_num = tail - 1;
let trim_request = CommandRecord::trim(latest_seq_num);

let append_record_batch = AppendRecordBatch::try_from_iter([trim_request])
.expect("valid batch with 1 command record");
let append_input = AppendInput::new(append_record_batch);
let _ = stream_client.append(append_input).await?;

println!("Trim requested");

Ok(())
}
29 changes: 29 additions & 0 deletions examples/get_latest_record.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use streamstore::{
client::{ClientConfig, StreamClient},
types::{BasinName, ReadLimit, ReadRequest},
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let token = std::env::var("S2_AUTH_TOKEN")?;
let config = ClientConfig::new(token);
let basin: BasinName = "my-basin".parse()?;
let stream = "my-stream";
let stream_client = StreamClient::new(config, basin, stream);

let tail = stream_client.check_tail().await?;
if tail == 0 {
println!("Empty stream");
return Ok(());
}

let latest_seq_num = tail - 1;

let read_limit = ReadLimit { count: 1, bytes: 0 };
let read_request = ReadRequest::new(latest_seq_num).with_limit(read_limit);
let latest_record = stream_client.read(read_request).await?;

println!("{latest_record:#?}");

Ok(())
}
Loading

0 comments on commit 723dfcd

Please sign in to comment.