Skip to content

Commit

Permalink
Created transactions on Database to do atomic writes for blocks, resu…
Browse files Browse the repository at this point in the history
…lts and batches
  • Loading branch information
rpanic committed Nov 22, 2024
1 parent fc03133 commit bb74c44
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 76 deletions.
4 changes: 4 additions & 0 deletions packages/persistance/src/PrismaDatabaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,8 @@ export class PrismaDatabaseConnection
public async close() {
await this.prismaClient.$disconnect();
}

public async executeInTransaction(f: () => Promise<void>) {
await this.prismaClient.$transaction(f);
}
}
13 changes: 13 additions & 0 deletions packages/persistance/src/PrismaRedisDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
RedisConnection,
RedisConnectionConfig,
RedisConnectionModule,
RedisTransaction,
} from "./RedisConnection";

export interface PrismaRedisCombinedConfig {
Expand Down Expand Up @@ -47,6 +48,10 @@ export class PrismaRedisDatabase
return this.redis.redisClient;
}

public get currentMulti(): RedisTransaction {
return this.redis.currentMulti;
}

public create(childContainerProvider: ChildContainerProvider) {
super.create(childContainerProvider);
this.prisma.create(childContainerProvider);
Expand Down Expand Up @@ -77,4 +82,12 @@ export class PrismaRedisDatabase
await this.prisma.pruneDatabase();
await this.redis.pruneDatabase();
}

public async executeInTransaction(f: () => Promise<void>) {
// TODO Long-term we want to somehow make sure we can rollback one data source
// if commiting the other one's transaction fails
await this.prisma.executeInTransaction(async () => {
await this.redis.executeInTransaction(f);
});
}
}
19 changes: 19 additions & 0 deletions packages/persistance/src/RedisConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ export interface RedisConnectionConfig {
username?: string;
}

export type RedisTransaction = ReturnType<RedisClientType["multi"]>;

export interface RedisConnection {
get redisClient(): RedisClientType;
get currentMulti(): RedisTransaction;
}

export class RedisConnectionModule
Expand Down Expand Up @@ -82,4 +85,20 @@ export class RedisConnectionModule
public async pruneDatabase() {
await this.redisClient.flushDb();
}

private multi?: RedisTransaction;

public get currentMulti() {
if (this.multi === undefined) {
throw new Error("Redis multi was access outside of a transaction");
}
return this.multi;
}

public async executeInTransaction(f: () => Promise<void>) {
this.multi = this.redisClient.multi();
await f();
await this.multi.exec();
this.multi = undefined;
}
}
65 changes: 31 additions & 34 deletions packages/persistance/src/services/prisma/PrismaBlockStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,45 +98,42 @@ export class PrismaBlockStorage

const { prismaClient } = this.connection;

await prismaClient.$transaction([
prismaClient.transaction.createMany({
data: block.transactions.map((txr) =>
this.transactionMapper.mapOut(txr.tx)
),
skipDuplicates: true,
}),
await prismaClient.transaction.createMany({
data: block.transactions.map((txr) =>
this.transactionMapper.mapOut(txr.tx)
),
skipDuplicates: true,
});

prismaClient.block.create({
data: {
...encodedBlock,
beforeNetworkState:
encodedBlock.beforeNetworkState as Prisma.InputJsonObject,
duringNetworkState:
encodedBlock.duringNetworkState as Prisma.InputJsonObject,
await prismaClient.block.create({
data: {
...encodedBlock,
beforeNetworkState:
encodedBlock.beforeNetworkState as Prisma.InputJsonObject,
duringNetworkState:
encodedBlock.duringNetworkState as Prisma.InputJsonObject,

transactions: {
createMany: {
data: transactions.map((tx) => {
return {
status: tx.status,
statusMessage: tx.statusMessage,
txHash: tx.txHash,
transactions: {
createMany: {
data: transactions.map((tx) => {
return {
status: tx.status,
statusMessage: tx.statusMessage,
txHash: tx.txHash,

stateTransitions:
tx.stateTransitions as Prisma.InputJsonArray,
protocolTransitions:
tx.protocolTransitions as Prisma.InputJsonArray,
events: tx.events as Prisma.InputJsonArray,
};
}),
skipDuplicates: true,
},
stateTransitions: tx.stateTransitions as Prisma.InputJsonArray,
protocolTransitions:
tx.protocolTransitions as Prisma.InputJsonArray,
events: tx.events as Prisma.InputJsonArray,
};
}),
skipDuplicates: true,
},

batchHeight: undefined,
},
}),
]);

batchHeight: undefined,
},
});
}

public async pushResult(result: BlockResult): Promise<void> {
Expand Down
33 changes: 16 additions & 17 deletions packages/persistance/src/services/prisma/PrismaMessageStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,24 @@ export class PrismaMessageStorage implements MessageStorage {
);

const { prismaClient } = this.connection;
await prismaClient.$transaction([
prismaClient.transaction.createMany({
data: transactions,
skipDuplicates: true,
}),

prismaClient.incomingMessageBatch.create({
data: {
fromMessageHash,
toMessageHash,
messages: {
createMany: {
data: transactions.map((transaction) => ({
transactionHash: transaction.hash,
})),
},
await prismaClient.transaction.createMany({
data: transactions,
skipDuplicates: true,
});

await prismaClient.incomingMessageBatch.create({
data: {
fromMessageHash,
toMessageHash,
messages: {
createMany: {
data: transactions.map((transaction) => ({
transactionHash: transaction.hash,
})),
},
},
}),
]);
},
});
}
}
22 changes: 10 additions & 12 deletions packages/persistance/src/services/prisma/PrismaStateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,17 @@ export class PrismaStateService implements AsyncStateService {
mask: this.mask,
}));

await prismaClient.$transaction([
prismaClient.state.deleteMany({
where: {
path: {
in: this.cache.map((x) => new Decimal(x.key.toString())),
},
mask: this.mask,
await prismaClient.state.deleteMany({
where: {
path: {
in: this.cache.map((x) => new Decimal(x.key.toString())),
},
}),
prismaClient.state.createMany({
data,
}),
]);
mask: this.mask,
},
});
await prismaClient.state.createMany({
data,
});

this.cache = [];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class RedisMerkleTreeStore implements AsyncMerkleTreeStore {
}

try {
await this.connection.redisClient.mSet(array.flat(1));
this.connection.currentMulti.mSet(array.flat(1));
} catch (error) {
log.error(error);
}
Expand Down Expand Up @@ -62,8 +62,8 @@ export class RedisMerkleTreeStore implements AsyncMerkleTreeStore {
public writeNodes(nodes: MerkleTreeNode[]): void {
this.cache = this.cache.concat(nodes);
// TODO Filter distinct
// We might not even need this, since the distinctness filter might already
// be implicitely done by the layer above (i.e. cachedmtstore)
// We might not even need this, since the distinctness filter might already
// be implicitely done by the layer above (i.e. cachedmtstore)

// Leaving this for now until I get to implementing it
// const concat = this.cache.concat(nodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { AsyncStateService } from "../../state/async/AsyncStateService";
import { AsyncMerkleTreeStore } from "../../state/async/AsyncMerkleTreeStore";
import { BlockResult, BlockWithResult } from "../../storage/model/Block";
import { VerificationKeyService } from "../runtime/RuntimeVerificationKeyService";
import type { Database } from "../../storage/Database";

import { BlockProverParameters } from "./tasks/BlockProvingTask";
import { StateTransitionProofParameters } from "./tasks/StateTransitionTaskParameters";
Expand Down Expand Up @@ -81,6 +82,8 @@ export class BatchProducerModule extends SequencerModule {
@inject("BatchStorage") private readonly batchStorage: BatchStorage,
@inject("BlockTreeStore")
private readonly blockTreeStore: AsyncMerkleTreeStore,
@inject("Database")
private readonly database: Database,
private readonly traceService: TransactionTraceService,
private readonly blockFlowService: BlockTaskFlowService,
private readonly blockProofSerializer: BlockProofSerializer,
Expand All @@ -90,8 +93,11 @@ export class BatchProducerModule extends SequencerModule {
}

private async applyStateChanges(batch: BatchMetadata) {
await batch.stateService.mergeIntoParent();
await batch.merkleStore.mergeIntoParent();
// TODO Introduce Proven and Unproven BlockHashTree stores - for rollbacks
await this.database.executeInTransaction(async () => {
await batch.stateService.mergeIntoParent();
await batch.merkleStore.mergeIntoParent();
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
} from "../../../storage/model/Block";
import { CachedStateService } from "../../../state/state/CachedStateService";
import { MessageStorage } from "../../../storage/repositories/MessageStorage";
import { Database } from "../../../storage/Database";

import { TransactionExecutionService } from "./TransactionExecutionService";

Expand All @@ -51,7 +52,8 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
private readonly executionService: TransactionExecutionService,
@inject("MethodIdResolver")
private readonly methodIdResolver: MethodIdResolver,
@inject("Runtime") private readonly runtime: Runtime<RuntimeModulesRecord>
@inject("Runtime") private readonly runtime: Runtime<RuntimeModulesRecord>,
@inject("Database") private readonly database: Database
) {
super();
}
Expand Down Expand Up @@ -111,10 +113,12 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
this.blockTreeStore
);

await blockHashTreeStore.mergeIntoParent();
await treeStore.mergeIntoParent();
await this.database.executeInTransaction(async () => {
await blockHashTreeStore.mergeIntoParent();
await treeStore.mergeIntoParent();

await this.blockQueue.pushResult(result);
await this.blockQueue.pushResult(result);
});

return result;
}
Expand Down Expand Up @@ -216,9 +220,10 @@ export class BlockProducerModule extends SequencerModule<BlockConfig> {
);

if (block !== undefined) {
await cachedStateService.mergeIntoParent();

await this.blockQueue.pushBlock(block);
await this.database.executeInTransaction(async () => {
await cachedStateService.mergeIntoParent();
await this.blockQueue.pushBlock(block);
});
}

this.productionInProgress = false;
Expand Down
4 changes: 3 additions & 1 deletion packages/sequencer/src/storage/Database.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { StorageDependencyFactory } from "./StorageDependencyFactory";
import type { StorageDependencyFactory } from "./StorageDependencyFactory";

export interface Database extends StorageDependencyFactory {
/**
Expand All @@ -7,4 +7,6 @@ export interface Database extends StorageDependencyFactory {
* everything else will lead to unexpected behaviour and errors
*/
pruneDatabase(): Promise<void>;

executeInTransaction(f: () => Promise<void>): Promise<void>;
}
4 changes: 4 additions & 0 deletions packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ export class InMemoryDatabase extends SequencerModule implements Database {
// at some point that is after startup (which we don't do currently)
noop();
}

public async executeInTransaction(f: () => Promise<void>) {
await f();
}
}

0 comments on commit bb74c44

Please sign in to comment.