Skip to content

Commit

Permalink
Support rebinding even when busy
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 3, 2015
1 parent 32a5715 commit a2c0c7d
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 19 deletions.
8 changes: 5 additions & 3 deletions src/Pipe/DuplexPipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,12 @@ public function isWritable()

/**
* {@inheritdoc}
*
* @param float|int $timeout Timeout for poll and await if a read or write was pending.
*/
public function rebind()
public function rebind($timeout = 0)
{
$this->readable->rebind();
$this->writable->rebind();
$this->readable->rebind($timeout);
$this->writable->rebind($timeout);
}
}
13 changes: 8 additions & 5 deletions src/Pipe/ReadablePipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,19 @@ public function isReadable()

/**
* {@inheritdoc}
*
* @param float|int $timeout Timeout for poll if a read was pending.
*/
public function rebind()
public function rebind($timeout = 0)
{
if (null !== $this->deferred) {
throw new BusyError('Cannot rebind while the stream is busy.');
}

$pending = $this->poll->isPending();
$this->poll->free();

$this->poll = $this->createPoll();

if ($pending) {
$this->poll->listen($timeout);
}
}

/**
Expand Down
13 changes: 8 additions & 5 deletions src/Pipe/WritablePipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,19 @@ public function isWritable()

/**
* {@inheritdoc}
*
* @param float|int $timeout Timeout for await if a write was pending.
*/
public function rebind()
public function rebind($timeout = 0)
{
if (!$this->writeQueue->isEmpty()) {
throw new BusyError('Cannot rebind while the stream is busy.');
}

$pending = $this->await->isPending();
$this->await->free();

$this->await = $this->createAwait();

if ($pending) {
$this->await->listen($timeout);
}
}

/**
Expand Down
20 changes: 17 additions & 3 deletions tests/Pipe/ReadablePipeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Exception;
use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
use Icicle\Loop\Events\SocketEventInterface;
use Icicle\Loop\LoopInterface;
use Icicle\Promise\Exception\TimeoutException;
use Icicle\Stream\Exception\BusyError;
Expand Down Expand Up @@ -986,14 +987,27 @@ public function testRebind()

/**
* @depends testRebind
* @expectedException \Icicle\Stream\Exception\BusyError
*/
public function testRebindWhileBusy()
public function testRebindAfterRead()
{
list($readable, $writable) = $this->createStreams();

$promise = new Coroutine($readable->read());

$readable->rebind();
$timeout = 1;

$poll = $this->getMock(SocketEventInterface::class);
$poll->expects($this->once())
->method('listen')
->with($timeout);

$loop = $this->getMock(LoopInterface::class);
$loop->expects($this->once())
->method('poll')
->will($this->returnValue($poll));

Loop\loop($loop);

$readable->rebind($timeout);
}
}
20 changes: 17 additions & 3 deletions tests/Pipe/WritablePipeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

use Icicle\Coroutine\Coroutine;
use Icicle\Loop;
use Icicle\Loop\Events\SocketEventInterface;
use Icicle\Loop\LoopInterface;
use Icicle\Promise\Exception\TimeoutException;
use Icicle\Stream\Exception\ClosedException;
Expand Down Expand Up @@ -432,9 +433,8 @@ public function testRebind()

/**
* @depends testRebind
* @expectedException \Icicle\Stream\Exception\BusyError
*/
public function testRebindWhileBusy()
public function testRebindAfterPendingWrite()
{
list($readable, $writable) = $this->createStreams();

Expand All @@ -443,6 +443,20 @@ public function testRebindWhileBusy()
Loop\tick(false);
} while (!$promise->isPending());

$writable->rebind();
$timeout = 1;

$await = $this->getMock(SocketEventInterface::class);
$await->expects($this->once())
->method('listen')
->with($timeout);

$loop = $this->getMock(LoopInterface::class);
$loop->expects($this->once())
->method('await')
->will($this->returnValue($await));

Loop\loop($loop);

$writable->rebind($timeout);
}
}

0 comments on commit a2c0c7d

Please sign in to comment.