Skip to content

Commit

Permalink
remove ExecuteMut trait (#1146)
Browse files Browse the repository at this point in the history
  • Loading branch information
fakeshadow authored Oct 5, 2024
1 parent d3066ba commit c40616e
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 88 deletions.
30 changes: 22 additions & 8 deletions http/src/h1/dispatcher_uring_unreal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ use core::mem::MaybeUninit;
use std::rc::Rc;

use http::StatusCode;
use httparse::Status;
use httparse::{Header, Status};
use xitca_io::{
bytes::{Buf, BytesMut},
net::io_uring::TcpStream,
};
use xitca_service::{AsyncFn, Service};

pub use httparse::Request;

use crate::date::{DateTime, DateTimeHandle, DateTimeService};

pub type Error = Box<dyn std::error::Error + Send + Sync>;

pub struct Request<'a> {
pub method: &'a str,
pub path: &'a str,
pub headers: &'a mut [Header<'a>],
}

pub struct Response<'a, const STEP: usize = 1> {
buf: &'a mut BytesMut,
date: &'a DateTimeHandle,
Expand Down Expand Up @@ -49,10 +53,14 @@ impl<'a> Response<'a, 2> {
self.want_write_date = false;
}

let key = key.as_bytes();
let val = val.as_bytes();

self.buf.reserve(key.len() + val.len() + 4);
self.buf.extend_from_slice(b"\r\n");
self.buf.extend_from_slice(key.as_bytes());
self.buf.extend_from_slice(key);
self.buf.extend_from_slice(b": ");
self.buf.extend_from_slice(val.as_bytes());
self.buf.extend_from_slice(val);
self
}

Expand Down Expand Up @@ -114,7 +122,7 @@ impl<F, C> Dispatcher<F, C> {

impl<F, C> Service<TcpStream> for Dispatcher<F, C>
where
F: for<'h, 'b> AsyncFn<(Request<'h, 'b>, Response<'h>, &'h C), Output = Response<'h, 3>>,
F: for<'h, 'b> AsyncFn<(Request<'h>, Response<'h>, &'h C), Output = Response<'h, 3>>,
{
type Response = ();
type Error = Error;
Expand All @@ -135,12 +143,18 @@ where
read_buf = buf;

'inner: loop {
let mut headers = const { [MaybeUninit::uninit(); 8] };
let mut headers = [const { MaybeUninit::uninit() }; 8];

let mut req = Request::new(&mut []);
let mut req = httparse::Request::new(&mut []);

match req.parse_with_uninit_headers(&read_buf, &mut headers)? {
Status::Complete(len) => {
let req = Request {
path: req.path.unwrap(),
method: req.method.unwrap(),
headers: req.headers,
};

let res = Response {
buf: &mut write_buf,
date: self.date.get(),
Expand Down
7 changes: 7 additions & 0 deletions postgres/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# unreleased 0.3.0
## Remove
- remove `ExecuteMut` trait. It's role is replaced by `impl Execute<&mut C>`

## Change
- change `pool::Pool`'s dead connection detection lifecycle.

# 0.2.1
## Fix
- relax lifetime bound on various query types
Expand Down
2 changes: 1 addition & 1 deletion postgres/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "xitca-postgres"
version = "0.2.1"
version = "0.3.0"
edition = "2021"
license = "Apache-2.0"
description = "an async postgres client"
Expand Down
4 changes: 2 additions & 2 deletions postgres/examples/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

// implement Execute trait
impl<'p, 'c> Execute<'c, Client> for PrepareAndExecute<'p>
impl<'p, 'c> Execute<&'c Client> for PrepareAndExecute<'p>
where
// in execute methods both PrepareAndExecute<'p> and &'c Client are moved into async block
// and we use c as the output lifetime in Execute::ExecuteOutput's boxed async block.
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

fn query(self, cli: &'c Client) -> Self::QueryOutput {
Box::pin(async {
Box::pin(async move {
// prepare statement and query for async iterator of rows
let stmt = Statement::named(self.stmt, self.types).execute(cli).await?;
let stream = stmt.query(cli).await?;
Expand Down
25 changes: 6 additions & 19 deletions postgres/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod sync_impl;
///
/// [`Encode`]: crate::driver::codec::encode::Encode
/// [`IntoResponse`]: crate::driver::codec::response::IntoResponse
pub trait Execute<'c, C>
pub trait Execute<C>
where
Self: Sized,
{
Expand All @@ -36,34 +36,21 @@ where
type QueryOutput;

/// define how a statement is executed with single time response
fn execute(self, cli: &'c C) -> Self::ExecuteOutput;
fn execute(self, cli: C) -> Self::ExecuteOutput;

/// define how a statement is queried with repeated response
fn query(self, cli: &'c C) -> Self::QueryOutput;
}

/// mutable version of [`Execute`] trait where C type is mutably borrowed
pub trait ExecuteMut<'c, C>
where
Self: Sized,
{
type ExecuteMutOutput;
type QueryMutOutput;

fn execute_mut(self, cli: &'c mut C) -> Self::ExecuteMutOutput;

fn query_mut(self, cli: &'c mut C) -> Self::QueryMutOutput;
fn query(self, cli: C) -> Self::QueryOutput;
}

/// blocking version of [`Execute`] for synchronous environment
pub trait ExecuteBlocking<'c, C>
pub trait ExecuteBlocking<C>
where
Self: Sized,
{
type ExecuteOutput;
type QueryOutput;

fn execute_blocking(self, cli: &'c C) -> Self::ExecuteOutput;
fn execute_blocking(self, cli: C) -> Self::ExecuteOutput;

fn query_blocking(self, cli: &'c C) -> Self::QueryOutput;
fn query_blocking(self, cli: C) -> Self::QueryOutput;
}
12 changes: 6 additions & 6 deletions postgres/src/execute/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{

use super::Execute;

impl<'s, C> Execute<'_, C> for &'s Statement
impl<'s, C> Execute<&C> for &'s Statement
where
C: Query,
{
Expand All @@ -36,7 +36,7 @@ where
}
}

impl<C> Execute<'_, C> for &str
impl<C> Execute<&C> for &str
where
C: Query,
{
Expand Down Expand Up @@ -76,7 +76,7 @@ where
}
}

impl<'c, C> Execute<'c, C> for StatementNamed<'_>
impl<'c, C> Execute<&'c C> for StatementNamed<'_>
where
C: Prepare + 'c,
{
Expand All @@ -96,7 +96,7 @@ where
}
}

impl<'s, C, P> Execute<'_, C> for StatementQuery<'s, P>
impl<'s, C, P> Execute<&C> for StatementQuery<'s, P>
where
C: Query,
P: AsParams,
Expand All @@ -115,7 +115,7 @@ where
}
}

impl<'c, C, P> Execute<'c, C> for StatementUnnamedBind<'_, P>
impl<'c, C, P> Execute<&'c C> for StatementUnnamedBind<'_, P>
where
C: Prepare + 'c,
P: AsParams,
Expand All @@ -136,7 +136,7 @@ where
}
}

impl<'c, C> Execute<'c, C> for &std::path::Path
impl<'c, C> Execute<&'c C> for &std::path::Path
where
C: Query + Sync + 'c,
{
Expand Down
18 changes: 9 additions & 9 deletions postgres/src/execute/sync_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{

use super::ExecuteBlocking;

impl<'s, C> ExecuteBlocking<'_, C> for &'s Statement
impl<'s, C> ExecuteBlocking<&C> for &'s Statement
where
C: Query,
{
Expand All @@ -30,7 +30,7 @@ where
}
}

impl<C> ExecuteBlocking<'_, C> for &str
impl<C> ExecuteBlocking<&C> for &str
where
C: Query,
{
Expand All @@ -49,7 +49,7 @@ where
}
}

impl<'c, C> ExecuteBlocking<'c, C> for StatementNamed<'_>
impl<'c, C> ExecuteBlocking<&'c C> for StatementNamed<'_>
where
C: Prepare + 'c,
{
Expand All @@ -68,7 +68,7 @@ where
}
}

impl<'s, C, P> ExecuteBlocking<'_, C> for StatementQuery<'s, P>
impl<'s, C, P> ExecuteBlocking<&C> for StatementQuery<'s, P>
where
C: Query,
P: AsParams,
Expand All @@ -88,7 +88,7 @@ where
}
}

impl<'c, C, P> ExecuteBlocking<'c, C> for StatementUnnamedBind<'_, P>
impl<'c, C, P> ExecuteBlocking<&'c C> for StatementUnnamedBind<'_, P>
where
C: Prepare + 'c,
P: AsParams,
Expand All @@ -108,20 +108,20 @@ where
}
}

impl<'c, C> ExecuteBlocking<'c, C> for &std::path::Path
impl<C> ExecuteBlocking<&C> for &std::path::Path
where
C: Query + 'c,
C: Query,
{
type ExecuteOutput = Result<u64, Error>;
type QueryOutput = Result<RowSimpleStream, Error>;

#[inline]
fn execute_blocking(self, cli: &'c C) -> Self::ExecuteOutput {
fn execute_blocking(self, cli: &C) -> Self::ExecuteOutput {
std::fs::read_to_string(self)?.execute_blocking(cli)
}

#[inline]
fn query_blocking(self, cli: &'c C) -> Self::QueryOutput {
fn query_blocking(self, cli: &C) -> Self::QueryOutput {
std::fs::read_to_string(self)?.query_blocking(cli)
}
}
2 changes: 1 addition & 1 deletion postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub use self::{
config::Config,
driver::Driver,
error::Error,
execute::{Execute, ExecuteBlocking, ExecuteMut},
execute::{Execute, ExecuteBlocking},
from_sql::FromSqlExt,
query::{RowSimpleStream, RowSimpleStreamOwned, RowStream, RowStreamOwned},
session::Session,
Expand Down
26 changes: 13 additions & 13 deletions postgres/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{
column::Column,
driver::codec::{self, encode::Encode, Response},
error::{Completed, Error},
execute::{Execute, ExecuteMut},
execute::Execute,
iter::AsyncLendingIterator,
query::Query,
row::Row,
Expand All @@ -31,7 +31,7 @@ use super::{
///
/// # Examples
/// ```rust
/// use xitca_postgres::{iter::AsyncLendingIterator, pipeline::Pipeline, Client, Execute, ExecuteMut, Statement};
/// use xitca_postgres::{iter::AsyncLendingIterator, pipeline::Pipeline, Client, Execute, Statement};
///
/// async fn pipeline(client: &Client) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// // prepare a statement that will be called repeatedly.
Expand All @@ -43,9 +43,9 @@ use super::{
///
/// // bind value param to statement and query with the pipeline.
/// // pipeline can encode multiple queries locally before send it to database.
/// statement.bind([] as [i32; 0]).query_mut(&mut pipe)?;
/// statement.bind([] as [i32; 0]).query_mut(&mut pipe)?;
/// statement.bind([] as [i32; 0]).query_mut(&mut pipe)?;
/// statement.bind([] as [i32; 0]).query(&mut pipe)?;
/// statement.bind([] as [i32; 0]).query(&mut pipe)?;
/// statement.bind([] as [i32; 0]).query(&mut pipe)?;
///
/// // query the pipeline and on success a streaming response will be returned.
/// let mut res = pipe.query(client)?;
Expand Down Expand Up @@ -235,20 +235,20 @@ impl<'b, const SYNC_MODE: bool> Pipeline<'_, Borrowed<'b>, SYNC_MODE> {
}
}

impl<'a, B, E, const SYNC_MODE: bool> ExecuteMut<'_, Pipeline<'a, B, SYNC_MODE>> for E
impl<'a, B, E, const SYNC_MODE: bool> Execute<&mut Pipeline<'a, B, SYNC_MODE>> for E
where
B: DerefMut<Target = BytesMut>,
E: Encode<Output = &'a [Column]>,
{
type ExecuteMutOutput = Self::QueryMutOutput;
type QueryMutOutput = Result<(), Error>;
type ExecuteOutput = Self::QueryOutput;
type QueryOutput = Result<(), Error>;

#[inline]
fn execute_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::ExecuteMutOutput {
self.query_mut(pipe)
fn execute(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::ExecuteOutput {
self.query(pipe)
}

fn query_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::QueryMutOutput {
fn query(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::QueryOutput {
let len = pipe.buf.len();

self.encode::<SYNC_MODE>(&mut pipe.buf)
Expand All @@ -264,7 +264,7 @@ pub struct PipelineQuery<'a, 'b> {
pub(crate) buf: &'b [u8],
}

impl<'p, C, B, const SYNC_MODE: bool> Execute<'_, C> for Pipeline<'p, B, SYNC_MODE>
impl<'p, C, B, const SYNC_MODE: bool> Execute<&C> for Pipeline<'p, B, SYNC_MODE>
where
C: Query,
B: DerefMut<Target = BytesMut>,
Expand Down Expand Up @@ -304,7 +304,7 @@ where
}
}

impl<'p, C, B, const SYNC_MODE: bool> ExecuteBlocking<'_, C> for Pipeline<'p, B, SYNC_MODE>
impl<'p, C, B, const SYNC_MODE: bool> ExecuteBlocking<&C> for Pipeline<'p, B, SYNC_MODE>
where
C: Query,
B: DerefMut<Target = BytesMut>,
Expand Down
Loading

0 comments on commit c40616e

Please sign in to comment.