Skip to content

Commit

Permalink
Live Query API (#104)
Browse files Browse the repository at this point in the history
* WIP Live Query API

* Incremental live queries

* Update readme

* add live query test

* add live query test
  • Loading branch information
samwillis authored Jul 25, 2024
1 parent b1832ae commit 5cfbed1
Show file tree
Hide file tree
Showing 9 changed files with 1,174 additions and 0 deletions.
121 changes: 121 additions & 0 deletions packages/pglite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,127 @@ PGlite supports the pl/pgsql procedural language extension, this is included and

In future we plan to support additional extensions, see the [roadmap](#roadmap).

## Live Queries

The "live" extension enables you to subscribe to a query and receve updated results when the underlying tables change.

To use the extension it needs adding to the PGlite instance when creating it:

```ts
import { PGlite } from "@electric-sql/pglite";
import { live } from "@electric-sql/pglite/live";

const pg = new PGlite({
extensions: {
live,
},
});
```

There are three methods on the `live` namespace:
- `live.query()` for basic live queries. With less machinery in PG it's quicker for small results sets and narrow rows.
- `live.incrementalQuery()` for incremental queries. It materialises the full result set on each update from only the changes emitted by the `live.changes` api. Perfect for feeding into React and good performance for large result sets and wide rows.
- `live.changes()` a lower level API that emits the changes (insert/update/delete) that can then be mapped to mutations in a UI or other datastore.

### live.query<T>()

This is very similar to a standard query, but takes an additional callback that receives the results whenever they change:

```js
const ret = pg.live.query("SELECT * FROM test ORDER BY rand;", [], (res) => {
// res is the same as a standard query result object
});
```

The returned value from the call is an object with this interface:

```ts
interface LiveQueryReturn<T> {
initialResults: Results<T>;
unsubscribe: () => Promise<void>;
refresh: () => Promise<void>;
}
```

- `initialResults` is the initial results set (also sent to the callback
- `unsubscribe` allow you to unsubscribe from the live query
- `refresh` allows you to force a refresh of the query

Internally it watches for the tables that the query depends on, and reruns the query whenever they are changed.

### live.incrementalQuery<T>()

Similar to above, but maintains a temporary table inside of Postgres of the previous state. When the tables it depends on change the query is re-run and diffed with the last state. Only the changes from the last version of the query are copied from WASM into JS.

It requires an additional `key` argument, the name of a column (often a PK) to key the diff on.

```ts
const ret = pg.live.incrementalQuery(
"SELECT * FROM test ORDER BY rand;", [], "id",
(res) => {
// res is the same as a standard query result object
}
);
```

The returned value is of the same type as the `query` method above.

### live.changes()

A lower level API which is the backend for the `incrementalQuery`, it emits the change that have happened. It requires a `key` to key the diff on:

```ts
const ret = pg.live.changes(
"SELECT * FROM test ORDER BY rand;", [], "id",
(res) => {
// res is a change result object
}
);
```

the returned value from the call is defined by this interface:

```ts
interface LiveChangesReturn<T = { [key: string]: any }> {
fields: { name: string; dataTypeID: number }[];
initialChanges: Array<Change<T>>;
unsubscribe: () => Promise<void>;
refresh: () => Promise<void>;
}
```

The results passed to the callback are array of `Change` objects:

```ts
type ChangeInsert<T> = {
__changed_columns__: string[];
__op__: "INSERT";
__after__: number;
} & T;

type ChangeDelete<T> = {
__changed_columns__: string[];
__op__: "DELETE";
__after__: undefined;
} & T;

type ChangeUpdate<T> = {
__changed_columns__: string[];
__op__: "UPDATE";
__after__: number;
} & T;

type Change<T> = ChangeInsert<T> | ChangeDelete<T> | ChangeUpdate<T>;
```

Each `Change` includes the new values along with:

- `__changed_columns__` the columns names that were changes
- `__op__` the operation that is required to update the state (`INSERT`, `UPDATE`, `DELETE`)
- `__after__` the `key` of the row that this row should be after, it will be included in `__changed_columns__` if it has been changed.

This API can be used to implement very efficient in-place DOM updates.

## ORM support.

- Drizzle ORM supports PGlite, see [their docs here](https://orm.drizzle.team/docs/get-started-postgresql#pglite).
Expand Down
54 changes: 54 additions & 0 deletions packages/pglite/examples/live-changes.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<button id="start">start</button><button id="add">Add</button>
<pre id="output"></pre>
<script type="module">
import { PGlite } from "../dist/index.js";
import { live } from "../dist/live/index.js";

const output = document.getElementById("output");
const startBtn = document.getElementById("start");
const addBtn = document.getElementById("add");
let counter = 1000;
let lastClicked = 0;
const nameLength = 10000;
const nameSuffix = "-".repeat(nameLength);

const pg = new PGlite({
extensions: {
live,
},
});
window.pg = pg;

await pg.exec(`
CREATE TABLE IF NOT EXISTS test (
id SERIAL PRIMARY KEY,
rand float,
name TEXT
);
INSERT INTO test (name, rand)
SELECT 'test' || i || '${nameSuffix}', random()
FROM generate_series(1, ${counter}) AS i;
`);

startBtn.addEventListener("click", async () => {
lastClicked = performance.now();
pg.live.changes(
"SELECT * FROM test ORDER BY rand;",
null,
"id",
(changes) => {
const time = performance.now() - lastClicked;
console.log(`Update took ${time}ms`);
output.textContent = JSON.stringify(changes, null, 2);
}
);
});

addBtn.addEventListener("click", async () => {
lastClicked = performance.now();
await pg.query(
"INSERT INTO test (name, rand) VALUES ($1, random());",
[`test${++counter}${nameSuffix}`]
);
});
</script>
54 changes: 54 additions & 0 deletions packages/pglite/examples/live-incremental.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<button id="start">start</button><button id="add">Add</button>
<div id="output"></div>
<script type="module">
import { PGlite } from "../dist/index.js";
import { live } from "../dist/live/index.js";

const output = document.getElementById("output");
const startBtn = document.getElementById("start");
const addBtn = document.getElementById("add");
let counter = 1000;
let lastClicked = 0;
const nameLength = 10000;
const nameSuffix = "-".repeat(nameLength);

const pg = new PGlite({
extensions: {
live,
},
});
window.pg = pg;

await pg.exec(`
CREATE TABLE IF NOT EXISTS test (
id SERIAL PRIMARY KEY,
rand FLOAT,
name TEXT
);
INSERT INTO test (name, rand)
SELECT 'test' || i || '${nameSuffix}', random()
FROM generate_series(1, ${counter}) AS i;
`);

startBtn.addEventListener("click", async () => {
lastClicked = performance.now();
pg.live.incrementalQuery(
"SELECT * FROM test ORDER BY rand;",
null,
"id",
(res) => {
const time = performance.now() - lastClicked;
console.log(`Update took ${time}ms`);
output.textContent = res.rows.map((row) => row.id).join(", ");
}
);
});

addBtn.addEventListener("click", async () => {
lastClicked = performance.now();
await pg.query(
"INSERT INTO test (name, rand) VALUES ($1, random());",
[`test${++counter}${nameSuffix}`]
);
});
</script>
49 changes: 49 additions & 0 deletions packages/pglite/examples/live.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<button id="start">start</button><button id="add">Add</button>
<div id="output"></div>
<script type="module">
import { PGlite } from "../dist/index.js";
import { live } from "../dist/live/index.js";

const output = document.getElementById("output");
const startBtn = document.getElementById("start");
const addBtn = document.getElementById("add");
let counter = 1000;
let lastClicked = 0;
const nameLength = 10000;
const nameSuffix = "-".repeat(nameLength);

const pg = new PGlite({
extensions: {
live,
},
});
window.pg = pg;

await pg.exec(`
CREATE TABLE IF NOT EXISTS test (
id SERIAL PRIMARY KEY,
rand FLOAT,
name TEXT
);
INSERT INTO test (name, rand)
SELECT 'test' || i || '${nameSuffix}', random()
FROM generate_series(1, ${counter}) AS i;
`);

startBtn.addEventListener("click", async () => {
lastClicked = performance.now();
pg.live.query("SELECT * FROM test ORDER BY rand;", null, (res) => {
const time = performance.now() - lastClicked;
console.log(`Update took ${time}ms`);
output.textContent = res.rows.map((row) => row.id).join(", ");
});
});

addBtn.addEventListener("click", async () => {
lastClicked = performance.now();
await pg.query(
"INSERT INTO test (name, rand) VALUES ($1, random());",
[`test${++counter}${nameSuffix}`]
);
});
</script>
1 change: 1 addition & 0 deletions packages/pglite/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",
"./live": "./dist/live/index.js",
"./worker": "./dist/worker/index.js",
"./vector": "./dist/vector/index.js"
},
Expand Down
Loading

1 comment on commit 5cfbed1

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.