diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 5ded8d2..7665861 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -8,6 +8,7 @@ * @module */ +import type * as crons from "../crons.js"; import type * as example from "../example.js"; import type { @@ -24,6 +25,7 @@ import type { * ``` */ declare const fullApi: ApiFromModules<{ + crons: typeof crons; example: typeof example; }>; declare const fullApiWithMounts: typeof fullApi; @@ -53,6 +55,7 @@ export declare const components: { { fnArgs: any; fnHandle: string; + fnName: string; fnType: "action" | "mutation" | "unknown"; options: { actionTimeoutMs?: number; @@ -98,6 +101,7 @@ export declare const components: { { fnArgs: any; fnHandle: string; + fnName: string; fnType: "action" | "mutation" | "unknown"; options: { actionTimeoutMs?: number; diff --git a/example/convex/crons.ts b/example/convex/crons.ts new file mode 100644 index 0000000..7bf8b20 --- /dev/null +++ b/example/convex/crons.ts @@ -0,0 +1,18 @@ +import { cronJobs } from "convex/server"; +import { internal } from "./_generated/api"; + +const crons = cronJobs(); + +crons.interval( + "start background work", + { minutes: 1 }, // every minute + internal.example.startBackgroundWork, +); + +crons.interval( + "start foreground work", + { seconds: 20 }, // every 20 seconds + internal.example.startForegroundWork, +); + +export default crons; \ No newline at end of file diff --git a/example/convex/example.ts b/example/convex/example.ts index af00acb..dee6ea8 100644 --- a/example/convex/example.ts +++ b/example/convex/example.ts @@ -1,5 +1,5 @@ -import { mutation, action, query, ActionCtx } from "./_generated/server"; -import { api, components } from "./_generated/api"; +import { mutation, action, query, internalMutation, internalAction } from "./_generated/server"; +import { api, components, internal } from "./_generated/api"; import { WorkId, WorkPool } from "@convex-dev/workpool"; import { v } from "convex/values"; @@ -91,17 +91,44 @@ export const enqueueAndWait = action({ }, }); -export const doSomethingInPool = action({ +async function sampleWork() { + const index = Math.floor(Math.random() * 3000) + 1; + const url = `https://xkcd.com/${index}`; + const response = await fetch(url); + const text = await response.text(); + const titleMatch = text.match(/(.*)<\/title>/); + console.log(`xkcd ${index} title: ${titleMatch?.[1]}`); +} + +// Example background work: scraping from a website. +export const backgroundWork = internalAction({ args: {}, - handler: async (ctx, _args): Promise<number> => { - // poolCtx is a drop-in replacement for ctx that does all work in the pool. - const poolCtx = pool.ctx(ctx); - return await doSomething(poolCtx); + handler: async () => { + await sampleWork(); + }, +}); + +export const startBackgroundWork = internalMutation({ + args: {}, + handler: async (ctx, _args) => { + for (let i = 0; i < 20; i++) { + await lowpriPool.enqueueAction(ctx, internal.example.backgroundWork, {}); + } + }, +}); + +// Example foreground work: calling an API on behalf of a user. +export const foregroundWork = internalAction({ + args: {}, + handler: async () => { + await sampleWork(); + }, +}); + +export const startForegroundWork = internalMutation({ + args: {}, + handler: async (ctx, _args) => { + await pool.enqueueAction(ctx, internal.example.foregroundWork, {}); }, }); -async function doSomething(ctx: ActionCtx): Promise<number> { - const data1 = await ctx.runMutation(api.example.addMutation, { data: 1 }); - const data2 = await ctx.runAction(api.example.addAction, { data: 2 }); - return data1 + data2; -} diff --git a/src/client/index.ts b/src/client/index.ts index 3ebb403..d31d848 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -9,6 +9,7 @@ import { GenericDataModel, GenericMutationCtx, GenericQueryCtx, + getFunctionName, } from "convex/server"; import { GenericId } from "convex/values"; import { api } from "../component/_generated/api"; @@ -19,6 +20,8 @@ export type WorkId<ReturnType> = string & { __returnType: ReturnType }; export class WorkPool { constructor( private component: UseApi<typeof api>, + // TODO(emma) reduce the number of options. consider removing the timeout options. + // consider removing the debounceMs option and the heartbeats. private options: { /** How many actions/mutations can be running at once within this pool. * Min 1, Max 300. @@ -74,6 +77,7 @@ export class WorkPool { const fnHandle = await createFunctionHandle(fn); const id = await ctx.runMutation(this.component.lib.enqueue, { fnHandle, + fnName: getFunctionName(fn), fnArgs, fnType: "action", runAtTime: Date.now(), @@ -89,6 +93,7 @@ export class WorkPool { const fnHandle = await createFunctionHandle(fn); const id = await ctx.runMutation(this.component.lib.enqueue, { fnHandle, + fnName: getFunctionName(fn), fnArgs, fnType: "mutation", runAtTime: Date.now(), @@ -112,6 +117,7 @@ export class WorkPool { const fnHandle = await createFunctionHandle(fn); const id = await ctx.runMutation(this.component.lib.enqueue, { fnHandle, + fnName: getFunctionName(fn), fnArgs, fnType: "unknown", runAtTime, @@ -146,6 +152,7 @@ export class WorkPool { } return undefined; } + // TODO(emma) consider removing. Apps can do this with `tryResult` if they want, and this is a tight resource-intensive loop. async pollResult<ReturnType>( ctx: RunQueryCtx & RunActionCtx, id: WorkId<ReturnType>, @@ -163,6 +170,10 @@ export class WorkPool { await new Promise<void>((resolve) => setTimeout(resolve, 50)); } } + // TODO(emma): just make this a wrapper around the scheduler. + // don't need to do the runAction/runMutation here. + // Also we can consider deleting this method entirely; just make them use + // enqueueMutation and enqueueAction. ctx<DataModel extends GenericDataModel>( ctx: GenericActionCtx<DataModel> ): GenericActionCtx<DataModel> { diff --git a/src/component/_generated/api.d.ts b/src/component/_generated/api.d.ts index bf78513..982bdc9 100644 --- a/src/component/_generated/api.d.ts +++ b/src/component/_generated/api.d.ts @@ -10,6 +10,7 @@ import type * as lib from "../lib.js"; import type * as logging from "../logging.js"; +import type * as stats from "../stats.js"; import type { ApiFromModules, @@ -27,6 +28,7 @@ import type { declare const fullApi: ApiFromModules<{ lib: typeof lib; logging: typeof logging; + stats: typeof stats; }>; export type Mounts = { lib: { @@ -38,6 +40,7 @@ export type Mounts = { { fnArgs: any; fnHandle: string; + fnName: string; fnType: "action" | "mutation" | "unknown"; options: { actionTimeoutMs?: number; diff --git a/src/component/lib.ts b/src/component/lib.ts index 5811e33..ec85097 100644 --- a/src/component/lib.ts +++ b/src/component/lib.ts @@ -14,12 +14,14 @@ import { api, internal } from "./_generated/api"; import { createLogger, logLevel } from "./logging"; import { components } from "./_generated/api"; import { Crons } from "@convex-dev/crons"; +import { recordCompleted, recordStarted } from "./stats"; const crons = new Crons(components.crons); export const enqueue = mutation({ args: { fnHandle: v.string(), + fnName: v.string(), fnArgs: v.any(), fnType: v.union( v.literal("action"), @@ -40,7 +42,7 @@ export const enqueue = mutation({ }), }, returns: v.id("pendingWork"), - handler: async (ctx, { fnHandle, options, fnArgs, fnType, runAtTime }) => { + handler: async (ctx, { fnHandle, fnName, options, fnArgs, fnType, runAtTime }) => { const debounceMs = options.debounceMs ?? 50; await ensurePoolExists(ctx, { maxParallelism: options.maxParallelism, @@ -55,6 +57,7 @@ export const enqueue = mutation({ }); const workId = await ctx.db.insert("pendingWork", { fnHandle, + fnName, fnArgs, fnType, runAtTime, @@ -179,6 +182,7 @@ export const mainLoop = internalMutation({ error: work.error, workId: work.workId, }); + recordCompleted(work.workId, work.error ? "failure" : "success"); didSomething = true; }) ); @@ -200,6 +204,7 @@ export const mainLoop = internalMutation({ workId: work.workId, error: "Canceled", }); + recordCompleted(work.workId, "canceled"); } await ctx.db.delete(work._id); didSomething = true; @@ -227,6 +232,7 @@ export const mainLoop = internalMutation({ workId: work.workId, ...result, }); + recordCompleted(work.workId, result.error ? "failure" : "success"); didSomething = true; } }) @@ -272,6 +278,7 @@ async function beginWork( if (!options) { throw new Error("cannot begin work with no pool"); } + recordStarted(work._id, work.fnName, work._creationTime, work.runAtTime); const { mutationTimeoutMs, actionTimeoutMs, unknownTimeoutMs } = options; if (work.fnType === "action") { return { diff --git a/src/component/schema.ts b/src/component/schema.ts index 6fe9b2f..2bcf100 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -49,6 +49,11 @@ export default defineSchema({ }), // State across all pools. + // TODO(emma) change this to use a boolean or enum of statuses, instead of using runAtTime. + // Status like "running", "waitingForJobCompletion", "idle". + // Currently there's a problem if enqueue is called from a mutation that takes longer than + // debounceMs to complete, and a mainLoop finishes and restarts in that time window. Then the enqueue will OCC with the mainLoop. + // But if we have fixed statuses, we don't need to write it so frequently so it won't OCC. Chat with @ian about details. mainLoop: defineTable({ fn: v.optional(v.id("_scheduled_functions")), generation: v.number(), @@ -62,6 +67,7 @@ export default defineSchema({ v.literal("unknown") ), fnHandle: v.string(), + fnName: v.string(), fnArgs: v.any(), runAtTime: v.number(), }).index("runAtTime", ["runAtTime"]), diff --git a/src/component/stats.ts b/src/component/stats.ts new file mode 100644 index 0000000..048d1a2 --- /dev/null +++ b/src/component/stats.ts @@ -0,0 +1,36 @@ +import { Id } from "./_generated/dataModel"; + +/** + * Record stats about work execution. Intended to be queried by Axiom or Datadog. + */ + +/** + * Sample axiom dashboard query: + +workpool +| extend parsed_message = iff( + isnotnull(parse_json(trim("'", tostring(["data.message"])))), + parse_json(trim("'", tostring(["data.message"]))), + parse_json('{}') +) +| extend lagSinceEnqueued = parsed_message["lagSinceEnqueued"] +| extend fnName = parsed_message["fnName"] +| summarize avg(todouble(lagSinceEnqueued)) by bin_auto(_time), tostring(fnName) + + */ + +export function recordStarted(workId: Id<"pendingWork">, fnName: string, enqueuedAt: number, runAtTime: number) { + console.log(JSON.stringify({ + workId, + event: "started", + fnName, + enqueuedAt, + runAtTime, + startedAt: Date.now(), + lagSinceEnqueued: Date.now() - enqueuedAt, + })); +} + +export function recordCompleted(workId: Id<"pendingWork">, status: "success" | "failure" | "canceled") { + console.log(JSON.stringify({ workId, completedAt: Date.now(), status })); +}