Skip to content

Commit

Permalink
make Execute::query async by default. add ExecuteBlocking trait for b…
Browse files Browse the repository at this point in the history
…locking APIs (#1139)

* make Execute::query async by default

* reduce dev deps

* opt out of async for pipeline execution

* add test for execute sql file
  • Loading branch information
fakeshadow authored Oct 1, 2024
1 parent 3ea804a commit da46ef0
Show file tree
Hide file tree
Showing 21 changed files with 426 additions and 294 deletions.
22 changes: 12 additions & 10 deletions postgres/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# unreleased 0.2.0
## Remove
- remove `prepare`, `query`, `execute`, `query_raw`, `execute_raw`, `query_simple` and `execute_simple` methods from all types. Leave only `Execute` trait as sole query API
- remove `prepare`, `query`, `execute`, `query_raw`, `execute_raw`, `query_simple` and `execute_simple` methods from all types. Leave only `Execute` trait family as sole API
```rust
use xitca_postgres::{Client, Execute, RowSimpleStream, RowStream, Statement};
// create a named statement and execute it. on success returns a prepared statement
let stmt: StatementGuarded<'_, Client> = Statement::named("SELECT 1").execute(&client).await?;
// query with the prepared statement. on success returns an async row stream.
let stream: RowStream<'_> = stmt.query(&client)?;
let stream: RowStream<'_> = stmt.query(&client).await?;
// query with raw string sql.
let stream: RowSimpleStream<'_> = "SELECT 1; SELECT 1".query(&client)?;
let stream: RowSimpleStream<'_> = "SELECT 1; SELECT 1".query(&client).await?;
// execute raw string sql.
let row_affected: u64 = "SELECT 1; SELECT 1".execute(&client).await?;
// execute sql file.
let row_affected: u64 = std::path::Path::new("./foo.sql").execute(&client).await?;
```
- remove `Client::pipeline` and `Pool::pipeline`. `pipeline::Pipeline` type can be execute with `Execute::query` method

Expand All @@ -23,8 +25,8 @@

// use ExecuteMut trait to add query to pipeline
use xitca_postgres::ExecuteMut;
stmt.query(&mut pipe)?;
stmt.query(&mut pipe)?;
stmt.query_mut(&mut pipe)?;
stmt.query_mut(&mut pipe)?;

// use Execute trait to start pipeline query
let pipe_stream = pipe.query(&client)?;
Expand All @@ -38,30 +40,30 @@
// prepare a statement.
let stmt = Statement::named("SELECT * FROM users WHERE id = $1 AND age = $2", &[Type::INT4, Type::INT4]).execute(&client).await?;
// bind statement to typed value and start query
let stream = stmt.bind([9527, 42]).query(&client)?;
let stream = stmt.bind([9527, 42]).query(&client).await?;
```
- query without parameter value can be queried with `Statement` alone.
```rust
use xitca_postgres::Execute;
// prepare a statement.
let stmt = Statement::named("SELECT * FROM users", &[]).execute(&client).await?;
// statement have no value params and can be used for query.
let stream = stmt.query(&client)?;
let stream = stmt.query(&client).await?;
```
- `AsyncLendingIterator` is no longer exported from crate's root path. use `iter::AsyncLendingIterator` instead
- `query::RowStreamOwned` and `row::RowOwned` are no longer behind `compat` crate feature anymore
- `statement::Statement::unnamed` must bind to value parameters with `bind` or `bind_dyn` before calling `Execute` methods.
```rust
let stmt = Statement::unnamed("SELECT * FROM users WHERE id = $1", &[Type::INT4]);
let row_stream = stmt.bind([9527]).query(&client)?;
let row_stream = stmt.bind([9527]).query(&client).await?;
```
- `Query::_send_encode_query` method's return type is changed to `Result<(<S as Encode>::Output, Response), Error>`. Enabling further simplify of the surface level API at the cost of more internal complexity
- `Encode` trait implementation detail change
- `IntoStream` trait is renamed to `IntoResponse` with implementation detail change

## Add
- add `Execute` and `ExecuteMut` traits for extending query customization
- add `Prepare::{_prepare_blocking, _get_type_blocking}`
- add `Execute`, `ExecuteMut`, `ExecuteBlocking` traits for extending query customization
- add `Prepare::_get_type_blocking`
- add `iter::AsyncLendingIteratorExt` for extending async iterator APIs
- add `statement::Statement::{bind, bind_dyn}` methods for binding value parameters to a prepared statement for query
- add `query::RowSimpleStreamOwned`
Expand Down
1 change: 0 additions & 1 deletion postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ rustls-pemfile = { version = "2", optional = true }
[dev-dependencies]
xitca-postgres = { version = "0.2", features = ["compat"] }
xitca-postgres-codegen = "0.1"
async-stream = "0.3"
bb8 = "0.8.5"
futures = { version = "0.3", default-features = false }
rcgen = "0.13"
Expand Down
8 changes: 4 additions & 4 deletions postgres/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// bind the prepared statement to parameter values it declared.
// when parameters are different Rust types it's suggested to use dynamic binding as following
// query with the bind and get an async streaming for database rows on success
let mut stream = stmt.bind_dyn(&[&1i32, &"alice"]).query(&cli)?;
let mut stream = stmt.bind_dyn(&[&1i32, &"alice"]).query(&cli).await?;

// use async iterator to visit rows
let row = stream.try_next().await?.ok_or("no row found")?;
Expand All @@ -88,7 +88,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
assert!(stream.try_next().await?.is_none());

// like execute method. query can be used with raw sql string.
let mut stream = "SELECT id, name FROM foo WHERE name = 'david'".query(&cli)?;
let mut stream = "SELECT id, name FROM foo WHERE name = 'david'".query(&cli).await?;
let row = stream.try_next().await?.ok_or("no row found")?;

// unlike query with prepared statement. raw sql query would return rows that can only be parsed to Rust string types.
Expand All @@ -104,13 +104,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
## Synchronous API
`xitca_postgres::Client` can run outside of tokio async runtime and using blocking API to interact with database
```rust
use xitca_postgres::{Client, Error, Execute};
use xitca_postgres::{Client, Error, ExecuteBlocking};

fn query(client: &Client) -> Result<(), Error> {
// execute sql query with blocking api
"SELECT 1".execute_blocking(client)?;

let stream = "SELECT 1".query(client)?;
let stream = "SELECT 1".query_blocking(client)?;

// use iterator to visit streaming rows
for item in stream {
Expand Down
39 changes: 12 additions & 27 deletions postgres/examples/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use std::{
pin::Pin,
};

use futures::stream::{Stream, TryStreamExt};
use xitca_postgres::{row::RowOwned, types::Type, Client, Error, Execute, Postgres, RowStreamOwned, Statement};
use xitca_postgres::{
iter::AsyncLendingIteratorExt, types::Type, Client, Error, Execute, Postgres, RowStreamOwned, Statement,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Expand Down Expand Up @@ -45,7 +46,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
type ExecuteOutput = Pin<Box<dyn Future<Output = Result<u64, Error>> + Send + 'c>>;

// like the execute but output an async stream iterator that produces database rows.
type QueryOutput = Pin<Box<dyn Stream<Item = Result<RowOwned, Error>> + Send + 'c>>;
type QueryOutput = Pin<Box<dyn Future<Output = Result<RowStreamOwned, Error>> + Send + 'c>>;

fn execute(self, cli: &'c Client) -> Self::ExecuteOutput {
// move PrepareAndExecute<'p> and &'c Client into an async block.
Expand All @@ -60,32 +61,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

fn query(self, cli: &'c Client) -> Self::QueryOutput {
// async stream macro is used to move prepare statement and query into a single
// streaming type.
Box::pin(async_stream::try_stream! {
Box::pin(async {
// prepare statement and query for async iterator of rows
let stmt = Statement::named(self.stmt, self.types)
.execute(cli)
.await?;
let stream = stmt.query(cli)?;

// async stream macro does not support lending iterator types and we convert
// row stream to an owned version where it does not contain references.
let mut stream = RowStreamOwned::from(stream);

// futures::stream::TryStreamExt trait is utilized here to produce database rows.
while let Some(item) = stream.try_next().await? {
yield item;
}
let stmt = Statement::named(self.stmt, self.types).execute(cli).await?;
let stream = stmt.query(cli).await?;
// convert borrowed stream to owned stream. as borrowed stream reference the statement this function
// just produced.
Ok(RowStreamOwned::from(stream))
})
}

// blocking version execute method. it's much simpler to implement than it's async variant.
fn execute_blocking(self, cli: &Client) -> Result<u64, Error> {
Statement::named(self.stmt, self.types)
.execute_blocking(cli)?
.execute_blocking(cli)
}
}

// use the new type to prepare and execute a statement.
Expand All @@ -103,7 +87,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
types: &[],
}
.query(&cli)
// use TryStreamExt trait methods to visit rows and collect column index 0 to integers.
.await?
// use async iterator methods to visit rows and collect column index 0 to integers.
.map_ok(|row| row.get::<i32>(0))
.try_collect::<Vec<_>>()
.await?;
Expand Down
11 changes: 4 additions & 7 deletions postgres/examples/macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.await?;
tokio::spawn(drv.into_future());

"CREATE TEMPORARY TABLE foo (id SERIAL, name TEXT)"
.execute(&cli)
.await?;
"INSERT INTO foo (name) VALUES ('alice'), ('bob'), ('charlie');"
.execute(&cli)
.await?;
std::path::Path::new("./samples/test.sql").execute(&cli).await?;

// this macro is expand into xitca_postgres::statement::Statement::unnamed
// it's also possible to utilize xitca-postgres's Execute traits for more customizable macro usage
let mut stream = sql!("SELECT * FROM foo WHERE id = $1 AND name = $2", &1i32, &"alice").query(&cli)?;
let mut stream = sql!("SELECT * FROM foo WHERE id = $1 AND name = $2", &1i32, &"alice")
.query(&cli)
.await?;

// the macro also have basic function for sql validation check. try uncomment below to see compile error.
// let _ = sql!("SELECT * FRO foo WHERR id = $1 AN name = $2", &1i32, &"alice");
Expand Down
4 changes: 2 additions & 2 deletions postgres/examples/pooling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

// you can forward query to xitca-postgres's client completely.
let transaction = conn.conn.transaction().await?;
let mut res = "SELECT 1".query(&transaction)?;
let mut res = "SELECT 1".query(&transaction).await?;
let row = res.try_next().await?.ok_or("row not found")?;
assert_eq!(Some("1"), row.get(0));
transaction.rollback().await?;

// or use the new type definition of your pool connection for additional state and functionalities your
// connection type could offer
let transaction = conn.transaction().await?;
let mut res = "SELECT 1".query(&transaction)?;
let mut res = "SELECT 1".query(&transaction).await?;
let row = res.try_next().await?.ok_or("row not found")?;
assert_eq!(Some("1"), row.get(0));
transaction.commit().await?;
Expand Down
4 changes: 2 additions & 2 deletions postgres/examples/sync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! example of running client in non async environment.
use std::future::IntoFuture;
use xitca_postgres::{types::Type, Execute, Postgres, Statement};
use xitca_postgres::{types::Type, ExecuteBlocking, Postgres, Statement};

fn main() -> Result<(), Box<dyn std::error::Error>> {
// prepare a tokio runtime for client's Driver.
Expand Down Expand Up @@ -32,7 +32,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
)
.execute_blocking(&cli)?;
// query api shares the same convention no matter the context.
let stream = stmt.bind_dyn(&[&4i32, &"david"]).query(&cli)?;
let stream = stmt.bind_dyn(&[&4i32, &"david"]).query_blocking(&cli)?;

// async row stream implement IntoIterator trait to convert stream into a sync iterator.
for item in stream {
Expand Down
2 changes: 2 additions & 0 deletions postgres/samples/test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE TEMPORARY TABLE foo (id SERIAL, name TEXT);
INSERT INTO foo (name) VALUES ('alice'), ('bob'), ('charlie');
2 changes: 1 addition & 1 deletion postgres/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async fn dns_resolve<'p>(host: &'p str, ports: &'p [u16]) -> Result<impl Iterato
/// drop(drv);
///
/// // client will always return error when it's driver is gone.
/// let e = "SELECT 1".query(&cli).unwrap_err();
/// let e = "SELECT 1".query(&cli).await.unwrap_err();
/// // a shortcut method can be used to determine if the error is caused by a shutdown driver.
/// assert!(e.is_driver_down());
///
Expand Down
2 changes: 1 addition & 1 deletion postgres/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ from_impl!(Completed);
/// let handle = tokio::spawn(drv.into_future());
///
/// // when query returns error immediately we check if the driver is gone.
/// if let Err(e) = "".query(&cli) {
/// if let Err(e) = "".query(&cli).await {
/// if e.is_driver_down() {
/// // driver is gone and we want to know detail reason in this case.
/// // await on the join handle will return the output of Driver task.
Expand Down
Loading

0 comments on commit da46ef0

Please sign in to comment.