diff --git a/packages/pglite/examples/live-changes.html b/packages/pglite/examples/live-changes.html index 9b4806391..b53465e68 100644 --- a/packages/pglite/examples/live-changes.html +++ b/packages/pglite/examples/live-changes.html @@ -7,9 +7,9 @@ const output = document.getElementById("output"); const startBtn = document.getElementById("start"); const addBtn = document.getElementById("add"); - let counter = 10; + let counter = 1000; let lastClicked = 0; - const nameLength = 100000; + const nameLength = 10000; const nameSuffix = "-".repeat(nameLength); const pg = new PGlite({ diff --git a/packages/pglite/examples/live-incremental.html b/packages/pglite/examples/live-incremental.html index 14f8bb8cd..104a35c89 100644 --- a/packages/pglite/examples/live-incremental.html +++ b/packages/pglite/examples/live-incremental.html @@ -1,5 +1,5 @@ -

+
diff --git a/packages/pglite/src/live/index.ts b/packages/pglite/src/live/index.ts index c3a62817d..d706a08f9 100644 --- a/packages/pglite/src/live/index.ts +++ b/packages/pglite/src/live/index.ts @@ -1,3 +1,4 @@ +import { after } from "node:test"; import type { Extension, PGliteInterface, @@ -24,7 +25,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { async query( query: string, params: any[] | undefined | null, - callback: (results: Results) => void, + callback: (results: Results) => void ) { const id = liveQueryCounter++; @@ -35,7 +36,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { // Create a temporary view with the query await tx.query( `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`, - params ?? [], + params ?? [] ); // Get the tables used in the view and add triggers to notify when they change @@ -65,7 +66,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { `table_change__${table.schema_name}__${table.table_name}`, async () => { refresh(); - }, + } ); unsubList.push(unsub); } @@ -96,7 +97,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { query: string, params: any[] | undefined | null, key: string, - callback: (changes: Array>) => void, + callback: (changes: Array>) => void ) { const id = liveQueryCounter++; let tables: { table_name: string; schema_name: string }[]; @@ -107,7 +108,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { // Create a temporary view with the query await tx.query( `CREATE OR REPLACE TEMP VIEW live_query_${id}_view AS ${query}`, - params ?? [], + params ?? [] ); // Get the tables used in the view and add triggers to notify when they change @@ -123,7 +124,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { WHERE table_name = 'live_query_${id}_view' `) ).rows, - { column_name: "__pos__", data_type: "integer" }, + { column_name: "__after__", data_type: "integer" }, ]; // Init state tables as empty temp table @@ -138,8 +139,8 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { await tx.exec(` PREPARE live_query_${id}_diff${curr} AS WITH - prev AS (SELECT ROW_NUMBER() OVER () as __pos__, * FROM live_query_${id}_state${prev}), - curr AS (SELECT ROW_NUMBER() OVER () as __pos__, * FROM live_query_${id}_state${curr}), + prev AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${prev}), + curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}), data_diff AS ( -- INSERT operations: Include all columns SELECT @@ -147,7 +148,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { ${columns .map( ({ column_name }) => - `curr."${column_name}" AS "${column_name}"`, + `curr."${column_name}" AS "${column_name}"` ) .join(",\n")}, ARRAY[]::text[] AS __changed_columns__ @@ -183,7 +184,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" THEN curr."${column_name}" ELSE NULL::${data_type} - END AS "${column_name}"`, + END AS "${column_name}"` ) .join(",\n")}, ARRAY(SELECT unnest FROM unnest(ARRAY[${columns @@ -194,10 +195,10 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" THEN '${column_name}' ELSE NULL - END`, + END` ) .join( - ", ", + ", " )}]) WHERE unnest IS NOT NULL) AS __changed_columns__ FROM curr INNER JOIN prev ON curr.${key} = prev.${key} @@ -219,7 +220,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { // Get the changes changes = await tx.query( - `EXECUTE live_query_${id}_diff${stateSwitch};`, + `EXECUTE live_query_${id}_diff${stateSwitch};` ); }); @@ -236,7 +237,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { `table_change__${table.schema_name}__${table.table_name}`, async () => { refresh(); - }, + } ); unsubList.push(unsub); } @@ -261,7 +262,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { // Fields const fields = changes!.fields.filter( (field) => - !["__pos__", "__op__", "__changed_columns__"].includes(field.name), + !["__after__", "__op__", "__changed_columns__"].includes(field.name) ); // Return the initial results @@ -277,10 +278,11 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { query: string, params: any[] | undefined | null, key: string, - callback: (results: Results>) => void, + callback: (results: Results>) => void ) { - let lastRows: any[] = []; - let lastRowsMap: Map = new Map(); + const rowsMap: Map = new Map(); + const afterMap: Map = new Map(); + let lastRows: Change[] = []; let firstRun = true; const { fields, unsubscribe, refresh } = await namespaceObj.changes( @@ -288,9 +290,7 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { params, key, (changes) => { - let posChanged = false; - const rows = [...lastRows]; - const rowsMap = new Map(lastRowsMap); + // Process the changes for (const change of changes) { const { __op__: op, @@ -299,51 +299,47 @@ const setup = async (pg: PGliteInterface, emscriptenOpts: any) => { } = change as typeof change & { [key: string]: any }; switch (op) { case "INSERT": - rows.push(obj); rowsMap.set(obj[key], obj); - posChanged = true; + afterMap.set(obj.__after__, obj[key]); break; case "DELETE": - const idx = rows.findIndex((r) => r[key] === obj[key]); - if (idx !== -1) { - rows.splice(idx, 1); - rowsMap.delete(obj[key]); - } + rowsMap.delete(obj[key]); + afterMap.delete(obj.__after__); break; case "UPDATE": - const oldObj = rowsMap.get(obj[key]); - if (oldObj) { - const newObj = { ...oldObj }; - for (const columnName of changedColumns) { - newObj[columnName] = obj[columnName]; + const newObj = { ...(rowsMap.get(obj[key]) ?? {}) }; + for (const columnName of changedColumns) { + newObj[columnName] = obj[columnName]; + if (columnName === "__after__") { + afterMap.set(obj.__after__, obj[key]); } - rowsMap.set(obj[key], newObj); - const idx = rows.findIndex((r) => r[key] === obj[key]); - if (idx !== -1) { - rows[idx] = newObj; - } else { - rows.push(newObj); - } - } - if (obj.__pos__ !== undefined) { - posChanged = true; } break; } } - if (posChanged) { - rows.sort((a, b) => a.__pos__ - b.__pos__); + + // Get the rows in order + const rows: Change[] = []; + let lastKey: any = null; + while (true) { + const nextKey = afterMap.get(lastKey); + const obj = rowsMap.get(nextKey); + if (!obj) { + break; + } + rows.push(obj); + lastKey = nextKey; } lastRows = rows; - lastRowsMap = rowsMap; + // Run the callback if (!firstRun) { callback({ rows, fields, }); } - }, + } ); firstRun = false; @@ -381,7 +377,7 @@ export const live = { */ async function getTablesForView( tx: Transaction | PGliteInterface, - viewName: string, + viewName: string ): Promise<{ table_name: string; schema_name: string }[]> { return ( await tx.query<{ @@ -402,7 +398,7 @@ async function getTablesForView( ) AND d.deptype = 'n'; `, - [viewName], + [viewName] ) ).rows.filter((row) => row.table_name !== viewName); } @@ -414,14 +410,14 @@ async function getTablesForView( */ async function addNotifyTriggersToTables( tx: Transaction | PGliteInterface, - tables: { table_name: string; schema_name: string }[], + tables: { table_name: string; schema_name: string }[] ) { const triggers = tables .filter( (table) => !tableNotifyTriggersAdded.has( - `${table.schema_name}_${table.table_name}`, - ), + `${table.schema_name}_${table.table_name}` + ) ) .map((table) => { return ` @@ -441,6 +437,6 @@ async function addNotifyTriggersToTables( await tx.exec(triggers); } tables.map((table) => - tableNotifyTriggersAdded.add(`${table.schema_name}_${table.table_name}`), + tableNotifyTriggersAdded.add(`${table.schema_name}_${table.table_name}`) ); } diff --git a/packages/pglite/src/live/interface.ts b/packages/pglite/src/live/interface.ts index 4631dfd36..9bbb7814a 100644 --- a/packages/pglite/src/live/interface.ts +++ b/packages/pglite/src/live/interface.ts @@ -62,19 +62,19 @@ export interface LiveChangesReturn { export type ChangeInsert = { __changed_columns__: string[]; __op__: "INSERT"; - __pos__: number; + __after__: number; } & T; export type ChangeDelete = {} & { __changed_columns__: string[]; __op__: "DELETE"; - __pos__: undefined; + __after__: undefined; } & T; export type ChangeUpdate = {} & { __changed_columns__: string[]; __op__: "UPDATE"; - __pos__: number; + __after__: number; } & T; export type Change = ChangeInsert | ChangeDelete | ChangeUpdate;