Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make Execute::query async by default. add ExecuteBlocking trait for blocking APIs #1139

Merged
merged 4 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions postgres/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# unreleased 0.2.0
## Remove
- remove `prepare`, `query`, `execute`, `query_raw`, `execute_raw`, `query_simple` and `execute_simple` methods from all types. Leave only `Execute` trait as sole query API
- remove `prepare`, `query`, `execute`, `query_raw`, `execute_raw`, `query_simple` and `execute_simple` methods from all types. Leave only `Execute` trait family as sole API
```rust
use xitca_postgres::{Client, Execute, RowSimpleStream, RowStream, Statement};
// create a named statement and execute it. on success returns a prepared statement
let stmt: StatementGuarded<'_, Client> = Statement::named("SELECT 1").execute(&client).await?;
// query with the prepared statement. on success returns an async row stream.
let stream: RowStream<'_> = stmt.query(&client)?;
let stream: RowStream<'_> = stmt.query(&client).await?;
// query with raw string sql.
let stream: RowSimpleStream<'_> = "SELECT 1; SELECT 1".query(&client)?;
let stream: RowSimpleStream<'_> = "SELECT 1; SELECT 1".query(&client).await?;
// execute raw string sql.
let row_affected: u64 = "SELECT 1; SELECT 1".execute(&client).await?;
// execute sql file.
let row_affected: u64 = std::path::Path::new("./foo.sql").execute(&client).await?;
```
- remove `Client::pipeline` and `Pool::pipeline`. `pipeline::Pipeline` type can be execute with `Execute::query` method

Expand All @@ -23,8 +25,8 @@

// use ExecuteMut trait to add query to pipeline
use xitca_postgres::ExecuteMut;
stmt.query(&mut pipe)?;
stmt.query(&mut pipe)?;
stmt.query_mut(&mut pipe)?;
stmt.query_mut(&mut pipe)?;

// use Execute trait to start pipeline query
let pipe_stream = pipe.query(&client)?;
Expand All @@ -38,30 +40,30 @@
// prepare a statement.
let stmt = Statement::named("SELECT * FROM users WHERE id = $1 AND age = $2", &[Type::INT4, Type::INT4]).execute(&client).await?;
// bind statement to typed value and start query
let stream = stmt.bind([9527, 42]).query(&client)?;
let stream = stmt.bind([9527, 42]).query(&client).await?;
```
- query without parameter value can be queried with `Statement` alone.
```rust
use xitca_postgres::Execute;
// prepare a statement.
let stmt = Statement::named("SELECT * FROM users", &[]).execute(&client).await?;
// statement have no value params and can be used for query.
let stream = stmt.query(&client)?;
let stream = stmt.query(&client).await?;
```
- `AsyncLendingIterator` is no longer exported from crate's root path. use `iter::AsyncLendingIterator` instead
- `query::RowStreamOwned` and `row::RowOwned` are no longer behind `compat` crate feature anymore
- `statement::Statement::unnamed` must bind to value parameters with `bind` or `bind_dyn` before calling `Execute` methods.
```rust
let stmt = Statement::unnamed("SELECT * FROM users WHERE id = $1", &[Type::INT4]);
let row_stream = stmt.bind([9527]).query(&client)?;
let row_stream = stmt.bind([9527]).query(&client).await?;
```
- `Query::_send_encode_query` method's return type is changed to `Result<(<S as Encode>::Output, Response), Error>`. Enabling further simplify of the surface level API at the cost of more internal complexity
- `Encode` trait implementation detail change
- `IntoStream` trait is renamed to `IntoResponse` with implementation detail change

## Add
- add `Execute` and `ExecuteMut` traits for extending query customization
- add `Prepare::{_prepare_blocking, _get_type_blocking}`
- add `Execute`, `ExecuteMut`, `ExecuteBlocking` traits for extending query customization
- add `Prepare::_get_type_blocking`
- add `iter::AsyncLendingIteratorExt` for extending async iterator APIs
- add `statement::Statement::{bind, bind_dyn}` methods for binding value parameters to a prepared statement for query
- add `query::RowSimpleStreamOwned`
Expand Down
1 change: 0 additions & 1 deletion postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ rustls-pemfile = { version = "2", optional = true }
[dev-dependencies]
xitca-postgres = { version = "0.2", features = ["compat"] }
xitca-postgres-codegen = "0.1"
async-stream = "0.3"
bb8 = "0.8.5"
futures = { version = "0.3", default-features = false }
rcgen = "0.13"
Expand Down
8 changes: 4 additions & 4 deletions postgres/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// bind the prepared statement to parameter values it declared.
// when parameters are different Rust types it's suggested to use dynamic binding as following
// query with the bind and get an async streaming for database rows on success
let mut stream = stmt.bind_dyn(&[&1i32, &"alice"]).query(&cli)?;
let mut stream = stmt.bind_dyn(&[&1i32, &"alice"]).query(&cli).await?;

// use async iterator to visit rows
let row = stream.try_next().await?.ok_or("no row found")?;
Expand All @@ -88,7 +88,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
assert!(stream.try_next().await?.is_none());

// like execute method. query can be used with raw sql string.
let mut stream = "SELECT id, name FROM foo WHERE name = 'david'".query(&cli)?;
let mut stream = "SELECT id, name FROM foo WHERE name = 'david'".query(&cli).await?;
let row = stream.try_next().await?.ok_or("no row found")?;

// unlike query with prepared statement. raw sql query would return rows that can only be parsed to Rust string types.
Expand All @@ -104,13 +104,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
## Synchronous API
`xitca_postgres::Client` can run outside of tokio async runtime and using blocking API to interact with database
```rust
use xitca_postgres::{Client, Error, Execute};
use xitca_postgres::{Client, Error, ExecuteBlocking};

fn query(client: &Client) -> Result<(), Error> {
// execute sql query with blocking api
"SELECT 1".execute_blocking(client)?;

let stream = "SELECT 1".query(client)?;
let stream = "SELECT 1".query_blocking(client)?;

// use iterator to visit streaming rows
for item in stream {
Expand Down
39 changes: 12 additions & 27 deletions postgres/examples/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use std::{
pin::Pin,
};

use futures::stream::{Stream, TryStreamExt};
use xitca_postgres::{row::RowOwned, types::Type, Client, Error, Execute, Postgres, RowStreamOwned, Statement};
use xitca_postgres::{
iter::AsyncLendingIteratorExt, types::Type, Client, Error, Execute, Postgres, RowStreamOwned, Statement,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Expand Down Expand Up @@ -45,7 +46,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
type ExecuteOutput = Pin<Box<dyn Future<Output = Result<u64, Error>> + Send + 'c>>;

// like the execute but output an async stream iterator that produces database rows.
type QueryOutput = Pin<Box<dyn Stream<Item = Result<RowOwned, Error>> + Send + 'c>>;
type QueryOutput = Pin<Box<dyn Future<Output = Result<RowStreamOwned, Error>> + Send + 'c>>;

fn execute(self, cli: &'c Client) -> Self::ExecuteOutput {
// move PrepareAndExecute<'p> and &'c Client into an async block.
Expand All @@ -60,32 +61,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

fn query(self, cli: &'c Client) -> Self::QueryOutput {
// async stream macro is used to move prepare statement and query into a single
// streaming type.
Box::pin(async_stream::try_stream! {
Box::pin(async {
// prepare statement and query for async iterator of rows
let stmt = Statement::named(self.stmt, self.types)
.execute(cli)
.await?;
let stream = stmt.query(cli)?;

// async stream macro does not support lending iterator types and we convert
// row stream to an owned version where it does not contain references.
let mut stream = RowStreamOwned::from(stream);

// futures::stream::TryStreamExt trait is utilized here to produce database rows.
while let Some(item) = stream.try_next().await? {
yield item;
}
let stmt = Statement::named(self.stmt, self.types).execute(cli).await?;
let stream = stmt.query(cli).await?;
// convert borrowed stream to owned stream. as borrowed stream reference the statement this function
// just produced.
Ok(RowStreamOwned::from(stream))
})
}

// blocking version execute method. it's much simpler to implement than it's async variant.
fn execute_blocking(self, cli: &Client) -> Result<u64, Error> {
Statement::named(self.stmt, self.types)
.execute_blocking(cli)?
.execute_blocking(cli)
}
}

// use the new type to prepare and execute a statement.
Expand All @@ -103,7 +87,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
types: &[],
}
.query(&cli)
// use TryStreamExt trait methods to visit rows and collect column index 0 to integers.
.await?
// use async iterator methods to visit rows and collect column index 0 to integers.
.map_ok(|row| row.get::<i32>(0))
.try_collect::<Vec<_>>()
.await?;
Expand Down
11 changes: 4 additions & 7 deletions postgres/examples/macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.await?;
tokio::spawn(drv.into_future());

"CREATE TEMPORARY TABLE foo (id SERIAL, name TEXT)"
.execute(&cli)
.await?;
"INSERT INTO foo (name) VALUES ('alice'), ('bob'), ('charlie');"
.execute(&cli)
.await?;
std::path::Path::new("./samples/test.sql").execute(&cli).await?;

// this macro is expand into xitca_postgres::statement::Statement::unnamed
// it's also possible to utilize xitca-postgres's Execute traits for more customizable macro usage
let mut stream = sql!("SELECT * FROM foo WHERE id = $1 AND name = $2", &1i32, &"alice").query(&cli)?;
let mut stream = sql!("SELECT * FROM foo WHERE id = $1 AND name = $2", &1i32, &"alice")
.query(&cli)
.await?;

// the macro also have basic function for sql validation check. try uncomment below to see compile error.
// let _ = sql!("SELECT * FRO foo WHERR id = $1 AN name = $2", &1i32, &"alice");
Expand Down
4 changes: 2 additions & 2 deletions postgres/examples/pooling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

// you can forward query to xitca-postgres's client completely.
let transaction = conn.conn.transaction().await?;
let mut res = "SELECT 1".query(&transaction)?;
let mut res = "SELECT 1".query(&transaction).await?;
let row = res.try_next().await?.ok_or("row not found")?;
assert_eq!(Some("1"), row.get(0));
transaction.rollback().await?;

// or use the new type definition of your pool connection for additional state and functionalities your
// connection type could offer
let transaction = conn.transaction().await?;
let mut res = "SELECT 1".query(&transaction)?;
let mut res = "SELECT 1".query(&transaction).await?;
let row = res.try_next().await?.ok_or("row not found")?;
assert_eq!(Some("1"), row.get(0));
transaction.commit().await?;
Expand Down
4 changes: 2 additions & 2 deletions postgres/examples/sync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! example of running client in non async environment.

use std::future::IntoFuture;
use xitca_postgres::{types::Type, Execute, Postgres, Statement};
use xitca_postgres::{types::Type, ExecuteBlocking, Postgres, Statement};

fn main() -> Result<(), Box<dyn std::error::Error>> {
// prepare a tokio runtime for client's Driver.
Expand Down Expand Up @@ -32,7 +32,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
)
.execute_blocking(&cli)?;
// query api shares the same convention no matter the context.
let stream = stmt.bind_dyn(&[&4i32, &"david"]).query(&cli)?;
let stream = stmt.bind_dyn(&[&4i32, &"david"]).query_blocking(&cli)?;

// async row stream implement IntoIterator trait to convert stream into a sync iterator.
for item in stream {
Expand Down
2 changes: 2 additions & 0 deletions postgres/samples/test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE TEMPORARY TABLE foo (id SERIAL, name TEXT);
INSERT INTO foo (name) VALUES ('alice'), ('bob'), ('charlie');
2 changes: 1 addition & 1 deletion postgres/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async fn dns_resolve<'p>(host: &'p str, ports: &'p [u16]) -> Result<impl Iterato
/// drop(drv);
///
/// // client will always return error when it's driver is gone.
/// let e = "SELECT 1".query(&cli).unwrap_err();
/// let e = "SELECT 1".query(&cli).await.unwrap_err();
/// // a shortcut method can be used to determine if the error is caused by a shutdown driver.
/// assert!(e.is_driver_down());
///
Expand Down
2 changes: 1 addition & 1 deletion postgres/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ from_impl!(Completed);
/// let handle = tokio::spawn(drv.into_future());
///
/// // when query returns error immediately we check if the driver is gone.
/// if let Err(e) = "".query(&cli) {
/// if let Err(e) = "".query(&cli).await {
/// if e.is_driver_down() {
/// // driver is gone and we want to know detail reason in this case.
/// // await on the join handle will return the output of Driver task.
Expand Down
Loading
Loading