Skip to content
This repository has been archived by the owner on Jan 5, 2025. It is now read-only.

add migrations #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { DurableObject } from "cloudflare:workers";
import cronParser from "cron-parser";
import { migrate } from "./migrate";
import { migrations } from "./migrations/migrations";

export type Task = {
id: string;
Expand All @@ -24,22 +26,7 @@ export class Scheduler<Env> extends DurableObject<Env> {
constructor(state: DurableObjectState, env: Env) {
super(state, env);
void this.ctx.blockConcurrencyWhile(async () => {
// Create tasks table if it doesn't exist
this.ctx.storage.sql.exec(
`
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT,
type TEXT NOT NULL,
payload TEXT,
time INTEGER,
delay INTEGER,
cron TEXT,
created_at INTEGER DEFAULT (unixepoch())
)
`
);

migrate(this.ctx.storage, migrations);
// Schedule the next task if any exist
await this.scheduleNextAlarm();
});
Expand Down
55 changes: 55 additions & 0 deletions src/migrate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
export type SQLMigration = {
version: number;
name: string;
sql: string;
};

export function migrate(storage: DurableObjectStorage, migrations: SQLMigration[]) {
createSchemaHistory(storage);
const current = getSchema(storage);

if (!migrations.length) {
if (current === null) {
return;
}
throw new Error("No migration and current schema is null");
}

const sorted = migrations.sort((a, b) => a.version - b.version);
const latest = sorted[migrations.length - 1];

if (latest.version < (current?.version || -1)) {
throw new Error(`Latest schema version in migrations [${latest.name} (version: ${latest.version})] is less than the current db schema: ${current}`);
}

if (current && latest.version === current.version) {
return;
}

const toApply = sorted.filter((m) => m.version > (current?.version || -1));
storage.transactionSync(() => {
for (let index = 0; index < toApply.length; index++) {
const migration = toApply[index];
storage.sql.exec(migration.sql);
storage.sql.exec("INSERT INTO schema_history (version, name) VALUES (?, ?)", migration.version, migration.name);
}
});
}

function createSchemaHistory(storage: DurableObjectStorage) {
storage.sql.exec(`
CREATE TABLE IF NOT EXISTS schema_history (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT (DATETIME('now','subsec'))
)
`);
}

function getSchema(storage: DurableObjectStorage): SQLMigration | null {
const [version] = [...storage.sql.exec("SELECT * FROM schema_history ORDER BY version DESC LIMIT 1")] as SQLMigration[];
if (!version) {
return null;
}
return version;
}
4 changes: 4 additions & 0 deletions src/migrations/migrations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { SQLMigration } from "../migrate";
import v00_create_tasks_table from "./v00_create_tasks_table";

export const migrations: SQLMigration[] = [v00_create_tasks_table] as const;
16 changes: 16 additions & 0 deletions src/migrations/v00_create_tasks_table.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export default {
version: 0,
name: "create_tasks_table",
sql: `
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT,
type TEXT NOT NULL,
payload TEXT,
time INTEGER,
delay INTEGER,
cron TEXT,
created_at INTEGER DEFAULT (unixepoch())
);
`,
};
Loading