Skip to content

Commit

Permalink
Adjust batch importing; vastly improve Postgres batch import (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
stefandesu committed Oct 27, 2022
1 parent 0f5cd0b commit 9262801
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 79 deletions.
16 changes: 9 additions & 7 deletions bin/import.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,20 @@ console.log()
console.log(`${full ? "Full" : "Partial"} import with file ${file}, modified ${modified}.`)

import csv from "csv-parser"
import { backend } from "../src/config.js"
import { connect } from "../src/config.js"

(async () => {
const backend = await connect()
const stream = fs.createReadStream(file)
.pipe(csv({
separator: "\t",
headers: ["ppn", "voc", "notation"],
quote: "",
}))
const csvTransform = csv({
separator: "\t",
headers: ["ppn", "voc", "notation"],
quote: "",
})

if (full) {
try {
await backend.batchImport(stream)
await backend.batchImport(backend.batchImportRequiresCSV ? stream.pipe(csvTransform) : stream)
} catch (error) {
console.error(error)
}
Expand All @@ -77,6 +78,7 @@ import { backend } from "../src/config.js"
let ppn
let rows = []
stream
.pipe(csvTransform)
.on("data", row => {
if (row.ppn === ppn) {
rows.push(row)
Expand Down
29 changes: 28 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
"express": "^4.18.1",
"jskos-tools": "^1.0.26",
"nodemon": "^2.0.19",
"pg": "^8.8.0"
"pg": "^8.8.0",
"pg-copy-streams": "^6.0.4"
},
"devDependencies": {
"eslint": "^8.19.0",
Expand Down
108 changes: 43 additions & 65 deletions src/backend/postgres.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@

import pg from "pg"
import pgcs from "pg-copy-streams"
const copyFrom = pgcs.from
const { Pool } = pg

export default class PostgreSQLBackend {

// Establish connection to backend or throw error
constructor(config) {
async connect(config) {
this.db = new Pool({
user: "stefan" || config.user,
password: "" || config.password,
Expand All @@ -16,31 +18,31 @@ export default class PostgreSQLBackend {
connectionTimeoutMillis: 0,
})

;(async () => {
const client = await this.db.connect()
try {
const res = await client.query("SELECT * FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")
if (res.rowCount === 0) {
await client.query(`
CREATE TABLE subjects (
ppn TEXT NOT NULL,
voc TEXT NOT NULL,
notation TEXT NOT NULL
);
CREATE INDEX idx_notation on subjects (notation);
CREATE INDEX idx_ppn on subjects (ppn);
CREATE TABLE metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
`)
}
} finally {
client.release()
const client = await this.db.connect()
try {
const res = await client.query("SELECT * FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")
if (res.rowCount === 0) {
await client.query(`
CREATE TABLE subjects (
ppn TEXT NOT NULL,
voc TEXT NOT NULL,
notation TEXT
);
CREATE INDEX idx_notation on subjects (notation);
CREATE INDEX idx_ppn on subjects (ppn);
CREATE TABLE metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
`)
}
})()
} catch (error) {
console.error(error)
} finally {
client.release()
}
this.name = `PostgreSQL database ${config.database} (port ${config.port})`
}

Expand Down Expand Up @@ -110,6 +112,10 @@ export default class PostgreSQLBackend {
}
}

get batchImportRequiresCSV() {
return false
}

async batchImport(data) {
const client = await this.db.connect()

Expand All @@ -123,48 +129,20 @@ export default class PostgreSQLBackend {
console.timeEnd("drop indexes/data")
// await client.query("BEGIN")

const bulkInsert = async (rows) => {
const keys = Object.keys(rows[0])
let valueStr = ""
let valueArray = []
let valueIndex = 1
for (let row of rows) {
if (valueStr) {
valueStr += ","
}
valueStr += "(" + keys.map((value, index) => `$${valueIndex + index}`) + ")"
valueArray = valueArray.concat(keys.map((value) => row[value]))
valueIndex += keys.length
}
await client.query(`INSERT INTO subjects (${keys.join(",")}) VALUES ${valueStr}`, valueArray)
}

let rows = []
let inserted = 0
console.time("insert")

for await (const row of data) {
rows.push(row)
if (rows.length === 2000) {
inserted += rows.length
await bulkInsert(rows)
rows = []
if (inserted % 1000000 === 0) {
// await client.query("COMMIT")
console.timeEnd("insert")
console.log(inserted)
console.time("insert")
// await client.query("BEGIN")
}
}
}
console.time("importing data")
await new Promise((resolve, reject) => {
// TODO: Can we require data files to be UTF8 so that we don't need to add ENCODING 'SQL_ASCII'?
// Note: QUOTE E`\b` is a workaround because we don't want any quote character. See https://stackoverflow.com/a/20402913.
const stream = client.query(copyFrom("COPY subjects FROM STDIN DELIMITER E'\t' ENCODING 'SQL_ASCII' CSV QUOTE E'\b' NULL AS ''"))
data.on("error", reject)
stream.on("error", reject)
stream.on("finish", resolve)
data.pipe(stream)
})
console.timeEnd("importing data")

inserted += rows.length
await bulkInsert(rows)
// await client.query("COMMIT")
console.timeEnd("insert")
console.log(inserted)
// Recreate indexes
console.log("import complete, recreating indexes...")
console.time("recreate indexes")
await client.query("CREATE INDEX idx_notation on subjects (notation);")
await client.query("CREATE INDEX idx_ppn on subjects (ppn);")
Expand Down
6 changes: 5 additions & 1 deletion src/backend/sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import fs from "fs"
export default class SQLiteBackend {

// Establish connection to backend or throw error
constructor(config) {
async connect(config) {
const file = config.database
// create database file if not exist or throw an error
if (!fs.existsSync(file)) {
Expand Down Expand Up @@ -68,6 +68,10 @@ CREATE TABLE metadata (
})()
}

get batchImportRequiresCSV() {
return true
}

async batchImport(data) {
// Drop indexes to recreate later
try {
Expand Down
11 changes: 8 additions & 3 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ if (!backendClass) {
process.exit(1)
}

export const backend = new backendClass(config)

console.log(`Configured ${schemes.length} vocabularies from ${config.schemesFile}. Using ${backend.name}.`)
export const backend = new backendClass()
// Connect immediately, but clients will still need to await connect()
const backendConnectPromise = backend.connect(config)
export const connect = async () => {
await backendConnectPromise
console.log(`Configured ${schemes.length} vocabularies from ${config.schemesFile}. Using ${backend.name}.`)
return backend
}
4 changes: 3 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { backend, config, schemes, links } from "./config.js"
import { connect, config, schemes, links } from "./config.js"

import express from "express"
import jskos from "jskos-tools"
Expand All @@ -13,6 +13,8 @@ const database = {
}

async function createServer() {
// Connect to backend
const backend = await connect()

// this will likely warm up the backend cache as well
// TODO: This is very slow and delays startup by multiple minutes. Find a better solution.
Expand Down

0 comments on commit 9262801

Please sign in to comment.