diff --git a/src/Driver/DefaultHttpDriverFactory.php b/src/Driver/DefaultHttpDriverFactory.php index 46dc8233..74004905 100644 --- a/src/Driver/DefaultHttpDriverFactory.php +++ b/src/Driver/DefaultHttpDriverFactory.php @@ -9,6 +9,8 @@ final class DefaultHttpDriverFactory implements HttpDriverFactory { /** + * @param positive-int $headerSizeLimit + * @param non-negative-int $bodySizeLimit * @param bool $allowHttp2Upgrade Requires HTTP/2 support to be enabled. * @param bool $pushEnabled Requires HTTP/2 support to be enabled. */ diff --git a/src/Driver/Http3Driver.php b/src/Driver/Http3Driver.php index 8b00b533..cb10472e 100644 --- a/src/Driver/Http3Driver.php +++ b/src/Driver/Http3Driver.php @@ -40,12 +40,15 @@ class Http3Driver extends ConnectionHttpDriver { + /** @psalm-suppress PropertyNotSetInConstructor */ private Client $client; /** @var \WeakMap */ private \WeakMap $requestStreams; + /** @psalm-suppress PropertyNotSetInConstructor */ private Http3Parser $parser; + /** @psalm-suppress PropertyNotSetInConstructor */ private Http3Writer $writer; private QPack $qpack; private int $highestStreamId = 0; @@ -55,9 +58,13 @@ class Http3Driver extends ConnectionHttpDriver private array $settings = []; /** @var DeferredFuture> */ private DeferredFuture $parsedSettings; - /** @var array */ + private array $streamHandlers = []; + /** + * @param positive-int $headerSizeLimit + * @param non-negative-int $bodySizeLimit + */ public function __construct( RequestHandler $requestHandler, ErrorHandler $errorHandler, @@ -69,6 +76,7 @@ public function __construct( parent::__construct($requestHandler, $errorHandler, $logger); $this->qpack = new QPack; + /** @var \WeakMap See https://github.com/vimeo/psalm/issues/7131 */ $this->requestStreams = new \WeakMap; $this->closeCancellation = new DeferredCancellation; $this->settings[Http3Settings::MAX_FIELD_SECTION_SIZE->value] = $this->headerSizeLimit; @@ -82,13 +90,13 @@ public function getSettings(): \array return $this->parsedSettings->getFuture()->await(); } - public function addSetting(Http3Settings|int $setting, int $value) + public function addSetting(Http3Settings|int $setting, int $value): void { $this->settings[\is_int($setting) ? $setting : $setting->value] = $value; } /** @param \Closure(string $buf, QuicSocket $stream): void $handler */ - public function addStreamHandler(int $type, \Closure $handler) + public function addStreamHandler(int $type, \Closure $handler): void { $this->streamHandlers[$type] = $handler; } @@ -226,6 +234,7 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti { /** @psalm-suppress RedundantPropertyInitializationCheck */ \assert(!isset($this->client), "The driver has already been setup"); + \assert($connection instanceof QuicConnection); $this->client = $client; $this->writer = new Http3Writer($connection, $this->settings); @@ -404,7 +413,9 @@ public function handleConnection(Client $client, QuicConnection|Socket $connecti $this->updatePriority($stream, $headers["priority"]); } + /** @var EventLoop\Suspension|null $dataSuspension */ $dataSuspension = null; + $bodySizeLimit = $this->bodySizeLimit; $body = new RequestBody( new ReadableIterableStream($bodyQueue->pipe()), function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) { @@ -459,6 +470,8 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) { $dataSuspension->suspend(); } } elseif ($type === Http3Frame::HEADERS) { + [$headers, $pseudo] = $data; + // Trailers must not contain pseudo-headers. if (!empty($pseudo)) { throw new Http3StreamException( @@ -576,10 +589,12 @@ function (int $bodySize) use (&$bodySizeLimit, &$dataSuspension) { } } - public function updatePriority(QuicSocket $socket, array|string $headers) + public function updatePriority(QuicSocket $socket, array|string $headers): void { - [$urgency, $incremental] = Http3Parser::parsePriority($headers); - $socket->setPriority($urgency + 124 /* 127 is default for QUIC, 3 is default for HTTP */, $incremental); + if ([$urgency, $incremental] = Http3Parser::parsePriority($headers)) { + /** @psalm-suppress PossiblyNullArgument https://github.com/vimeo/psalm/issues/10668 */ + $socket->setPriority($urgency + 124 /* 127 is default for QUIC, 3 is default for HTTP */, $incremental); + } } public function getPendingRequestCount(): int diff --git a/src/Driver/Internal/Capsules/CapsuleWriter.php b/src/Driver/Internal/Capsules/CapsuleWriter.php index acb64913..5c3f8b16 100644 --- a/src/Driver/Internal/Capsules/CapsuleWriter.php +++ b/src/Driver/Internal/Capsules/CapsuleWriter.php @@ -4,5 +4,5 @@ interface CapsuleWriter { - public function write(int $type, string $buf); + public function write(int $type, string $buf): void; } diff --git a/src/Driver/Internal/Capsules/DatagramCapsule.php b/src/Driver/Internal/Capsules/DatagramCapsule.php index 16f25cac..d0ed5414 100644 --- a/src/Driver/Internal/Capsules/DatagramCapsule.php +++ b/src/Driver/Internal/Capsules/DatagramCapsule.php @@ -12,7 +12,7 @@ class DatagramCapsule implements CapsuleReader, DatagramStream { const TYPE = 0; private ?Suspension $datagramSuspension = null; - private ?Cancellation $cancellation; + private ?Cancellation $cancellation = null; private ?string $cancellationId = null; private ?string $nextDatagram = null; @@ -30,7 +30,7 @@ public function read(): ?array return null; } - $buf = \implode(\iterator_to_array($content)); + $buf = \implode(\is_array($content) ? $content : \iterator_to_array($content)); if (\strlen($buf) < $len) { return null; } @@ -38,6 +38,7 @@ public function read(): ?array if ($this->datagramSuspension) { $this->datagramSuspension->resume($buf); $this->datagramSuspension = null; + \assert($this->cancellationId !== null); $this->cancellation?->unsubscribe($this->cancellationId); $this->cancellation = null; } else { @@ -75,6 +76,7 @@ public function receive(?Cancellation $cancellation = null): ?string $this->datagramSuspension = EventLoop::getSuspension(); $this->cancellation = $cancellation; $this->cancellationId = $cancellation?->subscribe(function ($e) { + \assert($this->datagramSuspension !== null); $this->datagramSuspension->throw($e); $this->datagramSuspension = null; }); diff --git a/src/Driver/Internal/Capsules/Rfc9297Reader.php b/src/Driver/Internal/Capsules/Rfc9297Reader.php index 487deba4..b4233169 100644 --- a/src/Driver/Internal/Capsules/Rfc9297Reader.php +++ b/src/Driver/Internal/Capsules/Rfc9297Reader.php @@ -35,14 +35,15 @@ public function read(): ?array yield $this->buf; $length -= \strlen($this->buf); - if (null === ($this->buf = $this->read())) { + if (null === $buf = $this->stream->read()) { return; } + $this->buf = $buf; } yield \substr($this->buf, 0, $length); $this->buf = \substr($this->buf, $length); $this->activeReader = false; }; - return [$type, $length, $reader]; + return [$type, $length, $reader()]; } } diff --git a/src/Driver/Internal/Capsules/Rfc9297Writer.php b/src/Driver/Internal/Capsules/Rfc9297Writer.php index a5358b12..aca8b84b 100644 --- a/src/Driver/Internal/Capsules/Rfc9297Writer.php +++ b/src/Driver/Internal/Capsules/Rfc9297Writer.php @@ -11,7 +11,7 @@ public function __construct(private WritableStream $writer) { } - public function write(int $type, string $buf) + public function write(int $type, string $buf): void { $this->writer->write(Http3Writer::encodeVarint($type) . Http3Writer::encodeVarint(\strlen($buf)) . $buf); } diff --git a/src/Driver/Internal/Http3/Http3Parser.php b/src/Driver/Internal/Http3/Http3Parser.php index cc39d8d2..8d59bb0c 100644 --- a/src/Driver/Internal/Http3/Http3Parser.php +++ b/src/Driver/Internal/Http3/Http3Parser.php @@ -22,6 +22,7 @@ class Http3Parser private Queue $queue; /** @var array */ private array $datagramReceivers = []; + /** @psalm-suppress PropertyNotSetInConstructor */ private DeferredCancellation $datagramReceiveEmpty; /** @var array */ private array $datagramCloseHandlerInstalled = []; @@ -66,9 +67,11 @@ public static function decodeVarintFromStream(ReadableStream $stream, string &$b } $buf .= $chunk; } + /** @psalm-suppress PossiblyUndefinedVariable https://github.com/vimeo/psalm/issues/10548 */ return $int; } + /** @param positive-int $headerSizeLimit */ public function __construct(private QuicConnection $connection, private int $headerSizeLimit, private QPack $qpack) { $this->queue = new Queue; @@ -94,19 +97,27 @@ public static function decodeFrameTypeFromStream(QuicSocket $stream, string &$bu $off += $length; $frametype = self::decodeVarintFromStream($stream, $buf, $off); } + /** @psalm-suppress PossiblyUndefinedVariable https://github.com/vimeo/psalm/issues/10548 */ return $frame; } - public static function readFullFrame(QuicSocket $stream, string &$buf, int &$off, $maxSize): ?array + /** + * @param positive-int $maxSize + * @return list{Http3Frame, string}|null + */ + public static function readFullFrame(QuicSocket $stream, string &$buf, int &$off, int $maxSize): ?array { - $type = self::decodeFrameTypeFromStream($stream, $buf, $off); + if (null === $type = self::decodeFrameTypeFromStream($stream, $buf, $off)) { + return null; + } if (null === $frame = self::readFrameWithoutType($stream, $buf, $off, $maxSize)) { return null; } return [$type, $frame]; } - public static function readFrameWithoutType(QuicSocket $stream, string &$buf, int &$off, $maxSize): ?string + /** @param positive-int $maxSize */ + public static function readFrameWithoutType(QuicSocket $stream, string &$buf, int &$off, int $maxSize): ?string { $length = self::decodeVarintFromStream($stream, $buf, $off); if ($length < 0) { @@ -168,6 +179,7 @@ private function readHttpMessage(QuicSocket $stream, string &$buf, int &$off): \ return; } if ($frame === Http3Frame::PUSH_PROMISE) { + /** @psalm-suppress PossiblyNullArgument https://github.com/vimeo/psalm/issues/10668 */ yield Http3Frame::PUSH_PROMISE => $this->parsePushPromise($contents); } else { break; @@ -177,6 +189,7 @@ private function readHttpMessage(QuicSocket $stream, string &$buf, int &$off): \ throw new Http3ConnectionException("A request or response stream may not start with any other frame than HEADERS", Http3Error::H3_FRAME_UNEXPECTED); } $headerOff = 0; + /** @psalm-suppress PossiblyNullArgument https://github.com/vimeo/psalm/issues/10668 */ yield Http3Frame::HEADERS => self::processHeaders($this->qpack->decode($contents, $headerOff)); $hadData = false; while (null !== $type = self::decodeFrameTypeFromStream($stream, $buf, $off)) { @@ -208,7 +221,7 @@ private function readHttpMessage(QuicSocket $stream, string &$buf, int &$off): \ $buf = ""; $off = 0; while (true) { - if (null === $buf = $stream->read()) { + if (null === $chunk = $stream->read()) { if ($stream->wasReset()) { yield null => null; } else { @@ -216,11 +229,12 @@ private function readHttpMessage(QuicSocket $stream, string &$buf, int &$off): \ } return; } - if (\strlen($buf) < $length) { - yield Http3Frame::DATA => $buf; - $length -= \strlen($buf); + if (\strlen($chunk) < $length) { + yield Http3Frame::DATA => $chunk; + $length -= \strlen($chunk); } else { - yield Http3Frame::DATA => \substr($buf, 0, $length); + yield Http3Frame::DATA => \substr($chunk, 0, $length); + $buf = $chunk; $off = $length; break; } @@ -244,9 +258,11 @@ private function readHttpMessage(QuicSocket $stream, string &$buf, int &$off): \ } while ([$frame, $contents] = self::readFullFrame($stream, $buf, $off, $this->headerSizeLimit)) { if ($frame === Http3Frame::PUSH_PROMISE) { + /** @psalm-suppress PossiblyNullArgument https://github.com/vimeo/psalm/issues/10668 */ yield Http3Frame::PUSH_PROMISE => $this->parsePushPromise($contents); } else { - throw new Http3ConnectionException("Expecting only push promises after a message frame, found {$frame->type}", Http3Error::H3_FRAME_UNEXPECTED); + /** @psalm-suppress PossiblyNullPropertyFetch https://github.com/vimeo/psalm/issues/10668 */ + throw new Http3ConnectionException("Expecting only push promises after a message frame, found {$frame->name}", Http3Error::H3_FRAME_UNEXPECTED); } } } @@ -281,7 +297,8 @@ private static function processHeaders(array $decoded): ?array return [$headers, $pseudo]; } - /** @return ConcurrentIterator */ + // I'm unable to suppress https://github.com/vimeo/psalm/issues/10669 + /* @return ConcurrentIterator */ public function process(): ConcurrentIterator { EventLoop::queue(function () { @@ -290,6 +307,10 @@ public function process(): ConcurrentIterator try { $off = 0; $buf = $stream->read(); + if ($buf === null) { + return; // Nothing happens. That's allowed. Just bye then. + } + $type = self::decodeVarintFromStream($stream, $buf, $off); if ($stream->isWritable()) { // client-initiated bidirectional stream @@ -320,6 +341,7 @@ public function process(): ConcurrentIterator if ($frame !== Http3Frame::SETTINGS) { throw new Http3ConnectionException("A settings frame must be the first frame on the control stream", Http3Error::H3_MISSING_SETTINGS); } + /** @psalm-suppress PossiblyNullArgument https://github.com/vimeo/psalm/issues/10668 */ $this->parseSettings($contents); while (true) { @@ -335,6 +357,7 @@ public function process(): ConcurrentIterator } $tmpOff = 0; + /** @psalm-suppress PossiblyNullArgument https://github.com/vimeo/psalm/issues/10668 */ if (0 > $id = self::decodeVarint($contents, $tmpOff)) { if (!$stream->getConnection()->isClosed()) { throw new Http3ConnectionException("The control stream was closed", Http3Error::H3_CLOSED_CRITICAL_STREAM); @@ -394,6 +417,7 @@ public function process(): ConcurrentIterator } // Note: format is shared with HTTP/2 + /** @return list{int<0, 7>, bool}|null */ public static function parsePriority(array|string $headers): ?array { $urgency = 3; @@ -404,7 +428,7 @@ public static function parsePriority(array|string $headers): ?array if ($number instanceof Number) { $value = $number->item; if (\is_int($value) && $value >= 0 && $value <= 7) { - $urgency = $number->item; + $urgency = $value; } } } @@ -491,6 +515,7 @@ public function receiveDatagram(QuicSocket $stream, ?Cancellation $cancellation try { return $suspension->suspend(); } finally { + /** @psalm-suppress PossiblyNullArgument https://github.com/vimeo/psalm/issues/10553 */ $cancellation?->unsubscribe($cancellationId); } } diff --git a/src/Driver/Internal/Http3/QPack.php b/src/Driver/Internal/Http3/QPack.php index f0846c9d..a8ca5ff4 100644 --- a/src/Driver/Internal/Http3/QPack.php +++ b/src/Driver/Internal/Http3/QPack.php @@ -114,7 +114,10 @@ class QPack ["x-frame-options", "sameorigin"], ]; - /** @return positive-int */ + /** + * @psalm-suppress MoreSpecificReturnType + * @return non-negative-int + */ private static function decodeDynamicInteger(string $input, int $maxBits, int &$off): int { if (!isset($input[$off])) { @@ -123,6 +126,7 @@ private static function decodeDynamicInteger(string $input, int $maxBits, int &$ $int = \ord($input[$off++]) & $maxBits; if ($maxBits !== $int) { + /** @psalm-suppress LessSpecificReturnStatement https://github.com/vimeo/psalm/issues/10667 */ return $int; } @@ -141,6 +145,7 @@ private static function decodeDynamicInteger(string $input, int $maxBits, int &$ } } while ($c & 0x80); + /** @psalm-suppress InvalidReturnStatement https://github.com/vimeo/psalm/issues/9902 */ return $int; } @@ -156,7 +161,10 @@ public static function decodeDynamicField(string $input, int $startBits, int &$o $off += $length; if ($huffman) { - return HPackNative::huffmanDecode($string); + $string = HPackNative::huffmanDecode($string); + if ($string === null) { + throw new QPackException(Http3Error::QPACK_DECOMPRESSION_FAILED, 'Invalid huffman encoded sequence'); + } } return $string; diff --git a/src/Driver/Internal/Http3/QPackException.php b/src/Driver/Internal/Http3/QPackException.php index b03d87ef..3be17145 100644 --- a/src/Driver/Internal/Http3/QPackException.php +++ b/src/Driver/Internal/Http3/QPackException.php @@ -4,7 +4,7 @@ class QPackException extends \Exception { - public function __construct(public Http3Error $error, $message = null) + public function __construct(public Http3Error $error, string $message = null) { parent::__construct($message ?? "Encoding error of type: " . $error->name); } diff --git a/src/Driver/Internal/StreamHttpDriver.php b/src/Driver/Internal/StreamHttpDriver.php index eec49c77..c11a95b2 100644 --- a/src/Driver/Internal/StreamHttpDriver.php +++ b/src/Driver/Internal/StreamHttpDriver.php @@ -10,6 +10,7 @@ abstract class StreamHttpDriver extends ConnectionHttpDriver { public function handleConnection(Client $client, QuicConnection|Socket $connection): void { + assert($connection instanceof Socket); $this->handleClient($client, $connection, $connection); } } diff --git a/src/Driver/Internal/UnbufferedBodyStream.php b/src/Driver/Internal/UnbufferedBodyStream.php index 42a71396..b0117f06 100644 --- a/src/Driver/Internal/UnbufferedBodyStream.php +++ b/src/Driver/Internal/UnbufferedBodyStream.php @@ -3,11 +3,17 @@ namespace Amp\Http\Server\Driver\Internal; use Amp\ByteStream\ReadableStream; +use Amp\ByteStream\ReadableStreamIteratorAggregate; use Amp\Cancellation; use Amp\Http\Server\RequestBody; -class UnbufferedBodyStream implements ReadableStream +/** + * @implements \IteratorAggregate + */ +class UnbufferedBodyStream implements ReadableStream, \IteratorAggregate { + use ReadableStreamIteratorAggregate; + private int $dataRead = 0; public function __construct(private RequestBody $body)