From 166e0f0ce2a2112502b08e2a79291a1130c65b0c Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 15 Oct 2015 20:12:52 -0500 Subject: [PATCH] Improve readable pipe reading; throw on invalid length --- src/Pipe/ReadablePipe.php | 97 ++++++++++++++------------------- tests/Pipe/ReadablePipeTest.php | 9 +-- 2 files changed, 46 insertions(+), 60 deletions(-) diff --git a/src/Pipe/ReadablePipe.php b/src/Pipe/ReadablePipe.php index 2054c55..e4b8560 100644 --- a/src/Pipe/ReadablePipe.php +++ b/src/Pipe/ReadablePipe.php @@ -16,6 +16,7 @@ use Icicle\Stream\Exception\BusyError; use Icicle\Stream\Exception\ClosedException; use Icicle\Stream\Exception\FailureException; +use Icicle\Stream\Exception\InvalidArgumentError; use Icicle\Stream\Exception\UnreadableException; use Icicle\Stream\ReadableStreamInterface; use Icicle\Stream\StreamResource; @@ -32,16 +33,6 @@ class ReadablePipe extends StreamResource implements ReadableStreamInterface */ private $poll; - /** - * @var int - */ - private $length = 0; - - /** - * @var string|null - */ - private $byte; - /** * @var string */ @@ -99,43 +90,48 @@ public function read($length = 0, $byte = null, $timeout = 0) throw new UnreadableException('The stream is no longer readable.'); } - $this->length = (int) $length; - if (0 >= $this->length) { - $this->length = self::CHUNK_SIZE; + $length = (int) $length; + if (0 > $length) { + throw new InvalidArgumentError('The length must be a non-negative integer.'); + } elseif (0 === $length) { + $length = self::CHUNK_SIZE; } - $this->byte = (string) $byte; - $this->byte = strlen($this->byte) ? $this->byte[0] : null; + $byte = (string) $byte; + $byte = strlen($byte) ? $byte[0] : null; $resource = $this->getResource(); - $data = $this->fetch($resource); - if ('' !== $data) { - yield $data; - return; - } + do { + $data = $this->fetch($resource, $length, $byte); - if ($this->eof($resource)) { // Close only if no data was read and at EOF. - $this->close(); - yield $data; // Resolve with empty string on EOF. - return; - } + if ('' !== $data) { + yield $data; + return; + } - if (null === $this->poll) { - $this->poll = $this->createPoll(); - } + if ($this->eof($resource)) { // Close only if no data was read and at EOF. + $this->close(); + yield $data; // Resolve with empty string on EOF. + return; + } - $this->poll->listen($timeout); + if (null === $this->poll) { + $this->poll = $this->createPoll(); + } - $this->deferred = new Deferred(function () { - $this->poll->cancel(); - }); + $this->poll->listen($timeout); - try { - yield $this->deferred->getPromise(); - } finally { - $this->deferred = null; - } + $this->deferred = new Deferred(function () { + $this->poll->cancel(); + }); + + try { + yield $this->deferred->getPromise(); + } finally { + $this->deferred = null; + } + } while (true); } /** @@ -172,8 +168,6 @@ public function poll($timeout = 0) throw new FailureException('Stream buffer is not empty. Perform another read before polling.'); } - $this->length = 0; - if (null === $this->poll) { $this->poll = $this->createPoll(); } @@ -203,25 +197,27 @@ public function isReadable() * Reads data from the stream socket resource based on set length and read-to byte. * * @param resource $resource + * @param int $length + * @param string|null $byte * * @return string */ - private function fetch($resource) + private function fetch($resource, $length = self::CHUNK_SIZE, $byte = null) { if ('' === $this->buffer) { - $data = (string) fread($resource, $this->length); + $data = (string) fread($resource, $length); - if (null === $this->byte || '' === $data) { + if (null === $byte || '' === $data) { return $data; } $this->buffer = $data; } - if (null !== $this->byte && false !== ($position = strpos($this->buffer, $this->byte))) { + if (null !== $byte && false !== ($position = strpos($this->buffer, $byte))) { ++$position; // Include byte in result. } else { - $position = $this->length; + $position = $length; } $data = (string) substr($this->buffer, 0, $position); @@ -250,18 +246,7 @@ private function createPoll() return; } - if (0 === $this->length) { - $this->deferred->resolve(''); - return; - } - - $data = $this->fetch($resource); - - $this->deferred->resolve($data); - - if ('' === $data && $this->eof($resource)) { // Close only if no data was read and at EOF. - $this->close(); - } + $this->deferred->resolve($this->buffer); }); } } \ No newline at end of file diff --git a/tests/Pipe/ReadablePipeTest.php b/tests/Pipe/ReadablePipeTest.php index e486030..b8f8389 100644 --- a/tests/Pipe/ReadablePipeTest.php +++ b/tests/Pipe/ReadablePipeTest.php @@ -16,6 +16,7 @@ use Icicle\Stream\Exception\BusyError; use Icicle\Stream\Exception\ClosedException; use Icicle\Stream\Exception\FailureException; +use Icicle\Stream\Exception\InvalidArgumentError; use Icicle\Stream\Exception\UnreadableException; use Icicle\Stream\Pipe\ReadablePipe; use Icicle\Stream\Pipe\WritablePipe; @@ -193,9 +194,9 @@ public function testReadWithInvalidLength() $callback = $this->createCallback(1); $callback->method('__invoke') - ->with($this->identicalTo(self::WRITE_STRING)); + ->with($this->isInstanceOf(InvalidArgumentError::class)); - $promise->done($callback); + $promise->done($this->createCallback(0), $callback); Loop\run(); } @@ -509,9 +510,9 @@ public function testReadToWithInvalidLength() $callback = $this->createCallback(1); $callback->method('__invoke') - ->with($this->identicalTo(substr(self::WRITE_STRING, 0, $offset + 1))); + ->with($this->isInstanceOf(InvalidArgumentError::class)); - $promise->done($callback); + $promise->done($this->createCallback(0), $callback); Loop\run(); }