Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Monitoring service as a cron job in indexer #111

Merged
merged 20 commits into from
Nov 6, 2023
Merged
10 changes: 8 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,11 @@
# DATABASE_URL="postgresql://johndoe:randompassword@localhost:5432/mydb?schema=public"
# DATABASE_URL="mongodb://root:[email protected]:27017/chainbridge-explorer-indexer?authSource=admin&retryWrites=true&w=majority"
DATABASE_URL="mongodb://mongo1:30001/chainbridge-explorer-indexer?replicaSet=my-replica-set&authSource=admin&retryWrites=true&w=majority"
CONFIG_SERVER_URL="http://localhost:8080"
STAGE="local"
STAGE="local"
SNS_REGION=
TOPIC_ARN=
CRON_TIME="* */10 * * * *"
INCIDENT_TIME_MINUTES=45
WARNING_TIME_MINUTES=15
INCIDENT_TEMPLATE_PATH="incidentTemplate.ejs"
WARNING_TEMPLATE_PATH="warningTemplate.ejs"
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ ENVIRONMENT="" # testnet || devnet
RPC_URL_CONFIG="[ { "id": DOMAIN_ID, "endpoint": DOMAIN_ENDPOINT } ]"
COINMARKETCAP_API_KEY=""
COINMARKETCAP_API_URL=""
SNS_REGION=
TOPIC_ARN=
CRON_TIME="* */10 * * * *"
INCIDENT_TIME_MINUTES=45
WARNING_TIME_MINUTES=15
INCIDENT_TEMPLATE_PATH="incidentTemplate.ejs"
WARNING_TEMPLATE_PATH="warningTemplate.ejs"
```

### Running locally
Expand Down
8 changes: 4 additions & 4 deletions license-config.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"ignore": [
"*.txt",
"**/*.txt",
"./data",
".github/",
".idea",
"*.yml",
"*.yaml",
"**/*.yml",
"**/*.yaml",
"README.md",
"prisma/",
".dockerignore",
Expand All @@ -18,7 +18,7 @@
"jest.config.js",
".eslintrc.**",
"./scripts/",
"*.ejs"
"**/*.ejs"
],
"license": "header.txt",
"licenseFormats": {
Expand Down
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@
"winston-transport": "^4.5.0"
},
"devDependencies": {
"@aws-sdk/client-sns": "^3.418.0",
"@buildwithsygma/sygma-sdk-core": "^2.1.0",
"@chainsafe/eslint-config": "^1.1.0",
"@polkadot/types": "10.9.1",
"@rushstack/eslint-patch": "^1.2.0",
"@types/chai": "^4.3.5",
"@types/cors": "^2.8.12",
"@types/cron": "^2.4.0",
"@types/ejs": "^3.1.3",
"@types/eslint": "^8",
"@types/mocha": "^10.0.1",
"@types/mongodb": "^4.0.7",
Expand All @@ -63,12 +66,15 @@
"@types/supertest": "^2.0.11",
"axios": "^1.4.0",
"chai": "^4.3.7",
"cron": "^2.4.3",
"cross-fetch": "3.1.5",
"docker-secret": "^1.2.4",
"dotenv": "^10.0.0",
"dotenv-cli": "^4.0.0",
"dotenv-flow": "^3.2.0",
"ejs": "^3.1.9",
"eslint": "^8.21.0",
"luxon": "^3.4.3",
"mocha": "^10.2.0",
"nodemon": "^2.0.12",
"nyc": "^15.1.0",
Expand Down
8 changes: 8 additions & 0 deletions src/indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { healthcheckRoute } from "./healthcheck"
import { OfacComplianceService } from "./services/ofac"
import AccountRepository from "./repository/account"
import CoinMarketCapService from "./services/coinmarketcap/coinmarketcap.service"
import { checkTransferStatus, startCronJob } from "./services/monitoringService"
import { NotificationSender } from "./services/monitoringService/notificationSender"

interface DomainIndexer {
listenToEvents(): Promise<void>
Expand Down Expand Up @@ -94,6 +96,12 @@ async function init(): Promise<{ domainIndexers: Array<DomainIndexer>; app: Fast

const domainsToIndex = getDomainsToIndex(sharedConfig.domains)
const domainIndexers: Array<DomainIndexer> = []

const notificationSender = new NotificationSender(process.env.SNS_REGION!)

const cronTime = process.env.CRON_TIME || "* */10 * * * *"
startCronJob(cronTime, checkTransferStatus, transferRepository, notificationSender)

for (const domain of domainsToIndex) {
const rpcURL = rpcUrlConfig.get(domain.id)
if (!rpcURL) {
Expand Down
24 changes: 22 additions & 2 deletions src/indexer/repository/transfer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
The Licensed Work is (c) 2023 Sygma
SPDX-License-Identifier: LGPL-3.0-only
*/
import { PrismaClient, Transfer, TransferStatus } from "@prisma/client"
import { Deposit, Prisma, PrismaClient, Transfer, TransferStatus } from "@prisma/client"
import { ObjectId } from "mongodb"
import { DecodedDepositLog, DecodedFailedHandlerExecution, DecodedProposalExecutionLog } from "../services/evmIndexer/evmTypes"

export type TransferWithDeposit = Prisma.TransferGetPayload<{
include: {
deposit: true
}
}>

export type TransferMetadata = {
id: string
depositNonce: number
Expand All @@ -14,6 +20,8 @@ export type TransferMetadata = {
fromDomainId: string
toDomainId: string
resourceID: string
timestamp: Date
deposit: Deposit
resource: {
connect: {
id: string
Expand Down Expand Up @@ -158,7 +166,7 @@ class TransferRepository {
}

public async insertFailedTransfer(
{ depositNonce, domainId, message }: Pick<DecodedFailedHandlerExecution, "depositNonce" | "domainId" | "message">,
{ depositNonce, domainId, message, timestamp }: Pick<DecodedFailedHandlerExecution, "depositNonce" | "domainId" | "message" | "timestamp">,
toDomainId: number,
): Promise<Transfer> {
const transferData = {
Expand All @@ -175,6 +183,7 @@ class TransferRepository {
},
},
status: TransferStatus.failed,
timestamp: new Date(timestamp),
message,
}
return await this.transfer.create({ data: transferData })
Expand Down Expand Up @@ -245,6 +254,17 @@ class TransferRepository {
},
})
}

public async findTransfersByStatus(status: TransferStatus): Promise<Array<TransferWithDeposit>> {
return await this.transfer.findMany({
where: {
status: status,
},
include: {
deposit: true,
},
})
}
}

export default TransferRepository
11 changes: 11 additions & 0 deletions src/indexer/services/monitoringService/incidentTemplate.ejs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
INCIDENT,
Transfer with hash <%= txHash %> from domain with id <%= fromDomainId %> is pending for <%= durationInMins %> minutes.

Sincerely,
The Monitoring Service

Tags:

#txHash:<%= txHash %>
#fromDomainId:<%= fromDomainId %>
#durationInMins:<%= durationInMins %>
52 changes: 52 additions & 0 deletions src/indexer/services/monitoringService/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
The Licensed Work is (c) 2023 Sygma
SPDX-License-Identifier: LGPL-3.0-only
*/
import path from "path"
import { CronJob } from "cron"
import { DateTime } from "luxon"
import { TransferStatus } from "@prisma/client"
import ejs from "ejs"
import { convertMillisecondsToMinutes } from "../../../utils/helpers"
import TransferRepository, { TransferWithDeposit } from "../../repository/transfer"
import { logger } from "../../../utils/logger"
import { NotificationSender } from "./notificationSender"

export enum NotificationType {
INCIDENT = "Incident",
WARNING = "Warning",
}

export async function createMessage(templatePath: string, transfer: TransferWithDeposit, durationInMins: number): Promise<string> {
return await ejs.renderFile(path.join(__dirname, templatePath), {
txHash: transfer.deposit!.txHash,
fromDomainId: transfer.fromDomainId,
durationInMins: Math.round(durationInMins),
})
}

export async function checkTransferStatus(transferRepository: TransferRepository, notificationSender: NotificationSender): Promise<void> {
logger.info("Checking pending transfers")

const transfers = await transferRepository.findTransfersByStatus(TransferStatus.pending)
for (const transfer of transfers) {
const duration = Date.now() - transfer.deposit!.timestamp.getTime()
const durationInMins = convertMillisecondsToMinutes(duration)
mj52951 marked this conversation as resolved.
Show resolved Hide resolved
if (durationInMins > Number(process.env.WARNING_TIME_MINUTES)) {
const msg = await createMessage(process.env.WARNING_TEMPLATE_PATH!, transfer, durationInMins)
await notificationSender.sendNotification({ Message: msg, TopicArn: process.env.TOPIC_ARN })
logger.debug("Warning sent")
}
}
}

export function startCronJob(cronTime: string | Date | DateTime, fn: Function, ...args: Parameters<any>): void {
const cronJob = new CronJob(cronTime, () => {
try {
fn(...args)
} catch (err) {
logger.error("Error while executing cron job function", err)
}
})
cronJob.start()
}
23 changes: 23 additions & 0 deletions src/indexer/services/monitoringService/notificationSender/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
The Licensed Work is (c) 2023 Sygma
SPDX-License-Identifier: LGPL-3.0-only
*/
import { PublishCommand, PublishCommandInput, SNSClient } from "@aws-sdk/client-sns"
import { logger } from "../../../../utils/logger"

export class NotificationSender {
private snsClient: SNSClient

constructor(region: string) {
this.snsClient = new SNSClient({ region })
}

public async sendNotification(message: PublishCommandInput): Promise<void> {
try {
logger.debug(`Sending notification message: ${JSON.stringify(message)}`)
await this.snsClient.send(new PublishCommand(message))
} catch (err) {
logger.error(`Error while sending SNS notification: ${JSON.stringify(err)}`)
}
}
}
12 changes: 12 additions & 0 deletions src/indexer/services/monitoringService/warningTemplate.ejs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
WARNING,
Transfer with hash <%= txHash %> from domain with id <%= fromDomainId %> is pending for <%= durationInMins %> minutes.

Sincerely,
The Monitoring Service

Tags:

#type: deposit
#txHash:<%= txHash %>
#fromDomainId:<%= fromDomainId %>
#durationInMins:<%= durationInMins %>
1 change: 1 addition & 0 deletions src/indexer/utils/substrate/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export async function saveFailedHandlerExecution(
depositNonce: Number(depositNonce),
domainId: originDomainId,
message: Buffer.from(error).toString(),
timestamp: timestamp,
},
toDomainId,
)
Expand Down
4 changes: 4 additions & 0 deletions src/utils/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ export function decodeDataHash(data: string): { amount: string; destinationRecip
return result
}

export function convertMillisecondsToMinutes(duration: number): number {
return duration / 1000 / 60
mj52951 marked this conversation as resolved.
Show resolved Hide resolved
}

export function getHandlersMap(bridge: EvmBridgeConfig, provider: ethers.JsonRpcProvider): HandlersMap {
const erc20HandlerContract = Erc20HandlerFactory.connect(bridge.erc20HandlerAddress, provider as unknown as Signer)
const erc721HandlerContract = Erc721HandlerFactory.connect(bridge.erc721HandlerAddress, provider as unknown as Signer)
Expand Down
Loading
Loading