diff --git a/http/src/h1/dispatcher_unreal.rs b/http/src/h1/dispatcher_unreal.rs index 5ffbb094..48ef77f0 100644 --- a/http/src/h1/dispatcher_unreal.rs +++ b/http/src/h1/dispatcher_unreal.rs @@ -116,12 +116,19 @@ where let mut r_buf = BytesMut::with_capacity(4096); let mut w_buf = BytesMut::with_capacity(4096); + let mut read_closed = false; + loop { stream.ready(Interest::READABLE).await?; loop { match read_buf(&mut stream, &mut r_buf) { - Ok(0) if r_buf.is_empty() => return Ok(()), + Ok(0) => { + if core::mem::replace(&mut read_closed, true) { + return Ok(()); + } + break; + } Ok(_) => {} Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break, Err(e) => return Err(e.into()), diff --git a/postgres/src/driver/generic.rs b/postgres/src/driver/generic.rs index 813170a4..fbdcfbc0 100644 --- a/postgres/src/driver/generic.rs +++ b/postgres/src/driver/generic.rs @@ -92,7 +92,7 @@ pub(crate) struct SharedState { } impl SharedState { - async fn wait(&self) -> WaitState { + fn wait(&self) -> impl Future + '_ { poll_fn(|cx| { let inner = self.guarded.lock().unwrap(); if !inner.buf.is_empty() { @@ -105,7 +105,6 @@ impl SharedState { Poll::Pending } }) - .await } } @@ -272,36 +271,43 @@ where } fn try_write(&mut self) -> io::Result<()> { - loop { - match self.write_state { - WriteState::WantFlush => { - match io::Write::flush(&mut self.io) { - Ok(_) => self.write_state = WriteState::Waiting, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} - Err(e) => return Err(e), - } - break; - } - WriteState::WantWrite => { - let mut inner = self.shared_state.guarded.lock().unwrap(); + debug_assert!( + matches!(self.write_state, WriteState::WantWrite | WriteState::WantFlush), + "try_write must not be called when WriteState is Wait or Closed" + ); + + if matches!(self.write_state, WriteState::WantWrite) { + let mut inner = self.shared_state.guarded.lock().unwrap(); + + let mut written = 0; - match io::Write::write(&mut self.io, &inner.buf) { - Ok(0) => return Err(io::ErrorKind::WriteZero.into()), - Ok(n) => { - inner.buf.advance(n); + loop { + match io::Write::write(&mut self.io, &inner.buf[written..]) { + Ok(0) => return Err(io::ErrorKind::WriteZero.into()), + Ok(n) => { + written += n; - if inner.buf.is_empty() { - self.write_state = WriteState::WantFlush; - } + if written == inner.buf.len() { + inner.buf.clear(); + self.write_state = WriteState::WantFlush; + break; } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break, - Err(e) => return Err(e), } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + inner.buf.advance(written); + return Ok(()); + } + Err(e) => return Err(e), } - _ => unreachable!("try_write must not be called when WriteState is wait or closed"), } } + match io::Write::flush(&mut self.io) { + Ok(_) => self.write_state = WriteState::Waiting, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} + Err(e) => return Err(e), + } + Ok(()) }