Skip to content

Commit

Permalink
adds cache control to validator handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
soaresa committed Nov 26, 2024
1 parent e0504e5 commit 3c1a94b
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 75 deletions.
1 change: 1 addition & 0 deletions api/src/ol/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export const V0_TIMESTAMP = 1712696400;
export const VALIDATORS_CACHE_KEY = '__0L_VALIDATORS__';
export const VALIDATORS_VOUCHES_CACHE_KEY = '__0L_VALIDATORS_VOUCHES__';
export const VALIDATORS_VFN_STATUS_CACHE_KEY = '__0L_VALIDATORS_VFN_STATUS__';
export const VALIDATORS_HANDLERS_CACHE_KEY = '__0L_VALIDATORS_HANDLERS__';
export const TOP_BALANCE_ACCOUNTS_CACHE_KEY = '__0L_TOP_BALANCE_ACCOUNTS__';
export const COMMUNITY_WALLETS_CACHE_KEY = '__0L_COMMUNITY_WALLETS__';
export const COMMUNITY_WALLETS_STATS_CACHE_KEY = '__0L_COMMUNITY_WALLETS_STATS__';
Expand Down
66 changes: 54 additions & 12 deletions api/src/ol/validators/validators.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue, Job } from 'bullmq';
import { Logger } from '@nestjs/common';
import { redisClient } from '../../redis/redis.service.js';
import { ValidatorsService } from './validators.service.js';
import {
VALIDATORS_CACHE_KEY,
VALIDATORS_VOUCHES_CACHE_KEY,
VALIDATORS_VFN_STATUS_CACHE_KEY,
VALIDATORS_HANDLERS_CACHE_KEY,
} from '../constants.js';

@Processor('validators')
export class ValidatorsProcessor extends WorkerHost {
private readonly logger = new Logger(ValidatorsProcessor.name);
public constructor(
@InjectQueue('validators')
private readonly validatorsQueue: Queue,
Expand All @@ -21,6 +24,20 @@ export class ValidatorsProcessor extends WorkerHost {
}

public async onModuleInit() {
await this.validatorsQueue.add('updateValidatorsHandlersCache', undefined, {
repeat: {
every: 12 * 60 * 60 * 1000, // 12 hours
},
});
this.updateValidatorsHandlersCache();

await this.validatorsQueue.add('updateValidatorsCache', undefined, {
repeat: {
every: 30 * 1000, // 30 seconds
},
});
this.updateValidatorsCache();

await this.validatorsQueue.add('updateVfnStatusCache', undefined, {
repeat: {
every: 5 * 60 * 1000, // 5 minutes
Expand All @@ -35,19 +52,17 @@ export class ValidatorsProcessor extends WorkerHost {
});
this.updateValidatorsVouchesCache();

await this.validatorsQueue.add('updateValidatorsCache', undefined, {
repeat: {
every: 30 * 1000, // 30 seconds
},
});
this.updateValidatorsCache();
this.logger.log('ValidatorsProcessor initialized');
}

public async process(job: Job<void, any, string>) {
switch (job.name) {
case 'updateValidatorsCache':
await this.updateValidatorsCache();
break;
case 'updateValidatorsHandlersCache':
await this.updateValidatorsHandlersCache();
break;
case 'updateValidatorsVouchesCache':
await this.updateValidatorsVouchesCache();
break;
Expand All @@ -61,17 +76,44 @@ export class ValidatorsProcessor extends WorkerHost {
}

private async updateVfnStatusCache() {
const vfnStatus = await this.validatorsService.queryValidatorsVfnStatus();
await redisClient.set(VALIDATORS_VFN_STATUS_CACHE_KEY, JSON.stringify(vfnStatus));
try {
const vfnStatus = await this.validatorsService.queryValidatorsVfnStatus();
await redisClient.set(VALIDATORS_VFN_STATUS_CACHE_KEY, JSON.stringify(vfnStatus));
this.logger.log('VFN status cache updated');
} catch (error) {
this.logger.error('Error updating VFN status cache', error);
}
}

private async updateValidatorsCache() {
const validators = await this.validatorsService.queryValidators();
await redisClient.set(VALIDATORS_CACHE_KEY, JSON.stringify(validators));
try {
const validators = await this.validatorsService.queryValidators();
await redisClient.set(VALIDATORS_CACHE_KEY, JSON.stringify(validators));
this.logger.log('Validators cache updated');
} catch (error) {
this.logger.error('Error updating validators cache', error);
}
}

private async updateValidatorsHandlersCache() {
try {
const validatorsHandlers = await this.validatorsService.loadValidatorHandles();
await redisClient.set(
VALIDATORS_HANDLERS_CACHE_KEY,
JSON.stringify(JSON.stringify(Array.from(validatorsHandlers.entries()))),
);
this.logger.log('Validators handlers cache updated');
} catch (error) {
this.logger.error('Error updating validators handlers cache', error);
}
}

private async updateValidatorsVouchesCache() {
const validatorsVouches = await this.validatorsService.queryValidatorsVouches();
await redisClient.set(VALIDATORS_VOUCHES_CACHE_KEY, JSON.stringify(validatorsVouches));
try {
const validatorsVouches = await this.validatorsService.queryValidatorsVouches();
await redisClient.set(VALIDATORS_VOUCHES_CACHE_KEY, JSON.stringify(validatorsVouches));
} catch (error) {
this.logger.error('Error updating validators vouches cache', error);
}
}
}
132 changes: 69 additions & 63 deletions api/src/ol/validators/validators.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import Bluebird from 'bluebird';
import axios from 'axios';
import BN from 'bn.js';
Expand All @@ -24,8 +24,12 @@ import {
VALIDATORS_CACHE_KEY,
VALIDATORS_VFN_STATUS_CACHE_KEY,
VALIDATORS_VOUCHES_CACHE_KEY,
VALIDATORS_HANDLERS_CACHE_KEY,
} from '../constants.js';

import * as net from 'net';
import { OlConfig } from '../../config/config.interface.js';

// Regex to match the fullnode address pattern
const fullnodeRegex =
/^\/(ip4|dns)\/([\d\.]+|[\w\.-]+)\/tcp\/\d+\/noise-ik\/0x[a-fA-F0-9]+\/handshake\/\d+$/;
Expand All @@ -34,6 +38,7 @@ const fullnodeRegex =
export class ValidatorsService {
private readonly cacheEnabled: boolean;
private readonly validatorHandlesUrl: string | undefined;
private readonly logger = new Logger(ValidatorsService.name);
public constructor(
private readonly olService: OlService,
private readonly prisma: PrismaService,
Expand Down Expand Up @@ -69,6 +74,38 @@ export class ValidatorsService {
return validators;
}

public async getValidatorsHandlers(): Promise<Map<string, string>> {
if (this.cacheEnabled) {
const cacheHandlersString = await this.getFromCache<string>(VALIDATORS_HANDLERS_CACHE_KEY);
if (cacheHandlersString) {
try {
const map = new Map<string, string>();
const entries: [string, string][] = JSON.parse(cacheHandlersString);
entries.forEach((entry) => {
map.set(entry[0], entry[1]);
});
return map;
} catch (parseError) {
this.logger.error('Error parsing validators handlers cache', parseError);
}
}
}

let handlers = new Map<string, string>();
try {
handlers = await this.loadValidatorHandles();
} catch (error) {
this.logger.error('Error loading validators handlers', error);
} finally {
await this.setCache(
VALIDATORS_HANDLERS_CACHE_KEY,
JSON.stringify(Array.from(handlers.entries())),
);
}

return handlers;
}

public async getValidatorsVouches(): Promise<ValidatorVouches[]> {
if (this.cacheEnabled) {
const cachedVouches = await this.getFromCache<ValidatorVouches[]>(
Expand Down Expand Up @@ -143,7 +180,7 @@ export class ValidatorsService {
}),
);

let handles = await this.loadValidatorHandles();
let handles = await this.getValidatorsHandlers();
let allValidators = [...currentValidators, ...eligibleValidators];
return await Promise.all(
allValidators.map(async (validator) => {
Expand Down Expand Up @@ -291,7 +328,7 @@ export class ValidatorsService {
public async queryValidatorsVouches(): Promise<ValidatorVouches[]> {
const eligible = await this.olService.getEligibleValidators();
const active = await this.olService.getValidatorSet();
const handles = await this.loadValidatorHandles();
const handles = await this.getValidatorsHandlers();
const currentEpoch = await this.olService.aptosClient
.getLedgerInfo()
.then((info) => Number(info.epoch));
Expand Down Expand Up @@ -420,56 +457,29 @@ export class ValidatorsService {
const entryFee = Number(rewardRes[1]);
const clearingBid = Number(rewardRes[2]);

// Check Thermostat
/*const measureRes = await this.olService.aptosClient.view({
function: '0x1::proof_of_fee::query_reward_adjustment',
type_arguments: [],
arguments: [],
});
const didIncrement = measureRes[1] as boolean;
const amount = measureRes[2];
// Get current epoch
const epochRes = await this.olService.aptosClient.getLedgerInfo();
const currentEpoch = Number(epochRes.epoch);
// Create ThermostatMeasure object
const thermostatMeasure = new ThermostatMeasure({
nextEpoch: currentEpoch + 1,
amount: Number(nominalReward) + (didIncrement ? +1 : -1) * Number(amount),
percentage: Math.round((Number(amount) / Number(nominalReward)) * 100),
didIncrease: didIncrement,
});*/

return new ValidatorUtils({
vouchPrice: Number(vouchPriceRes.amount),
entryFee: entryFee,
clearingBid: clearingBid,
netReward: nominalReward - entryFee,
/*, thermostatMeasure */
});
}

// TODO cache this
async loadValidatorHandles(): Promise<Map<string, string>> {
if (!this.validatorHandlesUrl) {
return new Map<string, string>();
}
try {
const response = await axios.get(this.validatorHandlesUrl);
const data = response.data;

const validatorMap = new Map<string, string>();
Object.keys(data.validators).forEach((address) => {
let addressStr = address.replace(/^0x/, '').toUpperCase();
validatorMap.set(addressStr, data.validators[address]);
});
const response = await axios.get(this.validatorHandlesUrl);
const data = response.data;

return validatorMap;
} catch (error) {
console.error('Error loading validator handles from URL:', error);
return new Map<string, string>();
}
const validatorMap = new Map<string, string>();
Object.keys(data.validators).forEach((address) => {
let addressStr = address.replace(/^0x/, '').toUpperCase();
validatorMap.set(addressStr, data.validators[address]);
});

return validatorMap;
}

public async queryValidatorsVfnStatus(): Promise<VfnStatus[]> {
Expand Down Expand Up @@ -513,7 +523,7 @@ export class ValidatorsService {
const valIp = match[2];

// Check if the address is accessible
status = await checkAddressAccessibility(valIp, 6182)
status = await this.checkAddressAccessibility(valIp, 6182)
.then((res) => {
return res ? VfnStatusType.Accessible : VfnStatusType.NotAccessible;
})
Expand All @@ -526,33 +536,29 @@ export class ValidatorsService {

return status;
}
}

import * as net from 'net';
import { OlConfig } from '../../config/config.interface.js';

function checkAddressAccessibility(ip: string, port: number): Promise<boolean> {
return new Promise((resolve) => {
const socket = new net.Socket();
async checkAddressAccessibility(ip: string, port: number): Promise<boolean> {
return new Promise((resolve) => {
const socket = new net.Socket();

// Timeout in case the server is not accessible
socket.setTimeout(1000);
// Timeout in case the server is not accessible
socket.setTimeout(1000);

// Try to connect to the IP and port
socket.connect(port, ip, () => {
//console.log(`Connected to ${ip}:${port}`);
socket.end();
resolve(true);
});
// Try to connect to the IP and port
socket.connect(port, ip, () => {
socket.end();
resolve(true);
});

socket.on('error', () => {
//console.log(`Failed to connect to ${ip}:${port}`);
resolve(false);
});
socket.on('error', () => {
this.logger.warn(`Error connecting to ${ip}:${port}`);
resolve(false);
});

socket.on('timeout', () => {
//console.log(`Connection to ${ip}:${port} timed out`);
resolve(false);
socket.on('timeout', () => {
this.logger.warn(`Timeout connecting to ${ip}:${port}`);
resolve(false);
});
});
});
}
}

0 comments on commit 3c1a94b

Please sign in to comment.