From c40616e63aff53fb188abe19d85e2d154018e6f9 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 6 Oct 2024 00:06:24 +0800 Subject: [PATCH] remove ExecuteMut trait (#1146) --- http/src/h1/dispatcher_uring_unreal.rs | 30 ++++++++++---- postgres/CHANGES.md | 7 ++++ postgres/Cargo.toml | 2 +- postgres/examples/execute.rs | 4 +- postgres/src/execute.rs | 25 +++--------- postgres/src/execute/async_impl.rs | 12 +++--- postgres/src/execute/sync_impl.rs | 18 ++++----- postgres/src/lib.rs | 2 +- postgres/src/pipeline.rs | 26 ++++++------- postgres/src/pool.rs | 54 +++++++++++++++----------- postgres/tests/runtime.rs | 6 +-- postgres/tests/sync.rs | 6 +-- 12 files changed, 104 insertions(+), 88 deletions(-) diff --git a/http/src/h1/dispatcher_uring_unreal.rs b/http/src/h1/dispatcher_uring_unreal.rs index 4c8344e4..82a215f3 100644 --- a/http/src/h1/dispatcher_uring_unreal.rs +++ b/http/src/h1/dispatcher_uring_unreal.rs @@ -3,19 +3,23 @@ use core::mem::MaybeUninit; use std::rc::Rc; use http::StatusCode; -use httparse::Status; +use httparse::{Header, Status}; use xitca_io::{ bytes::{Buf, BytesMut}, net::io_uring::TcpStream, }; use xitca_service::{AsyncFn, Service}; -pub use httparse::Request; - use crate::date::{DateTime, DateTimeHandle, DateTimeService}; pub type Error = Box; +pub struct Request<'a> { + pub method: &'a str, + pub path: &'a str, + pub headers: &'a mut [Header<'a>], +} + pub struct Response<'a, const STEP: usize = 1> { buf: &'a mut BytesMut, date: &'a DateTimeHandle, @@ -49,10 +53,14 @@ impl<'a> Response<'a, 2> { self.want_write_date = false; } + let key = key.as_bytes(); + let val = val.as_bytes(); + + self.buf.reserve(key.len() + val.len() + 4); self.buf.extend_from_slice(b"\r\n"); - self.buf.extend_from_slice(key.as_bytes()); + self.buf.extend_from_slice(key); self.buf.extend_from_slice(b": "); - self.buf.extend_from_slice(val.as_bytes()); + self.buf.extend_from_slice(val); self } @@ -114,7 +122,7 @@ impl Dispatcher { impl Service for Dispatcher where - F: for<'h, 'b> AsyncFn<(Request<'h, 'b>, Response<'h>, &'h C), Output = Response<'h, 3>>, + F: for<'h, 'b> AsyncFn<(Request<'h>, Response<'h>, &'h C), Output = Response<'h, 3>>, { type Response = (); type Error = Error; @@ -135,12 +143,18 @@ where read_buf = buf; 'inner: loop { - let mut headers = const { [MaybeUninit::uninit(); 8] }; + let mut headers = [const { MaybeUninit::uninit() }; 8]; - let mut req = Request::new(&mut []); + let mut req = httparse::Request::new(&mut []); match req.parse_with_uninit_headers(&read_buf, &mut headers)? { Status::Complete(len) => { + let req = Request { + path: req.path.unwrap(), + method: req.method.unwrap(), + headers: req.headers, + }; + let res = Response { buf: &mut write_buf, date: self.date.get(), diff --git a/postgres/CHANGES.md b/postgres/CHANGES.md index abf2714e..bfc5ab7b 100644 --- a/postgres/CHANGES.md +++ b/postgres/CHANGES.md @@ -1,3 +1,10 @@ +# unreleased 0.3.0 +## Remove +- remove `ExecuteMut` trait. It's role is replaced by `impl Execute<&mut C>` + +## Change +- change `pool::Pool`'s dead connection detection lifecycle. + # 0.2.1 ## Fix - relax lifetime bound on various query types diff --git a/postgres/Cargo.toml b/postgres/Cargo.toml index 87e12821..fec7d2b1 100644 --- a/postgres/Cargo.toml +++ b/postgres/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "xitca-postgres" -version = "0.2.1" +version = "0.3.0" edition = "2021" license = "Apache-2.0" description = "an async postgres client" diff --git a/postgres/examples/execute.rs b/postgres/examples/execute.rs index 75ab8369..8259e207 100644 --- a/postgres/examples/execute.rs +++ b/postgres/examples/execute.rs @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box> { } // implement Execute trait - impl<'p, 'c> Execute<'c, Client> for PrepareAndExecute<'p> + impl<'p, 'c> Execute<&'c Client> for PrepareAndExecute<'p> where // in execute methods both PrepareAndExecute<'p> and &'c Client are moved into async block // and we use c as the output lifetime in Execute::ExecuteOutput's boxed async block. @@ -61,7 +61,7 @@ async fn main() -> Result<(), Box> { } fn query(self, cli: &'c Client) -> Self::QueryOutput { - Box::pin(async { + Box::pin(async move { // prepare statement and query for async iterator of rows let stmt = Statement::named(self.stmt, self.types).execute(cli).await?; let stream = stmt.query(cli).await?; diff --git a/postgres/src/execute.rs b/postgres/src/execute.rs index 684e68c5..1039f899 100644 --- a/postgres/src/execute.rs +++ b/postgres/src/execute.rs @@ -15,7 +15,7 @@ mod sync_impl; /// /// [`Encode`]: crate::driver::codec::encode::Encode /// [`IntoResponse`]: crate::driver::codec::response::IntoResponse -pub trait Execute<'c, C> +pub trait Execute where Self: Sized, { @@ -36,34 +36,21 @@ where type QueryOutput; /// define how a statement is executed with single time response - fn execute(self, cli: &'c C) -> Self::ExecuteOutput; + fn execute(self, cli: C) -> Self::ExecuteOutput; /// define how a statement is queried with repeated response - fn query(self, cli: &'c C) -> Self::QueryOutput; -} - -/// mutable version of [`Execute`] trait where C type is mutably borrowed -pub trait ExecuteMut<'c, C> -where - Self: Sized, -{ - type ExecuteMutOutput; - type QueryMutOutput; - - fn execute_mut(self, cli: &'c mut C) -> Self::ExecuteMutOutput; - - fn query_mut(self, cli: &'c mut C) -> Self::QueryMutOutput; + fn query(self, cli: C) -> Self::QueryOutput; } /// blocking version of [`Execute`] for synchronous environment -pub trait ExecuteBlocking<'c, C> +pub trait ExecuteBlocking where Self: Sized, { type ExecuteOutput; type QueryOutput; - fn execute_blocking(self, cli: &'c C) -> Self::ExecuteOutput; + fn execute_blocking(self, cli: C) -> Self::ExecuteOutput; - fn query_blocking(self, cli: &'c C) -> Self::QueryOutput; + fn query_blocking(self, cli: C) -> Self::QueryOutput; } diff --git a/postgres/src/execute/async_impl.rs b/postgres/src/execute/async_impl.rs index 8a671aa8..f42b37cf 100644 --- a/postgres/src/execute/async_impl.rs +++ b/postgres/src/execute/async_impl.rs @@ -18,7 +18,7 @@ use crate::{ use super::Execute; -impl<'s, C> Execute<'_, C> for &'s Statement +impl<'s, C> Execute<&C> for &'s Statement where C: Query, { @@ -36,7 +36,7 @@ where } } -impl Execute<'_, C> for &str +impl Execute<&C> for &str where C: Query, { @@ -76,7 +76,7 @@ where } } -impl<'c, C> Execute<'c, C> for StatementNamed<'_> +impl<'c, C> Execute<&'c C> for StatementNamed<'_> where C: Prepare + 'c, { @@ -96,7 +96,7 @@ where } } -impl<'s, C, P> Execute<'_, C> for StatementQuery<'s, P> +impl<'s, C, P> Execute<&C> for StatementQuery<'s, P> where C: Query, P: AsParams, @@ -115,7 +115,7 @@ where } } -impl<'c, C, P> Execute<'c, C> for StatementUnnamedBind<'_, P> +impl<'c, C, P> Execute<&'c C> for StatementUnnamedBind<'_, P> where C: Prepare + 'c, P: AsParams, @@ -136,7 +136,7 @@ where } } -impl<'c, C> Execute<'c, C> for &std::path::Path +impl<'c, C> Execute<&'c C> for &std::path::Path where C: Query + Sync + 'c, { diff --git a/postgres/src/execute/sync_impl.rs b/postgres/src/execute/sync_impl.rs index 68b17d8e..5998f14c 100644 --- a/postgres/src/execute/sync_impl.rs +++ b/postgres/src/execute/sync_impl.rs @@ -11,7 +11,7 @@ use crate::{ use super::ExecuteBlocking; -impl<'s, C> ExecuteBlocking<'_, C> for &'s Statement +impl<'s, C> ExecuteBlocking<&C> for &'s Statement where C: Query, { @@ -30,7 +30,7 @@ where } } -impl ExecuteBlocking<'_, C> for &str +impl ExecuteBlocking<&C> for &str where C: Query, { @@ -49,7 +49,7 @@ where } } -impl<'c, C> ExecuteBlocking<'c, C> for StatementNamed<'_> +impl<'c, C> ExecuteBlocking<&'c C> for StatementNamed<'_> where C: Prepare + 'c, { @@ -68,7 +68,7 @@ where } } -impl<'s, C, P> ExecuteBlocking<'_, C> for StatementQuery<'s, P> +impl<'s, C, P> ExecuteBlocking<&C> for StatementQuery<'s, P> where C: Query, P: AsParams, @@ -88,7 +88,7 @@ where } } -impl<'c, C, P> ExecuteBlocking<'c, C> for StatementUnnamedBind<'_, P> +impl<'c, C, P> ExecuteBlocking<&'c C> for StatementUnnamedBind<'_, P> where C: Prepare + 'c, P: AsParams, @@ -108,20 +108,20 @@ where } } -impl<'c, C> ExecuteBlocking<'c, C> for &std::path::Path +impl ExecuteBlocking<&C> for &std::path::Path where - C: Query + 'c, + C: Query, { type ExecuteOutput = Result; type QueryOutput = Result; #[inline] - fn execute_blocking(self, cli: &'c C) -> Self::ExecuteOutput { + fn execute_blocking(self, cli: &C) -> Self::ExecuteOutput { std::fs::read_to_string(self)?.execute_blocking(cli) } #[inline] - fn query_blocking(self, cli: &'c C) -> Self::QueryOutput { + fn query_blocking(self, cli: &C) -> Self::QueryOutput { std::fs::read_to_string(self)?.query_blocking(cli) } } diff --git a/postgres/src/lib.rs b/postgres/src/lib.rs index 259cf44a..3a3a820c 100644 --- a/postgres/src/lib.rs +++ b/postgres/src/lib.rs @@ -32,7 +32,7 @@ pub use self::{ config::Config, driver::Driver, error::Error, - execute::{Execute, ExecuteBlocking, ExecuteMut}, + execute::{Execute, ExecuteBlocking}, from_sql::FromSqlExt, query::{RowSimpleStream, RowSimpleStreamOwned, RowStream, RowStreamOwned}, session::Session, diff --git a/postgres/src/pipeline.rs b/postgres/src/pipeline.rs index 0806615c..cf462342 100644 --- a/postgres/src/pipeline.rs +++ b/postgres/src/pipeline.rs @@ -19,7 +19,7 @@ use super::{ column::Column, driver::codec::{self, encode::Encode, Response}, error::{Completed, Error}, - execute::{Execute, ExecuteMut}, + execute::Execute, iter::AsyncLendingIterator, query::Query, row::Row, @@ -31,7 +31,7 @@ use super::{ /// /// # Examples /// ```rust -/// use xitca_postgres::{iter::AsyncLendingIterator, pipeline::Pipeline, Client, Execute, ExecuteMut, Statement}; +/// use xitca_postgres::{iter::AsyncLendingIterator, pipeline::Pipeline, Client, Execute, Statement}; /// /// async fn pipeline(client: &Client) -> Result<(), Box> { /// // prepare a statement that will be called repeatedly. @@ -43,9 +43,9 @@ use super::{ /// /// // bind value param to statement and query with the pipeline. /// // pipeline can encode multiple queries locally before send it to database. -/// statement.bind([] as [i32; 0]).query_mut(&mut pipe)?; -/// statement.bind([] as [i32; 0]).query_mut(&mut pipe)?; -/// statement.bind([] as [i32; 0]).query_mut(&mut pipe)?; +/// statement.bind([] as [i32; 0]).query(&mut pipe)?; +/// statement.bind([] as [i32; 0]).query(&mut pipe)?; +/// statement.bind([] as [i32; 0]).query(&mut pipe)?; /// /// // query the pipeline and on success a streaming response will be returned. /// let mut res = pipe.query(client)?; @@ -235,20 +235,20 @@ impl<'b, const SYNC_MODE: bool> Pipeline<'_, Borrowed<'b>, SYNC_MODE> { } } -impl<'a, B, E, const SYNC_MODE: bool> ExecuteMut<'_, Pipeline<'a, B, SYNC_MODE>> for E +impl<'a, B, E, const SYNC_MODE: bool> Execute<&mut Pipeline<'a, B, SYNC_MODE>> for E where B: DerefMut, E: Encode, { - type ExecuteMutOutput = Self::QueryMutOutput; - type QueryMutOutput = Result<(), Error>; + type ExecuteOutput = Self::QueryOutput; + type QueryOutput = Result<(), Error>; #[inline] - fn execute_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::ExecuteMutOutput { - self.query_mut(pipe) + fn execute(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::ExecuteOutput { + self.query(pipe) } - fn query_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::QueryMutOutput { + fn query(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::QueryOutput { let len = pipe.buf.len(); self.encode::(&mut pipe.buf) @@ -264,7 +264,7 @@ pub struct PipelineQuery<'a, 'b> { pub(crate) buf: &'b [u8], } -impl<'p, C, B, const SYNC_MODE: bool> Execute<'_, C> for Pipeline<'p, B, SYNC_MODE> +impl<'p, C, B, const SYNC_MODE: bool> Execute<&C> for Pipeline<'p, B, SYNC_MODE> where C: Query, B: DerefMut, @@ -304,7 +304,7 @@ where } } -impl<'p, C, B, const SYNC_MODE: bool> ExecuteBlocking<'_, C> for Pipeline<'p, B, SYNC_MODE> +impl<'p, C, B, const SYNC_MODE: bool> ExecuteBlocking<&C> for Pipeline<'p, B, SYNC_MODE> where C: Query, B: DerefMut, diff --git a/postgres/src/pool.rs b/postgres/src/pool.rs index b63e5895..53e5ba91 100644 --- a/postgres/src/pool.rs +++ b/postgres/src/pool.rs @@ -19,7 +19,7 @@ use super::{ copy::{r#Copy, CopyIn, CopyOut}, driver::codec::{encode::Encode, Response}, error::Error, - execute::{Execute, ExecuteMut}, + execute::Execute, iter::AsyncLendingIterator, prepare::Prepare, query::Query, @@ -83,10 +83,9 @@ impl Pool { /// return as [Error] pub async fn get(&self) -> Result, Error> { let _permit = self.permits.acquire().await.expect("Semaphore must not be closed"); - let conn = self.conn.lock().unwrap().pop_front(); - let conn = match conn { - Some(conn) => conn, - None => self.connect().await?, + let conn = match self.conn.lock().unwrap().pop_front() { + Some(conn) if !conn.client.closed() => conn, + _ => self.connect().await?, }; Ok(PoolConnection { pool: self, @@ -113,16 +112,28 @@ impl Pool { /// a set of public is exposed to interact with them. /// /// # Caching -/// PoolConnection contains cache set of [`Statement`] to speed up regular used sql queries. By default -/// when calling [`Execute::execute`] on a [`StatementNamed`] the pool connection does nothing and function -/// the same as a regular [`Client`]. In order to utilize the cache caller must execute the named statement -/// through [`ExecuteMut::execute_mut`]. This method call will look up local statement cache hand out a copy -/// of in the type of [`Arc`]. If no copy is found in the cache pool connection will prepare a -/// new statement and insert it into the cache. +/// PoolConnection contains cache set of [`Statement`] to speed up regular used sql queries. when calling +/// [`Execute::execute`] on a [`StatementNamed`] with &[`PoolConnection`] the pool connection does nothing +/// special and function the same as a regular [`Client`]. In order to utilize the cache caller must execute +/// the named statement with &mut [`PoolConnection`]. With a mutable reference of pool connection it will do +/// local cache look up for statement and hand out one in the type of [`Arc`] if any found. If no +/// copy is found in the cache pool connection will prepare a new statement and insert it into the cache. +/// ## Examples +/// ``` +/// # use xitca_postgres::{pool::Pool, Execute, Error, Statement}; +/// # async fn cached(pool: &Pool) -> Result<(), Error> { +/// let mut conn = pool.get().await?; +/// // prepare a statement without caching +/// Statement::named("SELECT 1", &[]).execute(&conn).await?; +/// // prepare a statement with caching from conn. +/// Statement::named("SELECT 1", &[]).execute(&mut conn).await?; +/// # Ok(()) +/// # } +/// ``` /// /// * When to use caching or not: /// - query statement repeatedly called intensely can benefit from cache. -/// - query statement with low latency requirement can benefit from upfront cached. +/// - query statement with low latency requirement can benefit from upfront cache. /// - rare query statement can benefit from no caching by reduce resource usage from the server side. For low /// latency of rare query consider use [`Statement::unnamed`] as alternative. pub struct PoolConnection<'a> { @@ -269,9 +280,6 @@ impl r#Copy for PoolConnection<'_> { impl Drop for PoolConnection<'_> { fn drop(&mut self) { let conn = self.conn.take().unwrap(); - if conn.client.closed() { - return; - } self.pool.conn.lock().unwrap().push_back(conn); } } @@ -290,26 +298,26 @@ impl PoolClient { } } -impl<'c, 's> ExecuteMut<'c, PoolConnection<'_>> for StatementNamed<'s> +impl<'c, 's> Execute<&'c mut PoolConnection<'_>> for StatementNamed<'s> where 's: 'c, { - type ExecuteMutOutput = StatementCacheFuture<'c>; - type QueryMutOutput = Self::ExecuteMutOutput; + type ExecuteOutput = StatementCacheFuture<'c>; + type QueryOutput = Self::ExecuteOutput; - fn execute_mut(self, cli: &'c mut PoolConnection) -> Self::ExecuteMutOutput { + fn execute(self, cli: &'c mut PoolConnection) -> Self::ExecuteOutput { match cli.conn().statements.get(self.stmt) { Some(stmt) => StatementCacheFuture::Cached(stmt.clone()), None => StatementCacheFuture::Prepared(Box::pin(async move { - let stmt = self.execute(cli).await?.leak(); + let stmt = self.execute(&*cli).await?.leak(); Ok(cli.insert_cache(self.stmt, stmt)) })), } } #[inline] - fn query_mut(self, cli: &'c mut PoolConnection) -> Self::QueryMutOutput { - self.execute_mut(cli) + fn query(self, cli: &'c mut PoolConnection) -> Self::QueryOutput { + self.execute(cli) } } @@ -350,7 +358,7 @@ mod test { let mut conn = pool.get().await.unwrap(); - let stmt = Statement::named("SELECT 1", &[]).execute_mut(&mut conn).await.unwrap(); + let stmt = Statement::named("SELECT 1", &[]).execute(&mut conn).await.unwrap(); stmt.execute(&conn.consume()).await.unwrap(); } } diff --git a/postgres/tests/runtime.rs b/postgres/tests/runtime.rs index 1502877b..b383596e 100644 --- a/postgres/tests/runtime.rs +++ b/postgres/tests/runtime.rs @@ -6,7 +6,7 @@ use xitca_postgres::{ pipeline::Pipeline, statement::Statement, types::Type, - Client, Execute, ExecuteMut, Postgres, + Client, Execute, Postgres, }; async fn connect(s: &str) -> Client { @@ -313,8 +313,8 @@ async fn pipeline() { let mut pipe = Pipeline::new(); - stmt.query_mut(&mut pipe).unwrap(); - stmt.query_mut(&mut pipe).unwrap(); + stmt.query(&mut pipe).unwrap(); + stmt.query(&mut pipe).unwrap(); let mut res = pipe.query(&cli).unwrap(); diff --git a/postgres/tests/sync.rs b/postgres/tests/sync.rs index 1c2ae2cc..a75887d3 100644 --- a/postgres/tests/sync.rs +++ b/postgres/tests/sync.rs @@ -5,7 +5,7 @@ use xitca_postgres::{ pipeline::Pipeline, statement::Statement, types::Type, - Client, ExecuteBlocking, ExecuteMut, Postgres, + Client, Execute, ExecuteBlocking, Postgres, }; fn connect() -> Client { @@ -112,8 +112,8 @@ fn pipeline_blocking() { let mut pipe = Pipeline::new(); - stmt.bind(["alice"]).query_mut(&mut pipe).unwrap(); - stmt.bind(["bob"]).query_mut(&mut pipe).unwrap(); + stmt.bind(["alice"]).query(&mut pipe).unwrap(); + stmt.bind(["bob"]).query(&mut pipe).unwrap(); let rows_affected = pipe.execute_blocking(&cli).unwrap(); assert_eq!(rows_affected, 2);