diff --git a/bin/import.js b/bin/import.js index 7270be8..edcd9ff 100755 --- a/bin/import.js +++ b/bin/import.js @@ -55,19 +55,20 @@ console.log() console.log(`${full ? "Full" : "Partial"} import with file ${file}, modified ${modified}.`) import csv from "csv-parser" -import { backend } from "../src/config.js" +import { connect } from "../src/config.js" (async () => { + const backend = await connect() const stream = fs.createReadStream(file) - .pipe(csv({ - separator: "\t", - headers: ["ppn", "voc", "notation"], - quote: "", - })) + const csvTransform = csv({ + separator: "\t", + headers: ["ppn", "voc", "notation"], + quote: "", + }) if (full) { try { - await backend.batchImport(stream) + await backend.batchImport(backend.batchImportRequiresCSV ? stream.pipe(csvTransform) : stream) } catch (error) { console.error(error) } @@ -77,6 +78,7 @@ import { backend } from "../src/config.js" let ppn let rows = [] stream + .pipe(csvTransform) .on("data", row => { if (row.ppn === ppn) { rows.push(row) diff --git a/package-lock.json b/package-lock.json index b00c721..4798d37 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,8 @@ "express": "^4.18.1", "jskos-tools": "^1.0.26", "nodemon": "^2.0.19", - "pg": "^8.8.0" + "pg": "^8.8.0", + "pg-copy-streams": "^6.0.4" }, "devDependencies": { "eslint": "^8.19.0", @@ -2355,6 +2356,11 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/obuf": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz", + "integrity": "sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg==" + }, "node_modules/on-finished": { "version": "2.4.1", "resolved": "https://registry.npmjs.org/on-finished/-/on-finished-2.4.1.tgz", @@ -2526,6 +2532,14 @@ "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz", "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==" }, + "node_modules/pg-copy-streams": { + "version": "6.0.4", + "resolved": "https://registry.npmjs.org/pg-copy-streams/-/pg-copy-streams-6.0.4.tgz", + "integrity": "sha512-FH6q2nFo0n2cFacLyIKorjDz8AOYtxrAANx1XMvYbKWNM2geY731gZstuP4mXMlqO6urRl9oIscFxf3GMIg3Ng==", + "dependencies": { + "obuf": "^1.1.2" + } + }, "node_modules/pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", @@ -5834,6 +5848,11 @@ "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.12.2.tgz", "integrity": "sha512-z+cPxW0QGUp0mcqcsgQyLVRDoXFQbXOwBaqyF7VIgI4TWNQsDHrBpUQslRmIfAoYWdYzs6UlKJtB2XJpTaNSpQ==" }, + "obuf": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz", + "integrity": "sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg==" + }, "on-finished": { "version": "2.4.1", "resolved": "https://registry.npmjs.org/on-finished/-/on-finished-2.4.1.tgz", @@ -5957,6 +5976,14 @@ "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-2.5.0.tgz", "integrity": "sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==" }, + "pg-copy-streams": { + "version": "6.0.4", + "resolved": "https://registry.npmjs.org/pg-copy-streams/-/pg-copy-streams-6.0.4.tgz", + "integrity": "sha512-FH6q2nFo0n2cFacLyIKorjDz8AOYtxrAANx1XMvYbKWNM2geY731gZstuP4mXMlqO6urRl9oIscFxf3GMIg3Ng==", + "requires": { + "obuf": "^1.1.2" + } + }, "pg-int8": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/pg-int8/-/pg-int8-1.0.1.tgz", diff --git a/package.json b/package.json index f8f9224..510bc4a 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,8 @@ "express": "^4.18.1", "jskos-tools": "^1.0.26", "nodemon": "^2.0.19", - "pg": "^8.8.0" + "pg": "^8.8.0", + "pg-copy-streams": "^6.0.4" }, "devDependencies": { "eslint": "^8.19.0", diff --git a/src/backend/postgres.js b/src/backend/postgres.js index b3f2862..f27e237 100644 --- a/src/backend/postgres.js +++ b/src/backend/postgres.js @@ -1,11 +1,13 @@ import pg from "pg" +import pgcs from "pg-copy-streams" +const copyFrom = pgcs.from const { Pool } = pg export default class PostgreSQLBackend { // Establish connection to backend or throw error - constructor(config) { + async connect(config) { this.db = new Pool({ user: "stefan" || config.user, password: "" || config.password, @@ -16,31 +18,31 @@ export default class PostgreSQLBackend { connectionTimeoutMillis: 0, }) - ;(async () => { - const client = await this.db.connect() - try { - const res = await client.query("SELECT * FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';") - if (res.rowCount === 0) { - await client.query(` - CREATE TABLE subjects ( - ppn TEXT NOT NULL, - voc TEXT NOT NULL, - notation TEXT NOT NULL - ); - - CREATE INDEX idx_notation on subjects (notation); - CREATE INDEX idx_ppn on subjects (ppn); - - CREATE TABLE metadata ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL - ); - `) - } - } finally { - client.release() + const client = await this.db.connect() + try { + const res = await client.query("SELECT * FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';") + if (res.rowCount === 0) { + await client.query(` + CREATE TABLE subjects ( + ppn TEXT NOT NULL, + voc TEXT NOT NULL, + notation TEXT + ); + + CREATE INDEX idx_notation on subjects (notation); + CREATE INDEX idx_ppn on subjects (ppn); + + CREATE TABLE metadata ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + `) } - })() + } catch (error) { + console.error(error) + } finally { + client.release() + } this.name = `PostgreSQL database ${config.database} (port ${config.port})` } @@ -110,6 +112,10 @@ export default class PostgreSQLBackend { } } + get batchImportRequiresCSV() { + return false + } + async batchImport(data) { const client = await this.db.connect() @@ -123,48 +129,20 @@ export default class PostgreSQLBackend { console.timeEnd("drop indexes/data") // await client.query("BEGIN") - const bulkInsert = async (rows) => { - const keys = Object.keys(rows[0]) - let valueStr = "" - let valueArray = [] - let valueIndex = 1 - for (let row of rows) { - if (valueStr) { - valueStr += "," - } - valueStr += "(" + keys.map((value, index) => `$${valueIndex + index}`) + ")" - valueArray = valueArray.concat(keys.map((value) => row[value])) - valueIndex += keys.length - } - await client.query(`INSERT INTO subjects (${keys.join(",")}) VALUES ${valueStr}`, valueArray) - } - - let rows = [] - let inserted = 0 - console.time("insert") - - for await (const row of data) { - rows.push(row) - if (rows.length === 2000) { - inserted += rows.length - await bulkInsert(rows) - rows = [] - if (inserted % 1000000 === 0) { - // await client.query("COMMIT") - console.timeEnd("insert") - console.log(inserted) - console.time("insert") - // await client.query("BEGIN") - } - } - } + console.time("importing data") + await new Promise((resolve, reject) => { + // TODO: Can we require data files to be UTF8 so that we don't need to add ENCODING 'SQL_ASCII'? + // Note: QUOTE E`\b` is a workaround because we don't want any quote character. See https://stackoverflow.com/a/20402913. + const stream = client.query(copyFrom("COPY subjects FROM STDIN DELIMITER E'\t' ENCODING 'SQL_ASCII' CSV QUOTE E'\b' NULL AS ''")) + data.on("error", reject) + stream.on("error", reject) + stream.on("finish", resolve) + data.pipe(stream) + }) + console.timeEnd("importing data") - inserted += rows.length - await bulkInsert(rows) - // await client.query("COMMIT") - console.timeEnd("insert") - console.log(inserted) // Recreate indexes + console.log("import complete, recreating indexes...") console.time("recreate indexes") await client.query("CREATE INDEX idx_notation on subjects (notation);") await client.query("CREATE INDEX idx_ppn on subjects (ppn);") diff --git a/src/backend/sqlite.js b/src/backend/sqlite.js index 27f0a54..aca1686 100644 --- a/src/backend/sqlite.js +++ b/src/backend/sqlite.js @@ -4,7 +4,7 @@ import fs from "fs" export default class SQLiteBackend { // Establish connection to backend or throw error - constructor(config) { + async connect(config) { const file = config.database // create database file if not exist or throw an error if (!fs.existsSync(file)) { @@ -68,6 +68,10 @@ CREATE TABLE metadata ( })() } + get batchImportRequiresCSV() { + return true + } + async batchImport(data) { // Drop indexes to recreate later try { diff --git a/src/config.js b/src/config.js index a6a3ab3..599179f 100644 --- a/src/config.js +++ b/src/config.js @@ -24,6 +24,11 @@ if (!backendClass) { process.exit(1) } -export const backend = new backendClass(config) - -console.log(`Configured ${schemes.length} vocabularies from ${config.schemesFile}. Using ${backend.name}.`) +export const backend = new backendClass() +// Connect immediately, but clients will still need to await connect() +const backendConnectPromise = backend.connect(config) +export const connect = async () => { + await backendConnectPromise + console.log(`Configured ${schemes.length} vocabularies from ${config.schemesFile}. Using ${backend.name}.`) + return backend +} diff --git a/src/index.js b/src/index.js index f5503e8..875f9f3 100644 --- a/src/index.js +++ b/src/index.js @@ -1,4 +1,4 @@ -import { backend, config, schemes, links } from "./config.js" +import { connect, config, schemes, links } from "./config.js" import express from "express" import jskos from "jskos-tools" @@ -13,6 +13,8 @@ const database = { } async function createServer() { + // Connect to backend + const backend = await connect() // this will likely warm up the backend cache as well // TODO: This is very slow and delays startup by multiple minutes. Find a better solution.