Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Proving: Fixed bullmq workers to support proving async #218

Open
wants to merge 2 commits into
base: feature/enable-proving
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions packages/deployment/src/queue/BullQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface BullQueueConfig {
port: number;
username?: string;
password?: string;
db?: number;
};
retryAttempts?: number;
}
Expand All @@ -25,17 +26,41 @@ export class BullQueue
extends SequencerModule<BullQueueConfig>
implements TaskQueue
{
private activePromise?: Promise<void>;

public createWorker(
name: string,
executor: (data: TaskPayload) => Promise<TaskPayload>,
options?: { concurrency?: number }
): Closeable {
const worker = new Worker<TaskPayload, TaskPayload>(
name,
async (job) => await executor(job.data),
async (job) => {
// This weird promise logic is needed to make sure the worker is not proving in parallel
// This is by far not optimal - since it still picks up 1 task per queue but waits until
// computing them, so that leads to bad performance over multiple workers.
// For that we need to restructure tasks to be flowing through a single queue however
while (this.activePromise !== undefined) {
// eslint-disable-next-line no-await-in-loop
await this.activePromise;
}
let resOutside: () => void = () => {};
const promise = new Promise<void>((res) => {
resOutside = res;
});
this.activePromise = promise;

const result = await executor(job.data);
this.activePromise = undefined;
void resOutside();

return result;
},
{
concurrency: options?.concurrency ?? 1,
connection: this.config.redis,
stalledInterval: 60000, // 1 minute
lockDuration: 60000, // 1 minute

metrics: { maxDataPoints: MetricsTime.ONE_HOUR * 24 },
}
Expand Down Expand Up @@ -68,6 +93,7 @@ export class BullQueue
name: queueName,

async addTask(payload: TaskPayload): Promise<{ taskId: string }> {
log.debug("Adding task: ", payload);
const job = await queue.add(queueName, payload, {
attempts: retryAttempts ?? 2,
});
Expand All @@ -76,14 +102,25 @@ export class BullQueue

async onCompleted(listener: (payload: TaskPayload) => Promise<void>) {
events.on("completed", async (result) => {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
await listener(JSON.parse(result.returnvalue) as TaskPayload);
log.debug("Completed task: ", result);
try {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
await listener(result.returnvalue as unknown as TaskPayload);
} catch (e) {
// Catch error explicitly since this promise is dangling,
// therefore any error will be voided as well
log.error(e);
}
});
events.on("error", async (error) => {
log.error("Error in worker", error);
});
await events.waitUntilReady();
},

async close(): Promise<void> {
await events.close();
await queue.drain();
await queue.close();
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ export class StateTransitionProverProgrammable extends ZkProgrammable<
index = 0
) {
const witness = Provable.witness(RollupMerkleTreeWitness, () =>
this.witnessProvider.getWitness(transition.path)
this.witnessProvider.getWitness(Field(transition.path.toString()))
);

const membershipValid = witness.checkMembership(
Expand Down
8 changes: 6 additions & 2 deletions packages/sequencer/src/worker/worker/FlowTaskWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class FlowTaskWorker<Tasks extends Task<any, any>[]>
log.debug(`Init task handler ${task.name}`);
const queueName = task.name;
return this.queue.createWorker(queueName, async (data) => {
log.debug(`Received task in queue ${queueName}`);
log.debug(`Received task ${data.taskId} in queue ${queueName}`);

try {
// Use first handler that returns a non-undefined result
Expand All @@ -51,12 +51,16 @@ export class FlowTaskWorker<Tasks extends Task<any, any>[]>
payload: await task.resultSerializer().toJSON(output),
};

log.debug(
`Responding to task ${data.taskId} with ${result.payload.slice(0, 100)}`
);

return result;
} catch (error: unknown) {
const payload =
error instanceof Error ? error.message : JSON.stringify(error);

log.debug("Error in worker (detailed trace): ", error);
log.info("Error in worker (detailed trace): ", error);

return {
status: "error",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ export class WorkerRegistrationFlow implements Closeable {

flow?: Closeable;

public async start(payload: WorkerStartupPayload): Promise<void> {
public async start(
payload: Omit<WorkerStartupPayload, "salt">
): Promise<void> {
const flow = this.flowCreator.createFlow("register-worker-flow", {});
this.flow = flow;

Expand All @@ -28,6 +30,7 @@ export class WorkerRegistrationFlow implements Closeable {
// eslint-disable-next-line no-await-in-loop
await flow.withFlow(async (res, rej) => {
log.trace("Pushing registration task");

await flow.pushTask(this.task, payload, async (result) => {
// Here someone could inject things to happen when the worker registers
res(result);
Expand Down
31 changes: 31 additions & 0 deletions packages/sequencer/test-integration/workers/ChildProcessWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { spawn, ChildProcess } from "node:child_process";

export class ChildProcessWorker {
process?: ChildProcess;

start(forwardLogs: boolean = true) {
const s = spawn("node", [
"--experimental-vm-modules",
"--experimental-wasm-modules",
"../../node_modules/jest/bin/jest.js",
"./test-integration/workers/worker.test.ts",
]);
s.on("error", (err) => {
console.error(err);
});
if (forwardLogs) {
s.stdout.on("data", (data) => {
process.stdout.write(data);
});
}
s.stderr.on("data", (data) => {
process.stderr.write(data);
});

this.process = s;
}

kill() {
this?.process?.kill();
}
}
6 changes: 6 additions & 0 deletions packages/sequencer/test-integration/workers/WorkerModules.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { LocalTaskWorkerModule, TaskQueue, TypedClass } from "../../src";

export interface MinimumWorkerModules {
TaskQueue: TypedClass<TaskQueue>;
LocalTaskWorkerModule: TypedClass<LocalTaskWorkerModule<any>>;
}
50 changes: 50 additions & 0 deletions packages/sequencer/test-integration/workers/modules.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { Runtime } from "@proto-kit/module";
import { Protocol } from "@proto-kit/protocol";
import { VanillaProtocolModules } from "@proto-kit/library";
import { ModulesConfig } from "@proto-kit/common";
import { BullQueueConfig } from "@proto-kit/deployment";

import { ProvenBalance } from "../../test/integration/mocks/ProvenBalance";
import { ProtocolStateTestHook } from "../../test/integration/mocks/ProtocolStateTestHook";

export const runtimeClass = Runtime.from({
modules: {
Balance: ProvenBalance,
},

config: {
Balance: {},
},
});

export const protocolClass = Protocol.from({
modules: VanillaProtocolModules.mandatoryModules({
ProtocolStateTestHook,
}),
});

export const runtimeProtocolConfig: ModulesConfig<{
Runtime: typeof runtimeClass;
Protocol: typeof protocolClass;
}> = {
Runtime: {
Balance: {},
},
Protocol: {
AccountState: {},
BlockProver: {},
StateTransitionProver: {},
BlockHeight: {},
LastStateRoot: {},
ProtocolStateTestHook: {},
},
};

export const BullConfig: BullQueueConfig = {
redis: {
host: "localhost",
port: 6379,
password: "password",
db: 1,
},
};
66 changes: 66 additions & 0 deletions packages/sequencer/test-integration/workers/worker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import "reflect-metadata";
import { AppChain } from "@proto-kit/sdk";
import { BullQueue } from "@proto-kit/deployment";
import { container } from "tsyringe";
import { log, sleep } from "@proto-kit/common";

import {
LocalTaskWorkerModule,
Sequencer,
VanillaTaskWorkerModules,
} from "../../src";

import {
BullConfig,
protocolClass,
runtimeClass,
runtimeProtocolConfig,
} from "./modules";
import { MinimumWorkerModules } from "./WorkerModules";

describe("worker", () => {
it("spin up and wait", async () => {
const sequencerClass = Sequencer.from({
modules: {
TaskQueue: BullQueue,
LocalTaskWorkerModule: LocalTaskWorkerModule.from(
VanillaTaskWorkerModules.withoutSettlement()
),
} satisfies MinimumWorkerModules,
});

const app = AppChain.from({
Runtime: runtimeClass,
Sequencer: sequencerClass,
Protocol: protocolClass,
modules: {},
});

app.configure({
...runtimeProtocolConfig,
Sequencer: {
TaskQueue: BullConfig,
LocalTaskWorkerModule: VanillaTaskWorkerModules.defaultConfig(),
},
});

console.log("Starting worker...");

log.setLevel("DEBUG");

await app.start(false, container.createChildContainer());

console.log("Worker started...");

const ready = await new Promise<boolean>((res) => {
app
.resolve("Sequencer")
.resolve("LocalTaskWorkerModule")
.containerEvents.on("ready", res);
});

console.log("Ready received!");

await sleep(10000000);
}, 10000000);
});
Loading