Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove ExecuteMut trait #1146

Merged
merged 1 commit into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions http/src/h1/dispatcher_uring_unreal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ use core::mem::MaybeUninit;
use std::rc::Rc;

use http::StatusCode;
use httparse::Status;
use httparse::{Header, Status};
use xitca_io::{
bytes::{Buf, BytesMut},
net::io_uring::TcpStream,
};
use xitca_service::{AsyncFn, Service};

pub use httparse::Request;

use crate::date::{DateTime, DateTimeHandle, DateTimeService};

pub type Error = Box<dyn std::error::Error + Send + Sync>;

pub struct Request<'a> {
pub method: &'a str,
pub path: &'a str,
pub headers: &'a mut [Header<'a>],
}

pub struct Response<'a, const STEP: usize = 1> {
buf: &'a mut BytesMut,
date: &'a DateTimeHandle,
Expand Down Expand Up @@ -49,10 +53,14 @@ impl<'a> Response<'a, 2> {
self.want_write_date = false;
}

let key = key.as_bytes();
let val = val.as_bytes();

self.buf.reserve(key.len() + val.len() + 4);
self.buf.extend_from_slice(b"\r\n");
self.buf.extend_from_slice(key.as_bytes());
self.buf.extend_from_slice(key);
self.buf.extend_from_slice(b": ");
self.buf.extend_from_slice(val.as_bytes());
self.buf.extend_from_slice(val);
self
}

Expand Down Expand Up @@ -114,7 +122,7 @@ impl<F, C> Dispatcher<F, C> {

impl<F, C> Service<TcpStream> for Dispatcher<F, C>
where
F: for<'h, 'b> AsyncFn<(Request<'h, 'b>, Response<'h>, &'h C), Output = Response<'h, 3>>,
F: for<'h, 'b> AsyncFn<(Request<'h>, Response<'h>, &'h C), Output = Response<'h, 3>>,
{
type Response = ();
type Error = Error;
Expand All @@ -135,12 +143,18 @@ where
read_buf = buf;

'inner: loop {
let mut headers = const { [MaybeUninit::uninit(); 8] };
let mut headers = [const { MaybeUninit::uninit() }; 8];

let mut req = Request::new(&mut []);
let mut req = httparse::Request::new(&mut []);

match req.parse_with_uninit_headers(&read_buf, &mut headers)? {
Status::Complete(len) => {
let req = Request {
path: req.path.unwrap(),
method: req.method.unwrap(),
headers: req.headers,
};

let res = Response {
buf: &mut write_buf,
date: self.date.get(),
Expand Down
7 changes: 7 additions & 0 deletions postgres/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# unreleased 0.3.0
## Remove
- remove `ExecuteMut` trait. It's role is replaced by `impl Execute<&mut C>`

## Change
- change `pool::Pool`'s dead connection detection lifecycle.

# 0.2.1
## Fix
- relax lifetime bound on various query types
Expand Down
2 changes: 1 addition & 1 deletion postgres/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "xitca-postgres"
version = "0.2.1"
version = "0.3.0"
edition = "2021"
license = "Apache-2.0"
description = "an async postgres client"
Expand Down
4 changes: 2 additions & 2 deletions postgres/examples/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

// implement Execute trait
impl<'p, 'c> Execute<'c, Client> for PrepareAndExecute<'p>
impl<'p, 'c> Execute<&'c Client> for PrepareAndExecute<'p>
where
// in execute methods both PrepareAndExecute<'p> and &'c Client are moved into async block
// and we use c as the output lifetime in Execute::ExecuteOutput's boxed async block.
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

fn query(self, cli: &'c Client) -> Self::QueryOutput {
Box::pin(async {
Box::pin(async move {
// prepare statement and query for async iterator of rows
let stmt = Statement::named(self.stmt, self.types).execute(cli).await?;
let stream = stmt.query(cli).await?;
Expand Down
25 changes: 6 additions & 19 deletions postgres/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod sync_impl;
///
/// [`Encode`]: crate::driver::codec::encode::Encode
/// [`IntoResponse`]: crate::driver::codec::response::IntoResponse
pub trait Execute<'c, C>
pub trait Execute<C>
where
Self: Sized,
{
Expand All @@ -36,34 +36,21 @@ where
type QueryOutput;

/// define how a statement is executed with single time response
fn execute(self, cli: &'c C) -> Self::ExecuteOutput;
fn execute(self, cli: C) -> Self::ExecuteOutput;

/// define how a statement is queried with repeated response
fn query(self, cli: &'c C) -> Self::QueryOutput;
}

/// mutable version of [`Execute`] trait where C type is mutably borrowed
pub trait ExecuteMut<'c, C>
where
Self: Sized,
{
type ExecuteMutOutput;
type QueryMutOutput;

fn execute_mut(self, cli: &'c mut C) -> Self::ExecuteMutOutput;

fn query_mut(self, cli: &'c mut C) -> Self::QueryMutOutput;
fn query(self, cli: C) -> Self::QueryOutput;
}

/// blocking version of [`Execute`] for synchronous environment
pub trait ExecuteBlocking<'c, C>
pub trait ExecuteBlocking<C>
where
Self: Sized,
{
type ExecuteOutput;
type QueryOutput;

fn execute_blocking(self, cli: &'c C) -> Self::ExecuteOutput;
fn execute_blocking(self, cli: C) -> Self::ExecuteOutput;

fn query_blocking(self, cli: &'c C) -> Self::QueryOutput;
fn query_blocking(self, cli: C) -> Self::QueryOutput;
}
12 changes: 6 additions & 6 deletions postgres/src/execute/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{

use super::Execute;

impl<'s, C> Execute<'_, C> for &'s Statement
impl<'s, C> Execute<&C> for &'s Statement
where
C: Query,
{
Expand All @@ -36,7 +36,7 @@ where
}
}

impl<C> Execute<'_, C> for &str
impl<C> Execute<&C> for &str
where
C: Query,
{
Expand Down Expand Up @@ -76,7 +76,7 @@ where
}
}

impl<'c, C> Execute<'c, C> for StatementNamed<'_>
impl<'c, C> Execute<&'c C> for StatementNamed<'_>
where
C: Prepare + 'c,
{
Expand All @@ -96,7 +96,7 @@ where
}
}

impl<'s, C, P> Execute<'_, C> for StatementQuery<'s, P>
impl<'s, C, P> Execute<&C> for StatementQuery<'s, P>
where
C: Query,
P: AsParams,
Expand All @@ -115,7 +115,7 @@ where
}
}

impl<'c, C, P> Execute<'c, C> for StatementUnnamedBind<'_, P>
impl<'c, C, P> Execute<&'c C> for StatementUnnamedBind<'_, P>
where
C: Prepare + 'c,
P: AsParams,
Expand All @@ -136,7 +136,7 @@ where
}
}

impl<'c, C> Execute<'c, C> for &std::path::Path
impl<'c, C> Execute<&'c C> for &std::path::Path
where
C: Query + Sync + 'c,
{
Expand Down
18 changes: 9 additions & 9 deletions postgres/src/execute/sync_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{

use super::ExecuteBlocking;

impl<'s, C> ExecuteBlocking<'_, C> for &'s Statement
impl<'s, C> ExecuteBlocking<&C> for &'s Statement
where
C: Query,
{
Expand All @@ -30,7 +30,7 @@ where
}
}

impl<C> ExecuteBlocking<'_, C> for &str
impl<C> ExecuteBlocking<&C> for &str
where
C: Query,
{
Expand All @@ -49,7 +49,7 @@ where
}
}

impl<'c, C> ExecuteBlocking<'c, C> for StatementNamed<'_>
impl<'c, C> ExecuteBlocking<&'c C> for StatementNamed<'_>
where
C: Prepare + 'c,
{
Expand All @@ -68,7 +68,7 @@ where
}
}

impl<'s, C, P> ExecuteBlocking<'_, C> for StatementQuery<'s, P>
impl<'s, C, P> ExecuteBlocking<&C> for StatementQuery<'s, P>
where
C: Query,
P: AsParams,
Expand All @@ -88,7 +88,7 @@ where
}
}

impl<'c, C, P> ExecuteBlocking<'c, C> for StatementUnnamedBind<'_, P>
impl<'c, C, P> ExecuteBlocking<&'c C> for StatementUnnamedBind<'_, P>
where
C: Prepare + 'c,
P: AsParams,
Expand All @@ -108,20 +108,20 @@ where
}
}

impl<'c, C> ExecuteBlocking<'c, C> for &std::path::Path
impl<C> ExecuteBlocking<&C> for &std::path::Path
where
C: Query + 'c,
C: Query,
{
type ExecuteOutput = Result<u64, Error>;
type QueryOutput = Result<RowSimpleStream, Error>;

#[inline]
fn execute_blocking(self, cli: &'c C) -> Self::ExecuteOutput {
fn execute_blocking(self, cli: &C) -> Self::ExecuteOutput {
std::fs::read_to_string(self)?.execute_blocking(cli)
}

#[inline]
fn query_blocking(self, cli: &'c C) -> Self::QueryOutput {
fn query_blocking(self, cli: &C) -> Self::QueryOutput {
std::fs::read_to_string(self)?.query_blocking(cli)
}
}
2 changes: 1 addition & 1 deletion postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub use self::{
config::Config,
driver::Driver,
error::Error,
execute::{Execute, ExecuteBlocking, ExecuteMut},
execute::{Execute, ExecuteBlocking},
from_sql::FromSqlExt,
query::{RowSimpleStream, RowSimpleStreamOwned, RowStream, RowStreamOwned},
session::Session,
Expand Down
26 changes: 13 additions & 13 deletions postgres/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::{
column::Column,
driver::codec::{self, encode::Encode, Response},
error::{Completed, Error},
execute::{Execute, ExecuteMut},
execute::Execute,
iter::AsyncLendingIterator,
query::Query,
row::Row,
Expand All @@ -31,7 +31,7 @@ use super::{
///
/// # Examples
/// ```rust
/// use xitca_postgres::{iter::AsyncLendingIterator, pipeline::Pipeline, Client, Execute, ExecuteMut, Statement};
/// use xitca_postgres::{iter::AsyncLendingIterator, pipeline::Pipeline, Client, Execute, Statement};
///
/// async fn pipeline(client: &Client) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// // prepare a statement that will be called repeatedly.
Expand All @@ -43,9 +43,9 @@ use super::{
///
/// // bind value param to statement and query with the pipeline.
/// // pipeline can encode multiple queries locally before send it to database.
/// statement.bind([] as [i32; 0]).query_mut(&mut pipe)?;
/// statement.bind([] as [i32; 0]).query_mut(&mut pipe)?;
/// statement.bind([] as [i32; 0]).query_mut(&mut pipe)?;
/// statement.bind([] as [i32; 0]).query(&mut pipe)?;
/// statement.bind([] as [i32; 0]).query(&mut pipe)?;
/// statement.bind([] as [i32; 0]).query(&mut pipe)?;
///
/// // query the pipeline and on success a streaming response will be returned.
/// let mut res = pipe.query(client)?;
Expand Down Expand Up @@ -235,20 +235,20 @@ impl<'b, const SYNC_MODE: bool> Pipeline<'_, Borrowed<'b>, SYNC_MODE> {
}
}

impl<'a, B, E, const SYNC_MODE: bool> ExecuteMut<'_, Pipeline<'a, B, SYNC_MODE>> for E
impl<'a, B, E, const SYNC_MODE: bool> Execute<&mut Pipeline<'a, B, SYNC_MODE>> for E
where
B: DerefMut<Target = BytesMut>,
E: Encode<Output = &'a [Column]>,
{
type ExecuteMutOutput = Self::QueryMutOutput;
type QueryMutOutput = Result<(), Error>;
type ExecuteOutput = Self::QueryOutput;
type QueryOutput = Result<(), Error>;

#[inline]
fn execute_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::ExecuteMutOutput {
self.query_mut(pipe)
fn execute(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::ExecuteOutput {
self.query(pipe)
}

fn query_mut(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::QueryMutOutput {
fn query(self, pipe: &mut Pipeline<'a, B, SYNC_MODE>) -> Self::QueryOutput {
let len = pipe.buf.len();

self.encode::<SYNC_MODE>(&mut pipe.buf)
Expand All @@ -264,7 +264,7 @@ pub struct PipelineQuery<'a, 'b> {
pub(crate) buf: &'b [u8],
}

impl<'p, C, B, const SYNC_MODE: bool> Execute<'_, C> for Pipeline<'p, B, SYNC_MODE>
impl<'p, C, B, const SYNC_MODE: bool> Execute<&C> for Pipeline<'p, B, SYNC_MODE>
where
C: Query,
B: DerefMut<Target = BytesMut>,
Expand Down Expand Up @@ -304,7 +304,7 @@ where
}
}

impl<'p, C, B, const SYNC_MODE: bool> ExecuteBlocking<'_, C> for Pipeline<'p, B, SYNC_MODE>
impl<'p, C, B, const SYNC_MODE: bool> ExecuteBlocking<&C> for Pipeline<'p, B, SYNC_MODE>
where
C: Query,
B: DerefMut<Target = BytesMut>,
Expand Down
Loading
Loading