From c57cad7fc7d60ccd9ae80d99476fc17c600ae010 Mon Sep 17 00:00:00 2001 From: Pl217 Date: Thu, 14 Mar 2024 16:15:35 +0100 Subject: [PATCH] HPC-9460: Allow for read-only replica DB connection Since Knex doesn't support read-only replicas, we need to allow for a second connection to the replica server. This assumes master and replica DB are on separate servers, but if connection to replica isn't passed, the models behave just like before. The goal is to use master DB for write operations and for transactions and replica DB should be only used for read-only operations. There are lots of changes, but it's mostly passing around of connections, just so that newly introduced replica connection can be used in `find()` method of raw model. --- src/db/index.ts | 220 +++++++++++---------- src/db/models/authGrant.ts | 10 +- src/db/models/authTarget.ts | 4 +- src/db/models/reportingWindowAssignment.ts | 4 +- src/db/util/id-model.ts | 6 +- src/db/util/raw-model.ts | 27 ++- src/db/util/sequelize-model.ts | 14 +- src/db/util/versioned-model.ts | 31 ++- 8 files changed, 175 insertions(+), 141 deletions(-) diff --git a/src/db/index.ts b/src/db/index.ts index aa131349..521bec87 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -111,116 +111,128 @@ export const UTILS = { Cond, }; -const initializeTables = (conn: Knex) => ({ - attachment: attachment(conn), - attachmentPrototype: attachmentPrototype(conn), - attachmentVersion: attachmentVersion(conn), - authGrant: authGrant(conn), - authGrantee: authGrantee(conn), - authGrantLog: authGrantLog(conn), - authInvite: authInvite(conn), - authTarget: authTarget(conn), - authToken: authToken(conn), - blueprint: blueprint(conn), - budgetSegment: budgetSegment(conn), - budgetSegmentBreakdown: budgetSegmentBreakdown(conn), - budgetSegmentBreakdownEntity: budgetSegmentBreakdownEntity(conn), - cache: cache(conn), - category: category(conn), - categoryGroup: categoryGroup(conn), - categoryRef: categoryRef(conn), - client: client(conn), - conditionField: conditionField(conn), - conditionFieldReliesOn: conditionFieldReliesOn(conn), - conditionFieldType: conditionFieldType(conn), - currency: currency(conn), - disaggregationCategory: disaggregationCategory(conn), - disaggregationCategoryGroup: disaggregationCategoryGroup(conn), - disaggregationModel: disaggregationModel(conn), - emergency: emergency(conn), - emergencyLocation: emergencyLocation(conn), - endpointLog: endpointLog(conn), - endpointUsage: endpointUsage(conn), - entitiesAssociation: entitiesAssociation(conn), - entityPrototype: entityPrototype(conn), - expiredData: expiredData(conn), - externalData: externalData(conn), - externalReference: externalReference(conn), - fileAssetEntity: fileAssetEntity(conn), - fileRecord: fileRecord(conn), - flow: flow(conn), - flowLink: flowLink(conn), - flowObject: flowObject(conn), - flowObjectType: flowObjectType(conn), - form: form(conn), - globalCluster: globalCluster(conn), - globalClusterAssociation: globalClusterAssociation(conn), - globalIndicator: globalIndicator(conn), - governingEntity: governingEntity(conn), - governingEntityVersion: governingEntityVersion(conn), - highWater: highWater(conn), - iatiActivity: iatiActivity(conn), - iatiPublisher: iatiPublisher(conn), - iatiRecipientCountry: iatiRecipientCountry(conn), - iatiTransaction: iatiTransaction(conn), - job: job(conn), - jobAssociation: jobAssociation(conn), - legacy: legacy(conn), - location: location(conn), - lookup: lookup(conn), - measurement: measurement(conn), - measurementVersion: measurementVersion(conn), - operation: operation(conn), - operationCluster: operationCluster(conn), - organization: organization(conn), - organizationLocation: organizationLocation(conn), - participant: participant(conn), - participantCountry: participantCountry(conn), - participantOrganization: participantOrganization(conn), - plan: plan(conn), - planEmergency: planEmergency(conn), - planEntity: planEntity(conn), - planEntityVersion: planEntityVersion(conn), - planLocation: planLocation(conn), - planReportingPeriod: planReportingPeriod(conn), - planTag: planTag(conn), - planVersion: planVersion(conn), - planYear: planYear(conn), - procedureEntityPrototype: procedureEntityPrototype(conn), - procedureSection: procedureSection(conn), - procedureSectionField: procedureSectionField(conn), - project: project(conn), - projectContact: projectContact(conn), - projectGlobalClusters: projectGlobalClusters(conn), - projectLocations: projectLocations(conn), - projectVersion: projectVersion(conn), - projectVersionAttachment: projectVersionAttachment(conn), - projectVersionComment: projectVersionComment(conn), - projectVersionField: projectVersionField(conn), - projectVersionGoverningEntity: projectVersionGoverningEntity(conn), - projectVersionHistory: projectVersionHistory(conn), - projectVersionOrganization: projectVersionOrganization(conn), - projectVersionPlan: projectVersionPlan(conn), - projectVersionPlanEntity: projectVersionPlanEntity(conn), - reportDetail: reportDetail(conn), - reportFile: reportFile(conn), - reportingWindow: reportingWindow(conn), - reportingWindowAssignment: reportingWindowAssignment(conn), - tag: tag(conn), - task: task(conn), - unit: unit(conn), - unitType: unitType(conn), - usageYear: usageYear(conn), - workflowStatusOption: workflowStatusOption(conn), - workflowStatusOptionStep: workflowStatusOptionStep(conn), +const initializeTables = (masterConn: Knex, replicaConn?: Knex) => ({ + attachment: attachment(masterConn, replicaConn), + attachmentPrototype: attachmentPrototype(masterConn, replicaConn), + attachmentVersion: attachmentVersion(masterConn, replicaConn), + authGrant: authGrant(masterConn, replicaConn), + authGrantee: authGrantee(masterConn, replicaConn), + authGrantLog: authGrantLog(masterConn, replicaConn), + authInvite: authInvite(masterConn, replicaConn), + authTarget: authTarget(masterConn, replicaConn), + authToken: authToken(masterConn, replicaConn), + blueprint: blueprint(masterConn, replicaConn), + budgetSegment: budgetSegment(masterConn, replicaConn), + budgetSegmentBreakdown: budgetSegmentBreakdown(masterConn, replicaConn), + budgetSegmentBreakdownEntity: budgetSegmentBreakdownEntity( + masterConn, + replicaConn + ), + cache: cache(masterConn, replicaConn), + category: category(masterConn, replicaConn), + categoryGroup: categoryGroup(masterConn, replicaConn), + categoryRef: categoryRef(masterConn, replicaConn), + client: client(masterConn, replicaConn), + conditionField: conditionField(masterConn, replicaConn), + conditionFieldReliesOn: conditionFieldReliesOn(masterConn, replicaConn), + conditionFieldType: conditionFieldType(masterConn, replicaConn), + currency: currency(masterConn, replicaConn), + disaggregationCategory: disaggregationCategory(masterConn, replicaConn), + disaggregationCategoryGroup: disaggregationCategoryGroup( + masterConn, + replicaConn + ), + disaggregationModel: disaggregationModel(masterConn, replicaConn), + emergency: emergency(masterConn, replicaConn), + emergencyLocation: emergencyLocation(masterConn, replicaConn), + endpointLog: endpointLog(masterConn, replicaConn), + endpointUsage: endpointUsage(masterConn, replicaConn), + entitiesAssociation: entitiesAssociation(masterConn, replicaConn), + entityPrototype: entityPrototype(masterConn, replicaConn), + expiredData: expiredData(masterConn, replicaConn), + externalData: externalData(masterConn, replicaConn), + externalReference: externalReference(masterConn, replicaConn), + fileAssetEntity: fileAssetEntity(masterConn, replicaConn), + fileRecord: fileRecord(masterConn, replicaConn), + flow: flow(masterConn, replicaConn), + flowLink: flowLink(masterConn, replicaConn), + flowObject: flowObject(masterConn, replicaConn), + flowObjectType: flowObjectType(masterConn, replicaConn), + form: form(masterConn, replicaConn), + globalCluster: globalCluster(masterConn, replicaConn), + globalClusterAssociation: globalClusterAssociation(masterConn, replicaConn), + globalIndicator: globalIndicator(masterConn, replicaConn), + governingEntity: governingEntity(masterConn, replicaConn), + governingEntityVersion: governingEntityVersion(masterConn, replicaConn), + highWater: highWater(masterConn, replicaConn), + iatiActivity: iatiActivity(masterConn, replicaConn), + iatiPublisher: iatiPublisher(masterConn, replicaConn), + iatiRecipientCountry: iatiRecipientCountry(masterConn, replicaConn), + iatiTransaction: iatiTransaction(masterConn, replicaConn), + job: job(masterConn, replicaConn), + jobAssociation: jobAssociation(masterConn, replicaConn), + legacy: legacy(masterConn, replicaConn), + location: location(masterConn, replicaConn), + lookup: lookup(masterConn, replicaConn), + measurement: measurement(masterConn, replicaConn), + measurementVersion: measurementVersion(masterConn, replicaConn), + operation: operation(masterConn, replicaConn), + operationCluster: operationCluster(masterConn, replicaConn), + organization: organization(masterConn, replicaConn), + organizationLocation: organizationLocation(masterConn, replicaConn), + participant: participant(masterConn, replicaConn), + participantCountry: participantCountry(masterConn, replicaConn), + participantOrganization: participantOrganization(masterConn, replicaConn), + plan: plan(masterConn, replicaConn), + planEmergency: planEmergency(masterConn, replicaConn), + planEntity: planEntity(masterConn, replicaConn), + planEntityVersion: planEntityVersion(masterConn, replicaConn), + planLocation: planLocation(masterConn, replicaConn), + planReportingPeriod: planReportingPeriod(masterConn, replicaConn), + planTag: planTag(masterConn, replicaConn), + planVersion: planVersion(masterConn, replicaConn), + planYear: planYear(masterConn, replicaConn), + procedureEntityPrototype: procedureEntityPrototype(masterConn, replicaConn), + procedureSection: procedureSection(masterConn, replicaConn), + procedureSectionField: procedureSectionField(masterConn, replicaConn), + project: project(masterConn, replicaConn), + projectContact: projectContact(masterConn, replicaConn), + projectGlobalClusters: projectGlobalClusters(masterConn, replicaConn), + projectLocations: projectLocations(masterConn, replicaConn), + projectVersion: projectVersion(masterConn, replicaConn), + projectVersionAttachment: projectVersionAttachment(masterConn, replicaConn), + projectVersionComment: projectVersionComment(masterConn, replicaConn), + projectVersionField: projectVersionField(masterConn, replicaConn), + projectVersionGoverningEntity: projectVersionGoverningEntity( + masterConn, + replicaConn + ), + projectVersionHistory: projectVersionHistory(masterConn, replicaConn), + projectVersionOrganization: projectVersionOrganization( + masterConn, + replicaConn + ), + projectVersionPlan: projectVersionPlan(masterConn, replicaConn), + projectVersionPlanEntity: projectVersionPlanEntity(masterConn, replicaConn), + reportDetail: reportDetail(masterConn, replicaConn), + reportFile: reportFile(masterConn, replicaConn), + reportingWindow: reportingWindow(masterConn, replicaConn), + reportingWindowAssignment: reportingWindowAssignment(masterConn, replicaConn), + tag: tag(masterConn, replicaConn), + task: task(masterConn, replicaConn), + unit: unit(masterConn, replicaConn), + unitType: unitType(masterConn, replicaConn), + usageYear: usageYear(masterConn, replicaConn), + workflowStatusOption: workflowStatusOption(masterConn, replicaConn), + workflowStatusOptionStep: workflowStatusOptionStep(masterConn, replicaConn), }); export type Tables = ReturnType; export type Table = Tables[keyof Tables]; -const initializeRoot = (conn: Knex) => { - const _tables = initializeTables(conn); +const initializeRoot = (masterConn: Knex, replicaConn?: Knex) => { + const _tables = initializeTables(masterConn, replicaConn); return { ...UTILS, ..._tables, diff --git a/src/db/models/authGrant.ts b/src/db/models/authGrant.ts index 5ee41218..93631a02 100644 --- a/src/db/models/authGrant.ts +++ b/src/db/models/authGrant.ts @@ -8,8 +8,8 @@ import { AUTH_GRANTEE_ID } from './authGrantee'; import { AUTH_TARGET_ID } from './authTarget'; import type { ParticipantId } from './participant'; -const authGrantModel = (conn: Knex) => { - const authGrantLog = initAuthGrantLog(conn); +const authGrantModel = (masterConn: Knex, replicaConn?: Knex) => { + const authGrantLog = initAuthGrantLog(masterConn, replicaConn); const model = defineSequelizeModel({ tableName: 'authGrant', @@ -30,7 +30,7 @@ const authGrantModel = (conn: Knex) => { }, }, softDeletionEnabled: false, - })(conn); + })(masterConn, replicaConn); type UserData = UserDataOfModel; type Instance = InstanceOfModel; @@ -61,7 +61,7 @@ const authGrantModel = (conn: Knex) => { return await model.create(data, { trx }); }; - return trx ? createCallback(trx) : conn.transaction(createCallback); + return trx ? createCallback(trx) : masterConn.transaction(createCallback); }; const update = ( @@ -104,7 +104,7 @@ const authGrantModel = (conn: Knex) => { } }; - return trx ? updateCallback(trx) : conn.transaction(updateCallback); + return trx ? updateCallback(trx) : masterConn.transaction(updateCallback); }; const createOrUpdate = async ( diff --git a/src/db/models/authTarget.ts b/src/db/models/authTarget.ts index 72d622f8..ddafa383 100644 --- a/src/db/models/authTarget.ts +++ b/src/db/models/authTarget.ts @@ -22,7 +22,7 @@ const AUTH_TARGET_TYPE = { governingEntity: null, }; -export default (conn: Knex) => { +export default (masterConn: Knex, replicaConn?: Knex) => { const model = defineIDModel({ tableName: 'authTarget', fields: { @@ -47,7 +47,7 @@ export default (conn: Knex) => { }, idField: 'id', softDeletionEnabled: false, - })(conn); + })(masterConn, replicaConn); return { ...model, diff --git a/src/db/models/reportingWindowAssignment.ts b/src/db/models/reportingWindowAssignment.ts index 21b7f5f7..b5182185 100644 --- a/src/db/models/reportingWindowAssignment.ts +++ b/src/db/models/reportingWindowAssignment.ts @@ -106,14 +106,14 @@ export default defineVersionedModel({ }, }, }, - prepare: async (data, conn) => { + prepare: async (data, masterConn, replicaConn) => { let assigneeOperation: OperationId; let assigneeId: number; if (data.assignee.type === 'operation') { assigneeOperation = data.assignee.operation; assigneeId = data.assignee.operation; } else if (data.assignee.type === 'operationCluster') { - const oc = operationCluster(conn); + const oc = operationCluster(masterConn, replicaConn); const cluster = await oc.get(data.assignee.cluster); if (!cluster) { throw new Error( diff --git a/src/db/util/id-model.ts b/src/db/util/id-model.ts index 38fc4bb2..132220c2 100644 --- a/src/db/util/id-model.ts +++ b/src/db/util/id-model.ts @@ -59,7 +59,7 @@ export type ModelWithIdInitializer< | null | keyof F['generated'] | keyof F['generatedCompositeKey'], -> = (conn: Knex) => ModelWithId; +> = (masterConn: Knex, replicaConn?: Knex) => ModelWithId; const hasField = ( data: unknown, @@ -90,7 +90,7 @@ export const defineIDModel = FieldsWithSequelize, IDField > => - (conn) => { + (masterConn, replicaConn) => { const { idField, tableName } = opts; type Fields = FieldsWithSequelize; type ID = IdOf; @@ -102,7 +102,7 @@ export const defineIDModel = hasField(data, idField) ? `${tableName} ${data[idField]}` : `Unknown ${tableName}`, - })(conn); + })(masterConn, replicaConn); const get = (id: ID): Promise => model.findOne({ diff --git a/src/db/util/raw-model.ts b/src/db/util/raw-model.ts index dec163cb..895f54b2 100644 --- a/src/db/util/raw-model.ts +++ b/src/db/util/raw-model.ts @@ -104,7 +104,7 @@ export type InstanceDataOfModel> = M extends Model export type ModelInitializer< F extends FieldDefinition, AdditionalFindArgs = {}, -> = (conn: Knex) => Model; +> = (masterConn: Knex, replicaConn?: Knex) => Model; export const defineRawModel = (opts: { @@ -116,7 +116,7 @@ export const defineRawModel = */ genIdentifier?: (data: unknown) => string; }): ModelInitializer => - (conn): Model => { + (masterConn, replicaConn): Model => { const { tableName, fields, genIdentifier } = opts; const validateAndFilter = (row: unknown) => @@ -128,10 +128,15 @@ export const defineRawModel = const validator = dataValidator(fields); - const tbl = () => conn(tableName); + const masterTable = () => masterConn(tableName); + const replicaTable = replicaConn + ? () => replicaConn(tableName) + : masterTable; const create: CreateFn = async (data, options) => { - const builder = options?.trx ? tbl().transacting(options.trx) : tbl(); + const builder = options?.trx + ? masterTable().transacting(options.trx) + : masterTable(); const res = await builder.insert([data as any]).returning('*'); return validateAndFilter(res[0]); }; @@ -141,7 +146,9 @@ export const defineRawModel = return []; } - const builder = options?.trx ? tbl().transacting(options.trx) : tbl(); + const builder = options?.trx + ? masterTable().transacting(options.trx) + : masterTable(); const res = await builder.insert(data).returning('*'); return res.map(validateAndFilter); }; @@ -154,7 +161,7 @@ export const defineRawModel = skipValidation = false, trx, } = {}) => { - const builder = trx ? tbl().transacting(trx) : tbl(); + const builder = trx ? masterTable().transacting(trx) : replicaTable(); const query = builder.where(prepareCondition(where ?? {})).select('*'); if (limit !== undefined && limit > 0) { @@ -198,7 +205,7 @@ export const defineRawModel = skipValidation = false, trx, }) => { - const builder = trx ? tbl().transacting(trx) : tbl(); + const builder = trx ? masterTable().transacting(trx) : masterTable(); const res = await builder .where(prepareCondition(where || {})) .update(values as any) @@ -212,13 +219,13 @@ export const defineRawModel = }; const destroy: DestroyFn = async ({ where, trx }) => { - const builder = trx ? tbl().transacting(trx) : tbl(); + const builder = trx ? masterTable().transacting(trx) : masterTable(); const count = await builder.where(prepareCondition(where || {})).delete(); return count; }; const truncate: TruncateFn = async (trx) => { - const builder = trx ? tbl().transacting(trx) : tbl(); + const builder = trx ? masterTable().transacting(trx) : masterTable(); await builder.truncate(); }; @@ -226,7 +233,7 @@ export const defineRawModel = _internals: { type: 'single-table', tableName, - query: tbl, + query: masterTable, fields, }, create, diff --git a/src/db/util/sequelize-model.ts b/src/db/util/sequelize-model.ts index c21f1502..df30805c 100644 --- a/src/db/util/sequelize-model.ts +++ b/src/db/util/sequelize-model.ts @@ -94,7 +94,7 @@ export const defineSequelizeModel = FieldsWithSequelize, AdditionalFindArgsForSequelizeTables > => - (conn) => { + (masterConn, replicaConn) => { type Fields = FieldsWithSequelize; const fields: Fields = merge( @@ -106,7 +106,7 @@ export const defineSequelizeModel = const model = defineRawModel({ ...opts, fields, - })(conn); + })(masterConn, replicaConn); const find: FindFn = ( args @@ -154,8 +154,8 @@ export const defineSequelizeModel = return model.create( { ...data, - createdAt: conn.fn.now(3), - updatedAt: conn.fn.now(3), + createdAt: masterConn.fn.now(3), + updatedAt: masterConn.fn.now(3), }, opts ); @@ -165,8 +165,8 @@ export const defineSequelizeModel = return model.createMany( data.map((data) => ({ ...data, - createdAt: conn.fn.now(3), - updatedAt: conn.fn.now(3), + createdAt: masterConn.fn.now(3), + updatedAt: masterConn.fn.now(3), })), opts ); @@ -177,7 +177,7 @@ export const defineSequelizeModel = ...args, values: { ...args.values, - updatedAt: conn.fn.now(3), + updatedAt: masterConn.fn.now(3), }, }); }; diff --git a/src/db/util/versioned-model.ts b/src/db/util/versioned-model.ts index 9d826a49..5ffc983d 100644 --- a/src/db/util/versioned-model.ts +++ b/src/db/util/versioned-model.ts @@ -110,7 +110,10 @@ type VersionedModelInitializer< IDType, Data, LookupColumns extends LookupColumnDefinition, -> = (conn: Knex) => VersionedModel; +> = ( + masterConn: Knex, + replicaConn?: Knex +) => VersionedModel; export type InstanceOfVersionedModel> = M extends VersionedModel @@ -135,10 +138,14 @@ export const defineVersionedModel = data: t.Type; lookupColumns: { columns: LookupColumns; - prepare: (data: Data, conn: Knex) => Promise>; + prepare: ( + data: Data, + masterConn: Knex, + replicaConn?: Knex + ) => Promise>; }; }): VersionedModelInitializer => - (conn) => { + (masterConn, replicaConn) => { const name = opts.tableBaseName; const rootFields = merge( @@ -159,7 +166,7 @@ export const defineVersionedModel = idField: 'id', fields: rootFields, softDeletionEnabled: false, - })(conn); + })(masterConn, replicaConn); const versionModel = defineSequelizeModel({ tableName: `${name}Version`, @@ -196,7 +203,7 @@ export const defineVersionedModel = } return `unknown ${name}Version`; }, - })(conn); + })(masterConn, replicaConn); const createInstance = ({ root: { id }, @@ -237,7 +244,11 @@ export const defineVersionedModel = tablesToClear: () => [`${name}Version`, name], }, create: async (data, modifiedBy) => { - const lookupData = await opts.lookupColumns.prepare(data, conn); + const lookupData = await opts.lookupColumns.prepare( + data, + masterConn, + replicaConn + ); const root = await rootModel.create(lookupData as any); @@ -399,12 +410,16 @@ export const defineVersionedModel = `version ${prev.version} is not the latest version` ); } - const lookupData = await opts.lookupColumns.prepare(data, conn); + const lookupData = await opts.lookupColumns.prepare( + data, + masterConn, + replicaConn + ); // Create new version, update previous version's isLatest flag, // and update lookup tables - return await conn.transaction(async (trx) => { + return await masterConn.transaction(async (trx) => { // Make new version not the latest await versionModel.update({ values: {