Skip to content

Commit

Permalink
Refactor connection results
Browse files Browse the repository at this point in the history
Using amphp/pipeline to simplify iterating result set rows.
  • Loading branch information
trowski committed Aug 14, 2022
1 parent 9412e74 commit 8482a68
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 168 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"require": {
"php": ">=8.1",
"amphp/amp": "^3",
"amphp/pipeline": "^1",
"amphp/socket": "^2",
"amphp/sql": "^2",
"amphp/sql-common": "^2"
Expand Down
37 changes: 16 additions & 21 deletions src/Internal/ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -878,12 +878,12 @@ private function handleQuery(string $packet): void
$this->parseOk($packet);

if ($this->metadata->statusFlags & self::SERVER_MORE_RESULTS_EXISTS) {
$result = new MysqlResultProxy;
$result->affectedRows = $this->metadata->affectedRows;
$result->insertId = $this->metadata->insertId;
$this->dequeueDeferred()->complete(new MysqlConnectionResult($result));
$this->result = $result;
$result->updateState(MysqlResultProxy::COLUMNS_FETCHED);
$this->result = new MysqlResultProxy(
affectedRows: $this->metadata->affectedRows,
insertId: $this->metadata->insertId
);
$this->result->updateState(MysqlResultProxyState::ColumnsFetched);
$this->dequeueDeferred()->complete(new MysqlConnectionResult($this->result));
$this->successfulResultFetch();
} else {
$this->parseCallback = null;
Expand All @@ -907,20 +907,16 @@ private function handleQuery(string $packet): void
}

$this->parseCallback = $this->handleTextColumnDefinition(...);
$this->dequeueDeferred()->complete(new MysqlConnectionResult($result = new MysqlResultProxy));
/* we need to resolve before assigning vars, so that a onResolve() handler won't have a partial result available */
$this->result = $result;
$result->setColumns(MysqlDataType::decodeUnsigned($packet));
$this->result = new MysqlResultProxy(MysqlDataType::decodeUnsigned($packet));
$this->dequeueDeferred()->complete(new MysqlConnectionResult($this->result));
}

/** @see 14.7.1 Binary Protocol Resultset */
private function handleExecute(string $packet): void
{
$this->parseCallback = $this->handleBinaryColumnDefinition(...);
$this->dequeueDeferred()->complete(new MysqlConnectionResult($result = new MysqlResultProxy));
/* we need to resolve before assigning vars, so that a onResolve() handler won't have a partial result available */
$this->result = $result;
$result->setColumns(\ord($packet));
$this->result = new MysqlResultProxy(\ord($packet));
$this->dequeueDeferred()->complete(new MysqlConnectionResult($this->result));
}

private function handleFieldList(string $packet): void
Expand Down Expand Up @@ -954,7 +950,7 @@ private function handleColumnDefinition(string $packet, \Closure $parseCallback)
\assert($this->result !== null, 'Connection result was in invalid state');

if (!$this->result->columnsToFetch--) {
$this->result->updateState(MysqlResultProxy::COLUMNS_FETCHED);
$this->result->updateState(MysqlResultProxyState::ColumnsFetched);
if (\ord($packet) === self::ERR_PACKET) {
$this->parseCallback = null;
$this->handleError($packet);
Expand Down Expand Up @@ -1000,7 +996,7 @@ private function prepareFields(string $packet): void
$result = $this->result;
$this->result = null;
$this->ready();
$result->updateState(MysqlResultProxy::COLUMNS_FETCHED);
$result->updateState(MysqlResultProxyState::ColumnsFetched);

return;
}
Expand Down Expand Up @@ -1074,12 +1070,13 @@ private function successfulResultFetch(): void
} else {
$this->parseCallback = null;
$this->query = null;
$this->result = null;
$deferred->complete();
$this->ready();
}

$result->updateState(MysqlResultProxy::ROWS_FETCHED);
$this->result = null;

$result->updateState(MysqlResultProxyState::Complete);
}

/** @see 14.6.4.1.1.3 Resultset Row */
Expand Down Expand Up @@ -1177,9 +1174,7 @@ private function handlePrepare(string $packet): void

$this->metadata->warnings = MysqlDataType::decodeUnsigned16($packet, $offset);

$this->result = new MysqlResultProxy;
$this->result->columnsToFetch = $params;
$this->result->columnCount = $columns;
$this->result = new MysqlResultProxy($columns, $params);
$this->refcount++;
\assert($this->query !== null, 'Invalid value for connection query');
$this->dequeueDeferred()->complete(new MysqlConnectionStatement($this, $this->query, $stmtId, $this->named, $this->result));
Expand Down
97 changes: 26 additions & 71 deletions src/Internal/MysqlConnectionResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,40 @@ final class MysqlConnectionResult implements MysqlResult, \IteratorAggregate
{
private readonly MysqlResultProxy $result;

private readonly \Generator $generator;
private readonly \Generator $iterator;

private ?Future $nextResult = null;

public function __construct(MysqlResultProxy $result)
{
$this->result = $result;
$this->generator = self::iterate($result);
}

public function getIterator(): \Traversable
{
return $this->generator;
$this->iterator = self::iterate($result);
}

private static function iterate(MysqlResultProxy $result): \Generator
{
$next = self::fetchRow($result);

try {
if (!($row = $next->await())) {
return;
}
if (!$result->rowIterator->continue()) {
return;
}

// Column names are only available once a result row has been fetched.
$columnNames = \array_column($result->columns, 'name');
// Column names are only available once a result row has been fetched.
$columnNames = \array_column($result->columns, 'name');

do {
$next = self::fetchRow($result);
yield \array_combine($columnNames, $row);
} while ($row = $next->await());
} finally {
if (!isset($row)) {
return; // Result fully consumed.
}
do {
$row = $result->rowIterator->getValue();
yield \array_combine($columnNames, $row);
} while ($result->rowIterator->continue());
}

EventLoop::queue(static function () use ($next, $result): void {
public function __destruct()
{
if ($this->iterator->valid()) {
$iterator = $this->iterator;
EventLoop::queue(static function () use ($iterator): void {
try {
// Discard remaining results if disposed.
while ($next->await()) {
$next = self::fetchRow($result);
// Discard remaining rows in the result set.
while ($iterator->valid()) {
$iterator->next();
}
} catch (\Throwable) {
// Ignore errors while discarding result.
Expand All @@ -62,47 +55,15 @@ private static function iterate(MysqlResultProxy $result): \Generator
}
}

/**
* @return Future<array|null>
*/
private static function fetchRow(MysqlResultProxy $result): Future
public function getIterator(): \Traversable
{
if ($result->userFetched < $result->fetchedRows) {
$row = $result->rows[$result->userFetched];
unset($result->rows[$result->userFetched]);
$result->userFetched++;
return Future::complete($row);
}

if ($result->state === MysqlResultProxy::ROWS_FETCHED) {
return Future::complete();
}

if ($result->exception) {
throw $result->exception;
}

$deferred = new DeferredFuture;

/* We need to increment the internal counter, else the next time fetch is called,
* it'll simply return the row we fetch here instead of fetching a new row
* since callback order on promises isn't defined, we can't do this via onResolve() */
$incRow = static function (?array $row) use ($result): ?array {
unset($result->rows[$result->userFetched++]);
return $row;
};

$result->deferreds[MysqlResultProxy::UNFETCHED][] = [$deferred, null, $incRow];
return $deferred->getFuture();
// Using a Generator to keep a reference to $this.
yield from $this->iterator;
}

public function getNextResult(): ?MysqlResult
{
if ($this->nextResult) {
return $this->nextResult->await();
}

$this->nextResult = async(function (): ?MysqlResult {
$this->nextResult ??= async(function (): ?MysqlResult {
$deferred = $this->result->next ??= new DeferredFuture;
$result = $deferred->getFuture()->await();

Expand All @@ -121,7 +82,7 @@ public function getRowCount(): ?int
return $this->result->affectedRows;
}

public function getColumnCount(): ?int
public function getColumnCount(): int
{
return $this->result->columnCount;
}
Expand All @@ -133,12 +94,6 @@ public function getLastInsertId(): ?int

public function getColumnDefinitions(): ?array
{
if ($this->result->state >= MysqlResultProxy::COLUMNS_FETCHED) {
return $this->result->columns;
}

$deferred = new DeferredFuture;
$this->result->deferreds[MysqlResultProxy::COLUMNS_FETCHED][] = [$deferred, &$this->result->columns, null];
return $deferred->getFuture()->await();
return $this->result->getColumnDefinitions();
}
}
12 changes: 1 addition & 11 deletions src/Internal/MysqlConnectionStatement.php
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,7 @@ public function reset(): void

public function getColumnDefinitions(): ?array
{
if ($this->result->state >= MysqlResultProxy::COLUMNS_FETCHED) {
return $this->result->columns;
}

if (isset($this->result->deferreds[MysqlResultProxy::COLUMNS_FETCHED][0])) {
return $this->result->deferreds[MysqlResultProxy::COLUMNS_FETCHED][0][0]->promise();
}

$deferred = new DeferredFuture;
$this->result->deferreds[MysqlResultProxy::COLUMNS_FETCHED][0] = [$deferred, &$this->result->columns, null];
return $deferred->getFuture()->await();
return $this->result->getColumnDefinitions();
}

public function getLastUsedAt(): int
Expand Down
Loading

0 comments on commit 8482a68

Please sign in to comment.