Skip to content

Commit

Permalink
Fix notifier
Browse files Browse the repository at this point in the history
Fixes
  • Loading branch information
Boldizsar Mezei committed May 7, 2024
1 parent 5eac8dc commit d0892e3
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 175 deletions.
2 changes: 1 addition & 1 deletion packages/database/src/pg/impl/knex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export const getKnex = () => {
},
pool: {
min: Number(process.env.DB_POOL_MIN || 0),
max: Number(process.env.DB_POOL_MAX || 20),
max: Number(process.env.DB_POOL_MAX || 10),
},
});
}
Expand Down
24 changes: 7 additions & 17 deletions packages/functions/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/* eslint-disable import/namespace */
/* eslint-disable @typescript-eslint/no-var-requires */
require('dotenv').config({ path: __dirname + '/.env' });
import { BaseRecord, PgChanges, database } from '@buildcore/database';
import { BaseRecord, database } from '@buildcore/database';
import { WEN_FUNC } from '@buildcore/interfaces';
import cors from 'cors';
import dayjs from 'dayjs';
import express from 'express';
import { get, head } from 'lodash';
import { get } from 'lodash';
import { flattenObject } from './common';
import * as onScheduled from './runtime/cron/index';
import { ScheduledFunction } from './runtime/cron/scheduled';
Expand Down Expand Up @@ -60,25 +60,15 @@ Object.entries(flattenObject(onTriggers)).forEach(([name, config]) => {
app.post(`/${name}`, jsonParser, loggingMiddleware(name), async (req, res) => {
const pubSubMessage = req.body.message.data;
const raw = Buffer.from(pubSubMessage, 'base64').toString().trim();
const processId = JSON.parse(raw).processId;
let snap: PgChanges | undefined = undefined;

const docRef = database().getCon()('changes').where({ uid: processId });
const change = JSON.parse(raw);
try {
snap = head(await docRef);
if (!snap) {
return;
}

await docRef.delete();

await (config as TriggeredFunction).handler({
...snap.change,
prev: snap.change!.prev || undefined,
curr: snap.change!.curr || undefined,
...change,
prev: change!.prev || undefined,
curr: change!.curr || undefined,
} as PgDocEvent<BaseRecord>);
} catch (error) {
logger.error('onTriggers-error', name, snap, error);
logger.error('onTriggers-error', name, change, error);
} finally {
res.sendStatus(200);
}
Expand Down
36 changes: 11 additions & 25 deletions packages/functions/test-tangle/faucet.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Network, NetworkAddress, Timestamp } from '@buildcore/interfaces';

import { Client } from '@iota/sdk';
import { database } from '@buildcore/database';
import { Network, NetworkAddress, Timestamp, getMilestoneCol } from '@buildcore/interfaces';
import { MnemonicService } from '../src/services/wallet/mnemonic';
import { Wallet } from '../src/services/wallet/wallet';
import { AddressDetails } from '../src/services/wallet/wallet.service';
Expand Down Expand Up @@ -31,10 +30,7 @@ export const requestFundsFromFaucet = async (
? { expiresAt, returnAddressBech32: faucetAddress.bech32 }
: undefined,
});
const ledgerInclusionState = await awaitLedgerInclusionState(blockId);
if (ledgerInclusionState === 'included') {
return { blockId, faucetAddress };
}
await awaitLedgerInclusionState(network, blockId);
} catch (e) {
console.log(e);
} finally {
Expand All @@ -55,10 +51,7 @@ export const requestFundsForManyFromFaucet = async (
try {
await MnemonicService.store(faucetAddress.bech32, faucetAddress.mnemonic, network);
const blockId = await wallet.sendToMany(faucetAddress, targets, {});
const ledgerInclusionState = await awaitLedgerInclusionState(blockId);
if (ledgerInclusionState === 'included') {
return blockId;
}
await awaitLedgerInclusionState(network, blockId);
} catch (e) {
console.log(e);
} finally {
Expand Down Expand Up @@ -86,10 +79,7 @@ export const requestMintedTokenFromFaucet = async (
nativeTokens: [{ id: tokenId, amount: BigInt(amount) }],
storageDepositSourceAddress: targetAddress.bech32,
});
const ledgerInclusionState = await awaitLedgerInclusionState(blockId);
if (ledgerInclusionState === 'included') {
return blockId;
}
await awaitLedgerInclusionState(Network.RMS, blockId);
} catch {
// do nothing
} finally {
Expand All @@ -100,20 +90,16 @@ export const requestMintedTokenFromFaucet = async (
throw Error('Could not get native tokens from faucet');
};

export const awaitLedgerInclusionState = async (blockId: string) => {
let ledgerInclusionState: string | undefined = '';
const client = new Client({ nodes: ['https://api.testnet.shimmer.network'] });
export const awaitLedgerInclusionState = async (network: Network, blockId: string) => {
await wait(async () => {
ledgerInclusionState = await getLedgerInclusionState(client, blockId);
return ledgerInclusionState !== undefined;
const block = await database()
.getCon()(getMilestoneCol(network) + '_transactions')
.where({ uid: blockId })
.first();
return block !== undefined;
}, 120);
await client.destroy();
return ledgerInclusionState;
};

const getLedgerInclusionState = async (client: Client, blockId: string) =>
(await client.getBlockMetadata(blockId)).ledgerInclusionState;

export const getFaucetMnemonic = (network: Network) => {
const urls = network === Network.ATOI ? ATOI_FAUCET_MNEMONIC : RMS_FAUCET_MNEMONIC;
return urls[getRandomIndex(urls)];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('Minted nft trading', () => {
);
await MnemonicService.store(address.bech32, address.mnemonic, Network.RMS);

await awaitLedgerInclusionState(blockId);
await awaitLedgerInclusionState(helper.network, blockId);

await helper.walletService!.send(
address,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe('Minted nft trading', () => {
);
await MnemonicService.store(address.bech32, address.mnemonic, Network.RMS);

await awaitLedgerInclusionState(blockId);
await awaitLedgerInclusionState(helper.network, blockId);

await helper.walletService!.send(
address,
Expand Down
2 changes: 1 addition & 1 deletion packages/functions/test-tangle/swap/swap_1.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('Swap control test', () => {
const blockId = await h.wallet.send(source, swapOrder.payload.targetAddress!, 0, {
nativeTokens: [{ id: MINTED_TOKEN_ID_1, amount: BigInt(5) }],
});
await awaitLedgerInclusionState(blockId);
await awaitLedgerInclusionState(h.network, blockId);
await MnemonicService.store(source.bech32, source.mnemonic, h.network);

await h.wallet.send(source, swapOrder.payload.targetAddress!, 0, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,13 @@ describe('Collection minting', () => {
helper.guardianAddress!,
collectionMetadata,
);
const collectionBlockState = await awaitLedgerInclusionState(collectionMintingBlock);
if (collectionBlockState !== 'included') {
fail();
}
await awaitLedgerInclusionState(helper.network, collectionMintingBlock);

const [nftMintingBlock, nftId] = await mintNft(helper.guardianAddress!, {
collectionId,
...nftMetadata,
});
const nftBlockState = await awaitLedgerInclusionState(nftMintingBlock);
if (nftBlockState !== 'included') {
fail();
}
await awaitLedgerInclusionState(helper.network, nftMintingBlock);

mockWalletReturnValue(helper.guardian!, { network: helper.network });
const depositOrder = await testEnv.wrap<Transaction>(WEN_FUNC.depositNft);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,13 @@ describe('Collection minting', () => {
helper.guardianAddress!,
collectionMetadata,
);
const collectionBlockState = await awaitLedgerInclusionState(collectionMintingBlock);
if (collectionBlockState !== 'included') {
fail();
}
await awaitLedgerInclusionState(helper.network, collectionMintingBlock);

const [nftMintingBlock, nftId] = await mintNft(helper.guardianAddress!, {
collectionId,
...nftMetadata,
});
const nftBlockState = await awaitLedgerInclusionState(nftMintingBlock);
if (nftBlockState !== 'included') {
fail();
}
await awaitLedgerInclusionState(helper.network, nftMintingBlock);

mockWalletReturnValue(helper.guardian!, { network: helper.network });
const depositOrder = await testEnv.wrap<Transaction>(WEN_FUNC.depositNft);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,13 @@ describe('Collection minting', () => {
helper.guardianAddress!,
collectionMetadata,
);
const collectionBlockState = await awaitLedgerInclusionState(collectionMintingBlock);
if (collectionBlockState !== 'included') {
fail();
}
await awaitLedgerInclusionState(helper.network, collectionMintingBlock);

const [nftMintingBlock, nftId] = await mintNft(helper.guardianAddress!, {
collectionId,
...nftMetadata,
});
const nftBlockState = await awaitLedgerInclusionState(nftMintingBlock);
if (nftBlockState !== 'included') {
fail();
}
await awaitLedgerInclusionState(helper.network, nftMintingBlock);

mockWalletReturnValue(helper.guardian!, { network: helper.network });
const depositOrder = await testEnv.wrap<Transaction>(WEN_FUNC.depositNft);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,13 @@ describe('Collection minting', () => {
helper.guardianAddress!,
collectionMetadata,
);
const collectionBlockState = await awaitLedgerInclusionState(collectionMintingBlock);
if (collectionBlockState !== 'included') {
fail();
}
await awaitLedgerInclusionState(helper.network, collectionMintingBlock);

const [nftMintingBlock, nftId] = await mintNft(helper.guardianAddress!, {
collectionId,
...nftMetadata,
});
const nftBlockState = await awaitLedgerInclusionState(nftMintingBlock);
if (nftBlockState !== 'included') {
fail();
}
await awaitLedgerInclusionState(helper.network, nftMintingBlock);

mockWalletReturnValue(helper.guardian!, { network: helper.network });
const depositOrder = await testEnv.wrap<Transaction>(WEN_FUNC.depositNft);
Expand Down
92 changes: 0 additions & 92 deletions packages/functions/test/milestone.sync.ts

This file was deleted.

4 changes: 3 additions & 1 deletion packages/functions/test/notifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ const notifier = async () => {
};

const notifyTriggers = async (channel: string, changeId: string) => {
const body = { message: { data: btoa(JSON.stringify({ processId: Number(changeId) })) } };
const change = await knex('changes').select('*').where({ uid: changeId });
const body = { message: { data: btoa(JSON.stringify(change[0].change)) } };

let error: any = undefined;
for (let i = 0; i < 5; ++i) {
Expand All @@ -80,6 +81,7 @@ const notifyTriggers = async (channel: string, changeId: string) => {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
await knex('change').delete().where({ uid: changeId });
throw error;
};

Expand Down
15 changes: 11 additions & 4 deletions packages/notifier/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,17 @@ const getTriggerTopic = (topic: string) => {
return triggerTopics[topic];
};

const notifyTriggers = async (channel: string, processId: string) => {
await getTriggerTopic(channel).publishMessage({
data: Buffer.from(JSON.stringify({ processId: Number(processId) })),
});
const notifyTriggers = async (channel: string, changeId: string) => {
try {
const change = await knex('changes').select('*').where({ uid: changeId });
await getTriggerTopic(channel).publishMessage({
data: Buffer.from(JSON.stringify(change[0].change)),
});
} catch (err) {
logger.error(err)
} finally {
await knex('changes').delete().where({ uid: changeId });
}
};

const upserTopic = pubSub.topic('onupsert');
Expand Down

0 comments on commit d0892e3

Please sign in to comment.