From 08a77b6967ef87e598f07c0b1a3276d23d302017 Mon Sep 17 00:00:00 2001 From: Boldizsar Mezei Date: Wed, 8 May 2024 01:16:07 +0200 Subject: [PATCH] Fix notifier --- packages/database/src/pg/impl/knex.ts | 2 +- packages/functions/src/index.ts | 24 +++++++----------------- packages/functions/test/notifier.ts | 4 +++- packages/notifier/src/index.ts | 15 +++++++++++---- 4 files changed, 22 insertions(+), 23 deletions(-) diff --git a/packages/database/src/pg/impl/knex.ts b/packages/database/src/pg/impl/knex.ts index 43c887c51..ea011de47 100644 --- a/packages/database/src/pg/impl/knex.ts +++ b/packages/database/src/pg/impl/knex.ts @@ -15,7 +15,7 @@ export const getKnex = () => { }, pool: { min: Number(process.env.DB_POOL_MIN || 0), - max: Number(process.env.DB_POOL_MAX || 20), + max: Number(process.env.DB_POOL_MAX || 10), }, }); } diff --git a/packages/functions/src/index.ts b/packages/functions/src/index.ts index c549ce2ee..63a68712a 100644 --- a/packages/functions/src/index.ts +++ b/packages/functions/src/index.ts @@ -1,12 +1,12 @@ /* eslint-disable import/namespace */ /* eslint-disable @typescript-eslint/no-var-requires */ require('dotenv').config({ path: __dirname + '/.env' }); -import { BaseRecord, PgChanges, database } from '@buildcore/database'; +import { BaseRecord, database } from '@buildcore/database'; import { WEN_FUNC } from '@buildcore/interfaces'; import cors from 'cors'; import dayjs from 'dayjs'; import express from 'express'; -import { get, head } from 'lodash'; +import { get } from 'lodash'; import { flattenObject } from './common'; import * as onScheduled from './runtime/cron/index'; import { ScheduledFunction } from './runtime/cron/scheduled'; @@ -60,25 +60,15 @@ Object.entries(flattenObject(onTriggers)).forEach(([name, config]) => { app.post(`/${name}`, jsonParser, loggingMiddleware(name), async (req, res) => { const pubSubMessage = req.body.message.data; const raw = Buffer.from(pubSubMessage, 'base64').toString().trim(); - const processId = JSON.parse(raw).processId; - let snap: PgChanges | undefined = undefined; - - const docRef = database().getCon()('changes').where({ uid: processId }); + const change = JSON.parse(raw); try { - snap = head(await docRef); - if (!snap) { - return; - } - - await docRef.delete(); - await (config as TriggeredFunction).handler({ - ...snap.change, - prev: snap.change!.prev || undefined, - curr: snap.change!.curr || undefined, + ...change, + prev: change!.prev || undefined, + curr: change!.curr || undefined, } as PgDocEvent); } catch (error) { - logger.error('onTriggers-error', name, snap, error); + logger.error('onTriggers-error', name, change, error); } finally { res.sendStatus(200); } diff --git a/packages/functions/test/notifier.ts b/packages/functions/test/notifier.ts index 3965d6e4a..b94f94222 100644 --- a/packages/functions/test/notifier.ts +++ b/packages/functions/test/notifier.ts @@ -65,7 +65,8 @@ const notifier = async () => { }; const notifyTriggers = async (channel: string, changeId: string) => { - const body = { message: { data: btoa(JSON.stringify({ processId: Number(changeId) })) } }; + const change = await knex('changes').select('*').where({ uid: changeId }); + const body = { message: { data: btoa(JSON.stringify(change[0].change)) } }; let error: any = undefined; for (let i = 0; i < 5; ++i) { @@ -80,6 +81,7 @@ const notifyTriggers = async (channel: string, changeId: string) => { await new Promise((resolve) => setTimeout(resolve, 1000)); } } + await knex('change').delete().where({ uid: changeId }); throw error; }; diff --git a/packages/notifier/src/index.ts b/packages/notifier/src/index.ts index ee5714451..e61ab34c1 100644 --- a/packages/notifier/src/index.ts +++ b/packages/notifier/src/index.ts @@ -56,10 +56,17 @@ const getTriggerTopic = (topic: string) => { return triggerTopics[topic]; }; -const notifyTriggers = async (channel: string, processId: string) => { - await getTriggerTopic(channel).publishMessage({ - data: Buffer.from(JSON.stringify({ processId: Number(processId) })), - }); +const notifyTriggers = async (channel: string, changeId: string) => { + try { + const change = await knex('changes').select('*').where({ uid: changeId }); + await getTriggerTopic(channel).publishMessage({ + data: Buffer.from(JSON.stringify(change[0].change)), + }); + } catch (err) { + logger.error(err) + } finally { + await knex('changes').delete().where({ uid: changeId }); + } }; const upserTopic = pubSub.topic('onupsert');