Skip to content

Commit

Permalink
Make sure closing the UpgradedSocket doesn't close the whole connection
Browse files Browse the repository at this point in the history
  • Loading branch information
bwoebi committed Feb 16, 2024
1 parent e88a602 commit 23d2b28
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 12 deletions.
6 changes: 6 additions & 0 deletions src/Driver/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ interface Client extends Closable
*/
public function getId(): int;

/**
* @inheritDoc
* @param int $close An optional close reason for streams which support a close reason.
*/
public function close(int $reason = 0): void;

/**
* @return SocketAddress Remote client address.
*/
Expand Down
22 changes: 20 additions & 2 deletions src/Driver/Http2Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
use Amp\Http\Server\Trailers;
use Amp\Pipeline\Queue;
use Amp\Socket\InternetAddress;
use Amp\Socket\SocketAddress;
use Amp\Socket\TlsInfo;
use League\Uri;
use Psr\Log\LoggerInterface as PsrLogger;
use Revolt\EventLoop;
Expand Down Expand Up @@ -1160,7 +1162,23 @@ private function upgrade(Request $request, Response $response, int $id): void
throw new \Error('Response was not upgraded');
}

$client = $request->getClient();
$client = new class($request, $id) extends SocketClient {
public function __construct(private Request $request, int $id) {
parent::__construct($request->getClient(), $id);
}

public function close(int $reason = 0): void {
// Nothing to do here, closing the output stream is enough
}

public function isClosed(): bool {
return $this->request->getBody()->isClosed();
}

public function onClose(\Closure $onClose): void {
$this->request->getBody()->onClose($onClose);
}
};

// The input RequestBody are parsed raw DATA frames - exactly what we need (see CONNECT)
$inputStream = new UnbufferedBodyStream($request->getBody());
Expand All @@ -1169,7 +1187,7 @@ private function upgrade(Request $request, Response $response, int $id): void
// The output of an upgraded connection is just DATA frames
$outputPipe = new Pipe(0);

$upgraded = new UpgradedSocket($client, $inputStream, $outputPipe->getSink(), $id);
$upgraded = new UpgradedSocket($client, $inputStream, $outputPipe->getSink());

try {
$upgradeHandler($upgraded, $request, $response);
Expand Down
4 changes: 1 addition & 3 deletions src/Driver/Http3Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,6 @@ private function upgrade(QuicSocket $stream, Request $request, Response $respons
throw new \Error('Response was not upgraded');
}

$client = $request->getClient();

// The input RequestBody are parsed raw DATA frames - exactly what we need (see CONNECT)
$inputStream = new UnbufferedBodyStream($request->getBody());
$request->setBody(""); // hide the body from the upgrade handler, it's available in the UpgradedSocket
Expand All @@ -220,7 +218,7 @@ private function upgrade(QuicSocket $stream, Request $request, Response $respons
$settings = $this->parsedSettings->getFuture()->await();
$datagramStream = empty($settings[Http3Settings::H3_DATAGRAM->value]) ? null : new Http3DatagramStream($this->parser->receiveDatagram(...), $this->writer->writeDatagram(...), $this->writer->maxDatagramSize(...), $stream);

$upgraded = new UpgradedSocket($client, $inputStream, $outputPipe->getSink(), $stream->getId(), $datagramStream);
$upgraded = new UpgradedSocket(new SocketClient($stream, $stream->getId()), $inputStream, $outputPipe->getSink(), $datagramStream);

try {
$upgradeHandler($upgraded, $request, $response);
Expand Down
14 changes: 8 additions & 6 deletions src/Driver/SocketClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
namespace Amp\Http\Server\Driver;

use Amp\Quic\QuicConnection;
use Amp\Quic\QuicSocket;
use Amp\Socket\Socket;
use Amp\Socket\SocketAddress;
use Amp\Socket\TlsInfo;

final class SocketClient implements Client
class SocketClient implements Client
{
private readonly int $id;

public function __construct(
private readonly Socket|QuicConnection $socket,
private readonly Client|Socket|QuicConnection $socket,
int $id = null
) {
$this->id = createClientId();
$this->id = $id ?? createClientId();
}

public function getId(): int
Expand All @@ -37,9 +39,9 @@ public function getTlsInfo(): ?TlsInfo
return $this->socket->getTlsInfo();
}

public function close(): void
public function close(int $reason = 0): void
{
$this->socket->close();
$this->socket->close($reason);
}

public function onClose(\Closure $onClose): void
Expand All @@ -54,6 +56,6 @@ public function isClosed(): bool

public function isQuicClient(): bool
{
return $this->socket instanceof QuicConnection;
return $this->socket instanceof QuicConnection || $this->socket instanceof QuicSocket;
}
}
1 change: 0 additions & 1 deletion src/Driver/UpgradedSocket.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public function __construct(
private readonly Client $client,
private readonly ReadableStream $readableStream,
private readonly WritableStream $writableStream,
public readonly int $id = 0,
public readonly ?DatagramStream $datagramClient = null,
) {
}
Expand Down

0 comments on commit 23d2b28

Please sign in to comment.