From 8482a6875fe57bf6df3784b8e60b2bf375dd1957 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 14 Aug 2022 13:05:28 -0500 Subject: [PATCH] Refactor connection results Using amphp/pipeline to simplify iterating result set rows. --- composer.json | 1 + src/Internal/ConnectionProcessor.php | 37 +++---- src/Internal/MysqlConnectionResult.php | 97 +++++------------- src/Internal/MysqlConnectionStatement.php | 12 +-- src/Internal/MysqlResultProxy.php | 115 ++++++++++------------ src/Internal/MysqlResultProxyState.php | 10 ++ test/LinkTest.php | 16 +++ test/initialize.php | 3 +- 8 files changed, 123 insertions(+), 168 deletions(-) create mode 100644 src/Internal/MysqlResultProxyState.php diff --git a/composer.json b/composer.json index 6b51209..ccb4978 100644 --- a/composer.json +++ b/composer.json @@ -15,6 +15,7 @@ "require": { "php": ">=8.1", "amphp/amp": "^3", + "amphp/pipeline": "^1", "amphp/socket": "^2", "amphp/sql": "^2", "amphp/sql-common": "^2" diff --git a/src/Internal/ConnectionProcessor.php b/src/Internal/ConnectionProcessor.php index ae53288..12517d5 100644 --- a/src/Internal/ConnectionProcessor.php +++ b/src/Internal/ConnectionProcessor.php @@ -878,12 +878,12 @@ private function handleQuery(string $packet): void $this->parseOk($packet); if ($this->metadata->statusFlags & self::SERVER_MORE_RESULTS_EXISTS) { - $result = new MysqlResultProxy; - $result->affectedRows = $this->metadata->affectedRows; - $result->insertId = $this->metadata->insertId; - $this->dequeueDeferred()->complete(new MysqlConnectionResult($result)); - $this->result = $result; - $result->updateState(MysqlResultProxy::COLUMNS_FETCHED); + $this->result = new MysqlResultProxy( + affectedRows: $this->metadata->affectedRows, + insertId: $this->metadata->insertId + ); + $this->result->updateState(MysqlResultProxyState::ColumnsFetched); + $this->dequeueDeferred()->complete(new MysqlConnectionResult($this->result)); $this->successfulResultFetch(); } else { $this->parseCallback = null; @@ -907,20 +907,16 @@ private function handleQuery(string $packet): void } $this->parseCallback = $this->handleTextColumnDefinition(...); - $this->dequeueDeferred()->complete(new MysqlConnectionResult($result = new MysqlResultProxy)); - /* we need to resolve before assigning vars, so that a onResolve() handler won't have a partial result available */ - $this->result = $result; - $result->setColumns(MysqlDataType::decodeUnsigned($packet)); + $this->result = new MysqlResultProxy(MysqlDataType::decodeUnsigned($packet)); + $this->dequeueDeferred()->complete(new MysqlConnectionResult($this->result)); } /** @see 14.7.1 Binary Protocol Resultset */ private function handleExecute(string $packet): void { $this->parseCallback = $this->handleBinaryColumnDefinition(...); - $this->dequeueDeferred()->complete(new MysqlConnectionResult($result = new MysqlResultProxy)); - /* we need to resolve before assigning vars, so that a onResolve() handler won't have a partial result available */ - $this->result = $result; - $result->setColumns(\ord($packet)); + $this->result = new MysqlResultProxy(\ord($packet)); + $this->dequeueDeferred()->complete(new MysqlConnectionResult($this->result)); } private function handleFieldList(string $packet): void @@ -954,7 +950,7 @@ private function handleColumnDefinition(string $packet, \Closure $parseCallback) \assert($this->result !== null, 'Connection result was in invalid state'); if (!$this->result->columnsToFetch--) { - $this->result->updateState(MysqlResultProxy::COLUMNS_FETCHED); + $this->result->updateState(MysqlResultProxyState::ColumnsFetched); if (\ord($packet) === self::ERR_PACKET) { $this->parseCallback = null; $this->handleError($packet); @@ -1000,7 +996,7 @@ private function prepareFields(string $packet): void $result = $this->result; $this->result = null; $this->ready(); - $result->updateState(MysqlResultProxy::COLUMNS_FETCHED); + $result->updateState(MysqlResultProxyState::ColumnsFetched); return; } @@ -1074,12 +1070,13 @@ private function successfulResultFetch(): void } else { $this->parseCallback = null; $this->query = null; - $this->result = null; $deferred->complete(); $this->ready(); } - $result->updateState(MysqlResultProxy::ROWS_FETCHED); + $this->result = null; + + $result->updateState(MysqlResultProxyState::Complete); } /** @see 14.6.4.1.1.3 Resultset Row */ @@ -1177,9 +1174,7 @@ private function handlePrepare(string $packet): void $this->metadata->warnings = MysqlDataType::decodeUnsigned16($packet, $offset); - $this->result = new MysqlResultProxy; - $this->result->columnsToFetch = $params; - $this->result->columnCount = $columns; + $this->result = new MysqlResultProxy($columns, $params); $this->refcount++; \assert($this->query !== null, 'Invalid value for connection query'); $this->dequeueDeferred()->complete(new MysqlConnectionStatement($this, $this->query, $stmtId, $this->named, $this->result)); diff --git a/src/Internal/MysqlConnectionResult.php b/src/Internal/MysqlConnectionResult.php index 2bfdccb..3aff048 100644 --- a/src/Internal/MysqlConnectionResult.php +++ b/src/Internal/MysqlConnectionResult.php @@ -13,47 +13,40 @@ final class MysqlConnectionResult implements MysqlResult, \IteratorAggregate { private readonly MysqlResultProxy $result; - private readonly \Generator $generator; + private readonly \Generator $iterator; private ?Future $nextResult = null; public function __construct(MysqlResultProxy $result) { $this->result = $result; - $this->generator = self::iterate($result); - } - - public function getIterator(): \Traversable - { - return $this->generator; + $this->iterator = self::iterate($result); } private static function iterate(MysqlResultProxy $result): \Generator { - $next = self::fetchRow($result); - - try { - if (!($row = $next->await())) { - return; - } + if (!$result->rowIterator->continue()) { + return; + } - // Column names are only available once a result row has been fetched. - $columnNames = \array_column($result->columns, 'name'); + // Column names are only available once a result row has been fetched. + $columnNames = \array_column($result->columns, 'name'); - do { - $next = self::fetchRow($result); - yield \array_combine($columnNames, $row); - } while ($row = $next->await()); - } finally { - if (!isset($row)) { - return; // Result fully consumed. - } + do { + $row = $result->rowIterator->getValue(); + yield \array_combine($columnNames, $row); + } while ($result->rowIterator->continue()); + } - EventLoop::queue(static function () use ($next, $result): void { + public function __destruct() + { + if ($this->iterator->valid()) { + $iterator = $this->iterator; + EventLoop::queue(static function () use ($iterator): void { try { - // Discard remaining results if disposed. - while ($next->await()) { - $next = self::fetchRow($result); + // Discard remaining rows in the result set. + while ($iterator->valid()) { + $iterator->next(); } } catch (\Throwable) { // Ignore errors while discarding result. @@ -62,47 +55,15 @@ private static function iterate(MysqlResultProxy $result): \Generator } } - /** - * @return Future - */ - private static function fetchRow(MysqlResultProxy $result): Future + public function getIterator(): \Traversable { - if ($result->userFetched < $result->fetchedRows) { - $row = $result->rows[$result->userFetched]; - unset($result->rows[$result->userFetched]); - $result->userFetched++; - return Future::complete($row); - } - - if ($result->state === MysqlResultProxy::ROWS_FETCHED) { - return Future::complete(); - } - - if ($result->exception) { - throw $result->exception; - } - - $deferred = new DeferredFuture; - - /* We need to increment the internal counter, else the next time fetch is called, - * it'll simply return the row we fetch here instead of fetching a new row - * since callback order on promises isn't defined, we can't do this via onResolve() */ - $incRow = static function (?array $row) use ($result): ?array { - unset($result->rows[$result->userFetched++]); - return $row; - }; - - $result->deferreds[MysqlResultProxy::UNFETCHED][] = [$deferred, null, $incRow]; - return $deferred->getFuture(); + // Using a Generator to keep a reference to $this. + yield from $this->iterator; } public function getNextResult(): ?MysqlResult { - if ($this->nextResult) { - return $this->nextResult->await(); - } - - $this->nextResult = async(function (): ?MysqlResult { + $this->nextResult ??= async(function (): ?MysqlResult { $deferred = $this->result->next ??= new DeferredFuture; $result = $deferred->getFuture()->await(); @@ -121,7 +82,7 @@ public function getRowCount(): ?int return $this->result->affectedRows; } - public function getColumnCount(): ?int + public function getColumnCount(): int { return $this->result->columnCount; } @@ -133,12 +94,6 @@ public function getLastInsertId(): ?int public function getColumnDefinitions(): ?array { - if ($this->result->state >= MysqlResultProxy::COLUMNS_FETCHED) { - return $this->result->columns; - } - - $deferred = new DeferredFuture; - $this->result->deferreds[MysqlResultProxy::COLUMNS_FETCHED][] = [$deferred, &$this->result->columns, null]; - return $deferred->getFuture()->await(); + return $this->result->getColumnDefinitions(); } } diff --git a/src/Internal/MysqlConnectionStatement.php b/src/Internal/MysqlConnectionStatement.php index 5434069..c1baa4f 100644 --- a/src/Internal/MysqlConnectionStatement.php +++ b/src/Internal/MysqlConnectionStatement.php @@ -159,17 +159,7 @@ public function reset(): void public function getColumnDefinitions(): ?array { - if ($this->result->state >= MysqlResultProxy::COLUMNS_FETCHED) { - return $this->result->columns; - } - - if (isset($this->result->deferreds[MysqlResultProxy::COLUMNS_FETCHED][0])) { - return $this->result->deferreds[MysqlResultProxy::COLUMNS_FETCHED][0][0]->promise(); - } - - $deferred = new DeferredFuture; - $this->result->deferreds[MysqlResultProxy::COLUMNS_FETCHED][0] = [$deferred, &$this->result->columns, null]; - return $deferred->getFuture()->await(); + return $this->result->getColumnDefinitions(); } public function getLastUsedAt(): int diff --git a/src/Internal/MysqlResultProxy.php b/src/Internal/MysqlResultProxy.php index 05bb977..139ce82 100644 --- a/src/Internal/MysqlResultProxy.php +++ b/src/Internal/MysqlResultProxy.php @@ -4,101 +4,88 @@ use Amp\DeferredFuture; use Amp\Mysql\MysqlColumnDefinition; +use Amp\Pipeline\ConcurrentIterator; +use Amp\Pipeline\Queue; /** @internal */ final class MysqlResultProxy { - public int $columnCount = 0; + public int $columnsToFetch = 0; + /** @var list */ public array $columns = []; + public array $params = []; - public int $columnsToFetch = 0; - public array $rows = []; - public int $fetchedRows = 0; - public int $userFetched = 0; - public ?int $insertId = null; + private readonly Queue $rowQueue; - public ?int $affectedRows = null; + public readonly ConcurrentIterator $rowIterator; - public array $deferreds = [ - self::UNFETCHED => [], - self::COLUMNS_FETCHED => [], - self::ROWS_FETCHED => [] - ]; + private ?DeferredFuture $columnDeferred = null; - public int $state = self::UNFETCHED; + private MysqlResultProxyState $state = MysqlResultProxyState::Initial; public ?DeferredFuture $next = null; - public const UNFETCHED = 0; - public const COLUMNS_FETCHED = 1; - public const ROWS_FETCHED = 2; + public function __construct( + public readonly int $columnCount = 0, + ?int $columnsToFetch = null, + public readonly ?int $affectedRows = null, + public readonly ?int $insertId = null, + ) { + $this->rowQueue = new Queue(); + $this->rowIterator = $this->rowQueue->iterate(); - public const SINGLE_ROW_FETCH = 255; - - public ?\Throwable $exception = null; - - public function setColumns(int $columns): void - { - $this->columnCount = $this->columnsToFetch = $columns; + $this->columnsToFetch = $columnsToFetch ?? $this->columnCount; } - public function updateState(int $state): void + public function getColumnDefinitions(): array { - $this->state = $state; - if ($state === self::ROWS_FETCHED) { - $this->rowFetched(null); - } - if (empty($this->deferreds[$state])) { - return; + if ($this->state === MysqlResultProxyState::Initial) { + $this->columnDeferred ??= new DeferredFuture(); + $this->columnDeferred->getFuture()->await(); } - foreach ($this->deferreds[$state] as [$deferred, $rows, $cb]) { - \assert($deferred instanceof DeferredFuture); - $deferred->complete($cb ? $cb($rows) : $rows); - } - $this->deferreds[$state] = []; + + return $this->columns; } - public function rowFetched(?array $row): void + public function updateState(MysqlResultProxyState $state): void { - if ($row !== null) { - $this->rows[$this->fetchedRows++] = $row; - } - [$entry, , $cb] = \current($this->deferreds[self::UNFETCHED]); - if ($entry !== null) { - unset($this->deferreds[self::UNFETCHED][\key($this->deferreds[self::UNFETCHED])]); - \assert($entry instanceof DeferredFuture); - $entry->complete($cb && $row ? $cb($row) : $row); + if ($this->state === MysqlResultProxyState::Complete) { + throw new \RuntimeException('Result set already complete'); } + + match ($state) { + MysqlResultProxyState::Complete => $this->rowQueue->complete(), + MysqlResultProxyState::ColumnsFetched => $this->columnsFetched(), + MysqlResultProxyState::Initial => throw new \RuntimeException('Cannot reset to initial state'), + }; + + $this->state = $state; } - public function error(\Throwable $e): void + private function columnsFetched(): void { - $this->exception = $e; - - foreach ($this->deferreds as $deferreds) { - foreach ($deferreds as [$deferred]) { - \assert($deferred instanceof DeferredFuture); - $deferred->error($e); - } - } + $this->columnDeferred?->complete(); + $this->columnDeferred = null; + } - $this->deferreds = []; + public function rowFetched(array $row): void + { + $this->rowQueue->push($row); } - /** - * @codeCoverageIgnore - */ - public function __debugInfo(): array + public function error(\Throwable $e): void { - $tmp = clone $this; - foreach ($tmp->deferreds as &$type) { - foreach ($type as &$entry) { - unset($entry[0], $entry[2]); - } + if ($this->state === MysqlResultProxyState::Complete) { + return; } - return (array) $tmp; + $this->state = MysqlResultProxyState::Complete; + + $this->rowQueue->error($e); + + $this->columnDeferred?->error($e); + $this->columnDeferred = null; } } diff --git a/src/Internal/MysqlResultProxyState.php b/src/Internal/MysqlResultProxyState.php new file mode 100644 index 0000000..84c071c --- /dev/null +++ b/src/Internal/MysqlResultProxyState.php @@ -0,0 +1,10 @@ +assertNull($resultset->getNextResult()); } + public function testQueryWithUnconsumedTupleResult() + { + $db = $this->getLink(); + + $result = $db->query("SELECT * FROM main"); + + $this->assertInstanceOf(Result::class, $result); + + unset($result); // Force destruction of result object. + + $result = $db->query("SELECT * FROM main"); + + $this->assertInstanceOf(Result::class, $result); + } + public function testPrepared() { $db = $this->getLink(true); diff --git a/test/initialize.php b/test/initialize.php index c5bdab1..6edd61e 100644 --- a/test/initialize.php +++ b/test/initialize.php @@ -7,7 +7,8 @@ function initialize(\mysqli $db): void $db->query("CREATE DATABASE test"); $db->query("CREATE TABLE test.main (id INT NOT NULL AUTO_INCREMENT, a INT, b INT, c DATETIME, PRIMARY KEY (id))"); - $epoch = date('1970-01-01 00:00:00'); + + $epoch = '1970-01-01 00:00:00'; $db->query("INSERT INTO test.main (a, b, c) VALUES (1, 2, '$epoch'), (2, 3, '$epoch'), (3, 4, '$epoch'), (4, 5, '$epoch'), (5, 6, '$epoch')"); $db->query("CREATE TABLE test.json (a JSON)");