diff --git a/packages/persistance/src/PrismaDatabaseConnection.ts b/packages/persistance/src/PrismaDatabaseConnection.ts index 5032f1cb..54f5cfb3 100644 --- a/packages/persistance/src/PrismaDatabaseConnection.ts +++ b/packages/persistance/src/PrismaDatabaseConnection.ts @@ -137,4 +137,8 @@ export class PrismaDatabaseConnection public async close() { await this.prismaClient.$disconnect(); } + + public async executeInTransaction(f: () => Promise) { + await this.prismaClient.$transaction(f); + } } diff --git a/packages/persistance/src/PrismaRedisDatabase.ts b/packages/persistance/src/PrismaRedisDatabase.ts index 597d0a5c..b3e2fb53 100644 --- a/packages/persistance/src/PrismaRedisDatabase.ts +++ b/packages/persistance/src/PrismaRedisDatabase.ts @@ -17,6 +17,7 @@ import { RedisConnection, RedisConnectionConfig, RedisConnectionModule, + RedisTransaction, } from "./RedisConnection"; export interface PrismaRedisCombinedConfig { @@ -47,6 +48,10 @@ export class PrismaRedisDatabase return this.redis.redisClient; } + public get currentMulti(): RedisTransaction { + return this.redis.currentMulti; + } + public create(childContainerProvider: ChildContainerProvider) { super.create(childContainerProvider); this.prisma.create(childContainerProvider); @@ -77,4 +82,12 @@ export class PrismaRedisDatabase await this.prisma.pruneDatabase(); await this.redis.pruneDatabase(); } + + public async executeInTransaction(f: () => Promise) { + // TODO Long-term we want to somehow make sure we can rollback one data source + // if commiting the other one's transaction fails + await this.prisma.executeInTransaction(async () => { + await this.redis.executeInTransaction(f); + }); + } } diff --git a/packages/persistance/src/RedisConnection.ts b/packages/persistance/src/RedisConnection.ts index e9bd6a9f..a0e92c80 100644 --- a/packages/persistance/src/RedisConnection.ts +++ b/packages/persistance/src/RedisConnection.ts @@ -14,8 +14,11 @@ export interface RedisConnectionConfig { username?: string; } +export type RedisTransaction = ReturnType; + export interface RedisConnection { get redisClient(): RedisClientType; + get currentMulti(): RedisTransaction; } export class RedisConnectionModule @@ -82,4 +85,20 @@ export class RedisConnectionModule public async pruneDatabase() { await this.redisClient.flushDb(); } + + private multi?: RedisTransaction; + + public get currentMulti() { + if (this.multi === undefined) { + throw new Error("Redis multi was access outside of a transaction"); + } + return this.multi; + } + + public async executeInTransaction(f: () => Promise) { + this.multi = this.redisClient.multi(); + await f(); + await this.multi.exec(); + this.multi = undefined; + } } diff --git a/packages/persistance/src/services/prisma/PrismaBlockStorage.ts b/packages/persistance/src/services/prisma/PrismaBlockStorage.ts index 52a06346..bbed9bec 100644 --- a/packages/persistance/src/services/prisma/PrismaBlockStorage.ts +++ b/packages/persistance/src/services/prisma/PrismaBlockStorage.ts @@ -98,45 +98,42 @@ export class PrismaBlockStorage const { prismaClient } = this.connection; - await prismaClient.$transaction([ - prismaClient.transaction.createMany({ - data: block.transactions.map((txr) => - this.transactionMapper.mapOut(txr.tx) - ), - skipDuplicates: true, - }), + await prismaClient.transaction.createMany({ + data: block.transactions.map((txr) => + this.transactionMapper.mapOut(txr.tx) + ), + skipDuplicates: true, + }); - prismaClient.block.create({ - data: { - ...encodedBlock, - beforeNetworkState: - encodedBlock.beforeNetworkState as Prisma.InputJsonObject, - duringNetworkState: - encodedBlock.duringNetworkState as Prisma.InputJsonObject, + await prismaClient.block.create({ + data: { + ...encodedBlock, + beforeNetworkState: + encodedBlock.beforeNetworkState as Prisma.InputJsonObject, + duringNetworkState: + encodedBlock.duringNetworkState as Prisma.InputJsonObject, - transactions: { - createMany: { - data: transactions.map((tx) => { - return { - status: tx.status, - statusMessage: tx.statusMessage, - txHash: tx.txHash, + transactions: { + createMany: { + data: transactions.map((tx) => { + return { + status: tx.status, + statusMessage: tx.statusMessage, + txHash: tx.txHash, - stateTransitions: - tx.stateTransitions as Prisma.InputJsonArray, - protocolTransitions: - tx.protocolTransitions as Prisma.InputJsonArray, - events: tx.events as Prisma.InputJsonArray, - }; - }), - skipDuplicates: true, - }, + stateTransitions: tx.stateTransitions as Prisma.InputJsonArray, + protocolTransitions: + tx.protocolTransitions as Prisma.InputJsonArray, + events: tx.events as Prisma.InputJsonArray, + }; + }), + skipDuplicates: true, }, - - batchHeight: undefined, }, - }), - ]); + + batchHeight: undefined, + }, + }); } public async pushResult(result: BlockResult): Promise { diff --git a/packages/persistance/src/services/prisma/PrismaMessageStorage.ts b/packages/persistance/src/services/prisma/PrismaMessageStorage.ts index 996eaf47..a844f1fd 100644 --- a/packages/persistance/src/services/prisma/PrismaMessageStorage.ts +++ b/packages/persistance/src/services/prisma/PrismaMessageStorage.ts @@ -51,25 +51,24 @@ export class PrismaMessageStorage implements MessageStorage { ); const { prismaClient } = this.connection; - await prismaClient.$transaction([ - prismaClient.transaction.createMany({ - data: transactions, - skipDuplicates: true, - }), - prismaClient.incomingMessageBatch.create({ - data: { - fromMessageHash, - toMessageHash, - messages: { - createMany: { - data: transactions.map((transaction) => ({ - transactionHash: transaction.hash, - })), - }, + await prismaClient.transaction.createMany({ + data: transactions, + skipDuplicates: true, + }); + + await prismaClient.incomingMessageBatch.create({ + data: { + fromMessageHash, + toMessageHash, + messages: { + createMany: { + data: transactions.map((transaction) => ({ + transactionHash: transaction.hash, + })), }, }, - }), - ]); + }, + }); } } diff --git a/packages/persistance/src/services/prisma/PrismaStateService.ts b/packages/persistance/src/services/prisma/PrismaStateService.ts index 32798700..8e34fd22 100644 --- a/packages/persistance/src/services/prisma/PrismaStateService.ts +++ b/packages/persistance/src/services/prisma/PrismaStateService.ts @@ -36,19 +36,17 @@ export class PrismaStateService implements AsyncStateService { mask: this.mask, })); - await prismaClient.$transaction([ - prismaClient.state.deleteMany({ - where: { - path: { - in: this.cache.map((x) => new Decimal(x.key.toString())), - }, - mask: this.mask, + await prismaClient.state.deleteMany({ + where: { + path: { + in: this.cache.map((x) => new Decimal(x.key.toString())), }, - }), - prismaClient.state.createMany({ - data, - }), - ]); + mask: this.mask, + }, + }); + await prismaClient.state.createMany({ + data, + }); this.cache = []; } diff --git a/packages/persistance/src/services/redis/RedisMerkleTreeStore.ts b/packages/persistance/src/services/redis/RedisMerkleTreeStore.ts index 5907b44e..3de87a29 100644 --- a/packages/persistance/src/services/redis/RedisMerkleTreeStore.ts +++ b/packages/persistance/src/services/redis/RedisMerkleTreeStore.ts @@ -34,7 +34,7 @@ export class RedisMerkleTreeStore implements AsyncMerkleTreeStore { } try { - await this.connection.redisClient.mSet(array.flat(1)); + this.connection.currentMulti.mSet(array.flat(1)); } catch (error) { log.error(error); } @@ -62,8 +62,8 @@ export class RedisMerkleTreeStore implements AsyncMerkleTreeStore { public writeNodes(nodes: MerkleTreeNode[]): void { this.cache = this.cache.concat(nodes); // TODO Filter distinct - // We might not even need this, since the distinctness filter might already - // be implicitely done by the layer above (i.e. cachedmtstore) + // We might not even need this, since the distinctness filter might already + // be implicitely done by the layer above (i.e. cachedmtstore) // Leaving this for now until I get to implementing it // const concat = this.cache.concat(nodes); diff --git a/packages/sequencer/src/protocol/production/BatchProducerModule.ts b/packages/sequencer/src/protocol/production/BatchProducerModule.ts index 967d34ac..af2d7815 100644 --- a/packages/sequencer/src/protocol/production/BatchProducerModule.ts +++ b/packages/sequencer/src/protocol/production/BatchProducerModule.ts @@ -22,6 +22,7 @@ import { AsyncStateService } from "../../state/async/AsyncStateService"; import { AsyncMerkleTreeStore } from "../../state/async/AsyncMerkleTreeStore"; import { BlockResult, BlockWithResult } from "../../storage/model/Block"; import { VerificationKeyService } from "../runtime/RuntimeVerificationKeyService"; +import type { Database } from "../../storage/Database"; import { BlockProverParameters } from "./tasks/BlockProvingTask"; import { StateTransitionProofParameters } from "./tasks/StateTransitionTaskParameters"; @@ -81,6 +82,8 @@ export class BatchProducerModule extends SequencerModule { @inject("BatchStorage") private readonly batchStorage: BatchStorage, @inject("BlockTreeStore") private readonly blockTreeStore: AsyncMerkleTreeStore, + @inject("Database") + private readonly database: Database, private readonly traceService: TransactionTraceService, private readonly blockFlowService: BlockTaskFlowService, private readonly blockProofSerializer: BlockProofSerializer, @@ -90,8 +93,11 @@ export class BatchProducerModule extends SequencerModule { } private async applyStateChanges(batch: BatchMetadata) { - await batch.stateService.mergeIntoParent(); - await batch.merkleStore.mergeIntoParent(); + // TODO Introduce Proven and Unproven BlockHashTree stores - for rollbacks + await this.database.executeInTransaction(async () => { + await batch.stateService.mergeIntoParent(); + await batch.merkleStore.mergeIntoParent(); + }); } /** diff --git a/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts b/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts index e245b239..75609e26 100644 --- a/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts +++ b/packages/sequencer/src/protocol/production/sequencing/BlockProducerModule.ts @@ -25,6 +25,7 @@ import { } from "../../../storage/model/Block"; import { CachedStateService } from "../../../state/state/CachedStateService"; import { MessageStorage } from "../../../storage/repositories/MessageStorage"; +import { Database } from "../../../storage/Database"; import { TransactionExecutionService } from "./TransactionExecutionService"; @@ -51,7 +52,8 @@ export class BlockProducerModule extends SequencerModule { private readonly executionService: TransactionExecutionService, @inject("MethodIdResolver") private readonly methodIdResolver: MethodIdResolver, - @inject("Runtime") private readonly runtime: Runtime + @inject("Runtime") private readonly runtime: Runtime, + @inject("Database") private readonly database: Database ) { super(); } @@ -111,10 +113,12 @@ export class BlockProducerModule extends SequencerModule { this.blockTreeStore ); - await blockHashTreeStore.mergeIntoParent(); - await treeStore.mergeIntoParent(); + await this.database.executeInTransaction(async () => { + await blockHashTreeStore.mergeIntoParent(); + await treeStore.mergeIntoParent(); - await this.blockQueue.pushResult(result); + await this.blockQueue.pushResult(result); + }); return result; } @@ -216,9 +220,10 @@ export class BlockProducerModule extends SequencerModule { ); if (block !== undefined) { - await cachedStateService.mergeIntoParent(); - - await this.blockQueue.pushBlock(block); + await this.database.executeInTransaction(async () => { + await cachedStateService.mergeIntoParent(); + await this.blockQueue.pushBlock(block); + }); } this.productionInProgress = false; diff --git a/packages/sequencer/src/storage/Database.ts b/packages/sequencer/src/storage/Database.ts index 78a94772..6013be2a 100644 --- a/packages/sequencer/src/storage/Database.ts +++ b/packages/sequencer/src/storage/Database.ts @@ -1,4 +1,4 @@ -import { StorageDependencyFactory } from "./StorageDependencyFactory"; +import type { StorageDependencyFactory } from "./StorageDependencyFactory"; export interface Database extends StorageDependencyFactory { /** @@ -7,4 +7,6 @@ export interface Database extends StorageDependencyFactory { * everything else will lead to unexpected behaviour and errors */ pruneDatabase(): Promise; + + executeInTransaction(f: () => Promise): Promise; } diff --git a/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts b/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts index 96390958..159c5ceb 100644 --- a/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts +++ b/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts @@ -65,4 +65,8 @@ export class InMemoryDatabase extends SequencerModule implements Database { // at some point that is after startup (which we don't do currently) noop(); } + + public async executeInTransaction(f: () => Promise) { + await f(); + } }