Skip to content

Commit

Permalink
Poll for advisory lock
Browse files Browse the repository at this point in the history
Poll once per second trying to acquire the lock, rather
than waiting in a long-running transaction.

CREATE INDEX CONCURRENTLY needs to wait for all running txs
to finish before starting, which can lead to deadlocks.

Fixes ThomWright#36
  • Loading branch information
ThomWright committed Jan 31, 2020
1 parent cfff57b commit 3fc9fde
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/__tests__/fixtures/concurrent-index-2/1_create-table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE IF NOT EXISTS concurrent (
id integer
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- will acquire ROW EXCLUSIVE table-level locks for the rest of the (reasonably long-running) transaction
INSERT INTO concurrent (id) SELECT i FROM generate_series(1, 1000000) as t(i);

-- will attempt to acquire ACCESS EXCLUSIVE table-level lock
-- will deadlock if this same transaction is running concurrently:
-- - both transactions will be waiting for the other to release the conflicting ROW EXCLUSIVE locks
ALTER TABLE concurrent ADD PRIMARY KEY (id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- postgres-migrations disable-transaction

CREATE INDEX CONCURRENTLY concurrent_index ON concurrent (id);
30 changes: 30 additions & 0 deletions src/__tests__/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,36 @@ test("concurrent migrations", async t => {
t.truthy(exists)
})

// https://github.com/ThomWright/postgres-migrations/issues/36
test("concurrent migrations - index concurrently", async t => {
const databaseName = "migration-test-concurrent-no-tx"
const dbConfig = {
database: databaseName,
user: "postgres",
password: PASSWORD,
host: "localhost",
port,
}

await createDb(databaseName, dbConfig)

await migrate(dbConfig, "src/__tests__/fixtures/concurrent")

// will deadlock if one process has the advisory lock and tries to index concurrently
// while the other waits for the advisory lock
await Promise.all([
migrate(dbConfig, "src/__tests__/fixtures/concurrent-index-2", {
logger: msg => console.log("A", msg),
}),
migrate(dbConfig, "src/__tests__/fixtures/concurrent-index-2", {
logger: msg => console.log("B", msg),
}),
])

const exists = await doesTableExist(dbConfig, "concurrent")
t.truthy(exists)
})

// can't test with unconnected client because `pg` just hangs on the first query...
test("with connected client", async t => {
const databaseName = "migration-test-with-connected-client"
Expand Down
12 changes: 11 additions & 1 deletion src/with-lock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@ export function withAdvisoryLock<T>(
try {
try {
log("Acquiring advisory lock...")
await client.query("SELECT pg_advisory_lock(-8525285245963000605);")
let acquired = false
while (!acquired) {
const lockResult = await client.query(
"SELECT pg_try_advisory_lock(-8525285245963000605);",
)
if (lockResult.rows[0].pg_try_advisory_lock === true) {
acquired = true
} else {
await new Promise(res => setTimeout(res, 1000))
}
}
log("... aquired advisory lock")
} catch (e) {
log(`Error acquiring advisory lock: ${e.message}`)
Expand Down

0 comments on commit 3fc9fde

Please sign in to comment.