Skip to content

Commit

Permalink
Fix notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
Boldizsar Mezei committed May 7, 2024
1 parent 5eac8dc commit 08a77b6
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 23 deletions.
2 changes: 1 addition & 1 deletion packages/database/src/pg/impl/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export const getKnex = () => {
},
pool: {
min: Number(process.env.DB_POOL_MIN || 0),
max: Number(process.env.DB_POOL_MAX || 20),
max: Number(process.env.DB_POOL_MAX || 10),
},
});
}
Expand Down
24 changes: 7 additions & 17 deletions packages/functions/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/* eslint-disable import/namespace */
/* eslint-disable @typescript-eslint/no-var-requires */
require('dotenv').config({ path: __dirname + '/.env' });
import { BaseRecord, PgChanges, database } from '@buildcore/database';
import { BaseRecord, database } from '@buildcore/database';
import { WEN_FUNC } from '@buildcore/interfaces';
import cors from 'cors';
import dayjs from 'dayjs';
import express from 'express';
import { get, head } from 'lodash';
import { get } from 'lodash';
import { flattenObject } from './common';
import * as onScheduled from './runtime/cron/index';
import { ScheduledFunction } from './runtime/cron/scheduled';
Expand Down Expand Up @@ -60,25 +60,15 @@ Object.entries(flattenObject(onTriggers)).forEach(([name, config]) => {
app.post(`/${name}`, jsonParser, loggingMiddleware(name), async (req, res) => {
const pubSubMessage = req.body.message.data;
const raw = Buffer.from(pubSubMessage, 'base64').toString().trim();
const processId = JSON.parse(raw).processId;
let snap: PgChanges | undefined = undefined;

const docRef = database().getCon()('changes').where({ uid: processId });
const change = JSON.parse(raw);
try {
snap = head(await docRef);
if (!snap) {
return;
}

await docRef.delete();

await (config as TriggeredFunction).handler({
...snap.change,
prev: snap.change!.prev || undefined,
curr: snap.change!.curr || undefined,
...change,
prev: change!.prev || undefined,
curr: change!.curr || undefined,
} as PgDocEvent<BaseRecord>);
} catch (error) {
logger.error('onTriggers-error', name, snap, error);
logger.error('onTriggers-error', name, change, error);
} finally {
res.sendStatus(200);
}
Expand Down
4 changes: 3 additions & 1 deletion packages/functions/test/notifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ const notifier = async () => {
};

const notifyTriggers = async (channel: string, changeId: string) => {
const body = { message: { data: btoa(JSON.stringify({ processId: Number(changeId) })) } };
const change = await knex('changes').select('*').where({ uid: changeId });
const body = { message: { data: btoa(JSON.stringify(change[0].change)) } };

let error: any = undefined;
for (let i = 0; i < 5; ++i) {
Expand All @@ -80,6 +81,7 @@ const notifyTriggers = async (channel: string, changeId: string) => {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
await knex('change').delete().where({ uid: changeId });
throw error;
};

Expand Down
15 changes: 11 additions & 4 deletions packages/notifier/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,17 @@ const getTriggerTopic = (topic: string) => {
return triggerTopics[topic];
};

const notifyTriggers = async (channel: string, processId: string) => {
await getTriggerTopic(channel).publishMessage({
data: Buffer.from(JSON.stringify({ processId: Number(processId) })),
});
const notifyTriggers = async (channel: string, changeId: string) => {
try {
const change = await knex('changes').select('*').where({ uid: changeId });
await getTriggerTopic(channel).publishMessage({
data: Buffer.from(JSON.stringify(change[0].change)),
});
} catch (err) {
logger.error(err)
} finally {
await knex('changes').delete().where({ uid: changeId });
}
};

const upserTopic = pubSub.topic('onupsert');
Expand Down

0 comments on commit 08a77b6

Please sign in to comment.