Skip to content

Commit

Permalink
examples: adjust to use the new interface
Browse files Browse the repository at this point in the history
This commit goes over all unadjusted examples and changes them to use
the new deserialization framework. Again, it contains a lot of changes,
but they are quite simple.

Co-authored-by: Wojciech Przytuła <[email protected]>
  • Loading branch information
piodul and wprzytula committed Nov 6, 2024
1 parent b630d0d commit 560c6b3
Show file tree
Hide file tree
Showing 24 changed files with 193 additions and 154 deletions.
10 changes: 4 additions & 6 deletions examples/allocations.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use scylla::{statement::prepared_statement::PreparedStatement, LegacySession, SessionBuilder};
use scylla::transport::session::Session;
use scylla::{statement::prepared_statement::PreparedStatement, SessionBuilder};
use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -65,7 +66,7 @@ fn print_stats(stats: &stats_alloc::Stats, reqs: f64) {
}

async fn measure(
session: Arc<LegacySession>,
session: Arc<Session>,
prepared: Arc<PreparedStatement>,
reqs: usize,
parallelism: usize,
Expand Down Expand Up @@ -128,10 +129,7 @@ async fn main() -> Result<()> {

println!("Connecting to {} ...", args.node);

let session: LegacySession = SessionBuilder::new()
.known_node(args.node)
.build_legacy()
.await?;
let session: Session = SessionBuilder::new().known_node(args.node).build().await?;
let session = Arc::new(session);

session.query_unpaged("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?;
Expand Down
2 changes: 1 addition & 1 deletion examples/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async fn main() -> Result<()> {
let session = SessionBuilder::new()
.known_node(uri)
.user("cassandra", "cassandra")
.build_legacy()
.build()
.await
.unwrap();

Expand Down
33 changes: 17 additions & 16 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::Result;
use futures::TryStreamExt;
use scylla::macros::FromRow;
use scylla::transport::session::LegacySession;
use futures::StreamExt as _;
use futures::TryStreamExt as _;
use scylla::frame::response::result::Row;
use scylla::transport::session::Session;
use scylla::DeserializeRow;
use scylla::SessionBuilder;
use std::env;

Expand All @@ -11,7 +13,7 @@ async fn main() -> Result<()> {

println!("Connecting to {} ...", uri);

let session: LegacySession = SessionBuilder::new().known_node(uri).build_legacy().await?;
let session: Session = SessionBuilder::new().known_node(uri).build().await?;

session.query_unpaged("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?;

Expand Down Expand Up @@ -53,39 +55,38 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?
.into_typed::<(i32, i32, String)>();
.rows_stream::<(i32, i32, String)>()?;
while let Some((a, b, c)) = iter.try_next().await? {
println!("a, b, c: {}, {}, {}", a, b, c);
}

// Or as custom structs that derive FromRow
#[derive(Debug, FromRow)]
// Or as custom structs that derive DeserializeRow
#[allow(unused)]
#[derive(Debug, DeserializeRow)]
struct RowData {
_a: i32,
_b: Option<i32>,
_c: String,
a: i32,
b: Option<i32>,
c: String,
}

let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?
.into_typed::<RowData>();
.rows_stream::<RowData>()?;
while let Some(row_data) = iter.try_next().await? {
println!("row_data: {:?}", row_data);
}

// Or simply as untyped rows
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?;
while let Some(row) = iter.try_next().await? {
.await?
.rows_stream::<Row>()?;
while let Some(row) = iter.next().await.transpose()? {
let a = row.columns[0].as_ref().unwrap().as_int().unwrap();
let b = row.columns[1].as_ref().unwrap().as_int().unwrap();
let c = row.columns[2].as_ref().unwrap().as_text().unwrap();
println!("a, b, c: {}, {}, {}", a, b, c);

// Alternatively each row can be parsed individually
// let (a2, b2, c2) = row.into_typed::<(i32, i32, String)>() ?;
}

let metrics = session.get_metrics();
Expand Down
2 changes: 1 addition & 1 deletion examples/cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async fn main() -> Result<()> {
.unwrap_or("examples/config_data.yaml".to_owned());
let session = CloudSessionBuilder::new(Path::new(&config_path))
.unwrap()
.build_legacy()
.build()
.await
.unwrap();

Expand Down
8 changes: 5 additions & 3 deletions examples/compare-tokens.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use scylla::routing::Token;
use scylla::transport::NodeAddr;
use scylla::{LegacySession, SessionBuilder};
use scylla::{Session, SessionBuilder};
use std::env;

#[tokio::main]
Expand All @@ -10,7 +10,7 @@ async fn main() -> Result<()> {

println!("Connecting to {} ...", uri);

let session: LegacySession = SessionBuilder::new().known_node(uri).build_legacy().await?;
let session: Session = SessionBuilder::new().known_node(uri).build().await?;

session.query_unpaged("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?;

Expand Down Expand Up @@ -51,7 +51,9 @@ async fn main() -> Result<()> {
(pk,),
)
.await?
.single_row_typed::<(i64,)>()?;
.into_rows_result()?
.expect("Got not Rows result")
.single_row()?;
assert_eq!(t, qt);
println!("token for {}: {}", pk, t);
}
Expand Down
24 changes: 12 additions & 12 deletions examples/cql-time-types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

use anyhow::Result;
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use futures::{StreamExt, TryStreamExt};
use futures::{StreamExt as _, TryStreamExt as _};
use scylla::frame::response::result::CqlValue;
use scylla::frame::value::{CqlDate, CqlTime, CqlTimestamp};
use scylla::transport::session::LegacySession;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use std::env;

Expand All @@ -16,7 +16,7 @@ async fn main() -> Result<()> {

println!("Connecting to {} ...", uri);

let session: LegacySession = SessionBuilder::new().known_node(uri).build_legacy().await?;
let session: Session = SessionBuilder::new().known_node(uri).build().await?;

session.query_unpaged("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?;

Expand Down Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(NaiveDate,)>();
.rows_stream::<(NaiveDate,)>()?;
while let Some(row_result) = iter.next().await {
let (read_date,): (NaiveDate,) = match row_result {
Ok(read_date) => read_date,
Expand All @@ -66,7 +66,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(time::Date,)>();
.rows_stream::<(time::Date,)>()?;
while let Some(row_result) = iter.next().await {
let (read_date,): (time::Date,) = match row_result {
Ok(read_date) => read_date,
Expand All @@ -88,7 +88,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(CqlValue,)>();
.rows_stream::<(CqlValue,)>()?;
while let Some(row_result) = iter.next().await {
let read_days: u32 = match row_result {
Ok((CqlValue::Date(CqlDate(days)),)) => days,
Expand Down Expand Up @@ -124,7 +124,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(NaiveTime,)>();
.rows_stream::<(NaiveTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Parsed a time into chrono::NaiveTime: {:?}", read_time);
}
Expand All @@ -139,7 +139,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(time::Time,)>();
.rows_stream::<(time::Time,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Parsed a time into time::Time: {:?}", read_time);
}
Expand All @@ -154,7 +154,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(CqlTime,)>();
.rows_stream::<(CqlTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Read a time as raw nanos: {:?}", read_time);
}
Expand Down Expand Up @@ -185,7 +185,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(DateTime<Utc>,)>();
.rows_stream::<(DateTime<Utc>,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!(
"Parsed a timestamp into chrono::DateTime<chrono::Utc>: {:?}",
Expand All @@ -206,7 +206,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(time::OffsetDateTime,)>();
.rows_stream::<(time::OffsetDateTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!(
"Parsed a timestamp into time::OffsetDateTime: {:?}",
Expand All @@ -227,7 +227,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(CqlTimestamp,)>();
.rows_stream::<(CqlTimestamp,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Read a timestamp as raw millis: {:?}", read_time);
}
Expand Down
47 changes: 27 additions & 20 deletions examples/cqlsh-rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use rustyline::completion::{Completer, Pair};
use rustyline::error::ReadlineError;
use rustyline::{CompletionType, Config, Context, Editor};
use rustyline_derive::{Helper, Highlighter, Hinter, Validator};
use scylla::frame::response::result::Row;
use scylla::transport::session::Session;
use scylla::transport::Compression;
use scylla::{LegacyQueryResult, LegacySession, SessionBuilder};
use scylla::QueryRowsResult;
use scylla::SessionBuilder;
use std::env;

#[derive(Helper, Highlighter, Validator, Hinter)]
Expand Down Expand Up @@ -173,23 +176,24 @@ impl Completer for CqlHelper {
}
}

fn print_result(result: &LegacyQueryResult) {
if result.rows.is_none() {
println!("OK");
return;
}
for row in result.rows.as_ref().unwrap() {
for column in &row.columns {
print!("|");
print!(
" {:16}",
match column {
None => "null".to_owned(),
Some(value) => format!("{:?}", value),
}
);
fn print_result(result: Option<&QueryRowsResult>) {
if let Some(rows_result) = result {
for row in rows_result.rows::<Row>().unwrap() {
let row = row.unwrap();
for column in &row.columns {
print!("|");
print!(
" {:16}",
match column {
None => "null".to_owned(),
Some(value) => format!("{:?}", value),
}
);
}
println!("|")
}
println!("|")
} else {
println!("OK");
}
}

Expand All @@ -199,10 +203,10 @@ async fn main() -> Result<()> {

println!("Connecting to {} ...", uri);

let session: LegacySession = SessionBuilder::new()
let session: Session = SessionBuilder::new()
.known_node(uri)
.compression(Some(Compression::Lz4))
.build_legacy()
.build()
.await?;

let config = Config::builder()
Expand All @@ -222,7 +226,10 @@ async fn main() -> Result<()> {
let maybe_res = session.query_unpaged(line, &[]).await;
match maybe_res {
Err(err) => println!("Error: {}", err),
Ok(res) => print_result(&res),
Ok(res) => {
let rows_res = res.into_rows_result()?;
print_result(rows_res.as_ref())
}
}
}
Err(ReadlineError::Interrupted) => continue,
Expand Down
7 changes: 5 additions & 2 deletions examples/custom_deserialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use anyhow::Result;
use scylla::cql_to_rust::{FromCqlVal, FromCqlValError};
use scylla::frame::response::result::CqlValue;
use scylla::macros::impl_from_cql_value_from_method;
use scylla::{LegacySession, SessionBuilder};
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use std::env;

#[tokio::main]
Expand All @@ -11,7 +12,7 @@ async fn main() -> Result<()> {

println!("Connecting to {} ...", uri);

let session: LegacySession = SessionBuilder::new().known_node(uri).build_legacy().await?;
let session: Session = SessionBuilder::new().known_node(uri).build().await?;

session.query_unpaged("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?;
session
Expand Down Expand Up @@ -46,6 +47,7 @@ async fn main() -> Result<()> {
(),
)
.await?
.into_legacy_result()?
.single_row_typed::<(MyType,)>()?;
assert_eq!(v, MyType("asdf".to_owned()));

Expand Down Expand Up @@ -73,6 +75,7 @@ async fn main() -> Result<()> {
(),
)
.await?
.into_legacy_result()?
.single_row_typed::<(MyOtherType,)>()?;
assert_eq!(v, MyOtherType("asdf".to_owned()));

Expand Down
6 changes: 3 additions & 3 deletions examples/custom_load_balancing_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use scylla::{
load_balancing::{LoadBalancingPolicy, RoutingInfo},
routing::Shard,
transport::{ClusterData, ExecutionProfile},
LegacySession, SessionBuilder,
Session, SessionBuilder,
};
use std::{env, sync::Arc};

Expand Down Expand Up @@ -68,10 +68,10 @@ async fn main() -> Result<()> {
.load_balancing_policy(Arc::new(custom_load_balancing))
.build();

let _session: LegacySession = SessionBuilder::new()
let _session: Session = SessionBuilder::new()
.known_node(uri)
.default_execution_profile_handle(profile.into_handle())
.build_legacy()
.build()
.await?;

Ok(())
Expand Down
Loading

0 comments on commit 560c6b3

Please sign in to comment.