Skip to content

Commit

Permalink
Improve readable pipe reading; throw on invalid length
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Oct 16, 2015
1 parent ddd1783 commit 166e0f0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 60 deletions.
97 changes: 41 additions & 56 deletions src/Pipe/ReadablePipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Icicle\Stream\Exception\BusyError;
use Icicle\Stream\Exception\ClosedException;
use Icicle\Stream\Exception\FailureException;
use Icicle\Stream\Exception\InvalidArgumentError;
use Icicle\Stream\Exception\UnreadableException;
use Icicle\Stream\ReadableStreamInterface;
use Icicle\Stream\StreamResource;
Expand All @@ -32,16 +33,6 @@ class ReadablePipe extends StreamResource implements ReadableStreamInterface
*/
private $poll;

/**
* @var int
*/
private $length = 0;

/**
* @var string|null
*/
private $byte;

/**
* @var string
*/
Expand Down Expand Up @@ -99,43 +90,48 @@ public function read($length = 0, $byte = null, $timeout = 0)
throw new UnreadableException('The stream is no longer readable.');
}

$this->length = (int) $length;
if (0 >= $this->length) {
$this->length = self::CHUNK_SIZE;
$length = (int) $length;
if (0 > $length) {
throw new InvalidArgumentError('The length must be a non-negative integer.');
} elseif (0 === $length) {
$length = self::CHUNK_SIZE;
}

$this->byte = (string) $byte;
$this->byte = strlen($this->byte) ? $this->byte[0] : null;
$byte = (string) $byte;
$byte = strlen($byte) ? $byte[0] : null;

$resource = $this->getResource();
$data = $this->fetch($resource);

if ('' !== $data) {
yield $data;
return;
}
do {
$data = $this->fetch($resource, $length, $byte);

if ($this->eof($resource)) { // Close only if no data was read and at EOF.
$this->close();
yield $data; // Resolve with empty string on EOF.
return;
}
if ('' !== $data) {
yield $data;
return;
}

if (null === $this->poll) {
$this->poll = $this->createPoll();
}
if ($this->eof($resource)) { // Close only if no data was read and at EOF.
$this->close();
yield $data; // Resolve with empty string on EOF.
return;
}

$this->poll->listen($timeout);
if (null === $this->poll) {
$this->poll = $this->createPoll();
}

$this->deferred = new Deferred(function () {
$this->poll->cancel();
});
$this->poll->listen($timeout);

try {
yield $this->deferred->getPromise();
} finally {
$this->deferred = null;
}
$this->deferred = new Deferred(function () {
$this->poll->cancel();
});

try {
yield $this->deferred->getPromise();
} finally {
$this->deferred = null;
}
} while (true);
}

/**
Expand Down Expand Up @@ -172,8 +168,6 @@ public function poll($timeout = 0)
throw new FailureException('Stream buffer is not empty. Perform another read before polling.');
}

$this->length = 0;

if (null === $this->poll) {
$this->poll = $this->createPoll();
}
Expand Down Expand Up @@ -203,25 +197,27 @@ public function isReadable()
* Reads data from the stream socket resource based on set length and read-to byte.
*
* @param resource $resource
* @param int $length
* @param string|null $byte
*
* @return string
*/
private function fetch($resource)
private function fetch($resource, $length = self::CHUNK_SIZE, $byte = null)
{
if ('' === $this->buffer) {
$data = (string) fread($resource, $this->length);
$data = (string) fread($resource, $length);

if (null === $this->byte || '' === $data) {
if (null === $byte || '' === $data) {
return $data;
}

$this->buffer = $data;
}

if (null !== $this->byte && false !== ($position = strpos($this->buffer, $this->byte))) {
if (null !== $byte && false !== ($position = strpos($this->buffer, $byte))) {
++$position; // Include byte in result.
} else {
$position = $this->length;
$position = $length;
}

$data = (string) substr($this->buffer, 0, $position);
Expand Down Expand Up @@ -250,18 +246,7 @@ private function createPoll()
return;
}

if (0 === $this->length) {
$this->deferred->resolve('');
return;
}

$data = $this->fetch($resource);

$this->deferred->resolve($data);

if ('' === $data && $this->eof($resource)) { // Close only if no data was read and at EOF.
$this->close();
}
$this->deferred->resolve($this->buffer);
});
}
}
9 changes: 5 additions & 4 deletions tests/Pipe/ReadablePipeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Icicle\Stream\Exception\BusyError;
use Icicle\Stream\Exception\ClosedException;
use Icicle\Stream\Exception\FailureException;
use Icicle\Stream\Exception\InvalidArgumentError;
use Icicle\Stream\Exception\UnreadableException;
use Icicle\Stream\Pipe\ReadablePipe;
use Icicle\Stream\Pipe\WritablePipe;
Expand Down Expand Up @@ -193,9 +194,9 @@ public function testReadWithInvalidLength()

$callback = $this->createCallback(1);
$callback->method('__invoke')
->with($this->identicalTo(self::WRITE_STRING));
->with($this->isInstanceOf(InvalidArgumentError::class));

$promise->done($callback);
$promise->done($this->createCallback(0), $callback);

Loop\run();
}
Expand Down Expand Up @@ -509,9 +510,9 @@ public function testReadToWithInvalidLength()

$callback = $this->createCallback(1);
$callback->method('__invoke')
->with($this->identicalTo(substr(self::WRITE_STRING, 0, $offset + 1)));
->with($this->isInstanceOf(InvalidArgumentError::class));

$promise->done($callback);
$promise->done($this->createCallback(0), $callback);

Loop\run();
}
Expand Down

0 comments on commit 166e0f0

Please sign in to comment.