Skip to content

Commit

Permalink
Merge pull request #254 from lidofinance/develop
Browse files Browse the repository at this point in the history
Develop to Main
  • Loading branch information
AlexanderLukin authored Aug 6, 2024
2 parents 7025871 + c3ed63c commit 54cc81e
Show file tree
Hide file tree
Showing 13 changed files with 1,879 additions and 1,979 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Install node
uses: actions/setup-node@v3
with:
node-version: '16'
node-version: '18.20.1'
cache: 'yarn'

- name: Install dependencies
Expand All @@ -28,6 +28,7 @@ jobs:
- name: Run e2e tests
run: yarn test:e2e
env:
ETH_NETWORK: 1
CL_API_URLS: ${{ secrets.CL_API_URL }}
CL_API_GET_RESPONSE_TIMEOUT: 60000
TEST_EPOCH_NUMBER: 152978
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:16-alpine as building
FROM node:18.20.1-alpine as building

WORKDIR /app

Expand All @@ -9,7 +9,7 @@ COPY ./tsconfig*.json ./
COPY ./src ./src
RUN yarn build

FROM node:16-alpine
FROM node:18.20.1-alpine

WORKDIR /app

Expand Down
9 changes: 6 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@
"start": "nest start",
"start:dev": "nest start --watch",
"start:debug": "nest start --debug --watch",
"start:prod": "node dist/src/main --trace-warnings --sync --sub-fin-head --max-old-space-size=8192",
"start:prod": "NODE_OPTIONS=--max_old_space_size=8192 node dist/src/main --trace-warnings --sync --sub-fin-head",
"test": "jest --detectOpenHandles --forceExit",
"test:watch": "jest --watch",
"test:cov": "jest --coverage",
"test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand",
"test:e2e": "jest --config ./test/jest-e2e.json --detectOpenHandles --forceExit",
"test:e2e": "NODE_OPTIONS=--experimental-vm-modules jest --config ./test/jest-e2e.json --detectOpenHandles --forceExit",
"lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix"
},
"author": "Lido",
"license": "MIT",
"description": "Consensus layer validators monitoring bot, that fetches Lido or User Node Operator keys from Execution layer and checks their performance in Consensus layer by: balance delta, attestations, proposes, sync committee participation.",
"dependencies": {
"@chainsafe/persistent-merkle-tree": "^0.7.1",
"@chainsafe/ssz": "^0.9.4",
"@clickhouse/client": "^0.0.11",
"@ethersproject/abstract-signer": "^5.4.0",
Expand All @@ -34,6 +35,7 @@
"@lido-nestjs/execution": "^1.11.1",
"@lido-nestjs/logger": "^1.3.2",
"@lido-nestjs/registry": "^7.4.0",
"@lodestar/types": "^1.15.1",
"@mikro-orm/core": "^5.3.1",
"@mikro-orm/knex": "^5.3.1",
"@mikro-orm/nestjs": "^5.1.0",
Expand Down Expand Up @@ -62,7 +64,8 @@
"retry-ts": "^0.1.3",
"stream-chain": "^2.2.5",
"stream-json": "^1.7.5",
"typechain": "^5.2.0"
"typechain": "^5.2.0",
"undici": "^6.6.2"
},
"devDependencies": {
"@golevelup/ts-jest": "^0.3.7",
Expand Down
6 changes: 4 additions & 2 deletions src/common/alertmanager/alerts/CriticalMissedAttestations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import { RegistrySourceOperator } from 'validators-registry';

import { Alert, AlertRequestBody, AlertRuleResult } from './BasicAlert';

const VALIDATORS_WITH_MISSED_ATTESTATION_COUNT_THRESHOLD = 1 / 3;
const validatorsWithMissedAttestationCountThreshold = (quantity: number) => {
return Math.min(quantity / 3, 1000);
};

export class CriticalMissedAttestations extends Alert {
constructor(config: ConfigService, storage: ClickhouseService, operators: RegistrySourceOperator[]) {
Expand All @@ -25,7 +27,7 @@ export class CriticalMissedAttestations extends Alert {
(a) => a.val_nos_id != null && +a.val_nos_module_id == operator.module && +a.val_nos_id == operator.index,
);
if (!missedAtt) continue;
if (missedAtt.amount > noStats.active_ongoing * VALIDATORS_WITH_MISSED_ATTESTATION_COUNT_THRESHOLD) {
if (missedAtt.amount > validatorsWithMissedAttestationCountThreshold(noStats.active_ongoing)) {
result[operator.name] = { ongoing: noStats.active_ongoing, missedAtt: missedAtt.amount };
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/common/alertmanager/alerts/CriticalNegativeDelta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import { RegistrySourceOperator } from 'validators-registry';

import { Alert, AlertRequestBody, AlertRuleResult } from './BasicAlert';

const VALIDATORS_WITH_NEGATIVE_DELTA_COUNT_THRESHOLD = 1 / 3;
const validatorsWithNegativeDeltaCountThreshold = (quantity: number) => {
return Math.min(quantity / 3, 1000);
};

export class CriticalNegativeDelta extends Alert {
constructor(config: ConfigService, storage: ClickhouseService, operators: RegistrySourceOperator[]) {
Expand All @@ -23,7 +25,7 @@ export class CriticalNegativeDelta extends Alert {
const operator = this.operators.find((o) => +noStats.val_nos_module_id == o.module && +noStats.val_nos_id == o.index);
const negDelta = negativeValidatorsCount.find((a) => +a.val_nos_module_id == operator.module && +a.val_nos_id == operator.index);
if (!negDelta) continue;
if (negDelta.amount > noStats.active_ongoing * VALIDATORS_WITH_NEGATIVE_DELTA_COUNT_THRESHOLD) {
if (negDelta.amount > validatorsWithNegativeDeltaCountThreshold(noStats.active_ongoing)) {
result[operator.name] = { ongoing: noStats.active_ongoing, negDelta: negDelta.amount };
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/common/config/env.validation.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { plainToInstance, Expose, Transform } from 'class-transformer';
import { Expose, Transform, plainToInstance } from 'class-transformer';
import {
ArrayMinSize,
IsArray,
Expand All @@ -17,6 +17,7 @@ import {
ValidateIf,
validateSync,
} from 'class-validator';

import { Epoch } from 'common/consensus-provider/types';

import { Environment, LogFormat, LogLevel } from './interfaces';
Expand Down Expand Up @@ -133,7 +134,6 @@ export class EnvironmentVariables {
@Min(1)
@Max(5000000)
@Transform(({ value }) => parseInt(value, 10), { toClassOnly: true })
@ValidateIf((vars) => vars.VALIDATOR_REGISTRY_SOURCE == ValidatorRegistrySource.Lido && vars.NODE_ENV != Environment.test)
public ETH_NETWORK!: Network;

@IsArray()
Expand Down Expand Up @@ -178,7 +178,6 @@ export class EnvironmentVariables {
({ value, obj }) =>
dencunForkEpoch[obj.ETH_NETWORK] || (value != null && value.trim() !== '' ? parseInt(value, 10) : Number.MAX_SAFE_INTEGER),
)
@ValidateIf((vars) => vars.NODE_ENV !== Environment.test)
public DENCUN_FORK_EPOCH: Epoch;

@IsNumber()
Expand Down
126 changes: 59 additions & 67 deletions src/common/consensus-provider/consensus-provider.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { ContainerTreeViewType } from '@chainsafe/ssz/lib/view/container';
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import { Inject, Injectable, LoggerService } from '@nestjs/common';
import { NonEmptyArray } from 'fp-ts/NonEmptyArray';
import { HTTPError, Response, got } from 'got-cjs';
import { request } from 'undici';
import { IncomingHttpHeaders } from 'undici/types/header';
import BodyReadable from 'undici/types/readable';

import { ConfigService } from 'common/config';
import { range } from 'common/functions/range';
Expand All @@ -24,36 +27,17 @@ import {
} from './intefaces';
import { BlockId, Epoch, Slot, StateId } from './types';

let ssz: typeof import('@lodestar/types').ssz;
let anySsz: typeof ssz.phase0 | typeof ssz.altair | typeof ssz.bellatrix | typeof ssz.capella | typeof ssz.deneb;
let ForkName: typeof import('@lodestar/params').ForkName;

interface RequestRetryOptions {
maxRetries?: number;
dataOnly?: boolean;
useFallbackOnRejected?: (last_error: any, current_error: any) => boolean;
useFallbackOnResolved?: (r: any) => boolean;
}

const REQUEST_TIMEOUT_POLICY_MS = {
// Starts when a socket is assigned.
// Ends when the hostname has been resolved.
lookup: undefined,
// Starts when lookup completes.
// Ends when the socket is fully connected.
// If lookup does not apply to the request, this event starts when the socket is assigned and ends when the socket is connected.
connect: 1000,
// Starts when connect completes.
// Ends when the handshake process completes.
secureConnect: undefined,
// Starts when the socket is connected.
// Resets when new data is transferred.
socket: undefined,
// Starts when the socket is connected.
// Ends when all data have been written to the socket.
send: undefined,
// Starts when request has been flushed.
// Ends when the headers are received.
// Will be redefined by `CL_API_GET_RESPONSE_TIMEOUT`
response: 1000,
};

@Injectable()
export class ConsensusProviderService {
protected apiUrls: string[];
Expand All @@ -68,12 +52,10 @@ export class ConsensusProviderService {
beaconHeadFinalityCheckpoints: 'eth/v1/beacon/states/head/finality_checkpoints',
blockInfo: (blockId: BlockId): string => `eth/v2/beacon/blocks/${blockId}`,
beaconHeaders: (blockId: BlockId): string => `eth/v1/beacon/headers/${blockId}`,
validatorsState: (stateId: StateId): string => `eth/v1/beacon/states/${stateId}/validators`,
attestationCommittees: (stateId: StateId, epoch: Epoch): string => `eth/v1/beacon/states/${stateId}/committees?epoch=${epoch}`,
syncCommittee: (stateId: StateId, epoch: Epoch): string => `eth/v1/beacon/states/${stateId}/sync_committees?epoch=${epoch}`,
proposerDutes: (epoch: Epoch): string => `eth/v1/validator/duties/proposer/${epoch}`,
attesterDuties: (epoch: Epoch): string => `eth/v1/validator/duties/attester/${epoch}`,
syncCommitteeDuties: (epoch: Epoch): string => `eth/v1/validator/duties/sync/${epoch}`,
state: (stateId: StateId): string => `eth/v2/debug/beacon/states/${stateId}`,
};

public constructor(
Expand Down Expand Up @@ -127,7 +109,9 @@ export class ConsensusProviderService {

if (nodeLatestSlot < this.latestSlot.slot) {
// we assume that the node must never return a slot less than the last saved slot
this.logger.error(`Received ${latestFrom} slot [${nodeLatestSlot}] is less than last [${this.latestSlot.slot}] slot received before, but shouldn't`);
this.logger.error(
`Received ${latestFrom} slot [${nodeLatestSlot}] is less than last [${this.latestSlot.slot}] slot received before, but shouldn't`,
);
return true;
}
if (nodeLatestSlot > this.latestSlot.slot) {
Expand Down Expand Up @@ -285,10 +269,18 @@ export class ConsensusProviderService {
return blockInfo;
}

public async getValidatorsState(stateId: StateId): Promise<Request> {
return await this.retryRequest(async (apiURL: string) => await this.apiGetStream(apiURL, this.endpoints.validatorsState(stateId)), {
dataOnly: false,
});
public async getState(stateId: StateId): Promise<ContainerTreeViewType<typeof anySsz.BeaconState.fields>> {
const { body, headers } = await this.retryRequest<{ body: BodyReadable; headers: IncomingHttpHeaders }>(
async (apiURL: string) => await this.apiGetStream(apiURL, this.endpoints.state(stateId), { accept: 'application/octet-stream' }),
{
dataOnly: false,
},
);
const forkName = headers['eth-consensus-version'] as keyof typeof ForkName;
const bodyBytes = new Uint8Array(await body.arrayBuffer());
// ugly hack to import ESModule to CommonJS project
ssz = await eval(`import('@lodestar/types').then((m) => m.ssz)`);
return ssz[forkName].BeaconState.deserializeToView(bodyBytes) as any as ContainerTreeViewType<typeof anySsz.BeaconState.fields>;
}

public async getBlockInfo(blockId: BlockId): Promise<BlockInfoResponse | void> {
Expand Down Expand Up @@ -322,13 +314,14 @@ export class ConsensusProviderService {
return blockInfo;
}

public async getAttestationCommitteesInfo(stateId: StateId, epoch: Epoch): Promise<Request> {
return await this.retryRequest(
public async getAttestationCommitteesInfo(stateId: StateId, epoch: Epoch): Promise<BodyReadable> {
const { body }: BodyReadable = await this.retryRequest(
async (apiURL: string) => await this.apiGetStream(apiURL, this.endpoints.attestationCommittees(stateId, epoch)),
{
dataOnly: false,
},
);
return body;
}

public async getSyncCommitteeInfo(stateId: StateId, epoch: Epoch): Promise<SyncCommitteeInfo> {
Expand Down Expand Up @@ -408,43 +401,42 @@ export class ConsensusProviderService {

@TrackCLRequest
protected async apiGet<T>(apiURL: string, subUrl: string): Promise<T> {
const res = await got
.get(urljoin(apiURL, subUrl), { timeout: { ...REQUEST_TIMEOUT_POLICY_MS, response: this.config.get('CL_API_GET_RESPONSE_TIMEOUT') } })
.catch((e) => {
if (e.response) {
throw new ResponseError(errRequest(e.response.body, subUrl, apiURL), e.response.statusCode);
}
throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});
if (res.statusCode !== 200) {
throw new ResponseError(errRequest(res.body, subUrl, apiURL), res.statusCode);
}
try {
return JSON.parse(res.body);
} catch (e) {
throw new ResponseError(`Error converting response body to JSON. Body: ${res.body}`);
const { body, statusCode } = await request(urljoin(apiURL, subUrl), {
method: 'GET',
headersTimeout: this.config.get('CL_API_GET_RESPONSE_TIMEOUT'),
}).catch((e) => {
if (e.response) {
throw new ResponseError(errRequest(e.response.body, subUrl, apiURL), e.response.statusCode);
}
throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});
if (statusCode !== 200) {
const errorText = await body.text();
throw new ResponseError(errRequest(errorText, subUrl, apiURL), statusCode);
}
return (await body.json()) as T;
}

@TrackCLRequest
protected async apiGetStream(apiURL: string, subUrl: string): Promise<Request> {
const readStream = got.stream.get(urljoin(apiURL, subUrl), {
timeout: { ...REQUEST_TIMEOUT_POLICY_MS, response: this.config.get('CL_API_GET_RESPONSE_TIMEOUT') },
protected async apiGetStream(
apiURL: string,
subUrl: string,
headersToSend?: Record<string, string>,
): Promise<{ body: BodyReadable; headers: IncomingHttpHeaders }> {
const { body, headers, statusCode } = await request(urljoin(apiURL, subUrl), {
method: 'GET',
headersTimeout: this.config.get('CL_API_GET_RESPONSE_TIMEOUT'),
headers: headersToSend,
}).catch((e) => {
if (e.response) {
throw new ResponseError(errRequest(e.response.body, subUrl, apiURL), e.response.statusCode);
}
throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});

return new Promise((resolve, reject) => {
readStream.on('response', (r: Response) => {
if (r.statusCode != 200) reject(new HTTPError(r));
resolve(readStream);
});
readStream.on('error', (e) => reject(e));
})
.then((r: Request) => r)
.catch((e) => {
if (e instanceof HTTPError) {
throw new ResponseError(errRequest(<string>e.response.body, subUrl, apiURL), e.response.statusCode);
}
throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});
if (statusCode !== 200) {
const errorText = await body.text();
throw new ResponseError(errRequest(errorText, subUrl, apiURL), statusCode);
}
return { body, headers };
}
}
7 changes: 3 additions & 4 deletions src/duty/attestation/attestation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ export class AttestationService {
public async check(epoch: Epoch, stateSlot: Slot): Promise<void> {
this.processedEpoch = epoch;
this.savedCanonSlotsAttProperties.clear();
const { attestations } = await this.getProcessedAttestations();
this.logger.log(`Getting attestation duties info`);
const committees = await this.getAttestationCommittees(stateSlot);
this.logger.log(`Getting attestations and duties info`);
const [attestations, committees] = await allSettled([this.getProcessedAttestations(), this.getAttestationCommittees(stateSlot)]);
this.logger.log(`Processing attestation duty info`);
const maxBatchSize = 5;
let index = 0;
Expand Down Expand Up @@ -169,7 +168,7 @@ export class AttestationService {
}
}
this.logger.debug(`All missed slots in getting attestations info process: ${allMissedSlots}`);
return { attestations, allMissedSlots };
return attestations;
}

@TrackTask('get-attestation-committees')
Expand Down
Loading

0 comments on commit 54cc81e

Please sign in to comment.