Skip to content

Commit

Permalink
Fix notifier
Browse files Browse the repository at this point in the history
Fixes

Fix

Fix

Fix

Fix

Fixes

Fixes
  • Loading branch information
Boldizsar Mezei committed May 8, 2024
1 parent 5eac8dc commit 07fa77b
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 117 deletions.
2 changes: 1 addition & 1 deletion packages/database/src/pg/impl/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export const getKnex = () => {
},
pool: {
min: Number(process.env.DB_POOL_MIN || 0),
max: Number(process.env.DB_POOL_MAX || 20),
max: Number(process.env.DB_POOL_MAX || 10),
},
});
}
Expand Down
26 changes: 8 additions & 18 deletions packages/functions/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/* eslint-disable import/namespace */
/* eslint-disable @typescript-eslint/no-var-requires */
require('dotenv').config({ path: __dirname + '/.env' });
import { BaseRecord, PgChanges, database } from '@buildcore/database';
import { BaseRecord, database } from '@buildcore/database';
import { WEN_FUNC } from '@buildcore/interfaces';
import cors from 'cors';
import dayjs from 'dayjs';
import express from 'express';
import { get, head } from 'lodash';
import { get } from 'lodash';
import { flattenObject } from './common';
import * as onScheduled from './runtime/cron/index';
import { ScheduledFunction } from './runtime/cron/scheduled';
Expand All @@ -28,7 +28,7 @@ app.use(cors());
app.use(traceMiddleware);

const httpRawParser = express.raw({ type: '*/*', limit: '100mb' });
const jsonParser = express.json();
const jsonParser = express.json({ limit: '50mb' });

const loggingMiddleware = (name: string) =>
isEmulatorEnv()
Expand Down Expand Up @@ -60,25 +60,15 @@ Object.entries(flattenObject(onTriggers)).forEach(([name, config]) => {
app.post(`/${name}`, jsonParser, loggingMiddleware(name), async (req, res) => {
const pubSubMessage = req.body.message.data;
const raw = Buffer.from(pubSubMessage, 'base64').toString().trim();
const processId = JSON.parse(raw).processId;
let snap: PgChanges | undefined = undefined;

const docRef = database().getCon()('changes').where({ uid: processId });
const change = JSON.parse(raw);
try {
snap = head(await docRef);
if (!snap) {
return;
}

await docRef.delete();

await (config as TriggeredFunction).handler({
...snap.change,
prev: snap.change!.prev || undefined,
curr: snap.change!.curr || undefined,
...change,
prev: change!.prev || undefined,
curr: change!.curr || undefined,
} as PgDocEvent<BaseRecord>);
} catch (error) {
logger.error('onTriggers-error', name, snap, error);
logger.error('onTriggers-error', name, change, error);
} finally {
res.sendStatus(200);
}
Expand Down
1 change: 0 additions & 1 deletion packages/functions/test-tangle/faucet.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Network, NetworkAddress, Timestamp } from '@buildcore/interfaces';

import { Client } from '@iota/sdk';
import { MnemonicService } from '../src/services/wallet/mnemonic';
import { Wallet } from '../src/services/wallet/wallet';
Expand Down
92 changes: 0 additions & 92 deletions packages/functions/test/milestone.sync.ts

This file was deleted.

4 changes: 3 additions & 1 deletion packages/functions/test/notifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ const notifier = async () => {
};

const notifyTriggers = async (channel: string, changeId: string) => {
const body = { message: { data: btoa(JSON.stringify({ processId: Number(changeId) })) } };
const change = await knex('changes').select('*').where({ uid: changeId });
const body = { message: { data: Buffer.from(JSON.stringify(change[0].change)) } };

let error: any = undefined;
for (let i = 0; i < 5; ++i) {
Expand All @@ -80,6 +81,7 @@ const notifyTriggers = async (channel: string, changeId: string) => {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
await knex('changes').delete().where({ uid: changeId });
throw error;
};

Expand Down
15 changes: 11 additions & 4 deletions packages/notifier/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,17 @@ const getTriggerTopic = (topic: string) => {
return triggerTopics[topic];
};

const notifyTriggers = async (channel: string, processId: string) => {
await getTriggerTopic(channel).publishMessage({
data: Buffer.from(JSON.stringify({ processId: Number(processId) })),
});
const notifyTriggers = async (channel: string, changeId: string) => {
try {
const change = await knex('changes').select('*').where({ uid: changeId });
await getTriggerTopic(channel).publishMessage({
data: Buffer.from(JSON.stringify(change[0].change)),
});
} catch (err) {
logger.error(err);
} finally {
await knex('changes').delete().where({ uid: changeId });
}
};

const upserTopic = pubSub.topic('onupsert');
Expand Down

0 comments on commit 07fa77b

Please sign in to comment.