From 47b559902d0bc66fdda1ef40aa533de4ff00a026 Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Sat, 9 Nov 2024 14:03:12 -0500 Subject: [PATCH 1/3] format with prettier --- .github/workflows/node.js.yml | 24 +-- .prettierrc.json | 3 + README.md | 4 +- example/README.md | 1 - example/convex/README.md | 2 +- example/convex/example.test.ts | 10 +- example/convex/example.ts | 18 +- example/convex/schema.ts | 12 +- node10stubs.mjs | 6 +- package.json | 1 + src/client/index.ts | 56 ++--- src/component/convex.config.ts | 2 +- src/component/logging.ts | 2 +- src/component/public.ts | 359 +++++++++++++++++++++------------ src/component/schema.ts | 8 +- 15 files changed, 309 insertions(+), 199 deletions(-) create mode 100644 .prettierrc.json diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml index 1b2e0ae..87bf576 100644 --- a/.github/workflows/node.js.yml +++ b/.github/workflows/node.js.yml @@ -1,23 +1,17 @@ name: Run tests on: push: - branches: [ "main" ] + branches: ["main"] pull_request: - branches: [ "main" ] + branches: ["main"] jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - name: Use Node.js - uses: actions/setup-node@v4 - with: - cache-dependency-path: | - example/package-lock.json - package-lock.json - node-version: '18.x' - cache: 'npm' - - run: npm i - - run: npm ci - - run: cd example && npm i && cd .. - - run: npm test + - uses: actions/checkout@v4 + - name: Use Node.js + uses: actions/setup-node@v4 + - run: npm i + - run: npm ci + - run: cd example && npm i && cd .. + - run: npm test diff --git a/.prettierrc.json b/.prettierrc.json new file mode 100644 index 0000000..757fd64 --- /dev/null +++ b/.prettierrc.json @@ -0,0 +1,3 @@ +{ + "trailingComma": "es5" +} diff --git a/README.md b/README.md index 465a8af..de2bf8d 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,9 @@ const scrapePool = new Workpool(components.scrapeWorkpool, { export const signUp = mutation({ handler: async (ctx, args) => { const userId = await ctx.db.insert("users", args); - await emailPool.enqueueAction(internal.auth.sendEmailVerification, { userId }); + await emailPool.enqueueAction(internal.auth.sendEmailVerification, { + userId, + }); }, }); diff --git a/example/README.md b/example/README.md index c894719..aafc1ee 100644 --- a/example/README.md +++ b/example/README.md @@ -2,4 +2,3 @@ Components need an app that uses them in order to run codegen. An example app is also useful for testing and documentation. - diff --git a/example/convex/README.md b/example/convex/README.md index 4d82e13..b317aef 100644 --- a/example/convex/README.md +++ b/example/convex/README.md @@ -80,7 +80,7 @@ function handleButtonPress() { // OR // use the result once the mutation has completed mutation({ first: "Hello!", second: "me" }).then((result) => - console.log(result), + console.log(result) ); } ``` diff --git a/example/convex/example.test.ts b/example/convex/example.test.ts index 6a747c3..d8c8690 100644 --- a/example/convex/example.test.ts +++ b/example/convex/example.test.ts @@ -1,7 +1,7 @@ /// import { convexTest } from "convex-test"; -import { afterEach, beforeEach, describe, expect, test, vi } from "vitest" +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import schema from "./schema"; import componentSchema from "../../src/component/schema"; import cronsSchema from "../../node_modules/@convex-dev/crons/src/component/schema"; @@ -9,7 +9,9 @@ import { api, components } from "./_generated/api"; const modules = import.meta.glob("./**/*.ts"); const componentModules = import.meta.glob("../../src/component/**/*.ts"); -const cronsModules = import.meta.glob("../../node_modules/@convex-dev/crons/src/component/**/*.ts"); +const cronsModules = import.meta.glob( + "../../node_modules/@convex-dev/crons/src/component/**/*.ts" +); describe("workpool", () => { async function setupTest() { @@ -47,13 +49,13 @@ describe("workpool", () => { }); test("enqueue and get status", async () => { - const id = await t.mutation(api.example.enqueueOneMutation, {data: 1}); + const id = await t.mutation(api.example.enqueueOneMutation, { data: 1 }); await runToCompletion(); expect(await t.query(api.example.status, { id })).toEqual({ kind: "success", result: 1, }); - }) + }); test("drop in replacement for ctx", async () => { const result = await t.action(api.example.doSomethingInPool, {}); diff --git a/example/convex/example.ts b/example/convex/example.ts index 40ae712..c5fc764 100644 --- a/example/convex/example.ts +++ b/example/convex/example.ts @@ -3,7 +3,7 @@ import { api, components } from "./_generated/api"; import { WorkId, WorkPool } from "@convex-dev/workpool"; import { v } from "convex/values"; -const pool = new WorkPool(components.workpool, { +const pool = new WorkPool(components.workpool, { maxParallelism: 3, // For tests, disable completed work cleanup. completedWorkMaxAgeMs: Number.POSITIVE_INFINITY, @@ -20,7 +20,7 @@ export const addMutation = mutation({ args: { data: v.optional(v.number()) }, handler: async (ctx, { data }) => { const d = data ?? Math.random(); - await ctx.db.insert("data", {data: d}); + await ctx.db.insert("data", { data: d }); return d; }, }); @@ -28,12 +28,12 @@ export const addMutation = mutation({ export const addAction = action({ args: { data: v.optional(v.number()) }, handler: async (ctx, { data }): Promise => { - return await ctx.runMutation(api.example.addMutation, {data}); + return await ctx.runMutation(api.example.addMutation, { data }); }, }); export const enqueueOneMutation = mutation({ - args: {data: v.number()}, + args: { data: v.number() }, handler: async (ctx, { data }): Promise => { return await pool.enqueueMutation(ctx, api.example.addMutation, { data }); }, @@ -43,7 +43,7 @@ export const status = query({ args: { id: v.string() }, handler: async (ctx, { id }) => { return await pool.status(ctx, id as WorkId); - } + }, }); export const enqueueABunchOfMutations = mutation({ @@ -59,7 +59,7 @@ export const addLowPri = mutation({ args: { data: v.optional(v.number()) }, handler: async (ctx, { data }) => { const d = -(data ?? Math.random()); - await ctx.db.insert("data", {data: d}); + await ctx.db.insert("data", { data: d }); return d; }, }); @@ -86,7 +86,7 @@ export const enqueueAndWait = action({ args: {}, handler: async (ctx, _args): Promise => { const work = await pool.enqueueAction(ctx, api.example.addAction, {}); - const result = await pool.pollResult(ctx, work, 30*1000); + const result = await pool.pollResult(ctx, work, 30 * 1000); return result; }, }); @@ -101,7 +101,7 @@ export const doSomethingInPool = action({ }); async function doSomething(ctx: ActionCtx): Promise { - const data1 = await ctx.runMutation(api.example.addMutation, {data: 1}); - const data2 = await ctx.runAction(api.example.addAction, {data: 2}); + const data1 = await ctx.runMutation(api.example.addMutation, { data: 1 }); + const data2 = await ctx.runAction(api.example.addAction, { data: 2 }); return data1 + data2; } diff --git a/example/convex/schema.ts b/example/convex/schema.ts index 724d51a..681993f 100644 --- a/example/convex/schema.ts +++ b/example/convex/schema.ts @@ -1,10 +1,8 @@ import { defineSchema, defineTable } from "convex/server"; import { v } from "convex/values"; -export default defineSchema( - { - data: defineTable({ - data: v.number(), - }), - }, -); +export default defineSchema({ + data: defineTable({ + data: v.number(), + }), +}); diff --git a/node10stubs.mjs b/node10stubs.mjs index 6a76782..26c5136 100644 --- a/node10stubs.mjs +++ b/node10stubs.mjs @@ -46,7 +46,7 @@ async function processSubPackages(packageJsonPath, exports, cleanup = false) { await fs.mkdir(newDir, { recursive: true }); await fs.writeFile( newPackageJsonPath, - JSON.stringify(newPackageJson, null, 2), + JSON.stringify(newPackageJson, null, 2) ); } } @@ -71,11 +71,11 @@ async function main() { if (isCleanup) { console.log( - "Node10 module resolution compatibility stub directories removed.", + "Node10 module resolution compatibility stub directories removed." ); } else { console.log( - "Node10 module resolution compatibility stub directories created", + "Node10 module resolution compatibility stub directories created" ); } } catch (error) { diff --git a/package.json b/package.json index 0214f6b..adfdda9 100644 --- a/package.json +++ b/package.json @@ -28,6 +28,7 @@ "prepack": "node node10stubs.mjs", "postpack": "node node10stubs.mjs --cleanup", "test": "vitest run", + "format": "prettier --write .", "test:debug": "vitest --inspect-brk --no-file-parallelism", "test:coverage": "vitest run --coverage --coverage.reporter=text" }, diff --git a/src/client/index.ts b/src/client/index.ts index c7c92b8..9551bb3 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -22,45 +22,45 @@ export class WorkPool { private options: { // How many actions/mutations can be running at once within this pool. // Min 1, Max 300. - maxParallelism: number, + maxParallelism: number; // How long an action can run before the pool considers it to be timed out. // The action itself might time out earlier. // Default 15 minutes. - actionTimeoutMs?: number, + actionTimeoutMs?: number; // How long a mutation can run before the pool considers it to be timed out. // The mutation itself might time out earlier. // Default 30 seconds. - mutationTimeoutMs?: number, + mutationTimeoutMs?: number; // How long a function started by `enqueueUnknown` or `runAt` or `runAfter` // can run before the pool considers it to be timed out. // The function itself might time out earlier. // Default 15 minutes. - unknownTimeoutMs?: number, + unknownTimeoutMs?: number; // When there is something to do, wait this long between loop iterations, // to allow more work to accumulate. // Default 50ms. - debounceMs?: number, + debounceMs?: number; // When something is running, wait this long to check if anything has // been canceled or failed unexpectedly. // Default 10s. - fastHeartbeatMs?: number, + fastHeartbeatMs?: number; // When nothing is happening, wait this long to check if there is new work // that we missed. // Default 2 hours. - slowHeartbeatMs?: number, + slowHeartbeatMs?: number; // How much to log. // Default WARN. - logLevel?: LogLevel, + logLevel?: LogLevel; // How long to keep completed work in the database, for access by `status`, // `tryResult`, and `pollResult`. // Default 1 day. - completedWorkMaxAgeMs?: number, + completedWorkMaxAgeMs?: number; } ) {} async enqueueAction( ctx: RunMutationCtx, - fn: FunctionReference<'action', FunctionVisibility, Args, ReturnType>, - fnArgs: Args, + fn: FunctionReference<"action", FunctionVisibility, Args, ReturnType>, + fnArgs: Args ): Promise> { const handle = await createFunctionHandle(fn); const id = await ctx.runMutation(this.component.public.enqueue, { @@ -74,8 +74,8 @@ export class WorkPool { } async enqueueMutation( ctx: RunMutationCtx, - fn: FunctionReference<'mutation', FunctionVisibility, Args, ReturnType>, - fnArgs: Args, + fn: FunctionReference<"mutation", FunctionVisibility, Args, ReturnType>, + fnArgs: Args ): Promise> { const handle = await createFunctionHandle(fn); const id = await ctx.runMutation(this.component.public.enqueue, { @@ -91,16 +91,21 @@ export class WorkPool { // which can happen if it comes from `runAt` or `runAfter`. async enqueueUnknown( ctx: RunMutationCtx, - fn: FunctionReference<'action' | 'mutation', FunctionVisibility, Args, null>, + fn: FunctionReference< + "action" | "mutation", + FunctionVisibility, + Args, + null + >, fnArgs: Args, - runAtTime: number, + runAtTime: number ): Promise> { const handle = await createFunctionHandle(fn); const id = await ctx.runMutation(this.component.public.enqueue, { handle, options: this.options, fnArgs, - fnType: 'unknown', + fnType: "unknown", runAtTime, }); return id as WorkId; @@ -110,15 +115,18 @@ export class WorkPool { } async status( ctx: RunQueryCtx, - id: WorkId, + id: WorkId ): Promise< - { kind: "pending" } | { kind: "inProgress" } | { kind: "success", result: ReturnType } | { kind: "error", error: string } + | { kind: "pending" } + | { kind: "inProgress" } + | { kind: "success"; result: ReturnType } + | { kind: "error"; error: string } > { return await ctx.runQuery(this.component.public.status, { id }); } async tryResult( ctx: RunQueryCtx, - id: WorkId, + id: WorkId ): Promise { const status = await this.status(ctx, id); if (status.kind === "success") { @@ -132,7 +140,7 @@ export class WorkPool { async pollResult( ctx: RunQueryCtx & RunActionCtx, id: WorkId, - timeoutMs: number, + timeoutMs: number ): Promise { const start = Date.now(); while (true) { @@ -147,16 +155,16 @@ export class WorkPool { } } ctx( - ctx: GenericActionCtx, + ctx: GenericActionCtx ): GenericActionCtx { return { runAction: (async (action: any, args: any) => { const workId = await this.enqueueAction(ctx, action, args); - return this.pollResult(ctx, workId, 30*1000); + return this.pollResult(ctx, workId, 30 * 1000); }) as any, runMutation: (async (mutation: any, args: any) => { const workId = await this.enqueueMutation(ctx, mutation, args); - return this.pollResult(ctx, workId, 30*1000); + return this.pollResult(ctx, workId, 30 * 1000); }) as any, scheduler: { runAfter: async (delay: number, fn: any, args: any) => { @@ -167,7 +175,7 @@ export class WorkPool { }, cancel: async (id: any) => { await this.cancel(ctx, id); - } + }, } as any, auth: ctx.auth, storage: ctx.storage, diff --git a/src/component/convex.config.ts b/src/component/convex.config.ts index b6d5a51..a61bcde 100644 --- a/src/component/convex.config.ts +++ b/src/component/convex.config.ts @@ -5,4 +5,4 @@ const component = defineComponent("workpool"); component.use(crons); -export default component; \ No newline at end of file +export default component; diff --git a/src/component/logging.ts b/src/component/logging.ts index 99a80dc..745195a 100644 --- a/src/component/logging.ts +++ b/src/component/logging.ts @@ -6,7 +6,7 @@ export const logLevel = v.union( v.literal("DEBUG"), v.literal("INFO"), v.literal("WARN"), - v.literal("ERROR"), + v.literal("ERROR") ); export type LogLevel = Infer; diff --git a/src/component/public.ts b/src/component/public.ts index 874df55..65e9fa0 100644 --- a/src/component/public.ts +++ b/src/component/public.ts @@ -1,5 +1,13 @@ import { v } from "convex/values"; -import { DatabaseReader, internalAction, internalMutation, mutation, MutationCtx, query, QueryCtx } from "./_generated/server"; +import { + DatabaseReader, + internalAction, + internalMutation, + mutation, + MutationCtx, + query, + QueryCtx, +} from "./_generated/server"; import { FunctionHandle } from "convex/server"; import { Doc, Id } from "./_generated/dataModel"; import { api, internal } from "./_generated/api"; @@ -24,26 +32,28 @@ export const enqueue = mutation({ completedWorkMaxAgeMs: v.optional(v.number()), }), fnArgs: v.any(), - fnType: v.union(v.literal("action"), v.literal("mutation"), v.literal("unknown")), + fnType: v.union( + v.literal("action"), + v.literal("mutation"), + v.literal("unknown") + ), runAtTime: v.number(), }, returns: v.id("pendingWork"), handler: async (ctx, { handle, options, fnArgs, fnType, runAtTime }) => { const debounceMs = options.debounceMs ?? 50; - await ensurePoolExists( - ctx, - { - maxParallelism: options.maxParallelism, - actionTimeoutMs: options.actionTimeoutMs ?? 15 * 60 * 1000, - mutationTimeoutMs: options.mutationTimeoutMs ?? 30 * 1000, - unknownTimeoutMs: options.unknownTimeoutMs ?? 15 * 60 * 1000, - debounceMs, - fastHeartbeatMs: options.fastHeartbeatMs ?? 10 * 1000, - slowHeartbeatMs: options.slowHeartbeatMs ?? 2 * 60 * 60 * 1000, - completedWorkMaxAgeMs: options.completedWorkMaxAgeMs ?? 24 * 60 * 60 * 1000, - logLevel: options.logLevel ?? "WARN", - }, - ); + await ensurePoolExists(ctx, { + maxParallelism: options.maxParallelism, + actionTimeoutMs: options.actionTimeoutMs ?? 15 * 60 * 1000, + mutationTimeoutMs: options.mutationTimeoutMs ?? 30 * 1000, + unknownTimeoutMs: options.unknownTimeoutMs ?? 15 * 60 * 1000, + debounceMs, + fastHeartbeatMs: options.fastHeartbeatMs ?? 10 * 1000, + slowHeartbeatMs: options.slowHeartbeatMs ?? 2 * 60 * 60 * 1000, + completedWorkMaxAgeMs: + options.completedWorkMaxAgeMs ?? 24 * 60 * 60 * 1000, + logLevel: options.logLevel ?? "WARN", + }); const workId = await ctx.db.insert("pendingWork", { handle, fnArgs, @@ -104,7 +114,9 @@ export const mainLoop = internalMutation({ const loopDoc = await ctx.db.query("mainLoop").unique(); const expectedGeneration = loopDoc?.generation ?? 0; if (expectedGeneration !== args.generation) { - throw new Error(`mainLoop generation mismatch ${expectedGeneration} !== ${args.generation}`); + throw new Error( + `mainLoop generation mismatch ${expectedGeneration} !== ${args.generation}` + ); } if (loopDoc) { await ctx.db.patch(loopDoc._id, { generation: args.generation + 1 }); @@ -121,7 +133,8 @@ export const mainLoop = internalMutation({ await kickMainLoop(ctx, 60 * 60 * 1000, true); return; } - const { maxParallelism, debounceMs, fastHeartbeatMs, slowHeartbeatMs } = options; + const { maxParallelism, debounceMs, fastHeartbeatMs, slowHeartbeatMs } = + options; console_.time("inProgress count"); // This is the only function reading and writing inProgressWork, @@ -133,22 +146,28 @@ export const mainLoop = internalMutation({ // Move from pendingWork to inProgressWork. console_.time("pendingWork"); - const toSchedule = Math.min(maxParallelism - inProgressBefore.length, BATCH_SIZE); + const toSchedule = Math.min( + maxParallelism - inProgressBefore.length, + BATCH_SIZE + ); let didSomething = false; - const pending = await ctx.db.query("pendingWork") - .withIndex("runAtTime", q=>q.lte("runAtTime", Date.now())) + const pending = await ctx.db + .query("pendingWork") + .withIndex("runAtTime", (q) => q.lte("runAtTime", Date.now())) .take(toSchedule); console_.debug(`scheduling ${pending.length} pending work`); - await Promise.all(pending.map(async (work) => { - const { scheduledId, timeoutMs } = await beginWork(ctx, work); - await ctx.db.insert("inProgressWork", { - running: scheduledId, - timeoutMs, - workId: work._id, - }); - await ctx.db.delete(work._id); - didSomething = true; - })); + await Promise.all( + pending.map(async (work) => { + const { scheduledId, timeoutMs } = await beginWork(ctx, work); + await ctx.db.insert("inProgressWork", { + running: scheduledId, + timeoutMs, + workId: work._id, + }); + await ctx.db.delete(work._id); + didSomething = true; + }) + ); console_.timeEnd("pendingWork"); // Move from pendingCompletion to completedWork, deleting from inProgressWork. @@ -157,37 +176,47 @@ export const mainLoop = internalMutation({ console_.time("pendingCompletion"); const completed = await ctx.db.query("pendingCompletion").take(BATCH_SIZE); console_.debug(`completing ${completed.length}`); - await Promise.all(completed.map(async (work) => { - const inProgressWork = await ctx.db.query("inProgressWork").withIndex("workId", (q) => q.eq("workId", work.workId)).unique(); - if (inProgressWork) { - await ctx.db.delete(inProgressWork._id); - } - await ctx.db.delete(work._id); - await ctx.db.insert("completedWork", { - result: work.result, - error: work.error, - workId: work.workId, - }); - didSomething = true; - })); + await Promise.all( + completed.map(async (work) => { + const inProgressWork = await ctx.db + .query("inProgressWork") + .withIndex("workId", (q) => q.eq("workId", work.workId)) + .unique(); + if (inProgressWork) { + await ctx.db.delete(inProgressWork._id); + } + await ctx.db.delete(work._id); + await ctx.db.insert("completedWork", { + result: work.result, + error: work.error, + workId: work.workId, + }); + didSomething = true; + }) + ); console_.timeEnd("pendingCompletion"); console_.time("pendingCancelation"); const canceled = await ctx.db.query("pendingCancelation").take(BATCH_SIZE); console_.debug(`canceling ${canceled.length}`); - await Promise.all(canceled.map(async (work) => { - const inProgressWork = await ctx.db.query("inProgressWork").withIndex("workId", (q) => q.eq("workId", work.workId)).unique(); - if (inProgressWork) { - await ctx.scheduler.cancel(inProgressWork.running); - await ctx.db.delete(inProgressWork._id); - await ctx.db.insert("completedWork", { - workId: work.workId, - error: "Canceled", - }); - } - await ctx.db.delete(work._id); - didSomething = true; - })); + await Promise.all( + canceled.map(async (work) => { + const inProgressWork = await ctx.db + .query("inProgressWork") + .withIndex("workId", (q) => q.eq("workId", work.workId)) + .unique(); + if (inProgressWork) { + await ctx.scheduler.cancel(inProgressWork.running); + await ctx.db.delete(inProgressWork._id); + await ctx.db.insert("completedWork", { + workId: work.workId, + error: "Canceled", + }); + } + await ctx.db.delete(work._id); + didSomething = true; + }) + ); console_.timeEnd("pendingCancelation"); if (completed.length === 0) { @@ -196,18 +225,24 @@ export const mainLoop = internalMutation({ // This will find everything that timed out, failed ungracefully, was // cancelled, or succeeded without a return value. const inProgress = await ctx.db.query("inProgressWork").collect(); - await Promise.all(inProgress.map(async (work) => { - const result = await checkInProgressWork(ctx, work); - if (result !== null) { - console_.debug("inProgressWork finished uncleanly", work.workId, result); - await ctx.db.delete(work._id); - await ctx.db.insert("completedWork", { - workId: work.workId, - ...result, - }); - didSomething = true; - } - })); + await Promise.all( + inProgress.map(async (work) => { + const result = await checkInProgressWork(ctx, work); + if (result !== null) { + console_.debug( + "inProgressWork finished uncleanly", + work.workId, + result + ); + await ctx.db.delete(work._id); + await ctx.db.insert("completedWork", { + workId: work.workId, + ...result, + }); + didSomething = true; + } + }) + ); console_.timeEnd("inProgressWork check for unclean exits"); } @@ -218,25 +253,32 @@ export const mainLoop = internalMutation({ } else { // Decide when to wake up. const allInProgressWork = await ctx.db.query("inProgressWork").collect(); - const nextPending = await ctx.db.query("pendingWork").withIndex("runAtTime").first(); - const nextPendingTime = nextPending ? nextPending.runAtTime : slowHeartbeatMs + Date.now(); - const nextInProgress = allInProgressWork.length ? Math.min( - fastHeartbeatMs + Date.now(), - ...allInProgressWork.map((w) => w._creationTime + w.timeoutMs), - ) : Number.POSITIVE_INFINITY; + const nextPending = await ctx.db + .query("pendingWork") + .withIndex("runAtTime") + .first(); + const nextPendingTime = nextPending + ? nextPending.runAtTime + : slowHeartbeatMs + Date.now(); + const nextInProgress = allInProgressWork.length + ? Math.min( + fastHeartbeatMs + Date.now(), + ...allInProgressWork.map((w) => w._creationTime + w.timeoutMs) + ) + : Number.POSITIVE_INFINITY; const nextTime = Math.min(nextPendingTime, nextInProgress); await kickMainLoop(ctx, nextTime - Date.now(), true); } console_.timeEnd("kickMainLoop"); - } + }, }); async function beginWork( ctx: MutationCtx, - work: Doc<"pendingWork">, + work: Doc<"pendingWork"> ): Promise<{ - scheduledId: Id<"_scheduled_functions">, - timeoutMs: number, + scheduledId: Id<"_scheduled_functions">; + timeoutMs: number; }> { const options = await getOptions(ctx.db); if (!options) { @@ -245,25 +287,37 @@ async function beginWork( const { mutationTimeoutMs, actionTimeoutMs, unknownTimeoutMs } = options; if (work.fnType === "action") { return { - scheduledId: await ctx.scheduler.runAfter(0, internal.public.runActionWrapper, { - workId: work._id, - handle: work.handle, - fnArgs: work.fnArgs, - }), + scheduledId: await ctx.scheduler.runAfter( + 0, + internal.public.runActionWrapper, + { + workId: work._id, + handle: work.handle, + fnArgs: work.fnArgs, + } + ), timeoutMs: actionTimeoutMs, }; } else if (work.fnType === "mutation") { return { - scheduledId: await ctx.scheduler.runAfter(0, internal.public.runMutationWrapper, { - workId: work._id, - handle: work.handle, - fnArgs: work.fnArgs, - }), + scheduledId: await ctx.scheduler.runAfter( + 0, + internal.public.runMutationWrapper, + { + workId: work._id, + handle: work.handle, + fnArgs: work.fnArgs, + } + ), timeoutMs: mutationTimeoutMs, }; } else if (work.fnType === "unknown") { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const handle = work.handle as FunctionHandle<'action' | 'mutation', any, any>; + const handle = work.handle as FunctionHandle< + "action" | "mutation", + any, + any + >; return { scheduledId: await ctx.scheduler.runAfter(0, handle, work.fnArgs), timeoutMs: unknownTimeoutMs, @@ -275,12 +329,15 @@ async function beginWork( async function checkInProgressWork( ctx: MutationCtx, - doc: Doc<"inProgressWork">, -): Promise<{ result?: unknown, error?: string } | null> { + doc: Doc<"inProgressWork"> +): Promise<{ result?: unknown; error?: string } | null> { const workStatus = await ctx.db.system.get(doc.running); if (workStatus === null) { return { error: "Timeout" }; - } else if (workStatus.state.kind === "pending" || workStatus.state.kind === "inProgress") { + } else if ( + workStatus.state.kind === "pending" || + workStatus.state.kind === "inProgress" + ) { if (Date.now() - workStatus._creationTime > doc.timeoutMs) { await ctx.scheduler.cancel(doc.running); return { error: "Timeout" }; @@ -306,12 +363,18 @@ export const runActionWrapper = internalAction({ }, handler: async (ctx, { workId, handle: handleStr, fnArgs }) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const handle = handleStr as FunctionHandle<'action', any, any>; + const handle = handleStr as FunctionHandle<"action", any, any>; try { const retval = await ctx.runAction(handle, fnArgs); - await ctx.runMutation(internal.public.saveResult, { workId, result: retval }); + await ctx.runMutation(internal.public.saveResult, { + workId, + result: retval, + }); } catch (e: unknown) { - await ctx.runMutation(internal.public.saveResult, { workId, error: (e as Error).message }); + await ctx.runMutation(internal.public.saveResult, { + workId, + error: (e as Error).message, + }); } }, }); @@ -325,11 +388,18 @@ export const saveResult = internalMutation({ handler: saveResultHandler, }); -async function saveResultHandler(ctx: MutationCtx, { workId, result, error }: { - workId: Id<"pendingWork">, - result?: unknown, - error?: string, -}): Promise { +async function saveResultHandler( + ctx: MutationCtx, + { + workId, + result, + error, + }: { + workId: Id<"pendingWork">; + result?: unknown; + error?: string; + } +): Promise { const options = await getOptions(ctx.db); if (!options) { throw new Error("cannot save result with no pool"); @@ -351,28 +421,36 @@ export const runMutationWrapper = internalMutation({ }, handler: async (ctx, { workId, handle: handleStr, fnArgs }) => { // eslint-disable-next-line @typescript-eslint/no-explicit-any - const handle = handleStr as FunctionHandle<'mutation', any, any>; + const handle = handleStr as FunctionHandle<"mutation", any, any>; try { const retval = await ctx.runMutation(handle, fnArgs); await saveResultHandler(ctx, { workId, result: retval }); } catch (e: unknown) { await saveResultHandler(ctx, { workId, error: (e as Error).message }); } - } + }, }); async function startMainLoopHandler(ctx: MutationCtx) { const mainLoop = await ctx.db.query("mainLoop").unique(); if (!mainLoop) { (await console(ctx)).debug("starting mainLoop"); - const fn = await ctx.scheduler.runAfter(0, internal.public.mainLoop, { generation: 0 }); - await ctx.db.insert("mainLoop", { fn, generation: 0, runAtTime: Date.now() }); + const fn = await ctx.scheduler.runAfter(0, internal.public.mainLoop, { + generation: 0, + }); + await ctx.db.insert("mainLoop", { + fn, + generation: 0, + runAtTime: Date.now(), + }); return; } const existingFn = mainLoop.fn ? await ctx.db.system.get(mainLoop.fn) : null; if (existingFn === null || existingFn.completedTime) { // mainLoop stopped, so we restart it. - const fn = await ctx.scheduler.runAfter(0, internal.public.mainLoop, { generation: mainLoop.generation }); + const fn = await ctx.scheduler.runAfter(0, internal.public.mainLoop, { + generation: mainLoop.generation, + }); await ctx.db.patch(mainLoop._id, { fn }); (await console(ctx)).debug("mainLoop stopped, so we restarted it"); } @@ -406,33 +484,46 @@ export const stopCleanup = mutation({ }, }); -async function kickMainLoop(ctx: MutationCtx, delayMs: number, isCurrentlyExecuting: boolean): Promise { +async function kickMainLoop( + ctx: MutationCtx, + delayMs: number, + isCurrentlyExecuting: boolean +): Promise { const debounceMs = (await getOptions(ctx.db))?.debounceMs ?? 50; const delay = Math.max(delayMs, debounceMs); const runAtTime = Date.now() + delay; // Look for mainLoop documents that we want to reschedule. // If we're currently running mainLoop, we definitely want to reschedule. // Otherwise, only reschedule if the new runAtTime is earlier than the existing one. - const mainLoop = await ctx.db.query("mainLoop").withIndex("runAtTime", q => { - if (isCurrentlyExecuting) return q; - else return q.gt("runAtTime", runAtTime) - }).unique(); + const mainLoop = await ctx.db + .query("mainLoop") + .withIndex("runAtTime", (q) => { + if (isCurrentlyExecuting) return q; + else return q.gt("runAtTime", runAtTime); + }) + .unique(); if (!mainLoop) { // Two possibilities: // 1. There is no main loop, in which case `startMainLoop` needs to be called. // 2. The main loop is scheduled to run soon, so we don't need to do anything. // Unfortunately, we can't tell the difference between those cases without taking // a read dependency on soon-to-be-run mainLoop documents, so we assume the latter. - (await console(ctx)).debug("mainLoop already scheduled to run soon (or doesn't exist, in which case you should call `startMainLoop`)"); + (await console(ctx)).debug( + "mainLoop already scheduled to run soon (or doesn't exist, in which case you should call `startMainLoop`)" + ); return; } // mainLoop is scheduled to run later, so we should cancel it and reschedule. if (!isCurrentlyExecuting && mainLoop.fn) { await ctx.scheduler.cancel(mainLoop.fn); } - const fn = await ctx.scheduler.runAt(runAtTime, internal.public.mainLoop, { generation: mainLoop.generation }); + const fn = await ctx.scheduler.runAt(runAtTime, internal.public.mainLoop, { + generation: mainLoop.generation, + }); await ctx.db.patch(mainLoop._id, { fn, runAtTime }); - (await console(ctx)).debug("mainLoop was scheduled later, so reschedule it to run sooner"); + (await console(ctx)).debug( + "mainLoop was scheduled later, so reschedule it to run sooner" + ); } export const status = query({ @@ -453,10 +544,11 @@ export const status = query({ v.object({ kind: v.literal("error"), error: v.string(), - }), + }) ), handler: async (ctx, { id }) => { - const completedWork = await ctx.db.query("completedWork") + const completedWork = await ctx.db + .query("completedWork") .withIndex("workId", (q) => q.eq("workId", id)) .unique(); if (completedWork) { @@ -482,7 +574,8 @@ export const cleanup = mutation({ }, handler: async (ctx, { maxAgeMs }) => { const old = Date.now() - maxAgeMs; - const docs = await ctx.db.query("completedWork") + const docs = await ctx.db + .query("completedWork") .withIndex("by_creation_time", (q) => q.lt("_creationTime", old)) .collect(); await Promise.all(docs.map((doc) => ctx.db.delete(doc._id))); @@ -505,16 +598,16 @@ async function ensurePoolExists( completedWorkMaxAgeMs, logLevel, }: { - maxParallelism: number, - actionTimeoutMs: number, - mutationTimeoutMs: number, - unknownTimeoutMs: number, - debounceMs: number, - fastHeartbeatMs: number, - slowHeartbeatMs: number, - completedWorkMaxAgeMs: number, - logLevel: LogLevel, - }, + maxParallelism: number; + actionTimeoutMs: number; + mutationTimeoutMs: number; + unknownTimeoutMs: number; + debounceMs: number; + fastHeartbeatMs: number; + slowHeartbeatMs: number; + completedWorkMaxAgeMs: number; + logLevel: LogLevel; + } ) { if (maxParallelism > MAX_POSSIBLE_PARALLELISM) { throw new Error(`maxParallelism must be <= ${MAX_POSSIBLE_PARALLELISM}`); @@ -574,14 +667,20 @@ async function ensurePoolExists( async function ensureCleanupCron( ctx: MutationCtx, - completedWorkMaxAgeMs: number, + completedWorkMaxAgeMs: number ) { if (completedWorkMaxAgeMs === Number.POSITIVE_INFINITY) { return; } const cronFrequencyMs = Math.min(completedWorkMaxAgeMs, 24 * 60 * 60 * 1000); let cleanupCron = await crons.get(ctx, { name: CLEANUP_CRON_NAME }); - if (cleanupCron !== null && !(cleanupCron.schedule.kind === "interval" && cleanupCron.schedule.ms === cronFrequencyMs)) { + if ( + cleanupCron !== null && + !( + cleanupCron.schedule.kind === "interval" && + cleanupCron.schedule.ms === cronFrequencyMs + ) + ) { await crons.delete(ctx, { id: cleanupCron.id }); cleanupCron = null; } @@ -591,7 +690,7 @@ async function ensureCleanupCron( { kind: "interval", ms: completedWorkMaxAgeMs }, api.public.cleanup, { maxAgeMs: completedWorkMaxAgeMs }, - CLEANUP_CRON_NAME, + CLEANUP_CRON_NAME ); } } diff --git a/src/component/schema.ts b/src/component/schema.ts index a4aed57..5e8eab1 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -56,7 +56,11 @@ export default defineSchema({ }).index("runAtTime", ["runAtTime"]), pendingWork: defineTable({ - fnType: v.union(v.literal("action"), v.literal("mutation"), v.literal("unknown")), + fnType: v.union( + v.literal("action"), + v.literal("mutation"), + v.literal("unknown") + ), handle: v.string(), fnArgs: v.any(), runAtTime: v.number(), @@ -75,7 +79,7 @@ export default defineSchema({ timeoutMs: v.number(), workId: v.id("pendingWork"), }).index("workId", ["workId"]), - + completedWork: defineTable({ result: v.optional(v.any()), error: v.optional(v.string()), From 373497fc96d5d2933becaffb269e941ebda10eb2 Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Sat, 9 Nov 2024 14:04:29 -0500 Subject: [PATCH 2/3] fix test dep --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index adfdda9..527f2b9 100644 --- a/package.json +++ b/package.json @@ -65,7 +65,7 @@ "devDependencies": { "@eslint/js": "^9.9.1", "@types/node": "^18.17.0", - "convex-test": "file:../convex-test/convex-test-0.0.33.tgz", + "convex-test": "^0.0.34", "eslint": "^9.9.1", "globals": "^15.9.0", "prettier": "3.2.5", From 4fb76681e07f841b4c7e7c594cc0ca959f6e3d63 Mon Sep 17 00:00:00 2001 From: Lee Danilek Date: Sun, 10 Nov 2024 16:34:03 -0500 Subject: [PATCH 3/3] fix tests --- example/convex/example.test.ts | 5 ----- package.json | 2 +- src/component/public.ts | 5 ++++- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/example/convex/example.test.ts b/example/convex/example.test.ts index d8c8690..8917577 100644 --- a/example/convex/example.test.ts +++ b/example/convex/example.test.ts @@ -56,9 +56,4 @@ describe("workpool", () => { result: 1, }); }); - - test("drop in replacement for ctx", async () => { - const result = await t.action(api.example.doSomethingInPool, {}); - expect(result).toEqual(3); - }); }); diff --git a/package.json b/package.json index 527f2b9..4690bb0 100644 --- a/package.json +++ b/package.json @@ -65,7 +65,7 @@ "devDependencies": { "@eslint/js": "^9.9.1", "@types/node": "^18.17.0", - "convex-test": "^0.0.34", + "convex-test": "^0.0.35-alpha.0", "eslint": "^9.9.1", "globals": "^15.9.0", "prettier": "3.2.5", diff --git a/src/component/public.ts b/src/component/public.ts index 65e9fa0..98b236e 100644 --- a/src/component/public.ts +++ b/src/component/public.ts @@ -130,6 +130,7 @@ export const mainLoop = internalMutation({ const options = await getOptions(ctx.db); if (!options) { + console_.info("no pool, skipping mainLoop"); await kickMainLoop(ctx, 60 * 60 * 1000, true); return; } @@ -312,10 +313,11 @@ async function beginWork( timeoutMs: mutationTimeoutMs, }; } else if (work.fnType === "unknown") { - // eslint-disable-next-line @typescript-eslint/no-explicit-any const handle = work.handle as FunctionHandle< "action" | "mutation", + // eslint-disable-next-line @typescript-eslint/no-explicit-any any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any any >; return { @@ -670,6 +672,7 @@ async function ensureCleanupCron( completedWorkMaxAgeMs: number ) { if (completedWorkMaxAgeMs === Number.POSITIVE_INFINITY) { + (await console(ctx)).info("completedWorkMaxAgeMs is Infinity, so we won't schedule cleanup"); return; } const cronFrequencyMs = Math.min(completedWorkMaxAgeMs, 24 * 60 * 60 * 1000);