Skip to content
This repository has been archived by the owner on Jan 5, 2025. It is now read-only.

Commit

Permalink
add migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
boristane committed Nov 30, 2024
1 parent 377d180 commit afc50e0
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 16 deletions.
19 changes: 3 additions & 16 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { DurableObject } from "cloudflare:workers";
import cronParser from "cron-parser";
import { migrate } from "./migrate";
import { migrations } from "./migrations/migrations";

export type Task = {
id: string;
Expand All @@ -24,22 +26,7 @@ export class Scheduler<Env> extends DurableObject<Env> {
constructor(state: DurableObjectState, env: Env) {
super(state, env);
void this.ctx.blockConcurrencyWhile(async () => {
// Create tasks table if it doesn't exist
this.ctx.storage.sql.exec(
`
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT,
type TEXT NOT NULL,
payload TEXT,
time INTEGER,
delay INTEGER,
cron TEXT,
created_at INTEGER DEFAULT (unixepoch())
)
`
);

migrate(this.ctx.storage, migrations);
// Schedule the next task if any exist
await this.scheduleNextAlarm();
});
Expand Down
55 changes: 55 additions & 0 deletions src/migrate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
export type SQLMigration = {
version: number;
name: string;
sql: string;
};

export function migrate(storage: DurableObjectStorage, migrations: SQLMigration[]) {
createSchemaHistory(storage);
const current = getSchema(storage);

if (!migrations.length) {
if (current === null) {
return;
}
throw new Error("No migration and current schema is null");
}

const sorted = migrations.sort((a, b) => a.version - b.version);
const latest = sorted[migrations.length - 1];

if (latest.version < (current?.version || -1)) {
throw new Error(`Latest schema version in migrations [${latest.name} (version: ${latest.version})] is less than the current db schema: ${current}`);
}

if (current && latest.version === current.version) {
return;
}

const toApply = sorted.filter((m) => m.version > (current?.version || -1));
storage.transactionSync(() => {
for (let index = 0; index < toApply.length; index++) {
const migration = toApply[index];
storage.sql.exec(migration.sql);
storage.sql.exec("INSERT INTO schema_history (version, name) VALUES (?, ?)", migration.version, migration.name);
}
});
}

function createSchemaHistory(storage: DurableObjectStorage) {
storage.sql.exec(`
CREATE TABLE IF NOT EXISTS schema_history (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT (DATETIME('now','subsec'))
)
`);
}

function getSchema(storage: DurableObjectStorage): SQLMigration | null {
const [version] = [...storage.sql.exec("SELECT * FROM schema_history ORDER BY version DESC LIMIT 1")] as SQLMigration[];
if (!version) {
return null;
}
return version;
}
4 changes: 4 additions & 0 deletions src/migrations/migrations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { SQLMigration } from "../migrate";
import v00_create_tasks_table from "./v00_create_tasks_table";

export const migrations: SQLMigration[] = [v00_create_tasks_table] as const;
16 changes: 16 additions & 0 deletions src/migrations/v00_create_tasks_table.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export default {
version: 0,
name: "create_tasks_table",
sql: `
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT,
type TEXT NOT NULL,
payload TEXT,
time INTEGER,
delay INTEGER,
cron TEXT,
created_at INTEGER DEFAULT (unixepoch())
);
`,
};

0 comments on commit afc50e0

Please sign in to comment.