diff --git a/src/codec/flate/decoder.rs b/src/codec/flate/decoder.rs index 70c28c2..c70667b 100644 --- a/src/codec/flate/decoder.rs +++ b/src/codec/flate/decoder.rs @@ -51,7 +51,7 @@ impl Decode for FlateDecoder { match self.decode(input, output, FlushDecompress::None)? { Status::Ok => Ok(false), Status::StreamEnd => Ok(true), - Status::BufError => Err(io::Error::new(io::ErrorKind::Other, "unexpected BufError")), + Status::BufError => Ok(true), // Waiting for more input. } } diff --git a/src/tokio/bufread/generic/decoder.rs b/src/tokio/bufread/generic/decoder.rs index 2f4d8c7..0d86843 100644 --- a/src/tokio/bufread/generic/decoder.rs +++ b/src/tokio/bufread/generic/decoder.rs @@ -68,21 +68,46 @@ impl Decoder { loop { *this.state = match this.state { State::Decoding => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - // Avoid attempting to reinitialise the decoder if the reader - // has returned EOF. - *this.multiple_members = false; - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - let done = this.decoder.decode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); - if done { - State::Flushing - } else { - State::Decoding + let fill_buf_result = this.reader.as_mut().poll_fill_buf(cx); + + match fill_buf_result { + Poll::Pending => { + // Try to decode even if there is no new data. + // Some data may be left in the internal state of the decoder + // because there was not enough space in the output buffer. + let written_before = output.written().len(); + + let mut input: Vec = vec![]; + let mut input = PartialBuffer::new(input); + let done = this.decoder.decode(&mut input, output)?; + if output.written().len() == written_before { + return Poll::Pending; + } + + if done { + State::Flushing + } else { + State::Decoding + } + } + Poll::Ready(input) => { + let input = input?; + if input.is_empty() { + // Avoid attempting to reinitialise the decoder if the reader + // has returned EOF. + *this.multiple_members = false; + State::Flushing + } else { + let mut input = PartialBuffer::new(input); + let done = this.decoder.decode(&mut input, output)?; + let len = input.written().len(); + this.reader.as_mut().consume(len); + if done { + State::Flushing + } else { + State::Decoding + } + } } } }