Skip to content

Commit

Permalink
Merge pull request #44 from /issues/43
Browse files Browse the repository at this point in the history
Avoid intermediate commits when importing from Redis
  • Loading branch information
john-gom authored Jun 3, 2024
2 parents 503994d + a8b1869 commit ec22dab
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 35 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ POSTGRES_PORT=127.0.0.1:5512
POSTGRES_DB=query
POSTGRES_USER=productopener
POSTGRES_PASSWORD=productopener
POSTGRES_SHM_SIZE=256m
COMMON_NET_NAME=po_default
MONGO_URI=mongodb://localhost:27017
#REDIS_URL=redis://localhost:6379
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/container-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
echo "COMMON_NET_NAME=po_webnet" >> $GITHUB_ENV
echo "MONGO_URI=mongodb://10.1.0.200:27017" >> $GITHUB_ENV
echo "REDIS_URL=redis://redis:6379" >> $GITHUB_ENV
echo "POSTGRES_SHM_SIZE=1g" >> $GITHUB_ENV
- name: Set various variable for production deployment
if: matrix.env == 'off-query-org'
run: |
Expand All @@ -46,6 +47,7 @@ jobs:
# mongodb and redis (through stunnel)
echo "MONGO_URI=mongodb://10.1.0.102:27017" >> $GITHUB_ENV
echo "REDIS_URL=redis://10.1.0.122:6379" >> $GITHUB_ENV
echo "POSTGRES_SHM_SIZE=256m" >> $GITHUB_ENV
- name: Wait for container build workflow
uses: tomchv/[email protected]
id: wait-build
Expand Down Expand Up @@ -124,6 +126,7 @@ jobs:
echo "COMMON_NET_NAME=${{ env.COMMON_NET_NAME }}" >> .env
echo "MONGO_URI=${{ env.MONGO_URI }}" >> .env
echo "REDIS_URL=${{ env.REDIS_URL }}" >> .env
echo "POSTGRES_SHM_SIZE=${{ env.POSTGRES_SHM_SIZE }}" >> .env
echo "LOG_LEVEL=log" >> .env
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ services:
- POSTGRES_USER
- POSTGRES_PASSWORD
- POSTGRES_DB
shm_size: ${POSTGRES_SHM_SIZE}
volumes:
- dbdata:/var/lib/postgresql/data
networks:
Expand Down
77 changes: 51 additions & 26 deletions src/domain/services/import.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { DomainModule } from '../domain.module';
import { ImportService } from './import.service';
import { EntityManager } from '@mikro-orm/core';
import { EntityManager } from '@mikro-orm/postgresql';
import { Product } from '../entities/product';
import { ProductIngredientsTag } from '../entities/product-tags';
import { createTestingModule, randomCode } from '../../../test/test.helper';
Expand All @@ -13,7 +13,6 @@ import { createClient } from 'redis';
import { GenericContainer } from 'testcontainers';
import { setTimeout } from 'timers/promises';

let index = 0;
const lastModified = 1692032161;

function testProducts() {
Expand All @@ -36,37 +35,43 @@ function testProducts() {
return { products, productIdExisting, productIdNew };
}

// Need to specify argumet list bellow so that calls in the assertion is typed
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const findMock = jest.fn((_filter, _projection) => ({
next: async () => {
return index++ <= mockedProducts.length ? mockedProducts[index - 1] : null;
},
close: jest.fn(),
}));
const findCalls = [];

jest.mock('mongodb', () => {
return {
MongoClient: jest.fn(() => ({
connect: jest.fn(),
db: () => ({
collection: () => ({
find: findMock,
}),
}),
db: () => {
let index = 0;
return {
collection: () => ({
find: (...args: any) => {
findCalls.push(args);
return {
next: async () => {
return index++ <= mockedProducts.length
? mockedProducts[index - 1]
: null;
},
close: jest.fn(),
};
},
}),
};
},
close: jest.fn(),
})),
};
});

let mockedProducts = [];
function mockMongoDB(productList) {
index = 0;
mockedProducts = productList;
}

// Import tests can sometimes take a little time in GitHub
jest.setTimeout(10000);
// Plus Allow a little time for the testcontainer to start
jest.setTimeout(300000);

describe('importFromMongo', () => {
it('should import a new product update existing products and delete missing products', async () => {
Expand Down Expand Up @@ -103,7 +108,7 @@ describe('importFromMongo', () => {

await importService.importFromMongo();

// THEN: New product is addeded, updated product is updated and other product is unchanged
// THEN: New product is added, updated product is updated and other product is unchanged
expect(deleteMock).toHaveBeenCalledTimes(1);
let updateId = deleteMock.mock.calls[0][0];
// Re-format updateId the way Postgres provides it
Expand Down Expand Up @@ -213,12 +218,12 @@ describe('importFromMongo', () => {

// WHEN: Doing an incremental import from MongoDB
mockMongoDB(products);
findMock.mockClear();
findCalls.length = 0;
await importService.importFromMongo('');

// THEN: Mongo find is called with the setting as a parameter
expect(findMock).toHaveBeenCalledTimes(2); // Called for normal an obsolete prodocuts
expect(findMock.mock.calls[0][0].last_modified_t.$gt).toBe(
expect(findCalls).toHaveLength(2); // Called for normal an obsolete prodocuts
expect(findCalls[0][0].last_modified_t.$gt).toBe(
Math.floor(startFrom.getTime() / 1000),
);

Expand All @@ -228,9 +233,9 @@ describe('importFromMongo', () => {
});
});

it('should cope with nul charactes', async () => {
it('should cope with nul characters', async () => {
await createTestingModule([DomainModule], async (app) => {
// WHEN: Impoting data containing nul characters
// WHEN: Importing data containing nul characters
const { productIdNew } = testProducts();
mockMongoDB([
{
Expand Down Expand Up @@ -269,7 +274,6 @@ describe('importFromMongo', () => {

// WHEN: Doing an import from MongoDB
mockMongoDB(testData);
findMock.mockClear();
await importService.importFromMongo('');

// THEN: The last modified date is set correctly
Expand Down Expand Up @@ -346,9 +350,30 @@ describe('ProductTag', () => {
});
});

describe('importWithFilter', () => {
it('should not get an error with concurrent imports', async () => {
await createTestingModule([DomainModule], async (app) => {
const importService = app.get(ImportService);

// WHEN: Doing an incremental import from MongoDB
const { products, productIdExisting, productIdNew } = testProducts();
mockMongoDB(products);
const imports = [];
// Need more than 10 concurrent imports to start to see errors
for (let i = 0; i < 11; i++) {
imports.push(
importService.importWithFilter(
{ code: { $in: [productIdExisting, productIdNew] } },
ProductSource.EVENT,
),
);
}
await Promise.all(imports);
});
});
});

describe('receiveMessages', () => {
// Allow a little time for the testcontainer to start
jest.setTimeout(30000);
it('should call importwithfilter when a message is received', async () => {
await createTestingModule([DomainModule], async (app) => {
// GIVEN: Redis is running
Expand Down
20 changes: 13 additions & 7 deletions src/domain/services/import.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class ImportService {
await client.close();

// Tags are popualted using raw SQL from the data field
await this.updateTags(updateId, fullImport);
await this.updateTags(updateId, fullImport, source);

// If doing a full import delete all products that weren't updated
if (fullImport) {
Expand Down Expand Up @@ -236,14 +236,21 @@ export class ImportService {
* SQL is then run to insert this into the individual tag tables.
* This was found to be quicker than using ORM functionality
*/
async updateTags(updateId: string, fullImport = false) {
async updateTags(
updateId: string,
fullImport = false,
source = ProductSource.FULL_LOAD,
) {
// Commit after each tag for bulk (non-Redis) loads to minimise server snapshot size
const commitPerTag = source !== ProductSource.EVENT;

this.logger.debug(`Updating tags for updateId: ${updateId}`);

const connection = this.em.getConnection();

// Fix ingredients
let logText = `Updated ingredients`;
await connection.execute('begin');
if (commitPerTag) await connection.execute('begin');
const deleted = await connection.execute(
`delete from product_ingredient
where product_id in (select id from product
Expand Down Expand Up @@ -329,15 +336,15 @@ export class ImportService {
affectedRows = results['affectedRows'];
logText += ` > ${affectedRows}`;
}
await connection.execute('commit');
if (commitPerTag) await connection.execute('commit');
this.logger.debug(logText + ' rows');

for (const [tag, entity] of Object.entries(ProductTagMap.MAPPED_TAGS)) {
let logText = `Updated ${tag}`;
// Get the underlying table name for the entity
const tableName = this.em.getMetadata(entity).tableName;

await connection.execute('begin');
if (commitPerTag) await connection.execute('begin');

// Delete existing tags for products that were imorted on this run
const deleted = await connection.execute(
Expand All @@ -359,8 +366,7 @@ export class ImportService {
'run',
);

// Commit after each tag to minimise server snapshot size
await connection.execute('commit');
if (commitPerTag) await connection.execute('commit');

// If this is a full load we can flag the tag as now available for query
if (fullImport) {
Expand Down
14 changes: 14 additions & 0 deletions src/mikro-orm.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import {
defineConfig,
} from '@mikro-orm/core';
import { SCHEMA } from './constants';
import { Logger } from '@nestjs/common';

class DateTimeNtzType extends DateTimeType {
getColumnType(): string {
return 'timestamp';
}
}

const logger = new Logger('MikroOrm');

export default defineConfig({
entities: ['./dist/domain/entities'],
entitiesTs: ['./src/domain/entities'],
Expand Down Expand Up @@ -47,6 +50,17 @@ export default defineConfig({
path: 'dist/migrations',
pathTs: 'src/migrations',
},
pool: {
afterCreate: function (conn: any, done: any) {
// issue a query to verify SQL connection is working
conn.query('select 1 as result', function (err) {
conn.on('notice', function (msg) {
logger.error('Notice from PostgreSQL: ' + msg.message);
});
done(err, conn);
});
},
},
// Uncomment the below and 'app.useLogger(new Logger());' to the test to see Mikro-ORM logs
// debug: ['query', 'query-params'],
});
26 changes: 24 additions & 2 deletions test/test.helper.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,48 @@
import { MikroORM, RequestContext } from '@mikro-orm/core';
import { logger } from '@mikro-orm/nestjs';
import { ConsoleLogger } from '@nestjs/common';
import { Test, TestingModule } from '@nestjs/testing';
import { randomBytes } from 'crypto';

export function randomCode() {
return 'TEST-' + randomBytes(20).toString('base64');
}

class TestLogger extends ConsoleLogger {
errors = new Array<string>();
expectedErrors = 0;
constructor() {
super();
this.setLogLevels(['error']);
}
error(message: string, ...rest: any[]) {
this.errors.push(message);
if (this.errors.length > this.expectedErrors) {
super.error(message, ...rest);
}
}
assertExpectedErrors() {
expect(this.errors).toHaveLength(this.expectedErrors);
}
}

export async function createTestingModule(
imports: any[],
callback: { (app: TestingModule): Promise<void> },
callback: { (app: TestingModule, logger: TestLogger): Promise<void> },
) {
const testLogger = new TestLogger();
const app = await Test.createTestingModule({
imports: imports,
}).compile();
app.useLogger(testLogger);
await app.init();

const orm = app.get(MikroORM);
try {
await RequestContext.createAsync(orm.em, async () => {
await callback(app);
await callback(app, testLogger);
});
testLogger.assertExpectedErrors();
} catch (e) {
(e.errors ?? [e]).map((e) => logger.error(e.message, e.stack));
throw e;
Expand Down

0 comments on commit ec22dab

Please sign in to comment.