Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(observability): utilise new logger in indexer #522

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/indexer/mvm.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"dependencies": {
"@akashnetwork/database": "1.0.0",
"@akashnetwork/env-loader": "1.0.1"
"@akashnetwork/env-loader": "1.0.1",
"@akashnetwork/logging": "2.0.2"
},
"devDependencies": {
"@akashnetwork/dev-config": "1.0.0"
Expand Down
1 change: 1 addition & 0 deletions apps/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"@akashnetwork/akash-api": "^1.3.0",
"@akashnetwork/database": "*",
"@akashnetwork/env-loader": "*",
"@akashnetwork/logging": "*",
"@cosmjs/crypto": "^0.32.4",
"@cosmjs/encoding": "^0.32.4",
"@cosmjs/math": "^0.32.4",
Expand Down
27 changes: 14 additions & 13 deletions apps/indexer/src/chain/chainSync.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { Block, Message } from "@akashnetwork/database/dbSchemas";
import { Day, Transaction } from "@akashnetwork/database/dbSchemas/base";
import { LoggerService } from "@akashnetwork/logging";
import { fromBase64 } from "@cosmjs/encoding";
import { decodeTxRaw } from "@cosmjs/proto-signing";
import { asyncify, eachLimit } from "async";
Expand All @@ -25,6 +26,8 @@ import {
import { nodeAccessor } from "./nodeAccessor";
import { statsProcessor } from "./statsProcessor";

const logger = LoggerService.forContext("ChainSync");

export const setMissingBlock = (height: number) => (missingBlock = height);
let missingBlock: number;

Expand Down Expand Up @@ -83,13 +86,13 @@ export async function syncBlocks() {
const latestHeightInCache = await getLatestHeightInCache();

if (latestHeightInCache >= latestBlockToDownload) {
console.log("No blocks to download");
logger.info("No blocks to download");
} else {
let startHeight = !env.KEEP_CACHE ? latestInsertedHeight + 1 : Math.max(latestHeightInCache, 1);

// If database is empty
if (latestInsertedHeight === 0) {
console.log("Starting from scratch");
logger.info("Starting from scratch");
startHeight = activeChain.startHeight || 1;
}

Expand All @@ -101,13 +104,11 @@ export async function syncBlocks() {

const maxDownloadGroupSize = 1_000;
if (latestBlockToDownload - startHeight > maxDownloadGroupSize) {
console.log("Limiting download to " + maxDownloadGroupSize + " blocks");
logger.info("Limiting download to " + maxDownloadGroupSize + " blocks");
latestBlockToDownload = startHeight + maxDownloadGroupSize;
}

console.log("Starting download at block #" + startHeight);
console.log("Will end download at block #" + latestBlockToDownload);
console.log(latestBlockToDownload - startHeight + 1 + " blocks to download");
const blocksCount = latestBlockToDownload - startHeight + 1
logger.info({ event: 'DOWNLOAD', startHeight, latestBlockToDownload, blocksCount });

await benchmark.measureAsync("downloadBlocks", async () => {
await downloadBlocks(startHeight, latestBlockToDownload);
Expand Down Expand Up @@ -151,7 +152,7 @@ export async function syncBlocks() {

async function insertBlocks(startHeight: number, endHeight: number) {
const blockCount = endHeight - startHeight + 1;
console.log("Inserting " + blockCount + " blocks into database");
logger.info("Inserting " + blockCount + " blocks into database");

let lastInsertedBlock = (await Block.findOne({
include: [
Expand Down Expand Up @@ -242,7 +243,7 @@ async function insertBlocks(startHeight: number, endHeight: number) {
const blockDate = new Date(Date.UTC(blockDatetime.getUTCFullYear(), blockDatetime.getUTCMonth(), blockDatetime.getUTCDate()));

if (!lastInsertedBlock || !isEqual(blockDate, lastInsertedBlock.day.date)) {
console.log("Creating day: ", blockDate, i);
logger.info(`Creating day: ${blockDate} ${i}`);
const [newDay, created] = await Day.findOrCreate({
where: {
date: blockDate
Expand All @@ -256,7 +257,7 @@ async function insertBlocks(startHeight: number, endHeight: number) {
});

if (!created) {
console.warn(`Day ${blockDate} already exists in database`);
logger.warn(`Day ${blockDate} already exists in database`);
}

blockEntry.dayId = newDay.id;
Expand Down Expand Up @@ -287,15 +288,15 @@ async function insertBlocks(startHeight: number, endHeight: number) {
blocksToAdd = [];
txsToAdd = [];
msgsToAdd = [];
console.log(`Blocks added to db: ${i - startHeight + 1} / ${blockCount} (${(((i - startHeight + 1) * 100) / blockCount).toFixed(2)}%)`);
logger.info(`Blocks added to db: ${i - startHeight + 1} / ${blockCount} (${(((i - startHeight + 1) * 100) / blockCount).toFixed(2)}%)`);

if (lastInsertedBlock) {
lastInsertedBlock.day.lastBlockHeightYet = lastInsertedBlock.height;
await lastInsertedBlock.day.save({ transaction: insertDbTransaction });
}
});
} catch (error) {
console.log(error, txsToAdd);
logger.info(`${error}, ${txsToAdd}`);
}
}
}
Expand All @@ -319,7 +320,7 @@ async function downloadBlocks(startHeight: number, endHeight: number) {
if (Date.now() - lastLogDate > 500) {
lastLogDate = Date.now();
console.clear();
console.log("Progress: " + ((downloadedCount * 100) / missingBlockCount).toFixed(2) + "%");
logger.info("Progress: " + ((downloadedCount * 100) / missingBlockCount).toFixed(2) + "%");

if (!isProd) {
nodeAccessor.displayTable();
Expand Down
6 changes: 4 additions & 2 deletions apps/indexer/src/chain/dataStore.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { LoggerService } from "@akashnetwork/logging";
import fs from "fs";
import { Level } from "level";
import path from "path";
Expand All @@ -6,6 +7,7 @@ import { dataFolderPath } from "@src/shared/constants";
import { bytesToHumanReadableSize } from "@src/shared/utils/files";

const LevelNotFoundCode = "LEVEL_NOT_FOUND";
const logger = LoggerService.forContext("DataStore");

if (!fs.existsSync(dataFolderPath)) {
fs.mkdirSync(dataFolderPath, { recursive: true });
Expand Down Expand Up @@ -37,10 +39,10 @@ export const getCacheSize = async function () {
};

export const deleteCache = async function () {
console.log("Deleting cache...");
logger.info("Deleting cache...");
await blocksDb.clear();
await blockResultsDb.clear();
console.log("Deleted");
logger.info("Deleted");
};

export async function getCachedBlockByHeight(height: number) {
Expand Down
7 changes: 5 additions & 2 deletions apps/indexer/src/chain/genesisImporter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { LoggerService } from "@akashnetwork/logging";
import fs from "fs";
import { ungzip } from "node-gzip";
import path from "path";
Expand All @@ -7,19 +8,21 @@ import { dataFolderPath } from "@src/shared/constants";
import { download } from "@src/shared/utils/download";
import { IGenesis } from "./genesisTypes";

const logger = LoggerService.forContext("GenesisImports");

export async function getGenesis(): Promise<IGenesis> {
const ext = path.extname(activeChain.genesisFileUrl);
const filename = path.basename(activeChain.genesisFileUrl);

let genesisLocalPath = dataFolderPath + "/" + filename;

if (!fs.existsSync(genesisLocalPath)) {
console.log("Downloading genesis file: " + activeChain.genesisFileUrl);
logger.info("Downloading genesis file: " + activeChain.genesisFileUrl);
await download(activeChain.genesisFileUrl, genesisLocalPath);
}

if (ext === ".gz") {
console.log("Extracting genesis file...");
logger.info("Extracting genesis file...");
const decompressed = await ungzip(fs.readFileSync(genesisLocalPath).buffer);
genesisLocalPath = genesisLocalPath.replace(".gz", "");
fs.writeFileSync(genesisLocalPath, decompressed);
Expand Down
9 changes: 5 additions & 4 deletions apps/indexer/src/chain/nodeAccessor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { LoggerService } from "@akashnetwork/logging";
import fs from "fs";

import { concurrentNodeQuery, dataFolderPath } from "@src/shared/constants";
Expand All @@ -10,7 +11,7 @@ interface NodeAccessorSettings {
}

const savedNodeInfoPath = dataFolderPath + "/nodeStatus.json";

const logger = LoggerService.forContext("NodeAccessor");
class NodeAccessor {
private nodes: NodeInfo[];
private settings: NodeAccessorSettings;
Expand All @@ -21,7 +22,7 @@ class NodeAccessor {
}

private async saveNodeStatus() {
console.log("Saving node status...");
logger.info("Saving node status...");
const statuses = this.nodes.map(x => x.getSavedNodeInfo());

await fs.promises.writeFile(savedNodeInfoPath, JSON.stringify(statuses, null, 2));
Expand All @@ -35,13 +36,13 @@ class NodeAccessor {

public async loadNodeStatus() {
if (!fs.existsSync(savedNodeInfoPath)) {
console.log("No saved node status found");
logger.info("No saved node status found");
await this.refetchNodeStatus();
await this.saveNodeStatus();
return;
}

console.log("Loading saved node status...");
logger.info("Loading saved node status...");
const file = await fs.promises.readFile(savedNodeInfoPath, "utf-8");
const savedNodes = JSON.parse(file) as SavedNodeInfo[];

Expand Down
15 changes: 9 additions & 6 deletions apps/indexer/src/chain/statsProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { Block, Message } from "@akashnetwork/database/dbSchemas";
import { AkashMessage } from "@akashnetwork/database/dbSchemas/akash";
import { Transaction } from "@akashnetwork/database/dbSchemas/base";
import { LoggerService } from "@akashnetwork/logging";
import { fromBase64 } from "@cosmjs/encoding";
import { decodeTxRaw } from "@cosmjs/proto-signing";
import { sha256 } from "js-sha256";
Expand All @@ -16,11 +17,13 @@ import { decodeMsg } from "@src/shared/utils/protobuf";
import { setMissingBlock } from "./chainSync";
import { getGenesis } from "./genesisImporter";

const logger = LoggerService.forContext("StatsProcessor");

class StatsProcessor {
private cacheInitialized: boolean = false;

public async rebuildStatsTables() {
console.log('Setting "isProcessed" to false');
logger.info('Setting "isProcessed" to false');
await Message.update(
{
isProcessed: false,
Expand All @@ -41,7 +44,7 @@ class StatsProcessor {
{ where: { isProcessed: true } }
);

console.log("Rebuilding stats tables...");
logger.info("Rebuilding stats tables...");

for (const indexer of activeIndexers) {
await indexer.recreateTables();
Expand All @@ -58,7 +61,7 @@ class StatsProcessor {
}

public async processMessages() {
console.log("Querying unprocessed messages...");
logger.info("Querying unprocessed messages...");

const shouldProcessEveryBlocks = activeIndexers.some(indexer => indexer.runForEveryBlocks);

Expand All @@ -78,7 +81,7 @@ class StatsProcessor {
const hasNewBlocks = !previousProcessedBlock || maxDbHeight > previousProcessedBlock.height;

if (!hasNewBlocks) {
console.log("No new blocks to process");
logger.info("No new blocks to process");
return;
}

Expand All @@ -94,7 +97,7 @@ class StatsProcessor {
let firstBlockToProcess = firstUnprocessedHeight;
let lastBlockToProcess = Math.min(maxDbHeight, firstBlockToProcess + groupSize, lastBlockToSync);
while (firstBlockToProcess <= Math.min(maxDbHeight, lastBlockToSync)) {
console.log(`Loading blocks ${firstBlockToProcess} to ${lastBlockToProcess}`);
logger.info(`Loading blocks ${firstBlockToProcess} to ${lastBlockToProcess}`);

const getBlocksTimer = benchmark.startTimer("getBlocks");
const blocks = await Block.findAll({
Expand Down Expand Up @@ -150,7 +153,7 @@ class StatsProcessor {
decodeTimer.end();

for (const msg of transaction.messages) {
console.log(`Processing message ${msg.type} - Block #${block.height}`);
logger.info(`Processing message ${msg.type} - Block #${block.height}`);

const encodedMessage = decodedTx.body.messages[msg.index].value;

Expand Down
9 changes: 6 additions & 3 deletions apps/indexer/src/db/buildDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { Block, Message } from "@akashnetwork/database/dbSchemas";
import { Day, Transaction } from "@akashnetwork/database/dbSchemas/base";
import { MonitoredValue } from "@akashnetwork/database/dbSchemas/base/monitoredValue";
import { LoggerService } from "@akashnetwork/logging";

import { getGenesis } from "@src/chain/genesisImporter";
import { indexers } from "@src/indexers";
import { ExecutionMode, executionMode } from "@src/shared/constants";
import { sequelize } from "./dbConnection";

const logger = LoggerService.forContext("BuildDatabase");

/**
* Initiate database schema
*/
export const initDatabase = async () => {
console.log(`Connecting to db (${sequelize.config.host}/${sequelize.config.database})...`);
logger.info(`Connecting to db (${sequelize.config.host}/${sequelize.config.database})...`);
await sequelize.authenticate();
console.log("Connection has been established successfully.");
logger.info("Connection has been established successfully.");

if (executionMode === ExecutionMode.RebuildAll) {
await Day.drop({ cascade: true });
Expand All @@ -41,7 +44,7 @@ export const initDatabase = async () => {
if (!activeChain.startHeight) {
const firstBlock = await Block.findOne();
if (!firstBlock) {
console.log("First time syncing, seeding from genesis file...");
logger.info("First time syncing, seeding from genesis file...");

const genesis = await getGenesis();
for (const indexer of indexers) {
Expand Down
9 changes: 6 additions & 3 deletions apps/indexer/src/db/keybaseProvider.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { Validator } from "@akashnetwork/database/dbSchemas/base";
import { LoggerService } from "@akashnetwork/logging";
import fetch from "node-fetch";
import { Op } from "sequelize";

const logger = LoggerService.forContext("KeybaseProvider");

export async function fetchValidatorKeybaseInfos() {
const validators = await Validator.findAll({
where: {
Expand All @@ -12,11 +15,11 @@ export async function fetchValidatorKeybaseInfos() {
const requests = validators.map(async validator => {
try {
if (!/^[A-F0-9]{16}$/.test(validator.identity)) {
console.warn("Invalid identity " + validator.identity + " for validator " + validator.operatorAddress);
logger.warn("Invalid identity " + validator.identity + " for validator " + validator.operatorAddress);
return Promise.resolve();
}

console.log("Fetching keybase info for " + validator.operatorAddress);
logger.info("Fetching keybase info for " + validator.operatorAddress);
const response = await fetch(`https://keybase.io/_/api/1.0/user/lookup.json?key_suffix=${validator.identity}`);

if (response.status === 200) {
Expand All @@ -31,7 +34,7 @@ export async function fetchValidatorKeybaseInfos() {

await validator.save();
} catch (err) {
console.error("Error while fetching keybase info for " + validator.operatorAddress);
logger.error("Error while fetching keybase info for " + validator.operatorAddress);
throw err;
}
});
Expand Down
9 changes: 6 additions & 3 deletions apps/indexer/src/db/priceHistoryProvider.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { activeChain } from "@akashnetwork/database/chainDefinitions";
import { Day } from "@akashnetwork/database/dbSchemas/base";
import { LoggerService } from "@akashnetwork/logging";
import { isSameDay } from "date-fns";
import fetch from "node-fetch";

Expand All @@ -9,15 +10,17 @@ interface PriceHistoryResponse {
total_volumes: Array<Array<number>>;
}

const logger = LoggerService.forContext("PriceHistoryProvider");

export const syncPriceHistory = async () => {
if (!activeChain.coinGeckoId) {
console.log("No coin gecko id defined for this chain. Skipping price history sync.");
logger.info("No coin gecko id defined for this chain. Skipping price history sync.");
return;
}

const endpointUrl = `https://api.coingecko.com/api/v3/coins/${activeChain.coinGeckoId}/market_chart?vs_currency=usd&days=360`;

console.log("Fetching latest market data from " + endpointUrl);
logger.info("Fetching latest market data from " + endpointUrl);

const response = await fetch(endpointUrl);
const data: PriceHistoryResponse = await response.json();
Expand All @@ -26,7 +29,7 @@ export const syncPriceHistory = async () => {
price: pDate[1]
}));

console.log(`There are ${apiPrices.length} prices to update.`);
logger.info(`There are ${apiPrices.length} prices to update.`);

const days = await Day.findAll();

Expand Down
Loading
Loading