Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live Query API #104

Merged
merged 5 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions packages/pglite/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,127 @@ PGlite supports the pl/pgsql procedural language extension, this is included and

In future we plan to support additional extensions, see the [roadmap](#roadmap).

## Live Queries

The "live" extension enables you to subscribe to a query and receve updated results when the underlying tables change.

To use the extension it needs adding to the PGlite instance when creating it:

```ts
import { PGlite } from "@electric-sql/pglite";
import { live } from "@electric-sql/pglite/live";

const pg = new PGlite({
extensions: {
live,
},
});
```

There are three methods on the `live` namespace:
- `live.query()` for basic live queries. With less machinery in PG it's quicker for small results sets and narrow rows.
- `live.incrementalQuery()` for incremental queries. It materialises the full result set on each update from only the changes emitted by the `live.changes` api. Perfect for feeding into React and good performance for large result sets and wide rows.
- `live.changes()` a lower level API that emits the changes (insert/update/delete) that can then be mapped to mutations in a UI or other datastore.

### live.query<T>()

This is very similar to a standard query, but takes an additional callback that receives the results whenever they change:

```js
const ret = pg.live.query("SELECT * FROM test ORDER BY rand;", [], (res) => {
// res is the same as a standard query result object
});
```

The returned value from the call is an object with this interface:

```ts
interface LiveQueryReturn<T> {
initialResults: Results<T>;
unsubscribe: () => Promise<void>;
refresh: () => Promise<void>;
}
```

- `initialResults` is the initial results set (also sent to the callback
- `unsubscribe` allow you to unsubscribe from the live query
- `refresh` allows you to force a refresh of the query

Internally it watches for the tables that the query depends on, and reruns the query whenever they are changed.

### live.incrementalQuery<T>()

Similar to above, but maintains a temporary table inside of Postgres of the previous state. When the tables it depends on change the query is re-run and diffed with the last state. Only the changes from the last version of the query are copied from WASM into JS.

It requires an additional `key` argument, the name of a column (often a PK) to key the diff on.

```ts
const ret = pg.live.incrementalQuery(
"SELECT * FROM test ORDER BY rand;", [], "id",
(res) => {
// res is the same as a standard query result object
}
);
```

The returned value is of the same type as the `query` method above.

### live.changes()

A lower level API which is the backend for the `incrementalQuery`, it emits the change that have happened. It requires a `key` to key the diff on:

```ts
const ret = pg.live.changes(
"SELECT * FROM test ORDER BY rand;", [], "id",
(res) => {
// res is a change result object
}
);
```

the returned value from the call is defined by this interface:

```ts
interface LiveChangesReturn<T = { [key: string]: any }> {
fields: { name: string; dataTypeID: number }[];
initialChanges: Array<Change<T>>;
unsubscribe: () => Promise<void>;
refresh: () => Promise<void>;
}
```

The results passed to the callback are array of `Change` objects:

```ts
type ChangeInsert<T> = {
__changed_columns__: string[];
__op__: "INSERT";
__after__: number;
} & T;

type ChangeDelete<T> = {
__changed_columns__: string[];
__op__: "DELETE";
__after__: undefined;
} & T;

type ChangeUpdate<T> = {
__changed_columns__: string[];
__op__: "UPDATE";
__after__: number;
} & T;

type Change<T> = ChangeInsert<T> | ChangeDelete<T> | ChangeUpdate<T>;
```

Each `Change` includes the new values along with:

- `__changed_columns__` the columns names that were changes
- `__op__` the operation that is required to update the state (`INSERT`, `UPDATE`, `DELETE`)
- `__after__` the `key` of the row that this row should be after, it will be included in `__changed_columns__` if it has been changed.

This API can be used to implement very efficient in-place DOM updates.

## ORM support.

- Drizzle ORM supports PGlite, see [their docs here](https://orm.drizzle.team/docs/get-started-postgresql#pglite).
Expand Down
54 changes: 54 additions & 0 deletions packages/pglite/examples/live-changes.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<button id="start">start</button><button id="add">Add</button>
<pre id="output"></pre>
<script type="module">
import { PGlite } from "../dist/index.js";
import { live } from "../dist/live/index.js";

const output = document.getElementById("output");
const startBtn = document.getElementById("start");
const addBtn = document.getElementById("add");
let counter = 1000;
let lastClicked = 0;
const nameLength = 10000;
const nameSuffix = "-".repeat(nameLength);

const pg = new PGlite({
extensions: {
live,
},
});
window.pg = pg;

await pg.exec(`
CREATE TABLE IF NOT EXISTS test (
id SERIAL PRIMARY KEY,
rand float,
name TEXT
);
INSERT INTO test (name, rand)
SELECT 'test' || i || '${nameSuffix}', random()
FROM generate_series(1, ${counter}) AS i;
`);

startBtn.addEventListener("click", async () => {
lastClicked = performance.now();
pg.live.changes(
"SELECT * FROM test ORDER BY rand;",
null,
"id",
(changes) => {
const time = performance.now() - lastClicked;
console.log(`Update took ${time}ms`);
output.textContent = JSON.stringify(changes, null, 2);
}
);
});

addBtn.addEventListener("click", async () => {
lastClicked = performance.now();
await pg.query(
"INSERT INTO test (name, rand) VALUES ($1, random());",
[`test${++counter}${nameSuffix}`]
);
});
</script>
54 changes: 54 additions & 0 deletions packages/pglite/examples/live-incremental.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<button id="start">start</button><button id="add">Add</button>
<div id="output"></div>
<script type="module">
import { PGlite } from "../dist/index.js";
import { live } from "../dist/live/index.js";

const output = document.getElementById("output");
const startBtn = document.getElementById("start");
const addBtn = document.getElementById("add");
let counter = 1000;
let lastClicked = 0;
const nameLength = 10000;
const nameSuffix = "-".repeat(nameLength);

const pg = new PGlite({
extensions: {
live,
},
});
window.pg = pg;

await pg.exec(`
CREATE TABLE IF NOT EXISTS test (
id SERIAL PRIMARY KEY,
rand FLOAT,
name TEXT
);
INSERT INTO test (name, rand)
SELECT 'test' || i || '${nameSuffix}', random()
FROM generate_series(1, ${counter}) AS i;
`);

startBtn.addEventListener("click", async () => {
lastClicked = performance.now();
pg.live.incrementalQuery(
"SELECT * FROM test ORDER BY rand;",
null,
"id",
(res) => {
const time = performance.now() - lastClicked;
console.log(`Update took ${time}ms`);
output.textContent = res.rows.map((row) => row.id).join(", ");
}
);
});

addBtn.addEventListener("click", async () => {
lastClicked = performance.now();
await pg.query(
"INSERT INTO test (name, rand) VALUES ($1, random());",
[`test${++counter}${nameSuffix}`]
);
});
</script>
49 changes: 49 additions & 0 deletions packages/pglite/examples/live.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<button id="start">start</button><button id="add">Add</button>
<div id="output"></div>
<script type="module">
import { PGlite } from "../dist/index.js";
import { live } from "../dist/live/index.js";

const output = document.getElementById("output");
const startBtn = document.getElementById("start");
const addBtn = document.getElementById("add");
let counter = 1000;
let lastClicked = 0;
const nameLength = 10000;
const nameSuffix = "-".repeat(nameLength);

const pg = new PGlite({
extensions: {
live,
},
});
window.pg = pg;

await pg.exec(`
CREATE TABLE IF NOT EXISTS test (
id SERIAL PRIMARY KEY,
rand FLOAT,
name TEXT
);
INSERT INTO test (name, rand)
SELECT 'test' || i || '${nameSuffix}', random()
FROM generate_series(1, ${counter}) AS i;
`);

startBtn.addEventListener("click", async () => {
lastClicked = performance.now();
pg.live.query("SELECT * FROM test ORDER BY rand;", null, (res) => {
const time = performance.now() - lastClicked;
console.log(`Update took ${time}ms`);
output.textContent = res.rows.map((row) => row.id).join(", ");
});
});

addBtn.addEventListener("click", async () => {
lastClicked = performance.now();
await pg.query(
"INSERT INTO test (name, rand) VALUES ($1, random());",
[`test${++counter}${nameSuffix}`]
);
});
</script>
1 change: 1 addition & 0 deletions packages/pglite/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",
"./live": "./dist/live/index.js",
"./worker": "./dist/worker/index.js",
"./vector": "./dist/vector/index.js"
},
Expand Down
Loading
Loading