Skip to content

Commit

Permalink
Use linked list rather than pos for ordering (smaller updates over wire)
Browse files Browse the repository at this point in the history
  • Loading branch information
samwillis committed Jun 21, 2024
1 parent c2f8c2a commit 1f0f7e0
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 67 deletions.
4 changes: 2 additions & 2 deletions packages/pglite/examples/live-changes.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
const output = document.getElementById("output");
const startBtn = document.getElementById("start");
const addBtn = document.getElementById("add");
let counter = 10;
let counter = 1000;
let lastClicked = 0;
const nameLength = 100000;
const nameLength = 10000;
const nameSuffix = "-".repeat(nameLength);

const pg = new PGlite({
Expand Down
8 changes: 4 additions & 4 deletions packages/pglite/examples/live-incremental.html
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
<button id="start">start</button><button id="add">Add</button>
<pre id="output"></pre>
<div id="output"></div>
<script type="module">
import { PGlite } from "../dist/index.js";
import { live } from "../dist/live/index.js";

const output = document.getElementById("output");
const startBtn = document.getElementById("start");
const addBtn = document.getElementById("add");
let counter = 10;
let counter = 1000;
let lastClicked = 0;
const nameLength = 100000;
const nameLength = 10000;
const nameSuffix = "-".repeat(nameLength);

const pg = new PGlite({
Expand Down Expand Up @@ -39,7 +39,7 @@
(res) => {
const time = performance.now() - lastClicked;
console.log(`Update took ${time}ms`);
output.textContent = JSON.stringify(res.rows, null, 2);
output.textContent = res.rows.map((row) => row.id).join(", ");
}
);
});
Expand Down
10 changes: 5 additions & 5 deletions packages/pglite/examples/live.html
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
<button id="start">start</button><button id="add">Add</button>
<pre id="output"></pre>
<div id="output"></div>
<script type="module">
import { PGlite } from "../dist/index.js";
import { live } from "../dist/live/index.js";

const output = document.getElementById("output");
const startBtn = document.getElementById("start");
const addBtn = document.getElementById("add");
let counter = 10;
let counter = 1000;
let lastClicked = 0;
const nameLength = 100000;
const nameLength = 10000;
const nameSuffix = "-".repeat(nameLength);

const pg = new PGlite({
Expand All @@ -35,15 +35,15 @@
pg.live.query("SELECT * FROM test ORDER BY rand;", null, (res) => {
const time = performance.now() - lastClicked;
console.log(`Update took ${time}ms`);
output.textContent = JSON.stringify(res.rows, null, 2);
output.textContent = res.rows.map((row) => row.id).join(", ");
});
});

addBtn.addEventListener("click", async () => {
lastClicked = performance.now();
await pg.query(
"INSERT INTO test (name, rand) VALUES ($1, random());",
[`test${++counter}{$nameSuffix}`]
[`test${++counter}${nameSuffix}`]
);
});
</script>
102 changes: 49 additions & 53 deletions packages/pglite/src/live/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { after } from "node:test";
import type {
Extension,
PGliteInterface,
Expand All @@ -24,7 +25,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
async query<T>(
query: string,
params: any[] | undefined | null,
callback: (results: Results<T>) => void,
callback: (results: Results<T>) => void
) {
const id = liveQueryCounter++;

Expand All @@ -35,7 +36,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
// Create a temporary view with the query
await tx.query(
`CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
params ?? [],
params ?? []
);

// Get the tables used in the view and add triggers to notify when they change
Expand Down Expand Up @@ -65,7 +66,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
`table_change__${table.schema_name}__${table.table_name}`,
async () => {
refresh();
},
}
);
unsubList.push(unsub);
}
Expand Down Expand Up @@ -96,7 +97,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
query: string,
params: any[] | undefined | null,
key: string,
callback: (changes: Array<Change<T>>) => void,
callback: (changes: Array<Change<T>>) => void
) {
const id = liveQueryCounter++;
let tables: { table_name: string; schema_name: string }[];
Expand All @@ -107,7 +108,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
// Create a temporary view with the query
await tx.query(
`CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`,
params ?? [],
params ?? []
);

// Get the tables used in the view and add triggers to notify when they change
Expand All @@ -123,7 +124,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
WHERE table_name = 'live_query_${id}_view'
`)
).rows,
{ column_name: "__pos__", data_type: "integer" },
{ column_name: "__after__", data_type: "integer" },
];

// Init state tables as empty temp table
Expand All @@ -138,16 +139,16 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
await tx.exec(`
PREPARE live_query_${id}_diff${curr} AS
WITH
prev AS (SELECT ROW_NUMBER() OVER () as __pos__, * FROM live_query_${id}_state${prev}),
curr AS (SELECT ROW_NUMBER() OVER () as __pos__, * FROM live_query_${id}_state${curr}),
prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}),
curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}),
data_diff AS (
-- INSERT operations: Include all columns
SELECT
'INSERT' AS __op__,
${columns
.map(
({ column_name }) =>
`curr."${column_name}" AS "${column_name}"`,
`curr."${column_name}" AS "${column_name}"`
)
.join(",\n")},
ARRAY[]::text[] AS __changed_columns__
Expand Down Expand Up @@ -183,7 +184,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
THEN curr."${column_name}"
ELSE NULL::${data_type}
END AS "${column_name}"`,
END AS "${column_name}"`
)
.join(",\n")},
ARRAY(SELECT unnest FROM unnest(ARRAY[${columns
Expand All @@ -194,10 +195,10 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
THEN '${column_name}'
ELSE NULL
END`,
END`
)
.join(
", ",
", "
)}]) WHERE unnest IS NOT NULL) AS __changed_columns__
FROM curr
INNER JOIN prev ON curr.${key} = prev.${key}
Expand All @@ -219,7 +220,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {

// Get the changes
changes = await tx.query<any>(
`EXECUTE live_query_${id}_diff${stateSwitch};`,
`EXECUTE live_query_${id}_diff${stateSwitch};`
);
});

Expand All @@ -236,7 +237,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
`table_change__${table.schema_name}__${table.table_name}`,
async () => {
refresh();
},
}
);
unsubList.push(unsub);
}
Expand All @@ -261,7 +262,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
// Fields
const fields = changes!.fields.filter(
(field) =>
!["__pos__", "__op__", "__changed_columns__"].includes(field.name),
!["__after__", "__op__", "__changed_columns__"].includes(field.name)
);

// Return the initial results
Expand All @@ -277,20 +278,19 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
query: string,
params: any[] | undefined | null,
key: string,
callback: (results: Results<Change<T>>) => void,
callback: (results: Results<Change<T>>) => void
) {
let lastRows: any[] = [];
let lastRowsMap: Map<any, any> = new Map();
const rowsMap: Map<any, any> = new Map();
const afterMap: Map<any, any> = new Map();
let lastRows: Change<T>[] = [];
let firstRun = true;

const { fields, unsubscribe, refresh } = await namespaceObj.changes<T>(
query,
params,
key,
(changes) => {
let posChanged = false;
const rows = [...lastRows];
const rowsMap = new Map(lastRowsMap);
// Process the changes
for (const change of changes) {
const {
__op__: op,
Expand All @@ -299,51 +299,47 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => {
} = change as typeof change & { [key: string]: any };
switch (op) {
case "INSERT":
rows.push(obj);
rowsMap.set(obj[key], obj);
posChanged = true;
afterMap.set(obj.__after__, obj[key]);
break;
case "DELETE":
const idx = rows.findIndex((r) => r[key] === obj[key]);
if (idx !== -1) {
rows.splice(idx, 1);
rowsMap.delete(obj[key]);
}
rowsMap.delete(obj[key]);
afterMap.delete(obj.__after__);
break;
case "UPDATE":
const oldObj = rowsMap.get(obj[key]);
if (oldObj) {
const newObj = { ...oldObj };
for (const columnName of changedColumns) {
newObj[columnName] = obj[columnName];
const newObj = { ...(rowsMap.get(obj[key]) ?? {}) };
for (const columnName of changedColumns) {
newObj[columnName] = obj[columnName];
if (columnName === "__after__") {
afterMap.set(obj.__after__, obj[key]);
}
rowsMap.set(obj[key], newObj);
const idx = rows.findIndex((r) => r[key] === obj[key]);
if (idx !== -1) {
rows[idx] = newObj;
} else {
rows.push(newObj);
}
}
if (obj.__pos__ !== undefined) {
posChanged = true;
}
break;
}
}
if (posChanged) {
rows.sort((a, b) => a.__pos__ - b.__pos__);

// Get the rows in order
const rows: Change<T>[] = [];
let lastKey: any = null;
while (true) {
const nextKey = afterMap.get(lastKey);
const obj = rowsMap.get(nextKey);
if (!obj) {
break;
}
rows.push(obj);
lastKey = nextKey;
}
lastRows = rows;
lastRowsMap = rowsMap;

// Run the callback
if (!firstRun) {
callback({
rows,
fields,
});
}
},
}
);

firstRun = false;
Expand Down Expand Up @@ -381,7 +377,7 @@ export const live = {
*/
async function getTablesForView(
tx: Transaction | PGliteInterface,
viewName: string,
viewName: string
): Promise<{ table_name: string; schema_name: string }[]> {
return (
await tx.query<{
Expand All @@ -402,7 +398,7 @@ async function getTablesForView(
)
AND d.deptype = 'n';
`,
[viewName],
[viewName]
)
).rows.filter((row) => row.table_name !== viewName);
}
Expand All @@ -414,14 +410,14 @@ async function getTablesForView(
*/
async function addNotifyTriggersToTables(
tx: Transaction | PGliteInterface,
tables: { table_name: string; schema_name: string }[],
tables: { table_name: string; schema_name: string }[]
) {
const triggers = tables
.filter(
(table) =>
!tableNotifyTriggersAdded.has(
`${table.schema_name}_${table.table_name}`,
),
`${table.schema_name}_${table.table_name}`
)
)
.map((table) => {
return `
Expand All @@ -441,6 +437,6 @@ async function addNotifyTriggersToTables(
await tx.exec(triggers);
}
tables.map((table) =>
tableNotifyTriggersAdded.add(`${table.schema_name}_${table.table_name}`),
tableNotifyTriggersAdded.add(`${table.schema_name}_${table.table_name}`)
);
}
6 changes: 3 additions & 3 deletions packages/pglite/src/live/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ export interface LiveChangesReturn<T = { [key: string]: any }> {
export type ChangeInsert<T> = {
__changed_columns__: string[];
__op__: "INSERT";
__pos__: number;
__after__: number;
} & T;

export type ChangeDelete<T> = {} & {
__changed_columns__: string[];
__op__: "DELETE";
__pos__: undefined;
__after__: undefined;
} & T;

export type ChangeUpdate<T> = {} & {
__changed_columns__: string[];
__op__: "UPDATE";
__pos__: number;
__after__: number;
} & T;

export type Change<T> = ChangeInsert<T> | ChangeDelete<T> | ChangeUpdate<T>;

0 comments on commit 1f0f7e0

Please sign in to comment.