Skip to content

Commit

Permalink
Close source stream on pipe completion
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 1, 2015
1 parent 3c5bd7f commit 2b083f7
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 10 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
# Changelog

### v0.4.2

- Changes
- Setting the third parameter of Stream\pipe() to true will now also close the source stream when piping completes.

---

### v0.4.1

- Bug Fixes
- Fixed issue in `Icicle\Stream\Pipe\ReadablePipe` where reading might fail if the other end of the pipe is closed.

---

### v0.4.0

- New Features
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,15 @@ Creates a duplex stream from the given stream resource (note only stream resourc
```php
Icicle\Stream\pipe(
ReadableStreamInterface $source
WritableStreamInterface $dest,
WritableStreamInterface $destination,
bool $end = true,
int $length = 0,
string|null $byte = null
float $timeout = 0
): Generator
```

Returns a generator that should be used within a coroutine or used to create a new coroutine. Pipes all data read from this stream to the writable stream. If `$length` is not `0`, only `$length` bytes will be piped to the writable stream. If `$byte` is not `null`, piping will end once `$byte` is encountered in the stream. The returned promise is fulfilled with the number of bytes piped once the writable stream is no longer writable, `$length` bytes have been piped, or `$byte` is encountered in the stream.
Returns a generator that should be used within a coroutine or used to create a new coroutine. Pipes all data read from this stream to the writable stream. If `$length` is not `0`, only `$length` bytes will be piped to the writable stream. If `$byte` is not `null`, piping will end once `$byte` is encountered in the stream. The returned promise is fulfilled with the number of bytes piped once the writable stream is no longer writable, `$length` bytes have been piped, or `$byte` is encountered in the stream. If `$end` is true, the source stream will be closed and the destination stream will be ended when piping completes.

Resolution | Type | Description
:-: | :-- | :--
Expand Down
29 changes: 21 additions & 8 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
*
* @param \Icicle\Stream\ReadableStreamInterface $source
* @param \Icicle\Stream\WritableStreamInterface $destination
* @param bool $end
* @param int $length
* @param string|null $byte
* @param float|int $timeout
* @param bool $end If true, calls close() on the source stream and calls end() on the destination stream when
* piping ends, either due to completion or on error.
* @param int $length The number of bytes to pipe. Use 0 for any number of bytes.
* @param string|null $byte Stop piping when the given byte is read from the source stream. Use null to ignore
* this parameter.
* @param float|int $timeout Number of seconds to wait while reading from the source or writing to the destination
* before failing. Use 0 for no timeout.
*
* @return \Generator
*
Expand Down Expand Up @@ -79,14 +82,24 @@ function pipe(
&& (0 === $length || 0 < $length -= $count)
);
} catch (\Exception $exception) {
if ($end && $destination->isWritable()) {
yield $destination->end();
if ($end) {
$source->close();
if ($destination->isWritable()) {
yield $destination->end();
} else {
$destination->close();
}
}
throw $exception;
}

if ($end && $destination->isWritable()) {
yield $destination->end();
if ($end) {
$source->close();
if ($destination->isWritable()) {
yield $destination->end();
} else {
$destination->close();
}
}

yield $bytes;
Expand Down
1 change: 1 addition & 0 deletions tests/FunctionsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public function testPipe()
$this->assertFalse($promise->isPending());
$this->assertTrue($promise->isFulfilled());
$this->assertSame(strlen(self::WRITE_STRING) * 3, $promise->wait());
$this->assertFalse($readable->isOpen());
}

/**
Expand Down

0 comments on commit 2b083f7

Please sign in to comment.