Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logs for stats #5

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions example/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* @module
*/

import type * as crons from "../crons.js";
import type * as example from "../example.js";

import type {
Expand All @@ -24,6 +25,7 @@ import type {
* ```
*/
declare const fullApi: ApiFromModules<{
crons: typeof crons;
example: typeof example;
}>;
declare const fullApiWithMounts: typeof fullApi;
Expand Down Expand Up @@ -53,6 +55,7 @@ export declare const components: {
{
fnArgs: any;
fnHandle: string;
fnName: string;
fnType: "action" | "mutation" | "unknown";
options: {
actionTimeoutMs?: number;
Expand Down Expand Up @@ -98,6 +101,7 @@ export declare const components: {
{
fnArgs: any;
fnHandle: string;
fnName: string;
fnType: "action" | "mutation" | "unknown";
options: {
actionTimeoutMs?: number;
Expand Down
18 changes: 18 additions & 0 deletions example/convex/crons.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { cronJobs } from "convex/server";
import { internal } from "./_generated/api";

const crons = cronJobs();

crons.interval(
"start background work",
{ minutes: 1 }, // every minute
internal.example.startBackgroundWork,
);

crons.interval(
"start foreground work",
{ seconds: 20 }, // every 20 seconds
internal.example.startForegroundWork,
);

export default crons;
51 changes: 39 additions & 12 deletions example/convex/example.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { mutation, action, query, ActionCtx } from "./_generated/server";
import { api, components } from "./_generated/api";
import { mutation, action, query, internalMutation, internalAction } from "./_generated/server";
import { api, components, internal } from "./_generated/api";
import { WorkId, WorkPool } from "@convex-dev/workpool";
import { v } from "convex/values";

Expand Down Expand Up @@ -91,17 +91,44 @@ export const enqueueAndWait = action({
},
});

export const doSomethingInPool = action({
async function sampleWork() {
const index = Math.floor(Math.random() * 3000) + 1;
const url = `https://xkcd.com/${index}`;
const response = await fetch(url);
const text = await response.text();
const titleMatch = text.match(/<title>(.*)<\/title>/);
console.log(`xkcd ${index} title: ${titleMatch?.[1]}`);
}

// Example background work: scraping from a website.
export const backgroundWork = internalAction({
args: {},
handler: async (ctx, _args): Promise<number> => {
// poolCtx is a drop-in replacement for ctx that does all work in the pool.
const poolCtx = pool.ctx(ctx);
return await doSomething(poolCtx);
handler: async () => {
await sampleWork();
},
});

export const startBackgroundWork = internalMutation({
args: {},
handler: async (ctx, _args) => {
for (let i = 0; i < 20; i++) {
await lowpriPool.enqueueAction(ctx, internal.example.backgroundWork, {});
}
},
});

// Example foreground work: calling an API on behalf of a user.
export const foregroundWork = internalAction({
args: {},
handler: async () => {
await sampleWork();
},
});

export const startForegroundWork = internalMutation({
args: {},
handler: async (ctx, _args) => {
await pool.enqueueAction(ctx, internal.example.foregroundWork, {});
},
});

async function doSomething(ctx: ActionCtx): Promise<number> {
const data1 = await ctx.runMutation(api.example.addMutation, { data: 1 });
const data2 = await ctx.runAction(api.example.addAction, { data: 2 });
return data1 + data2;
}
11 changes: 11 additions & 0 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
GenericDataModel,
GenericMutationCtx,
GenericQueryCtx,
getFunctionName,
} from "convex/server";
import { GenericId } from "convex/values";
import { api } from "../component/_generated/api";
Expand All @@ -19,6 +20,8 @@ export type WorkId<ReturnType> = string & { __returnType: ReturnType };
export class WorkPool {
constructor(
private component: UseApi<typeof api>,
// TODO(emma) reduce the number of options. consider removing the timeout options.
// consider removing the debounceMs option and the heartbeats.
private options: {
/** How many actions/mutations can be running at once within this pool.
* Min 1, Max 300.
Expand Down Expand Up @@ -74,6 +77,7 @@ export class WorkPool {
const fnHandle = await createFunctionHandle(fn);
const id = await ctx.runMutation(this.component.lib.enqueue, {
fnHandle,
fnName: getFunctionName(fn),
fnArgs,
fnType: "action",
runAtTime: Date.now(),
Expand All @@ -89,6 +93,7 @@ export class WorkPool {
const fnHandle = await createFunctionHandle(fn);
const id = await ctx.runMutation(this.component.lib.enqueue, {
fnHandle,
fnName: getFunctionName(fn),
fnArgs,
fnType: "mutation",
runAtTime: Date.now(),
Expand All @@ -112,6 +117,7 @@ export class WorkPool {
const fnHandle = await createFunctionHandle(fn);
const id = await ctx.runMutation(this.component.lib.enqueue, {
fnHandle,
fnName: getFunctionName(fn),
fnArgs,
fnType: "unknown",
runAtTime,
Expand Down Expand Up @@ -146,6 +152,7 @@ export class WorkPool {
}
return undefined;
}
// TODO(emma) consider removing. Apps can do this with `tryResult` if they want, and this is a tight resource-intensive loop.
async pollResult<ReturnType>(
ctx: RunQueryCtx & RunActionCtx,
id: WorkId<ReturnType>,
Expand All @@ -163,6 +170,10 @@ export class WorkPool {
await new Promise<void>((resolve) => setTimeout(resolve, 50));
}
}
// TODO(emma): just make this a wrapper around the scheduler.
// don't need to do the runAction/runMutation here.
// Also we can consider deleting this method entirely; just make them use
// enqueueMutation and enqueueAction.
ctx<DataModel extends GenericDataModel>(
ctx: GenericActionCtx<DataModel>
): GenericActionCtx<DataModel> {
Expand Down
3 changes: 3 additions & 0 deletions src/component/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import type * as lib from "../lib.js";
import type * as logging from "../logging.js";
import type * as stats from "../stats.js";

import type {
ApiFromModules,
Expand All @@ -27,6 +28,7 @@ import type {
declare const fullApi: ApiFromModules<{
lib: typeof lib;
logging: typeof logging;
stats: typeof stats;
}>;
export type Mounts = {
lib: {
Expand All @@ -38,6 +40,7 @@ export type Mounts = {
{
fnArgs: any;
fnHandle: string;
fnName: string;
fnType: "action" | "mutation" | "unknown";
options: {
actionTimeoutMs?: number;
Expand Down
9 changes: 8 additions & 1 deletion src/component/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import { api, internal } from "./_generated/api";
import { createLogger, logLevel } from "./logging";
import { components } from "./_generated/api";
import { Crons } from "@convex-dev/crons";
import { recordCompleted, recordStarted } from "./stats";

const crons = new Crons(components.crons);

export const enqueue = mutation({
args: {
fnHandle: v.string(),
fnName: v.string(),
fnArgs: v.any(),
fnType: v.union(
v.literal("action"),
Expand All @@ -40,7 +42,7 @@ export const enqueue = mutation({
}),
},
returns: v.id("pendingWork"),
handler: async (ctx, { fnHandle, options, fnArgs, fnType, runAtTime }) => {
handler: async (ctx, { fnHandle, fnName, options, fnArgs, fnType, runAtTime }) => {
const debounceMs = options.debounceMs ?? 50;
await ensurePoolExists(ctx, {
maxParallelism: options.maxParallelism,
Expand All @@ -55,6 +57,7 @@ export const enqueue = mutation({
});
const workId = await ctx.db.insert("pendingWork", {
fnHandle,
fnName,
fnArgs,
fnType,
runAtTime,
Expand Down Expand Up @@ -179,6 +182,7 @@ export const mainLoop = internalMutation({
error: work.error,
workId: work.workId,
});
recordCompleted(work.workId, work.error ? "failure" : "success");
didSomething = true;
})
);
Expand All @@ -200,6 +204,7 @@ export const mainLoop = internalMutation({
workId: work.workId,
error: "Canceled",
});
recordCompleted(work.workId, "canceled");
}
await ctx.db.delete(work._id);
didSomething = true;
Expand Down Expand Up @@ -227,6 +232,7 @@ export const mainLoop = internalMutation({
workId: work.workId,
...result,
});
recordCompleted(work.workId, result.error ? "failure" : "success");
didSomething = true;
}
})
Expand Down Expand Up @@ -272,6 +278,7 @@ async function beginWork(
if (!options) {
throw new Error("cannot begin work with no pool");
}
recordStarted(work._id, work.fnName, work._creationTime, work.runAtTime);
const { mutationTimeoutMs, actionTimeoutMs, unknownTimeoutMs } = options;
if (work.fnType === "action") {
return {
Expand Down
6 changes: 6 additions & 0 deletions src/component/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ export default defineSchema({
}),

// State across all pools.
// TODO(emma) change this to use a boolean or enum of statuses, instead of using runAtTime.
// Status like "running", "waitingForJobCompletion", "idle".
// Currently there's a problem if enqueue is called from a mutation that takes longer than
// debounceMs to complete, and a mainLoop finishes and restarts in that time window. Then the enqueue will OCC with the mainLoop.
// But if we have fixed statuses, we don't need to write it so frequently so it won't OCC. Chat with @ian about details.
mainLoop: defineTable({
fn: v.optional(v.id("_scheduled_functions")),
generation: v.number(),
Expand All @@ -62,6 +67,7 @@ export default defineSchema({
v.literal("unknown")
),
fnHandle: v.string(),
fnName: v.string(),
fnArgs: v.any(),
runAtTime: v.number(),
}).index("runAtTime", ["runAtTime"]),
Expand Down
36 changes: 36 additions & 0 deletions src/component/stats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Id } from "./_generated/dataModel";

/**
* Record stats about work execution. Intended to be queried by Axiom or Datadog.
*/

/**
* Sample axiom dashboard query:

workpool
| extend parsed_message = iff(
isnotnull(parse_json(trim("'", tostring(["data.message"])))),
parse_json(trim("'", tostring(["data.message"]))),
parse_json('{}')
)
| extend lagSinceEnqueued = parsed_message["lagSinceEnqueued"]
| extend fnName = parsed_message["fnName"]
| summarize avg(todouble(lagSinceEnqueued)) by bin_auto(_time), tostring(fnName)

*/

export function recordStarted(workId: Id<"pendingWork">, fnName: string, enqueuedAt: number, runAtTime: number) {
console.log(JSON.stringify({
workId,
event: "started",
fnName,
enqueuedAt,
runAtTime,
startedAt: Date.now(),
lagSinceEnqueued: Date.now() - enqueuedAt,
}));
}

export function recordCompleted(workId: Id<"pendingWork">, status: "success" | "failure" | "canceled") {
console.log(JSON.stringify({ workId, completedAt: Date.now(), status }));
}