diff --git a/.env b/.env index 3e64455..7eff976 100644 --- a/.env +++ b/.env @@ -9,6 +9,7 @@ POSTGRES_PORT=127.0.0.1:5512 POSTGRES_DB=query POSTGRES_USER=productopener POSTGRES_PASSWORD=productopener +POSTGRES_SHM_SIZE=256m COMMON_NET_NAME=po_default MONGO_URI=mongodb://localhost:27017 #REDIS_URL=redis://localhost:6379 diff --git a/.github/workflows/container-deploy.yml b/.github/workflows/container-deploy.yml index bf173ac..9fa9069 100644 --- a/.github/workflows/container-deploy.yml +++ b/.github/workflows/container-deploy.yml @@ -34,6 +34,7 @@ jobs: echo "COMMON_NET_NAME=po_webnet" >> $GITHUB_ENV echo "MONGO_URI=mongodb://10.1.0.200:27017" >> $GITHUB_ENV echo "REDIS_URL=redis://redis:6379" >> $GITHUB_ENV + echo "POSTGRES_SHM_SIZE=1g" >> $GITHUB_ENV - name: Set various variable for production deployment if: matrix.env == 'off-query-org' run: | @@ -46,6 +47,7 @@ jobs: # mongodb and redis (through stunnel) echo "MONGO_URI=mongodb://10.1.0.102:27017" >> $GITHUB_ENV echo "REDIS_URL=redis://10.1.0.122:6379" >> $GITHUB_ENV + echo "POSTGRES_SHM_SIZE=256m" >> $GITHUB_ENV - name: Wait for container build workflow uses: tomchv/wait-my-workflow@v1.1.0 id: wait-build @@ -124,6 +126,7 @@ jobs: echo "COMMON_NET_NAME=${{ env.COMMON_NET_NAME }}" >> .env echo "MONGO_URI=${{ env.MONGO_URI }}" >> .env echo "REDIS_URL=${{ env.REDIS_URL }}" >> .env + echo "POSTGRES_SHM_SIZE=${{ env.POSTGRES_SHM_SIZE }}" >> .env echo "LOG_LEVEL=log" >> .env diff --git a/docker-compose.yml b/docker-compose.yml index 2ff1050..1f245f9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,7 @@ services: - POSTGRES_USER - POSTGRES_PASSWORD - POSTGRES_DB + shm_size: ${POSTGRES_SHM_SIZE} volumes: - dbdata:/var/lib/postgresql/data networks: diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index 7f50450..a6b4b5e 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -1,6 +1,6 @@ import { DomainModule } from '../domain.module'; import { ImportService } from './import.service'; -import { EntityManager } from '@mikro-orm/core'; +import { EntityManager } from '@mikro-orm/postgresql'; import { Product } from '../entities/product'; import { ProductIngredientsTag } from '../entities/product-tags'; import { createTestingModule, randomCode } from '../../../test/test.helper'; @@ -13,7 +13,6 @@ import { createClient } from 'redis'; import { GenericContainer } from 'testcontainers'; import { setTimeout } from 'timers/promises'; -let index = 0; const lastModified = 1692032161; function testProducts() { @@ -36,24 +35,30 @@ function testProducts() { return { products, productIdExisting, productIdNew }; } -// Need to specify argumet list bellow so that calls in the assertion is typed -// eslint-disable-next-line @typescript-eslint/no-unused-vars -const findMock = jest.fn((_filter, _projection) => ({ - next: async () => { - return index++ <= mockedProducts.length ? mockedProducts[index - 1] : null; - }, - close: jest.fn(), -})); +const findCalls = []; jest.mock('mongodb', () => { return { MongoClient: jest.fn(() => ({ connect: jest.fn(), - db: () => ({ - collection: () => ({ - find: findMock, - }), - }), + db: () => { + let index = 0; + return { + collection: () => ({ + find: (...args: any) => { + findCalls.push(args); + return { + next: async () => { + return index++ <= mockedProducts.length + ? mockedProducts[index - 1] + : null; + }, + close: jest.fn(), + }; + }, + }), + }; + }, close: jest.fn(), })), }; @@ -61,12 +66,12 @@ jest.mock('mongodb', () => { let mockedProducts = []; function mockMongoDB(productList) { - index = 0; mockedProducts = productList; } // Import tests can sometimes take a little time in GitHub -jest.setTimeout(10000); +// Plus Allow a little time for the testcontainer to start +jest.setTimeout(300000); describe('importFromMongo', () => { it('should import a new product update existing products and delete missing products', async () => { @@ -103,7 +108,7 @@ describe('importFromMongo', () => { await importService.importFromMongo(); - // THEN: New product is addeded, updated product is updated and other product is unchanged + // THEN: New product is added, updated product is updated and other product is unchanged expect(deleteMock).toHaveBeenCalledTimes(1); let updateId = deleteMock.mock.calls[0][0]; // Re-format updateId the way Postgres provides it @@ -213,12 +218,12 @@ describe('importFromMongo', () => { // WHEN: Doing an incremental import from MongoDB mockMongoDB(products); - findMock.mockClear(); + findCalls.length = 0; await importService.importFromMongo(''); // THEN: Mongo find is called with the setting as a parameter - expect(findMock).toHaveBeenCalledTimes(2); // Called for normal an obsolete prodocuts - expect(findMock.mock.calls[0][0].last_modified_t.$gt).toBe( + expect(findCalls).toHaveLength(2); // Called for normal an obsolete prodocuts + expect(findCalls[0][0].last_modified_t.$gt).toBe( Math.floor(startFrom.getTime() / 1000), ); @@ -228,9 +233,9 @@ describe('importFromMongo', () => { }); }); - it('should cope with nul charactes', async () => { + it('should cope with nul characters', async () => { await createTestingModule([DomainModule], async (app) => { - // WHEN: Impoting data containing nul characters + // WHEN: Importing data containing nul characters const { productIdNew } = testProducts(); mockMongoDB([ { @@ -269,7 +274,6 @@ describe('importFromMongo', () => { // WHEN: Doing an import from MongoDB mockMongoDB(testData); - findMock.mockClear(); await importService.importFromMongo(''); // THEN: The last modified date is set correctly @@ -346,9 +350,30 @@ describe('ProductTag', () => { }); }); +describe('importWithFilter', () => { + it('should not get an error with concurrent imports', async () => { + await createTestingModule([DomainModule], async (app) => { + const importService = app.get(ImportService); + + // WHEN: Doing an incremental import from MongoDB + const { products, productIdExisting, productIdNew } = testProducts(); + mockMongoDB(products); + const imports = []; + // Need more than 10 concurrent imports to start to see errors + for (let i = 0; i < 11; i++) { + imports.push( + importService.importWithFilter( + { code: { $in: [productIdExisting, productIdNew] } }, + ProductSource.EVENT, + ), + ); + } + await Promise.all(imports); + }); + }); +}); + describe('receiveMessages', () => { - // Allow a little time for the testcontainer to start - jest.setTimeout(30000); it('should call importwithfilter when a message is received', async () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: Redis is running diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index 3ceef49..08009a3 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -131,7 +131,7 @@ export class ImportService { await client.close(); // Tags are popualted using raw SQL from the data field - await this.updateTags(updateId, fullImport); + await this.updateTags(updateId, fullImport, source); // If doing a full import delete all products that weren't updated if (fullImport) { @@ -236,14 +236,21 @@ export class ImportService { * SQL is then run to insert this into the individual tag tables. * This was found to be quicker than using ORM functionality */ - async updateTags(updateId: string, fullImport = false) { + async updateTags( + updateId: string, + fullImport = false, + source = ProductSource.FULL_LOAD, + ) { + // Commit after each tag for bulk (non-Redis) loads to minimise server snapshot size + const commitPerTag = source !== ProductSource.EVENT; + this.logger.debug(`Updating tags for updateId: ${updateId}`); const connection = this.em.getConnection(); // Fix ingredients let logText = `Updated ingredients`; - await connection.execute('begin'); + if (commitPerTag) await connection.execute('begin'); const deleted = await connection.execute( `delete from product_ingredient where product_id in (select id from product @@ -329,7 +336,7 @@ export class ImportService { affectedRows = results['affectedRows']; logText += ` > ${affectedRows}`; } - await connection.execute('commit'); + if (commitPerTag) await connection.execute('commit'); this.logger.debug(logText + ' rows'); for (const [tag, entity] of Object.entries(ProductTagMap.MAPPED_TAGS)) { @@ -337,7 +344,7 @@ export class ImportService { // Get the underlying table name for the entity const tableName = this.em.getMetadata(entity).tableName; - await connection.execute('begin'); + if (commitPerTag) await connection.execute('begin'); // Delete existing tags for products that were imorted on this run const deleted = await connection.execute( @@ -359,8 +366,7 @@ export class ImportService { 'run', ); - // Commit after each tag to minimise server snapshot size - await connection.execute('commit'); + if (commitPerTag) await connection.execute('commit'); // If this is a full load we can flag the tag as now available for query if (fullImport) { diff --git a/src/mikro-orm.config.ts b/src/mikro-orm.config.ts index 7fff296..2fd8482 100644 --- a/src/mikro-orm.config.ts +++ b/src/mikro-orm.config.ts @@ -7,6 +7,7 @@ import { defineConfig, } from '@mikro-orm/core'; import { SCHEMA } from './constants'; +import { Logger } from '@nestjs/common'; class DateTimeNtzType extends DateTimeType { getColumnType(): string { @@ -14,6 +15,8 @@ class DateTimeNtzType extends DateTimeType { } } +const logger = new Logger('MikroOrm'); + export default defineConfig({ entities: ['./dist/domain/entities'], entitiesTs: ['./src/domain/entities'], @@ -47,6 +50,17 @@ export default defineConfig({ path: 'dist/migrations', pathTs: 'src/migrations', }, + pool: { + afterCreate: function (conn: any, done: any) { + // issue a query to verify SQL connection is working + conn.query('select 1 as result', function (err) { + conn.on('notice', function (msg) { + logger.error('Notice from PostgreSQL: ' + msg.message); + }); + done(err, conn); + }); + }, + }, // Uncomment the below and 'app.useLogger(new Logger());' to the test to see Mikro-ORM logs // debug: ['query', 'query-params'], }); diff --git a/test/test.helper.ts b/test/test.helper.ts index 05e0da6..f15a199 100644 --- a/test/test.helper.ts +++ b/test/test.helper.ts @@ -1,5 +1,6 @@ import { MikroORM, RequestContext } from '@mikro-orm/core'; import { logger } from '@mikro-orm/nestjs'; +import { ConsoleLogger } from '@nestjs/common'; import { Test, TestingModule } from '@nestjs/testing'; import { randomBytes } from 'crypto'; @@ -7,20 +8,41 @@ export function randomCode() { return 'TEST-' + randomBytes(20).toString('base64'); } +class TestLogger extends ConsoleLogger { + errors = new Array(); + expectedErrors = 0; + constructor() { + super(); + this.setLogLevels(['error']); + } + error(message: string, ...rest: any[]) { + this.errors.push(message); + if (this.errors.length > this.expectedErrors) { + super.error(message, ...rest); + } + } + assertExpectedErrors() { + expect(this.errors).toHaveLength(this.expectedErrors); + } +} + export async function createTestingModule( imports: any[], - callback: { (app: TestingModule): Promise }, + callback: { (app: TestingModule, logger: TestLogger): Promise }, ) { + const testLogger = new TestLogger(); const app = await Test.createTestingModule({ imports: imports, }).compile(); + app.useLogger(testLogger); await app.init(); const orm = app.get(MikroORM); try { await RequestContext.createAsync(orm.em, async () => { - await callback(app); + await callback(app, testLogger); }); + testLogger.assertExpectedErrors(); } catch (e) { (e.errors ?? [e]).map((e) => logger.error(e.message, e.stack)); throw e;