Skip to content

Commit

Permalink
Handle stopping
Browse files Browse the repository at this point in the history
  • Loading branch information
bwoebi committed Jan 12, 2024
1 parent de9d951 commit eb70ad6
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 43 deletions.
1 change: 0 additions & 1 deletion src/Driver/DefaultHttpDriverFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public function createHttpDriver(
streamTimeout: $this->streamTimeout,
headerSizeLimit: $this->headerSizeLimit,
bodySizeLimit: $this->bodySizeLimit,
pushEnabled: $this->pushEnabled,
);
}

Expand Down
132 changes: 92 additions & 40 deletions src/Driver/Http3Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

namespace Amp\Http\Server\Driver;

use Amp\ByteStream\ClosedException;
use Amp\ByteStream\ReadableIterableStream;
use Amp\CancelledException;
use Amp\DeferredCancellation;
use Amp\DeferredFuture;
use Amp\Http\Http2\Http2ConnectionException;
use Amp\Http\Http2\Http2Parser;
use Amp\Http\Http2\Http2StreamException;
use Amp\Http\InvalidHeaderException;
use Amp\Http\Server\ClientException;
use Amp\Http\Server\Driver\Internal\ConnectionHttpDriver;
use Amp\Http\Server\Driver\Internal\Http2Stream;
use Amp\Http\Server\Driver\Internal\Http3\Http3ConnectionException;
use Amp\Http\Server\Driver\Internal\Http3\Http3Error;
use Amp\Http\Server\Driver\Internal\Http3\Http3Frame;
Expand All @@ -25,7 +25,6 @@
use Amp\Http\Server\RequestHandler;
use Amp\Http\Server\Response;
use Amp\Http\Server\Trailers;
use Amp\NullCancellation;
use Amp\Pipeline\Queue;
use Amp\Quic\QuicConnection;
use Amp\Quic\QuicSocket;
Expand All @@ -39,16 +38,16 @@

class Http3Driver extends ConnectionHttpDriver
{
private bool $allowsPush;

private Client $client;

Check failure on line 41 in src/Driver/Http3Driver.php

View workflow job for this annotation

GitHub Actions / PHP 8.1

PropertyNotSetInConstructor

src/Driver/Http3Driver.php:41:20: PropertyNotSetInConstructor: Property Amp\Http\Server\Driver\Http3Driver::$client is not defined in constructor of Amp\Http\Server\Driver\Http3Driver or in any private or final methods called in the constructor (see https://psalm.dev/074)
private QuicConnection $connection;

/** @var \WeakMap<Request, QuicSocket> */
private \WeakMap $requestStreams;

private Http3Writer $writer;

Check failure on line 46 in src/Driver/Http3Driver.php

View workflow job for this annotation

GitHub Actions / PHP 8.1

PropertyNotSetInConstructor

src/Driver/Http3Driver.php:46:25: PropertyNotSetInConstructor: Property Amp\Http\Server\Driver\Http3Driver::$writer is not defined in constructor of Amp\Http\Server\Driver\Http3Driver or in any private or final methods called in the constructor (see https://psalm.dev/074)
private QPack $qpack;
private int $highestStreamId = 0;
private bool $stopping = false;
private DeferredCancellation $closeCancellation;

public function __construct(
RequestHandler $requestHandler,
Expand All @@ -57,15 +56,13 @@ public function __construct(
private readonly int $streamTimeout = Http2Driver::DEFAULT_STREAM_TIMEOUT,
private readonly int $headerSizeLimit = Http2Driver::DEFAULT_HEADER_SIZE_LIMIT,
private readonly int $bodySizeLimit = Http2Driver::DEFAULT_BODY_SIZE_LIMIT,
private bool $pushEnabled = true,
private readonly ?string $settings = null,
) {
parent::__construct($requestHandler, $errorHandler, $logger);

$this->allowsPush = $pushEnabled;

$this->qpack = new QPack;
$this->requestStreams = new \WeakMap;

Check failure on line 64 in src/Driver/Http3Driver.php

View workflow job for this annotation

GitHub Actions / PHP 8.1

PropertyTypeCoercion

src/Driver/Http3Driver.php:64:33: PropertyTypeCoercion: $this->requestStreams expects 'WeakMap<Amp\Http\Server\Request, Amp\Quic\QuicSocket>', parent type 'WeakMap<object, mixed>' provided (see https://psalm.dev/198)
$this->closeCancellation = new DeferredCancellation;
}

// TODO copied from Http2Driver...
Expand All @@ -88,7 +85,6 @@ protected function write(Request $request, Response $response): void
{
/** @var QuicSocket $stream */
$stream = $this->requestStreams[$request];
unset($this->requestStreams[$request]);

$status = $response->getStatus();
$headers = [
Expand All @@ -108,9 +104,6 @@ protected function write(Request $request, Response $response): void

foreach ($response->getPushes() as $push) {
$headers["link"][] = "<{$push->getUri()}>; rel=preload";
if ($this->allowsPush) {
// TODO $this->sendPushPromise($request, $id, $push);
}
}

$this->writer->sendHeaderFrame($stream, $this->encodeHeaders($headers));
Expand All @@ -119,23 +112,31 @@ protected function write(Request $request, Response $response): void
return;
}

$cancellation = new NullCancellation; // TODO just dummy
try {
$cancellation = $this->closeCancellation->getCancellation();

$body = $response->getBody();
$chunk = $body->read($cancellation);
$body = $response->getBody();
$chunk = $body->read($cancellation);

while ($chunk !== null) {
$this->writer->sendData($stream, $chunk);
while ($chunk !== null) {
$this->writer->sendData($stream, $chunk);

$chunk = $body->read($cancellation);
}
$chunk = $body->read($cancellation);
}

if ($trailers !== null) {
$trailers = $trailers->await($cancellation);
$this->writer->sendHeaderFrame($stream, $this->encodeHeaders($trailers->getHeaders()));
if ($trailers !== null) {
$trailers = $trailers->await($cancellation);
$this->writer->sendHeaderFrame($stream, $this->encodeHeaders($trailers->getHeaders()));
}

$stream->end();
if (!$stream->isClosed()) {
$stream->endReceiving();
}
} catch (CancelledException) {
}

$stream->end();
unset($this->requestStreams[$request]);
}

public function getApplicationLayerProtocols(): array
Expand All @@ -149,11 +150,12 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti
\assert(!isset($this->client), "The driver has already been setup");

$this->client = $client;
$this->connection = $connection;
$this->writer = new Http3Writer($connection, [[Http3Settings::MAX_FIELD_SECTION_SIZE, $this->headerSizeLimit]]);

Check failure on line 153 in src/Driver/Http3Driver.php

View workflow job for this annotation

GitHub Actions / PHP 8.1

PossiblyInvalidArgument

src/Driver/Http3Driver.php:153:41: PossiblyInvalidArgument: Argument 1 of Amp\Http\Server\Driver\Internal\Http3\Http3Writer::__construct expects Amp\Quic\QuicConnection, but possibly different type Amp\Quic\QuicConnection|Amp\Socket\Socket provided (see https://psalm.dev/092)
$largestPushId = (1 << 62) - 1;
$maxAllowedPushId = 0;

$connection->onClose($this->closeCancellation->cancel(...));

$parser = new Http3Parser($connection, $this->headerSizeLimit, $this->qpack);

Check failure on line 159 in src/Driver/Http3Driver.php

View workflow job for this annotation

GitHub Actions / PHP 8.1

PossiblyInvalidArgument

src/Driver/Http3Driver.php:159:35: PossiblyInvalidArgument: Argument 1 of Amp\Http\Server\Driver\Internal\Http3\Http3Parser::__construct expects Amp\Quic\QuicConnection, but possibly different type Amp\Quic\QuicConnection|Amp\Socket\Socket provided (see https://psalm.dev/092)
try {
foreach ($parser->process() as $frame) {
Expand All @@ -164,11 +166,16 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti
break;

case Http3Frame::HEADERS:
EventLoop::queue(function () use ($parser, $frame) {
/** @var QuicSocket $stream */
[, $stream, $generator] = $frame;
if ($this->stopping) {
[, $stream] = $frame;
$stream->close(Http3Error::H3_NO_ERROR->value);
break;
}
EventLoop::queue(function () use ($parser, $stream, $generator) {
try {
/** @var QuicSocket $stream */
$stream = $frame[1];
$generator = $frame[2];
$streamId = $stream->getId();

[$headers, $pseudo] = $generator->current();
foreach ($pseudo as $name => $value) {
Expand Down Expand Up @@ -208,7 +215,7 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti

$host = $matches[1];
$port = isset($matches[2])
? (int)$matches[2]
? (int) $matches[2]
: ($address instanceof InternetAddress ? $address->getPort() : null);

if ($position = \strpos($target, "#")) {
Expand Down Expand Up @@ -284,7 +291,7 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti
);
}

$expectedLength = (int)$contentLength;
$expectedLength = (int) $contentLength;
} else {
$expectedLength = null;
}
Expand Down Expand Up @@ -314,6 +321,11 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) {
$trailers
);
$this->requestStreams[$request] = $stream;

if ($this->highestStreamId < $streamId) {
$this->highestStreamId = $streamId;
}

async($this->handleRequest(...), $request);

$generator->next();
Expand Down Expand Up @@ -400,17 +412,16 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) {
break;

case Http3Frame::GOAWAY:
$maxPushId = $frame[1];
[, $maxPushId] = $frame;
if ($maxPushId > $largestPushId) {
$parser->abort(new Http3ConnectionException("A GOAWAY id must not be larger than a prior one", Http3Error::H3_ID_ERROR));
break;
}
$this->pushEnabled = false;
// TODO abort pending server pushes
// Nothing to do here, we don't support pushes.
break;

case Http3Frame::MAX_PUSH_ID:
$maxPushId = $frame[1];
[, $maxPushId] = $frame;
if ($maxPushId < $maxAllowedPushId) {
$parser->abort(new Http3ConnectionException("A MAX_PUSH_ID id must not be smaller than a prior one", Http3Error::H3_ID_ERROR));
break;
Expand All @@ -419,8 +430,13 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) {
break;

case Http3Frame::CANCEL_PUSH:
$pushId = $frame[1];
// TODO stop push
[, $pushId] = $frame;
// Without pushes sent, this frame is always invalid
$parser->abort(new Http3ConnectionException("An CANCEL_PUSH for a not promised $pushId was received", Http3Error::H3_ID_ERROR));
break;

case Http3Frame::PUSH_PROMISE:
$parser->abort(new Http3ConnectionException("A push stream must not be initiated by the client", Http3Error::H3_STREAM_CREATION_ERROR));
break;

default:
Expand All @@ -437,11 +453,47 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) {

public function getPendingRequestCount(): int
{
return 0;
return $this->requestStreams->count();
}

public function stop(): void
{
// TODO emit goaway frames
if ($this->stopping) {
return;
}

$this->stopping = true;
$this->writer->sendGoaway($this->highestStreamId);

/** @psalm-suppress RedundantCondition */
\assert($this->logger->debug(\sprintf(
"Gracefully shutting down HTTP/3 client @ %s #%d; last-id: %d",
$this->client->getRemoteAddress()->toString(),
$this->client->getId(),
$this->highestStreamId,
)) || true);


$outstanding = $this->requestStreams->count();
if ($outstanding === 0) {
$this->writer->close();
return;
}

$deferred = new DeferredFuture;
foreach ($this->requestStreams as $stream) {
$stream->onClose(function () use (&$outstanding, $deferred) {
if (--$outstanding === 0) {
$deferred->complete();
}
});
}

try {
$deferred->getFuture()->await($this->closeCancellation->getCancellation());
} catch (CancelledException) {
} finally {
$this->writer->close();
}
}
}
2 changes: 1 addition & 1 deletion src/Driver/Internal/Http3/Http3Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public function process(): ConcurrentIterator
throw new Http3ConnectionException("The push stream was closed too early", Http3Error::H3_FRAME_ERROR);
}
}
$this->queue->push([Http3StreamType::Push, $pushId, fn () => $this->readHttpMessage($stream, $buf, $off)]);
$this->queue->push([Http3Frame::PUSH_PROMISE, $pushId, fn () => $this->readHttpMessage($stream, $buf, $off)]);
break;

// We don't do anything with these streams yet, but we must not close them according to RFC 9204 Section 4.2
Expand Down
11 changes: 10 additions & 1 deletion src/Driver/Internal/Http3/Http3Writer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class Http3Writer
{
private $controlStream;
private QuicSocket $controlStream;

public function __construct(private QuicConnection $connection, private array $settings)
{
Expand Down Expand Up @@ -43,6 +43,11 @@ public function sendData(QuicSocket $stream, string $payload)
self::sendFrame($stream, Http3Frame::DATA, $payload);
}

public function sendGoaway(int $highestStreamId)
{
self::sendFrame($this->controlStream, Http3Frame::GOAWAY, self::encodeVarint($highestStreamId));
}

private function startControlStream()
{
$this->controlStream = $this->connection->openStream();
Expand All @@ -56,4 +61,8 @@ private function startControlStream()
self::sendFrame($this->controlStream, Http3Frame::SETTINGS, \implode($ints));
}

public function close()
{
$this->connection->close(Http3Error::H3_NO_ERROR->value);
}
}

0 comments on commit eb70ad6

Please sign in to comment.