diff --git a/packages/deployment/src/queue/BullQueue.ts b/packages/deployment/src/queue/BullQueue.ts index faf01f64e..c7fe8d5e0 100644 --- a/packages/deployment/src/queue/BullQueue.ts +++ b/packages/deployment/src/queue/BullQueue.ts @@ -28,11 +28,11 @@ export class BullQueue public createWorker( name: string, executor: (data: TaskPayload) => Promise, - options?: { concurrency: number } + options?: { concurrency?: number } ): Closeable { - const worker = new Worker( + const worker = new Worker( name, - async (job) => JSON.stringify(await executor(job.data)), + async (job) => await executor(job.data), { concurrency: options?.concurrency ?? 1, connection: this.config.redis, diff --git a/packages/protocol/src/index.ts b/packages/protocol/src/index.ts index aa605a21c..1a24ea093 100644 --- a/packages/protocol/src/index.ts +++ b/packages/protocol/src/index.ts @@ -19,7 +19,9 @@ export * from "./utils/StateTransitionReductionList"; export * from "./utils/utils"; export * from "./prover/block/BlockProver"; export * from "./prover/block/BlockProvable"; +export * from "./prover/block/accummulators/RuntimeVerificationKeyTree"; export * from "./prover/block/accummulators/BlockHashMerkleTree"; +export * from "./prover/block/services/RuntimeVerificationKeyRootService"; export * from "./prover/statetransition/StateTransitionProver"; export * from "./prover/statetransition/StateTransitionProvable"; export * from "./prover/statetransition/StateTransitionWitnessProvider"; @@ -29,7 +31,6 @@ export * from "./protocol/ProtocolModule"; export * from "./protocol/ProtocolEnvironment"; export * from "./protocol/ProvableTransactionHook"; export * from "./protocol/ProvableBlockHook"; -export * from "./protocol/VerificationKeyService"; export * from "./state/context/TransitionMethodExecutionContext"; export * from "./state/context/RuntimeMethodExecutionContext"; export * from "./state/protocol/ProtocolState"; diff --git a/packages/protocol/src/prover/block/BlockProvable.ts b/packages/protocol/src/prover/block/BlockProvable.ts index f2a2c0472..75bdd7bb1 100644 --- a/packages/protocol/src/prover/block/BlockProvable.ts +++ b/packages/protocol/src/prover/block/BlockProvable.ts @@ -15,6 +15,7 @@ import { RuntimeTransaction } from "../../model/transaction/RuntimeTransaction"; import { NetworkState } from "../../model/network/NetworkState"; import { BlockHashMerkleTreeWitness } from "./accummulators/BlockHashMerkleTree"; +import { RuntimeVerificationKeyAttestation } from "./accummulators/RuntimeVerificationKeyTree"; export class BlockProverPublicInput extends Struct({ transactionsHash: Field, @@ -75,7 +76,8 @@ export interface BlockProvable publicInput: BlockProverPublicInput, stateProof: StateTransitionProof, appProof: DynamicRuntimeProof, - executionData: BlockProverExecutionData + executionData: BlockProverExecutionData, + verificationKeyAttestation: RuntimeVerificationKeyAttestation ) => Promise; proveBlock: ( diff --git a/packages/protocol/src/prover/block/BlockProver.ts b/packages/protocol/src/prover/block/BlockProver.ts index 807cf4ff6..63fca7d66 100644 --- a/packages/protocol/src/prover/block/BlockProver.ts +++ b/packages/protocol/src/prover/block/BlockProver.ts @@ -19,10 +19,6 @@ import { ZkProgrammable, } from "@proto-kit/common"; -import { - MethodVKConfigData, - VerificationKeyService, -} from "../../protocol/VerificationKeyService"; import { DefaultProvableHashList } from "../../utils/ProvableHashList"; import { MethodPublicOutput } from "../../model/MethodPublicOutput"; import { ProtocolModule } from "../../protocol/ProtocolModule"; @@ -58,6 +54,12 @@ import { BlockHashMerkleTreeWitness, BlockHashTreeEntry, } from "./accummulators/BlockHashMerkleTree"; +import { + MethodVKConfigData, + MinimalVKTreeService, + RuntimeVerificationKeyAttestation, +} from "./accummulators/RuntimeVerificationKeyTree"; +import { RuntimeVerificationKeyRootService } from "./services/RuntimeVerificationKeyRootService"; const errors = { stateProofNotStartingAtZero: () => @@ -148,7 +150,7 @@ export class BlockProverProgrammable extends ZkProgrammable< public readonly runtime: ZkProgrammable, private readonly transactionHooks: ProvableTransactionHook[], private readonly blockHooks: ProvableBlockHook[], - private readonly verificationKeyService: VerificationKeyService + private readonly verificationKeyService: MinimalVKTreeService ) { super(); } @@ -165,6 +167,7 @@ export class BlockProverProgrammable extends ZkProgrammable< * @param stateTransitionProof * @param runtimeProof * @param executionData + * @param verificationKey * @returns The new BlockProver-state to be used as public output */ public async applyTransaction( @@ -407,7 +410,8 @@ export class BlockProverProgrammable extends ZkProgrammable< publicInput: BlockProverPublicInput, stateProof: StateTransitionProof, runtimeProof: DynamicRuntimeProof, - executionData: BlockProverExecutionData + executionData: BlockProverExecutionData, + verificationKeyWitness: RuntimeVerificationKeyAttestation ): Promise { const state: BlockProverState = { ...publicInput, @@ -418,35 +422,18 @@ export class BlockProverProgrammable extends ZkProgrammable< "ExecutionData Networkstate doesn't equal public input hash" ); - const zkProgramConfig = Provable.witness(MethodVKConfigData, () => - this.verificationKeyService.getVKConfig( - executionData.transaction.methodId.toBigInt() - ) - ); - const witness = Provable.witness( - VerificationKeyService.getWitnessType(), - () => - this.verificationKeyService.getWitness( - executionData.transaction.methodId.toBigInt() - ) - ); - const vkRecord = Provable.witness(VerificationKey, () => - this.verificationKeyService.getVkRecordEntry( - executionData.transaction.methodId.toBigInt() - ) - ); + // Verify the [methodId, vk] tuple against the baked-in vk tree root + const { verificationKey, witness: verificationKeyTreeWitness } = + verificationKeyWitness; const root = Field(this.verificationKeyService.getRoot()); - const calculatedRoot = witness.calculateRoot(zkProgramConfig.hash()); - root.assertEquals(calculatedRoot, errors.invalidZkProgramTreeRoot()); - - zkProgramConfig.methodId.assertEquals( - executionData.transaction.methodId, - errors.invalidZkProgramConfigMethodId() + const calculatedRoot = verificationKeyTreeWitness.calculateRoot( + new MethodVKConfigData({ + methodId: executionData.transaction.methodId, + vkHash: verificationKey.hash, + }).hash() ); - vkRecord.hash.assertEquals(zkProgramConfig.vkHash); - - runtimeProof.verify(vkRecord); + root.assertEquals(calculatedRoot, errors.invalidZkProgramTreeRoot()); const bundleInclusionState = this.addTransactionToBundle( state, @@ -459,7 +446,7 @@ export class BlockProverProgrammable extends ZkProgrammable< stateProof, runtimeProof, executionData, - vkRecord + verificationKey ); return new BlockProverPublicOutput({ @@ -817,19 +804,22 @@ export class BlockProverProgrammable extends ZkProgrammable< StateTransitionProofClass, RuntimeProofClass, BlockProverExecutionData, + RuntimeVerificationKeyAttestation, ], async method( publicInput: BlockProverPublicInput, stateProof: StateTransitionProof, appProof: DynamicRuntimeProof, - executionData: BlockProverExecutionData + executionData: BlockProverExecutionData, + verificationKeyAttestation: RuntimeVerificationKeyAttestation ) { return await proveTransaction( publicInput, stateProof, appProof, - executionData + executionData, + verificationKeyAttestation ); }, }, @@ -915,7 +905,7 @@ export class BlockProver extends ProtocolModule implements BlockProvable { transactionHooks: ProvableTransactionHook[], @injectAll("ProvableBlockHook") blockHooks: ProvableBlockHook[], - verificationKeyService: VerificationKeyService + verificationKeyService: RuntimeVerificationKeyRootService ) { super(); this.zkProgrammable = new BlockProverProgrammable( @@ -932,13 +922,15 @@ export class BlockProver extends ProtocolModule implements BlockProvable { publicInput: BlockProverPublicInput, stateProof: StateTransitionProof, appProof: DynamicRuntimeProof, - executionData: BlockProverExecutionData + executionData: BlockProverExecutionData, + verificationKeyAttestation: RuntimeVerificationKeyAttestation ): Promise { return this.zkProgrammable.proveTransaction( publicInput, stateProof, appProof, - executionData + executionData, + verificationKeyAttestation ); } diff --git a/packages/protocol/src/prover/block/accummulators/RuntimeVerificationKeyTree.ts b/packages/protocol/src/prover/block/accummulators/RuntimeVerificationKeyTree.ts new file mode 100644 index 000000000..b5a85d47d --- /dev/null +++ b/packages/protocol/src/prover/block/accummulators/RuntimeVerificationKeyTree.ts @@ -0,0 +1,24 @@ +import { createMerkleTree } from "@proto-kit/common"; +import { Field, Poseidon, Struct, VerificationKey } from "o1js"; + +export const treeFeeHeight = 10; +export class VKTree extends createMerkleTree(treeFeeHeight) {} +export class VKTreeWitness extends VKTree.WITNESS {} + +export class RuntimeVerificationKeyAttestation extends Struct({ + verificationKey: VerificationKey, + witness: VKTreeWitness, +}) {} + +export class MethodVKConfigData extends Struct({ + methodId: Field, + vkHash: Field, +}) { + public hash() { + return Poseidon.hash(MethodVKConfigData.toFields(this)); + } +} + +export interface MinimalVKTreeService { + getRoot: () => bigint; +} diff --git a/packages/protocol/src/prover/block/services/RuntimeVerificationKeyRootService.ts b/packages/protocol/src/prover/block/services/RuntimeVerificationKeyRootService.ts new file mode 100644 index 000000000..4a2211757 --- /dev/null +++ b/packages/protocol/src/prover/block/services/RuntimeVerificationKeyRootService.ts @@ -0,0 +1,20 @@ +import { injectable, Lifecycle, scoped } from "tsyringe"; + +import { MinimalVKTreeService } from "../accummulators/RuntimeVerificationKeyTree"; + +@injectable() +@scoped(Lifecycle.ContainerScoped) +export class RuntimeVerificationKeyRootService implements MinimalVKTreeService { + private injectedRoot?: bigint; + + public setRoot(root: bigint) { + this.injectedRoot = root; + } + + public getRoot() { + if (this.injectedRoot === undefined) { + throw new Error("VKTree root not set"); + } + return this.injectedRoot; + } +} diff --git a/packages/sdk/src/appChain/TestingAppChain.ts b/packages/sdk/src/appChain/TestingAppChain.ts index ce7d50c3f..409787297 100644 --- a/packages/sdk/src/appChain/TestingAppChain.ts +++ b/packages/sdk/src/appChain/TestingAppChain.ts @@ -22,6 +22,7 @@ import { BlockProducerModule, InMemoryDatabase, SequencerModulesRecord, + VanillaTaskWorkerModules, } from "@proto-kit/sequencer"; import { TypedClass } from "@proto-kit/common"; import { PrivateKey } from "o1js"; @@ -108,15 +109,7 @@ export class TestingAppChain< BlockTrigger: {}, Mempool: {}, BatchProducerModule: {}, - LocalTaskWorkerModule: { - StateTransitionTask: {}, - RuntimeProvingTask: {}, - StateTransitionReductionTask: {}, - BlockReductionTask: {}, - BlockProvingTask: {}, - BlockBuildingTask: {}, - CircuitCompilerTask: {}, - }, + LocalTaskWorkerModule: VanillaTaskWorkerModules.defaultConfig(), BaseLayer: {}, BlockProducerModule: {}, TaskQueue: { diff --git a/packages/sequencer/src/protocol/ProtocolStartupModule.ts b/packages/sequencer/src/protocol/ProtocolStartupModule.ts index e9eac0ba6..48df481e8 100644 --- a/packages/sequencer/src/protocol/ProtocolStartupModule.ts +++ b/packages/sequencer/src/protocol/ProtocolStartupModule.ts @@ -2,32 +2,34 @@ import { inject } from "tsyringe"; import { MandatoryProtocolModulesRecord, Protocol, - VerificationKeyService, - VKRecord, + RuntimeVerificationKeyRootService, } from "@proto-kit/protocol"; -import { log } from "@proto-kit/common"; +import { log, sleep } from "@proto-kit/common"; import { SequencerModule, sequencerModule, } from "../sequencer/builder/SequencerModule"; import { FlowCreator } from "../worker/flow/Flow"; +import { WorkerRegistrationFlow } from "../worker/WorkerRegistrationFlow"; import { CircuitCompilerTask } from "./production/tasks/CircuitCompilerTask"; +import { + VerificationKeyService, + VKRecord, +} from "./runtime/RuntimeVerificationKeyService"; @sequencerModule() export class ProtocolStartupModule extends SequencerModule { - private readonly verificationKeyService: VerificationKeyService; - public constructor( private readonly flowCreator: FlowCreator, - @inject("Protocol") protocol: Protocol, - private readonly compileTask: CircuitCompilerTask + @inject("Protocol") + private readonly protocol: Protocol, + private readonly compileTask: CircuitCompilerTask, + private readonly verificationKeyService: VerificationKeyService, + private readonly registrationFlow: WorkerRegistrationFlow ) { super(); - this.verificationKeyService = protocol.dependencyContainer.resolve( - VerificationKeyService - ); } public async start() { @@ -44,5 +46,19 @@ export class ProtocolStartupModule extends SequencerModule { log.info("Protocol circuits compiled"); await this.verificationKeyService.initializeVKTree(vks); + + const root = this.verificationKeyService.getRoot(); + + this.protocol.dependencyContainer + .resolve(RuntimeVerificationKeyRootService) + .setRoot(root); + + await this.registrationFlow.start({ + runtimeVerificationKeyRoot: root, + }); + + await sleep(500); + + log.info("Protocol circuits compiled successfully, commencing startup"); } } diff --git a/packages/sequencer/src/protocol/production/BatchProducerModule.ts b/packages/sequencer/src/protocol/production/BatchProducerModule.ts index 5a52ad479..967d34ac4 100644 --- a/packages/sequencer/src/protocol/production/BatchProducerModule.ts +++ b/packages/sequencer/src/protocol/production/BatchProducerModule.ts @@ -21,6 +21,7 @@ import { CachedMerkleTreeStore } from "../../state/merkle/CachedMerkleTreeStore" import { AsyncStateService } from "../../state/async/AsyncStateService"; import { AsyncMerkleTreeStore } from "../../state/async/AsyncMerkleTreeStore"; import { BlockResult, BlockWithResult } from "../../storage/model/Block"; +import { VerificationKeyService } from "../runtime/RuntimeVerificationKeyService"; import { BlockProverParameters } from "./tasks/BlockProvingTask"; import { StateTransitionProofParameters } from "./tasks/StateTransitionTaskParameters"; @@ -82,7 +83,8 @@ export class BatchProducerModule extends SequencerModule { private readonly blockTreeStore: AsyncMerkleTreeStore, private readonly traceService: TransactionTraceService, private readonly blockFlowService: BlockTaskFlowService, - private readonly blockProofSerializer: BlockProofSerializer + private readonly blockProofSerializer: BlockProofSerializer, + private readonly verificationKeyService: VerificationKeyService ) { super(); } @@ -248,6 +250,7 @@ export class BatchProducerModule extends SequencerModule { const result = await this.traceService.createTransactionTrace( tx, stateServices, + this.verificationKeyService, block.networkState.during, bundleTracker, eternalBundleTracker, diff --git a/packages/sequencer/src/protocol/production/TransactionTraceService.ts b/packages/sequencer/src/protocol/production/TransactionTraceService.ts index 6d242ab91..f16ea9e33 100644 --- a/packages/sequencer/src/protocol/production/TransactionTraceService.ts +++ b/packages/sequencer/src/protocol/production/TransactionTraceService.ts @@ -23,6 +23,7 @@ import type { BlockWithResult, } from "../../storage/model/Block"; import { AsyncMerkleTreeStore } from "../../state/async/AsyncMerkleTreeStore"; +import { VerificationKeyService } from "../runtime/RuntimeVerificationKeyService"; import type { TransactionTrace, BlockTrace } from "./BatchProducerModule"; import { StateTransitionProofParameters } from "./tasks/StateTransitionTaskParameters"; @@ -173,6 +174,7 @@ export class TransactionTraceService { stateService: CachedStateService; merkleStore: CachedMerkleTreeStore; }, + verificationKeyService: VerificationKeyService, networkState: NetworkState, bundleTracker: ProvableHashList, eternalBundleTracker: ProvableHashList, @@ -218,6 +220,10 @@ export class TransactionTraceService { const signedTransaction = tx.toProtocolTransaction(); + const verificationKeyAttestation = verificationKeyService.getAttestation( + tx.methodId.toBigInt() + ); + return { runtimeProver: { tx, @@ -244,6 +250,7 @@ export class TransactionTraceService { }, startingState: protocolStartingState, + verificationKeyAttestation, }, }; } diff --git a/packages/sequencer/src/protocol/production/tasks/BlockProvingTask.ts b/packages/sequencer/src/protocol/production/tasks/BlockProvingTask.ts index 8d439bd38..14a061db2 100644 --- a/packages/sequencer/src/protocol/production/tasks/BlockProvingTask.ts +++ b/packages/sequencer/src/protocol/production/tasks/BlockProvingTask.ts @@ -8,14 +8,19 @@ import { Protocol, ProtocolModulesRecord, ReturnType, + RuntimeVerificationKeyAttestation, StateServiceProvider, StateTransitionProof, StateTransitionProvable, + VKTreeWitness, } from "@proto-kit/protocol"; import { DynamicProof, Field, Proof, Void } from "o1js"; import { Runtime } from "@proto-kit/module"; import { inject, injectable, Lifecycle, scoped } from "tsyringe"; -import { ProvableMethodExecutionContext } from "@proto-kit/common"; +import { + MOCK_VERIFICATION_KEY, + ProvableMethodExecutionContext, +} from "@proto-kit/common"; import { PairProofTaskSerializer, @@ -27,6 +32,7 @@ import { TaskSerializer, Task } from "../../../worker/flow/Task"; import { PreFilledStateService } from "../../../state/prefilled/PreFilledStateService"; import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule"; import { TaskStateRecord } from "../TransactionTraceService"; +import { VerificationKeyService } from "../../runtime/RuntimeVerificationKeyService"; import { CompileRegistry } from "./CompileRegistry"; import { JSONEncodableState } from "./RuntimeTaskParameters"; @@ -37,6 +43,7 @@ export interface BlockProverParameters { publicInput: BlockProverPublicInput; executionData: BlockProverExecutionData; startingState: TaskStateRecord; + verificationKeyAttestation: RuntimeVerificationKeyAttestation; } export type BlockProvingTaskParameters = PairingDerivedInput< @@ -87,7 +94,8 @@ export class BlockReductionTask MandatoryProtocolModulesRecord & ProtocolModulesRecord >, private readonly executionContext: ProvableMethodExecutionContext, - private readonly compileRegistry: CompileRegistry + private readonly compileRegistry: CompileRegistry, + private readonly verificationKeyService: VerificationKeyService ) { super(); this.blockProver = this.protocol.blockProver; @@ -175,6 +183,11 @@ export class BlockProvingTask startingState: DecodedStateSerializer.toJSON( input.params.startingState ), + + verificationKeyAttestation: + RuntimeVerificationKeyAttestation.toJSON( + input.params.verificationKeyAttestation + ), }, }; return JSON.stringify(jsonReadyObject); @@ -189,6 +202,9 @@ export class BlockProvingTask publicInput: ReturnType; executionData: ReturnType; startingState: JSONEncodableState; + verificationKeyAttestation: ReturnType< + typeof RuntimeVerificationKeyAttestation.toJSON + >; }; } = JSON.parse(json); @@ -208,6 +224,22 @@ export class BlockProvingTask startingState: DecodedStateSerializer.fromJSON( jsonReadyObject.params.startingState ), + + verificationKeyAttestation: + jsonReadyObject.params.verificationKeyAttestation + .verificationKey === MOCK_VERIFICATION_KEY.data + ? new RuntimeVerificationKeyAttestation({ + witness: new VKTreeWitness( + VKTreeWitness.fromJSON( + jsonReadyObject.params.verificationKeyAttestation + .witness + ) + ), + verificationKey: MOCK_VERIFICATION_KEY, + }) + : RuntimeVerificationKeyAttestation.fromJSON( + jsonReadyObject.params.verificationKeyAttestation + ), }, }; }, @@ -252,7 +284,8 @@ export class BlockProvingTask input.params.publicInput, stateTransitionProof, runtimeProofDynamic, - input.params.executionData + input.params.executionData, + input.params.verificationKeyAttestation ); } ); diff --git a/packages/sequencer/src/protocol/production/tasks/CircuitCompilerTask.ts b/packages/sequencer/src/protocol/production/tasks/CircuitCompilerTask.ts index a10db826f..fe6349326 100644 --- a/packages/sequencer/src/protocol/production/tasks/CircuitCompilerTask.ts +++ b/packages/sequencer/src/protocol/production/tasks/CircuitCompilerTask.ts @@ -1,10 +1,11 @@ import { inject, injectable, Lifecycle, scoped } from "tsyringe"; import { Runtime } from "@proto-kit/module"; -import { VKRecord } from "@proto-kit/protocol"; import { Field, VerificationKey } from "o1js"; +import { log } from "@proto-kit/common"; -import { TaskWorkerModule } from "../../../worker/worker/TaskWorkerModule"; -import { Task, TaskSerializer } from "../../../worker/flow/Task"; +import { TaskSerializer } from "../../../worker/flow/Task"; +import { VKRecord } from "../../runtime/RuntimeVerificationKeyService"; +import { UnpreparingTask } from "../../../worker/flow/UnpreparingTask"; type VKRecordLite = Record; @@ -63,10 +64,7 @@ export class VKResultSerializer implements TaskSerializer { @injectable() @scoped(Lifecycle.ContainerScoped) -export class CircuitCompilerTask - extends TaskWorkerModule - implements Task -{ +export class CircuitCompilerTask extends UnpreparingTask { public name = "compiledCircuit"; public constructor( @@ -84,6 +82,8 @@ export class CircuitCompilerTask } public async compute(): Promise { + log.info("Computing VKs"); + let methodCounter = 0; return await this.runtime.zkProgrammable.zkProgram.reduce< Promise @@ -116,8 +116,4 @@ export class CircuitCompilerTask }; }, Promise.resolve({})); } - - public async prepare(): Promise { - return await Promise.resolve(); - } } diff --git a/packages/protocol/src/protocol/VerificationKeyService.ts b/packages/sequencer/src/protocol/runtime/RuntimeVerificationKeyService.ts similarity index 59% rename from packages/protocol/src/protocol/VerificationKeyService.ts rename to packages/sequencer/src/protocol/runtime/RuntimeVerificationKeyService.ts index 2b05ddb44..fb52db301 100644 --- a/packages/protocol/src/protocol/VerificationKeyService.ts +++ b/packages/sequencer/src/protocol/runtime/RuntimeVerificationKeyService.ts @@ -1,26 +1,16 @@ -import { Field, Poseidon, Struct, VerificationKey } from "o1js"; +import { Field, VerificationKey } from "o1js"; import { - createMerkleTree, - InMemoryMerkleTreeStorage, ConfigurableModule, + InMemoryMerkleTreeStorage, ZkProgrammable, } from "@proto-kit/common"; -import { inject, injectable } from "tsyringe"; - -import { MethodPublicOutput } from "../model/MethodPublicOutput"; - -export const treeFeeHeight = 10; -export class VKTree extends createMerkleTree(treeFeeHeight) {} - -export interface MethodVKConfig { - methodId: bigint; - vkHash: string; - vk: VerificationKey; -} - -export interface VKTreeValues { - [methodId: string]: MethodVKConfig; -} +import { inject, injectable, Lifecycle, scoped } from "tsyringe"; +import { + MethodPublicOutput, + MethodVKConfigData, + RuntimeVerificationKeyAttestation, + VKTree, +} from "@proto-kit/protocol"; export interface VKIndexes { [methodId: string]: bigint; @@ -33,14 +23,6 @@ export type VKRecord = { }; }; -export class MethodVKConfigData extends Struct({ - methodId: Field, - vkHash: Field, -}) { - public hash() { - return Poseidon.hash(MethodVKConfigData.toFields(this)); - } -} export interface WithGetMethodId { getMethodId: (moduleName: string, methodName: string) => bigint; } @@ -49,7 +31,9 @@ export interface WithZkProgrammableAndGetMethodById { zkProgrammable: ZkProgrammable; methodIdResolver: WithGetMethodId; } + @injectable() +@scoped(Lifecycle.ContainerScoped) export class VerificationKeyService extends ConfigurableModule<{}> { public constructor( @inject("Runtime") @@ -61,13 +45,8 @@ export class VerificationKeyService extends ConfigurableModule<{}> { super(); } - public static getWitnessType() { - return VKTree.WITNESS; - } - private persistedVKTree?: { tree: VKTree; - values: VKTreeValues; indexes: VKIndexes; }; @@ -79,30 +58,24 @@ export class VerificationKeyService extends ConfigurableModule<{}> { const tree = new VKTree(new InMemoryMerkleTreeStorage()); const valuesVK: Record = {}; const indexes: VKIndexes = {}; - const values: VKTreeValues = {}; Object.entries(verificationKeys).forEach(([key, value]) => { const vkConfig = new MethodVKConfigData({ methodId: Field(key), vkHash: Field(value.vk.hash), }); - values[key] = { - methodId: BigInt(key), - vkHash: vkConfig.hash().toBigInt().toString(), - vk: value.vk, - }; indexes[key] = BigInt(value.index); tree.setLeaf(BigInt(value.index), vkConfig.hash()); valuesVK[key.toString()] = value.vk; }); - this.persistedVKTree = { tree, values, indexes }; + this.persistedVKTree = { tree, indexes }; this.persistedVKRecord = valuesVK; } public getVKTree() { if (this.persistedVKTree === undefined) { - throw new Error("ZkProgram Tree not intialized"); + throw new Error("ZkProgram Tree not initialized"); } return this.persistedVKTree; @@ -110,23 +83,25 @@ export class VerificationKeyService extends ConfigurableModule<{}> { public getVkRecord() { if (this.persistedVKRecord === undefined) { - throw new Error("VK record nots intialized"); + throw new Error("VK record not initialized"); } return this.persistedVKRecord; } - public getVkRecordEntry(methodId: bigint) { - const persistedVk = this.getVkRecord(); - return persistedVk[methodId.toString()]; - } + public getAttestation(methodId: bigint) { + const verificationKey = this.getVkRecord()[methodId.toString()]; + if (verificationKey === undefined) { + throw new Error( + `MethodId not registered in VerificationKeyService (${methodId})` + ); + } - public getVKConfig(methodId: bigint) { - const vkConfig = this.getVKTree().values[methodId.toString()]; + const witness = this.getWitness(methodId); - return new MethodVKConfigData({ - methodId: Field(vkConfig.methodId), - vkHash: Field(vkConfig.vkHash), + return new RuntimeVerificationKeyAttestation({ + verificationKey, + witness, }); } diff --git a/packages/sequencer/src/sequencer/executor/Sequencer.ts b/packages/sequencer/src/sequencer/executor/Sequencer.ts index 68238f490..69a1c8436 100644 --- a/packages/sequencer/src/sequencer/executor/Sequencer.ts +++ b/packages/sequencer/src/sequencer/executor/Sequencer.ts @@ -67,19 +67,6 @@ export class Sequencer * modules to start each */ public async start() { - // Set default STWitnessProvider inside protocol - // eslint-disable-next-line max-len - // TODO But what is the default? How do we deal with stages states (i.e. simulated state) in the DI container? - // const witnessProviderReference = this.protocol.dependencyContainer - // .resolve( - // StateTransitionWitnessProviderReference - // ); - // const witnessProvider = - // this.container.resolve( - // "StateTransitionWitnessProvider" - // ); - // witnessProviderReference.setWitnessProvider(witnessProvider); - this.useDependencyFactory(this.container.resolve(MethodIdFactory)); // Log startup info diff --git a/packages/sequencer/src/worker/WorkerRegistrationFlow.ts b/packages/sequencer/src/worker/WorkerRegistrationFlow.ts new file mode 100644 index 000000000..954099897 --- /dev/null +++ b/packages/sequencer/src/worker/WorkerRegistrationFlow.ts @@ -0,0 +1,44 @@ +import { injectable } from "tsyringe"; +import { log } from "@proto-kit/common"; + +import { Closeable } from "./queue/TaskQueue"; +import { FlowCreator } from "./flow/Flow"; +import { + WorkerRegistrationTask, + WorkerStartupPayload, +} from "./worker/startup/WorkerRegistrationTask"; + +@injectable() +export class WorkerRegistrationFlow implements Closeable { + public constructor( + private readonly flowCreator: FlowCreator, + private readonly task: WorkerRegistrationTask + ) {} + + flow?: Closeable; + + public async start(payload: WorkerStartupPayload): Promise { + const flow = this.flowCreator.createFlow("register-worker-flow", {}); + this.flow = flow; + + const loop = async () => { + // eslint-disable-next-line no-constant-condition + while (true) { + // eslint-disable-next-line no-await-in-loop + await flow.withFlow(async (res, rej) => { + log.trace("Pushing registration task"); + await flow.pushTask(this.task, payload, async (result) => { + // Here someone could inject things to happen when the worker registers + res(result); + }); + }); + } + }; + + void loop(); + } + + public async close() { + await this.flow?.close(); + } +} diff --git a/packages/sequencer/src/worker/flow/AbstractStartupTask.ts b/packages/sequencer/src/worker/flow/AbstractStartupTask.ts new file mode 100644 index 000000000..e5ec296bd --- /dev/null +++ b/packages/sequencer/src/worker/flow/AbstractStartupTask.ts @@ -0,0 +1,26 @@ +import { EventEmitter, EventEmittingComponent } from "@proto-kit/common"; + +import { TaskWorkerModule } from "../worker/TaskWorkerModule"; + +import { Task, TaskSerializer } from "./Task"; + +export type StartupTaskEvents = { + "startup-task-finished": []; +}; + +export abstract class AbstractStartupTask + extends TaskWorkerModule + implements EventEmittingComponent, Task +{ + abstract name: string; + + abstract prepare(): Promise; + + abstract compute(input: Input): Promise; + + abstract inputSerializer(): TaskSerializer; + + abstract resultSerializer(): TaskSerializer; + + events = new EventEmitter(); +} diff --git a/packages/sequencer/src/worker/flow/UnpreparingTask.ts b/packages/sequencer/src/worker/flow/UnpreparingTask.ts new file mode 100644 index 000000000..4ec71f076 --- /dev/null +++ b/packages/sequencer/src/worker/flow/UnpreparingTask.ts @@ -0,0 +1,26 @@ +import { noop } from "@proto-kit/common"; + +import { TaskWorkerModule } from "../worker/TaskWorkerModule"; + +import { Task, TaskSerializer } from "./Task"; + +/** + * Contract: + * Doesn't implement prepare() + */ +export abstract class UnpreparingTask + extends TaskWorkerModule + implements Task +{ + abstract name: string; + + public async prepare() { + noop(); + } + + abstract compute(input: Input): Promise; + + abstract inputSerializer(): TaskSerializer; + + abstract resultSerializer(): TaskSerializer; +} diff --git a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts index a0e2d691d..49231f271 100644 --- a/packages/sequencer/src/worker/queue/LocalTaskQueue.ts +++ b/packages/sequencer/src/worker/queue/LocalTaskQueue.ts @@ -31,7 +31,8 @@ export class LocalTaskQueue private workers: { [key: string]: { busy: boolean; - handler: (data: TaskPayload) => Promise; + handler: (data: TaskPayload) => Promise; + close: () => Promise; }; } = {}; @@ -48,6 +49,9 @@ export class LocalTaskQueue // Execute task in worker void this.workers[queueName].handler(task.payload).then((payload) => { + if (payload === "closed") { + return; + } log.trace("LocalTaskQueue got", JSON.stringify(payload)); // Notify listeners about result const listenerPromises = this.listeners[queueName].map( @@ -66,23 +70,42 @@ export class LocalTaskQueue public createWorker( queueName: string, - executor: (data: TaskPayload) => Promise + executor: (data: TaskPayload) => Promise, + options?: { concurrency?: number; singleUse?: boolean } ): Closeable { - this.workers[queueName] = { + const close = async () => { + this.workers[queueName] = { + busy: false, + + handler: async () => { + return "closed"; + }, + close: async () => {}, + }; + }; + + const worker = { busy: false, handler: async (data: TaskPayload) => { await sleep(this.config.simulatedDuration ?? 0); - return await executor(data); + const result = await executor(data); + + if (options?.singleUse ?? false) { + await close(); + } + + return result; }, + + close, }; + + this.workers[queueName] = worker; this.workNextTasks(); - return { - close: async () => { - noop(); - }, - }; + + return worker; } public async getQueue(queueName: string): Promise { @@ -93,9 +116,12 @@ export class LocalTaskQueue return { name: queueName, - addTask: async (payload: TaskPayload): Promise<{ taskId: string }> => { + addTask: async ( + payload: TaskPayload, + taskId?: string + ): Promise<{ taskId: string }> => { id += 1; - const nextId = String(id).toString(); + const nextId = taskId ?? String(id).toString(); this.queues[queueName].push({ payload, taskId: nextId }); this.workNextTasks(); diff --git a/packages/sequencer/src/worker/queue/TaskQueue.ts b/packages/sequencer/src/worker/queue/TaskQueue.ts index 7e8f1a207..af2cf289d 100644 --- a/packages/sequencer/src/worker/queue/TaskQueue.ts +++ b/packages/sequencer/src/worker/queue/TaskQueue.ts @@ -9,7 +9,8 @@ export interface TaskQueue { createWorker: ( name: string, - executor: (data: TaskPayload) => Promise + executor: (data: TaskPayload) => Promise, + options?: { concurrency?: number } ) => Closeable; } @@ -26,7 +27,10 @@ export interface InstantiatedQueue extends Closeable { /** * Adds a specific payload to the queue and returns a unique jobId */ - addTask: (payload: TaskPayload) => Promise<{ taskId: string }>; + addTask: ( + payload: TaskPayload, + taskId?: string + ) => Promise<{ taskId: string }>; /** * Registers a listener for the completion of jobs diff --git a/packages/sequencer/src/worker/worker/FlowTaskWorker.ts b/packages/sequencer/src/worker/worker/FlowTaskWorker.ts index 446a1a1f9..39f308db0 100644 --- a/packages/sequencer/src/worker/worker/FlowTaskWorker.ts +++ b/packages/sequencer/src/worker/worker/FlowTaskWorker.ts @@ -2,25 +2,21 @@ import { log } from "@proto-kit/common"; import { Closeable, TaskQueue } from "../queue/TaskQueue"; import { Task, TaskPayload } from "../flow/Task"; +import { AbstractStartupTask } from "../flow/AbstractStartupTask"; +import { UnpreparingTask } from "../flow/UnpreparingTask"; const errors = { notComputable: (name: string) => new Error(`Task ${name} not computable on selected worker`), }; -type InferTaskInput> = - TaskT extends Task ? Input : never; - -type InferTaskOutput> = - TaskT extends Task ? Output : never; - // Had to use any here, because otherwise you couldn't assign any tasks to it export class FlowTaskWorker[]> implements Closeable { private readonly queue: TaskQueue; - private workers: Closeable[] = []; + private workers: Record = {}; public constructor( mq: TaskQueue, @@ -32,6 +28,7 @@ export class FlowTaskWorker[]> // The array type is this weird, because we first want to extract the // element type, and after that, we expect multiple elements of that -> [] private initHandler(task: Task) { + log.debug(`Init task ${task.name}`); const queueName = task.name; return this.queue.createWorker(queueName, async (data) => { log.debug(`Received task in queue ${queueName}`); @@ -59,6 +56,8 @@ export class FlowTaskWorker[]> const payload = error instanceof Error ? error.message : JSON.stringify(error); + log.debug("Error in worker (detailed trace): ", error); + return { status: "error", taskId: data.taskId, @@ -70,28 +69,82 @@ export class FlowTaskWorker[]> }); } - public async start() { + public async prepareTasks(tasks: Task[]) { + log.info("Preparing tasks..."); + // Call all task's prepare() method // Call them in order of registration, because the prepare methods // might depend on each other or a result that is saved in a DI singleton - for (const task of this.tasks) { + for (const task of tasks) { // eslint-disable-next-line no-await-in-loop await task.prepare(); } - this.workers = this.tasks.map((task: Task) => - this.initHandler< - InferTaskInput, - InferTaskOutput - >(task) + const newWorkers = Object.fromEntries( + tasks + .filter((x) => !(x instanceof AbstractStartupTask)) + .map((task) => [task.name, this.initHandler(task)]) + ); + this.workers = { + ...this.workers, + ...newWorkers, + }; + } + + public async start() { + function isAbstractStartupTask( + task: Task + ): task is AbstractStartupTask { + return task instanceof AbstractStartupTask; + } + + function isUnpreparingTask( + task: Task + ): task is UnpreparingTask { + return task instanceof UnpreparingTask; + } + + const startupTasks = this.tasks.filter>( + isAbstractStartupTask ); + + const unpreparingTasks = this.tasks.filter(isUnpreparingTask); + + const normalTasks = this.tasks.filter( + (task) => !isUnpreparingTask(task) && !isAbstractStartupTask(task) + ); + + if (startupTasks.length > 0) { + this.workers = Object.fromEntries( + unpreparingTasks + .concat(startupTasks) + .map((task) => [task.name, this.initHandler(task)]) + ); + + let startupTasksLeft = startupTasks.length; + startupTasks.forEach((task) => { + // The callbacks promise not being awaited is fine here + task.events.on("startup-task-finished", async () => { + await this.workers[task.name].close(); + delete this.workers[task.name]; + startupTasksLeft -= 1; + + if (startupTasksLeft === 0) { + await this.prepareTasks(normalTasks); + } + }); + }); + } else { + await this.prepareTasks(normalTasks.concat(unpreparingTasks)); + } } public async close() { await Promise.all( - this.workers.map(async (worker) => { + Object.values(this.workers).map(async (worker) => { await worker.close(); }) ); + this.workers = {}; } } diff --git a/packages/sequencer/src/worker/worker/LocalTaskWorkerModule.ts b/packages/sequencer/src/worker/worker/LocalTaskWorkerModule.ts index 3eb3ac0b9..2bf35082c 100644 --- a/packages/sequencer/src/worker/worker/LocalTaskWorkerModule.ts +++ b/packages/sequencer/src/worker/worker/LocalTaskWorkerModule.ts @@ -30,6 +30,7 @@ import { CircuitCompilerTask } from "../../protocol/production/tasks/CircuitComp import { FlowTaskWorker } from "./FlowTaskWorker"; import { TaskWorkerModule } from "./TaskWorkerModule"; +import { WorkerRegistrationTask } from "./startup/WorkerRegistrationTask"; // Temporary workaround against the compiler emitting // import("common/dist") inside the library artifacts @@ -89,11 +90,16 @@ export class LocalTaskWorkerModule this.assertIsValidModuleName(moduleName); const task = this.resolve(moduleName); - log.info(`Setting up task ${task.name}`); + log.info(`Resolved task ${task.name}`); return task; }); - const worker = new FlowTaskWorker(this.taskQueue(), tasks); + const registrationTask = this.container.resolve(WorkerRegistrationTask); + + const worker = new FlowTaskWorker(this.taskQueue(), [ + registrationTask, + ...tasks, + ]); await worker.start(); } } diff --git a/packages/sequencer/src/worker/worker/startup/CloseWorkerError.ts b/packages/sequencer/src/worker/worker/startup/CloseWorkerError.ts new file mode 100644 index 000000000..e87e33b4a --- /dev/null +++ b/packages/sequencer/src/worker/worker/startup/CloseWorkerError.ts @@ -0,0 +1,3 @@ +export class CloseWorkerError extends Error { + removeQueueError = true; +} diff --git a/packages/sequencer/src/worker/worker/startup/WorkerRegistrationTask.ts b/packages/sequencer/src/worker/worker/startup/WorkerRegistrationTask.ts new file mode 100644 index 000000000..e08a7da99 --- /dev/null +++ b/packages/sequencer/src/worker/worker/startup/WorkerRegistrationTask.ts @@ -0,0 +1,81 @@ +import { log, noop } from "@proto-kit/common"; +import { inject, injectable } from "tsyringe"; +import { + Protocol, + RuntimeVerificationKeyRootService, +} from "@proto-kit/protocol"; + +import { Task } from "../../flow/Task"; +import { AbstractStartupTask } from "../../flow/AbstractStartupTask"; + +import { CloseWorkerError } from "./CloseWorkerError"; + +export type WorkerStartupPayload = { + runtimeVerificationKeyRoot: bigint; +}; + +@injectable() +export class WorkerRegistrationTask + extends AbstractStartupTask + implements Task +{ + private done = false; + + public constructor( + @inject("Protocol") private readonly protocol: Protocol + ) { + super(); + } + + public name = "worker-registration"; + + public async prepare() { + noop(); + } + + public async compute(input: WorkerStartupPayload) { + if (this.done) { + log.info("Done, trying to close worker"); + throw new CloseWorkerError("Already started"); + } + + const rootService = this.protocol.dependencyContainer.resolve( + RuntimeVerificationKeyRootService + ); + rootService.setRoot(input.runtimeVerificationKeyRoot); + + this.events.emit("startup-task-finished"); + + this.done = true; + return true; + } + + public inputSerializer() { + return { + toJSON: (payload: WorkerStartupPayload) => { + return JSON.stringify({ + runtimeVerificationKeyRoot: + payload.runtimeVerificationKeyRoot.toString(), + }); + }, + fromJSON: (payload: string) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const jsonObject = JSON.parse(payload); + + return { + runtimeVerificationKeyRoot: BigInt( + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + jsonObject.runtimeVerificationKeyRoot + ), + }; + }, + }; + } + + public resultSerializer() { + return { + toJSON: (payload: boolean) => String(payload), + fromJSON: (payload: string) => Boolean(payload), + }; + } +} diff --git a/packages/stack/src/scripts/worker/sequencer.ts b/packages/stack/src/scripts/worker/sequencer.ts index 41c00a5e1..99e9ef4b4 100644 --- a/packages/stack/src/scripts/worker/sequencer.ts +++ b/packages/stack/src/scripts/worker/sequencer.ts @@ -11,7 +11,6 @@ import { MinaBaseLayer, TimedBlockTrigger, DatabasePruneModule, - ProtocolStartupModule, } from "@proto-kit/sequencer"; import { VanillaGraphqlModules, @@ -31,7 +30,6 @@ export const sequencer = AppChain.from({ BaseLayer: MinaBaseLayer, BlockTrigger: TimedBlockTrigger, DatabasePruneModule: DatabasePruneModule, - ProtocolStartupModule: ProtocolStartupModule, GraphqlServer: GraphqlServer, Graphql: GraphqlSequencerModule.from({ modules: VanillaGraphqlModules.with({}),