From 4a1d8f06d092ab3d0e591070856214a28116d764 Mon Sep 17 00:00:00 2001 From: Defi Dev Date: Thu, 14 Dec 2023 01:27:57 +0000 Subject: [PATCH] improve data fetching --- app/App.ts | 72 ++----------- app/ConfigGetters.ts | 2 +- app/Network.ts | 157 +++++++++++++++++++++++----- app/Types.ts | 16 ++- app/agents/AbstractAgent.ts | 26 +++-- app/dataSources/AbstractSource.ts | 11 +- app/dataSources/BlockchainSource.ts | 13 ++- app/dataSources/SubgraphSource.ts | 48 +++++---- app/services/Logger.ts | 33 +++++- tests/e2e/RandaoAgent.test.ts | 54 +++++----- 10 files changed, 263 insertions(+), 169 deletions(-) diff --git a/app/App.ts b/app/App.ts index b7c84e2..97de284 100644 --- a/app/App.ts +++ b/app/App.ts @@ -1,13 +1,7 @@ -import { Config, AllNetworksConfig, IAgent, AgentConfig } from './Types'; +import { Config, AllNetworksConfig } from './Types'; import { Network } from './Network.js'; -import { toChecksummedAddress } from './Utils.js'; import { initApi } from './Api.js'; -import { getAgentVersionAndType } from './ConfigGetters.js'; -import { AgentRandao_2_3_0 } from './agents/Agent.2.3.0.randao.js'; -import { AgentLight_2_2_0 } from './agents/Agent.2.2.0.light.js'; -import { SubgraphSource } from './dataSources/SubgraphSource.js'; -import { BlockchainSource } from './dataSources/BlockchainSource.js'; -import logger, { updateSentryScope } from './services/Logger.js'; +import logger from './services/Logger.js'; export class App { private networks: { [key: string]: Network }; @@ -87,8 +81,7 @@ export class App { for (const [netName, netConfig] of Object.entries(allNetworkConfigs.details)) { if (allNetworkConfigs.enabled.includes(netName)) { - const agents = this.buildAgents(netName, netConfig.agents); - networks[netName] = new Network(netName, netConfig, this, agents); + networks[netName] = new Network(netName, netConfig, this); } else { logger.debug(`App: Skipping ${netName} network...`); } @@ -97,65 +90,16 @@ export class App { return networks; } - public buildAgents(networkName: string, agentsConfig: { [key: string]: AgentConfig }): IAgent[] { - const agents = []; - // TODO: get type & AgentConfig - for (const [address, agentConfig] of Object.entries(agentsConfig)) { - const checksummedAddress = toChecksummedAddress(address); - let { version, strategy } = agentConfig; - if (!version || !strategy) { - [version, strategy] = getAgentVersionAndType(checksummedAddress, networkName); - } - let agent; - - if (version.startsWith('2.') && strategy === 'randao') { - agent = new AgentRandao_2_3_0(checksummedAddress, agentConfig, networkName); - } else if (version.startsWith('2.') && strategy === 'light') { - agent = new AgentLight_2_2_0(checksummedAddress, agentConfig, networkName); - } else { - throw new Error( - `App: Not supported agent version/strategy: network=${networkName},version=${version},strategy=${strategy}`, - ); - } - - agents.push(agent); - } - return agents; - } - public async initNetworks(networks: { [netName: string]: Network }) { this.networks = networks; logger.debug('App: Network initialization start...'); - const inits = []; for (const network of Object.values(this.networks)) { - inits.push(network.init()); - - for (const agent of network.getAgents()) { - let dataSource; - // TODO: Add support for different agents. Now if there are multiple agents, the tags linked to the latest one. - updateSentryScope( - network.getName(), - network.getFlashbotsRpc(), - agent.address, - agent.getKeyAddress(), - agent.dataSourceType, - agent.subgraphUrl, - ); - if (agent.dataSourceType === 'subgraph') { - dataSource = new SubgraphSource(network, agent, agent.subgraphUrl); - } else if (agent.dataSourceType === 'blockchain') { - dataSource = new BlockchainSource(network, agent); - } else { - throw new Error(`App: missing dataSource for agent ${agent.address}`); - } - await agent.init(network, dataSource); - } + await network.init().catch(e => { + logger.error(`App: Network ${network.getName()} initialization failed ${e}`); + }); } - logger.debug('App: Waiting for all networks to be initialized...'); - try { - await Promise.all(inits); - } catch (e) { - logger.error(`App: Networks initialization failed ${e}`); + if (!Object.values(this.networks).some(n => !!n.getChainId())) { + logger.error('App: Networks initialization failed'); process.exit(1); } logger.info('App: Networks initialization done!'); diff --git a/app/ConfigGetters.ts b/app/ConfigGetters.ts index 3b907f2..ff23afe 100644 --- a/app/ConfigGetters.ts +++ b/app/ConfigGetters.ts @@ -88,7 +88,7 @@ export function getDefaultNetworkConfig(name) { export function getMaxBlocksSubgraphDelay(networkName) { return ( { - arbitrumOne: 1000, + arbitrumOne: 10000, }[networkName] || 10 ); } diff --git a/app/Network.ts b/app/Network.ts index 1cd309d..b880572 100644 --- a/app/Network.ts +++ b/app/Network.ts @@ -6,9 +6,12 @@ import { NetworkConfig, Resolver, } from './Types.js'; -import { bigintToHex } from './Utils.js'; +import { bigintToHex, toChecksummedAddress } from './Utils.js'; +import pIteration from 'p-iteration'; import { ethers } from 'ethers'; +import EventEmitter from 'events'; import { + getAgentVersionAndType, getAverageBlockTime, getDefaultNetworkConfig, getExternalLensAddress, @@ -17,10 +20,13 @@ import { } from './ConfigGetters.js'; import { getExternalLensAbi, getMulticall2Abi } from './services/AbiService.js'; import { EthersContractWrapperFactory } from './clients/EthersContractWrapperFactory.js'; -import EventEmitter from 'events'; -import { App } from './App'; -import logger from './services/Logger.js'; +import { App } from './App.js'; +import logger, { updateSentryScope } from './services/Logger.js'; import ContractEventsEmitter from './services/ContractEventsEmitter.js'; +import { SubgraphSource } from './dataSources/SubgraphSource.js'; +import { BlockchainSource } from './dataSources/BlockchainSource.js'; +import { AgentRandao_2_3_0 } from './agents/Agent.2.3.0.randao.js'; +import { AgentLight_2_2_0 } from './agents/Agent.2.2.0.light.js'; interface ResolverJobWithCallback { lastSuccessBlock?: bigint; @@ -61,6 +67,7 @@ export class Network { private currentBlockDelay: number; private latestBaseFee: bigint; + private agentsStartBlockNumber: bigint; private latestBlockNumber: bigint; private latestBlockTimestamp: bigint; @@ -76,7 +83,7 @@ export class Network { return new Error(`NetworkError${this.toString()}: ${args.join(' ')}`); } - constructor(name: string, networkConfig: NetworkConfig, app: App, agents: IAgent[]) { + constructor(name: string, networkConfig: NetworkConfig, app: App) { this.initialized = false; this.app = app; this.name = name; @@ -85,7 +92,7 @@ export class Network { this.maxBlockDelay = networkConfig.max_block_delay; this.maxNewBlockDelay = networkConfig.max_new_block_delay; this.networkConfig = networkConfig; - this.agents = agents; + this.agents = this.buildAgents(); this.flashbotsRpc = networkConfig?.flashbots?.rpc; this.flashbotsAddress = networkConfig?.flashbots?.address; @@ -231,6 +238,32 @@ export class Network { }; } + private buildAgents(): IAgent[] { + const agents = []; + // TODO: get type & AgentConfig + for (const [address, agentConfig] of Object.entries(this.networkConfig.agents)) { + const checksummedAddress = toChecksummedAddress(address); + let { version, strategy } = agentConfig; + if (!version || !strategy) { + [version, strategy] = getAgentVersionAndType(checksummedAddress, this.name); + } + let agent; + + if (version.startsWith('2.') && strategy === 'randao') { + agent = new AgentRandao_2_3_0(checksummedAddress, agentConfig, this.name); + } else if (version.startsWith('2.') && strategy === 'light') { + agent = new AgentLight_2_2_0(checksummedAddress, agentConfig, this.name); + } else { + throw new Error( + `App: Not supported agent version/strategy: network=${this.name},version=${version},strategy=${strategy}`, + ); + } + + agents.push(agent); + } + return agents; + } + private initProvider() { this.provider = new ethers.providers.WebSocketProvider(this.rpc); this.fixProvider(this.provider); @@ -323,6 +356,74 @@ export class Network { 'info', `The network '${this.getName()}' has been initialized. The last block number: ${this.latestBlockNumber}`, ); + + await this.initAgents(); + + if (this.agentsStartBlockNumber < this.latestBlockNumber) { + let startBlockNumber = Number(this.agentsStartBlockNumber); + let diff = Number(this.latestBlockNumber) - startBlockNumber; + this.latestBlockNumber = this.agentsStartBlockNumber; + this.contractEventsEmitter.setBlockLogsMode(true); + this.clog('info', `Sync diff between sources: ${diff}. Start fetching this blocks manually...`); + + const step = 10; + while (diff > step) { + const count = diff > step ? step : diff; + const before = this.nowMs(); + const blocks = await pIteration.map(Array.from(Array(Number(count)).keys()), n => { + return this.queryBlock(+n + 1); + }); + + blocks.forEach(block => this._handleNewBlock(block, before)); + + if (this.contractEventsEmitter.blockLogsMode) { + this.contractEventsEmitter.emitByBlockLogs( + await this.provider.getLogs({ + fromBlock: Number(this.agentsStartBlockNumber) + 1, + toBlock: Number(this.agentsStartBlockNumber) + count, + }), + ); + } + + startBlockNumber += count; + diff = parseInt(await this.queryLatestBlock().then(b => b.number.toString())) - startBlockNumber; + } + this.contractEventsEmitter.setBlockLogsMode(false); + } + } + + private async initAgents() { + let lowBlockNumber; + for (const agent of this.getAgents()) { + let dataSource; + // TODO: Add support for different agents. Now if there are multiple agents, the tags linked to the latest one. + updateSentryScope( + this.getName(), + this.getFlashbotsRpc(), + agent.address, + agent.getKeyAddress(), + agent.dataSourceType, + agent.subgraphUrl, + ); + if (agent.dataSourceType === 'subgraph') { + dataSource = this.getAgentSubgraphDataSource(agent); + } else if (agent.dataSourceType === 'blockchain') { + dataSource = this.getAgentBlockchainDataSource(agent); + } else { + throw new Error(`App: missing dataSource for agent ${agent.address}`); + } + const syncBlockNumber = await agent.init(this, dataSource); + lowBlockNumber = !lowBlockNumber || syncBlockNumber < lowBlockNumber ? syncBlockNumber : lowBlockNumber; + } + this.agentsStartBlockNumber = lowBlockNumber; + } + + public getAgentSubgraphDataSource(agent) { + return new SubgraphSource(this, agent, agent.subgraphUrl); + } + + public getAgentBlockchainDataSource(agent) { + return new BlockchainSource(this, agent); } public stop() { @@ -351,20 +452,14 @@ export class Network { this.clog('error', `⚠️ Block not found (number=${blockNumber},before=${before},nowMs=${this.nowMs()})`); return null; } - const fetchBlockDelay = this.nowMs() - before; - if (process.env.NODE_ENV !== 'test') { - this.clog( - 'info', - `🧱 New block: (number=${blockNumber},timestamp=${block.timestamp},hash=${block.hash},txCount=${block.transactions.length},baseFee=${block.baseFeePerGas},fetchDelayMs=${fetchBlockDelay})`, - ); - } - this.latestBaseFee = BigInt(block.baseFeePerGas.toString()); - this.latestBlockTimestamp = BigInt(block.timestamp.toString()); - this.currentBlockDelay = this.nowS() - parseInt(block.timestamp.toString()); - this.newBlockEventEmitter.emit('newBlock', block.timestamp, blockNumber); + this._handleNewBlock(block, before); + this._walkThroughTheJobs(block.number, block.timestamp); - this.walkThroughTheJobs(blockNumber, block.timestamp); + if (this.contractEventsEmitter.blockLogsMode) { + const fromBlock = bigintToHex(blockNumber); + this.contractEventsEmitter.emitByBlockLogs(await this.provider.getLogs({ fromBlock, toBlock: fromBlock })); + } setTimeout(async () => { if (this.latestBlockNumber > blockNumber) { @@ -384,12 +479,22 @@ export class Network { } while (block); }, this.maxNewBlockDelay * 1000); - if (this.contractEventsEmitter.blockLogsMode) { - const fromBlock = bigintToHex(blockNumber); - this.contractEventsEmitter.emitByBlockLogs(await this.provider.getLogs({ fromBlock, toBlock: fromBlock })); + return block; + } + + private _handleNewBlock(block, before) { + const fetchBlockDelay = this.nowMs() - before; + if (process.env.NODE_ENV !== 'test') { + this.clog( + 'info', + `🧱 New block: (number=${block.number},timestamp=${block.timestamp},hash=${block.hash},txCount=${block.transactions.length},baseFee=${block.baseFeePerGas},fetchDelayMs=${fetchBlockDelay})`, + ); } + this.latestBaseFee = BigInt(block.baseFeePerGas.toString()); + this.latestBlockTimestamp = BigInt(block.timestamp.toString()); + this.currentBlockDelay = this.nowS() - parseInt(block.timestamp.toString()); - return block; + this.newBlockEventEmitter.emit('newBlock', block.timestamp, block.number); } public isBlockDelayAboveMax() { @@ -412,7 +517,7 @@ export class Network { return this.contractEventsEmitter.contractEmitter(contract); } - private walkThroughTheJobs(blockNumber: number, blockTimestamp: number) { + private _walkThroughTheJobs(blockNumber: number, blockTimestamp: number) { this.triggerIntervalCallbacks(blockNumber, blockTimestamp); this.callResolversAndTriggerCallbacks(blockNumber); } @@ -486,12 +591,6 @@ export class Network { } } - private _validateKeyInMap(key: string, map: { [key: string]: object }, type: string): void { - if (!map[key]) { - throw this.err(`Callback key already exists: type=${type},key=${key}`); - } - } - public registerTimeout(key: string, triggerCallbackAfter: number, callback: (blockTimestamp: number) => void) { this._validateKeyLength(key, 'interval'); this._validateKeyNotInMap(key, this.timeoutData, 'interval'); diff --git a/app/Types.ts b/app/Types.ts index ce652b3..724faa3 100644 --- a/app/Types.ts +++ b/app/Types.ts @@ -360,12 +360,22 @@ export interface IRandaoAgent extends IAgent { export interface IDataSource { getType(): string; - getBlocksDelay(): Promise; - getRegisteredJobs(_context): Promise>; - getOwnersBalances(context, jobOwnersSet: Set): Promise>; + getBlocksDelay(): Promise<{ diff: bigint; nodeBlockNumber: bigint; sourceBlockNumber: bigint }>; + getRegisteredJobs(_context): Promise<{ data: Map; meta: SourceMetadata }>; + getOwnersBalances( + context, + jobOwnersSet: Set, + ): Promise<{ data: Map; meta: SourceMetadata }>; addLensFieldsToOneJob(newJobs: RandaoJob | LightJob): void; } +export interface SourceMetadata { + isSynced: boolean; + diff: bigint; + nodeBlockNumber: bigint; + sourceBlockNumber: bigint; +} + export interface IAgent { readonly executorType: ExecutorType; readonly address: string; diff --git a/app/agents/AbstractAgent.ts b/app/agents/AbstractAgent.ts index cc22b87..b64ea3b 100644 --- a/app/agents/AbstractAgent.ts +++ b/app/agents/AbstractAgent.ts @@ -10,13 +10,14 @@ import { IAgent, IDataSource, Resolver, + SourceMetadata, TxEnvelope, TxGasUpdate, UnsignedTransaction, } from '../Types.js'; import { BigNumber, ethers, Wallet } from 'ethers'; import { getEncryptedJson } from '../services/KeyService.js'; -import { AVERAGE_BLOCK_TIME_SECONDS, BN_ZERO, DEFAULT_SYNC_FROM_CHAINS } from '../Constants.js'; +import { BN_ZERO, DEFAULT_SYNC_FROM_CHAINS } from '../Constants.js'; import { filterFunctionResultObject, numberToBigInt, toChecksummedAddress, weiValueToEth } from '../Utils.js'; import { FlashbotsExecutor } from '../executors/FlashbotsExecutor.js'; import { PGAExecutor } from '../executors/PGAExecutor.js'; @@ -167,7 +168,7 @@ export abstract class AbstractAgent implements IAgent { this.clog('debug', 'Sync from', this.fullSyncFrom); } - public async init(network: Network, dataSource: IDataSource) { + public async init(network: Network, dataSource: IDataSource): Promise { this.network = network; this.dataSource = dataSource; @@ -273,6 +274,7 @@ export abstract class AbstractAgent implements IAgent { await this._afterInit(); this.clog('info', '✅ Agent initialization done!'); + return upTo; } private async initKeeperWorkerKey() { @@ -426,13 +428,14 @@ export abstract class AbstractAgent implements IAgent { * 4. Handle SetJobResolver events * @private */ - private async resyncAllJobs(skipRepeat = false): Promise { + private async resyncAllJobs(): Promise { this.clog('info', 'resyncAllJobs start'); - const latestBock = this.network.getLatestBlockNumber(); + let latestBock = this.network.getLatestBlockNumber(); // 1. init jobs - let newJobs = new Map(); + let newJobs = new Map(), + sourceMeta: SourceMetadata = null; //TODO: handle timeout error on getting all jobs from blockchain - newJobs = await this.dataSource.getRegisteredJobs(this); + ({ data: newJobs, meta: sourceMeta } = await this.dataSource.getRegisteredJobs(this)); // 2. set owners const jobOwnersSet = new Set(); @@ -450,18 +453,13 @@ export abstract class AbstractAgent implements IAgent { } // 3. Load job owner balances - this.ownerBalances = await this.dataSource.getOwnersBalances(this, jobOwnersSet); + this.ownerBalances = await this.dataSource.getOwnersBalances(this, jobOwnersSet).then(r => r.data); this.jobs = newJobs; await this.startAllJobs(); - if (!skipRepeat && this.dataSource.getType() === 'subgraph' && this.networkName !== 'testnet') { - const blockDelay = await this.dataSource.getBlocksDelay().catch(() => null); - if (blockDelay > 10) { - setTimeout(() => { - this.resyncAllJobs(true); - }, parseInt(blockDelay.toString()) * AVERAGE_BLOCK_TIME_SECONDS[this.networkName]); - } + if (this.dataSource.getType() === 'subgraph' && this.networkName !== 'testnet' && sourceMeta.diff > 10) { + latestBock = sourceMeta.sourceBlockNumber; } this.clog('info', `resyncAllJobs end (${Array.from(this.jobs.keys()).length} synced)`); diff --git a/app/dataSources/AbstractSource.ts b/app/dataSources/AbstractSource.ts index 58d0061..78cf604 100644 --- a/app/dataSources/AbstractSource.ts +++ b/app/dataSources/AbstractSource.ts @@ -1,5 +1,5 @@ import { Network } from '../Network'; -import { IAgent, IDataSource } from '../Types'; +import { IAgent, IDataSource, SourceMetadata } from '../Types'; import { RandaoJob } from '../jobs/RandaoJob'; import { LightJob } from '../jobs/LightJob'; import { BigNumber } from 'ethers'; @@ -31,8 +31,11 @@ export abstract class AbstractSource implements IDataSource { return new Error(`AbstractDataSourceError${this.toString()}: ${args.join(' ')}`); } - abstract getRegisteredJobs(_context): Promise>; - abstract getOwnersBalances(context, jobOwnersSet: Set): Promise>; + abstract getRegisteredJobs(_context): Promise<{ data: Map; meta: SourceMetadata }>; + abstract getOwnersBalances( + context, + jobOwnersSet: Set, + ): Promise<{ data: Map; meta: SourceMetadata }>; abstract addLensFieldsToOneJob(newJobs: RandaoJob | LightJob): void; /** @@ -53,7 +56,7 @@ export abstract class AbstractSource implements IDataSource { return this.type; } - async getBlocksDelay(): Promise { + async getBlocksDelay(): Promise<{ diff: bigint; nodeBlockNumber: bigint; sourceBlockNumber: bigint }> { return null; } } diff --git a/app/dataSources/BlockchainSource.ts b/app/dataSources/BlockchainSource.ts index 83d33d5..07f2352 100644 --- a/app/dataSources/BlockchainSource.ts +++ b/app/dataSources/BlockchainSource.ts @@ -2,7 +2,7 @@ import { AbstractSource } from './AbstractSource.js'; import { RandaoJob } from '../jobs/RandaoJob'; import { LightJob } from '../jobs/LightJob'; import { Network } from '../Network'; -import { IAgent } from '../Types'; +import { IAgent, SourceMetadata } from '../Types'; import { BigNumber } from 'ethers'; import { chunkArray, flattenArray, parseConfig } from '../Utils.js'; import pIteration from 'p-iteration'; @@ -24,7 +24,7 @@ export class BlockchainSource extends AbstractSource { * * @return Promise> */ - async getRegisteredJobs(context): Promise> { + async getRegisteredJobs(context): Promise<{ data: Map; meta: SourceMetadata }> { const latestBock = this.network.getLatestBlockNumber(); // TODO: check latestBlock not null const registerLogs = await this.agent.queryPastEvents('RegisterJob', context.fullSyncFrom, Number(latestBock)); @@ -35,7 +35,7 @@ export class BlockchainSource extends AbstractSource { // fetching additional fields from lens await this.addLensFieldsToJobs(newJobs); - return newJobs; + return { data: newJobs, meta: null }; } /** @@ -43,7 +43,10 @@ export class BlockchainSource extends AbstractSource { * @param context - agent context * @param jobOwnersSet - array of jobOwners addresses */ - async getOwnersBalances(context, jobOwnersSet: Set): Promise> { + async getOwnersBalances( + context, + jobOwnersSet: Set, + ): Promise<{ data: Map; meta: SourceMetadata }> { const jobOwnersArray = Array.from(jobOwnersSet); const res = await this.network.queryLensOwnerBalances(context.address, jobOwnersArray); const jobOwnerBalances: Array = res.results; @@ -51,7 +54,7 @@ export class BlockchainSource extends AbstractSource { for (let i = 0; i < jobOwnersArray.length; i++) { result.set(jobOwnersArray[i], jobOwnerBalances[i]); } - return result; + return { data: result, meta: null }; } /** diff --git a/app/dataSources/SubgraphSource.ts b/app/dataSources/SubgraphSource.ts index 931774b..5d3859c 100644 --- a/app/dataSources/SubgraphSource.ts +++ b/app/dataSources/SubgraphSource.ts @@ -4,7 +4,7 @@ import { BlockchainSource } from './BlockchainSource.js'; import { RandaoJob } from '../jobs/RandaoJob'; import { LightJob } from '../jobs/LightJob'; import { Network } from '../Network'; -import { IAgent } from '../Types'; +import { IAgent, SourceMetadata } from '../Types'; import { BigNumber, utils } from 'ethers'; import { toChecksummedAddress } from '../Utils.js'; import logger from '../services/Logger.js'; @@ -104,26 +104,30 @@ export class SubgraphSource extends AbstractSource { /** * Checking if our graph is existing and synced */ - async isGraphOk(): Promise { + async getGraphStatus(): Promise { try { - const diff = await this.getBlocksDelay(); + const { diff, nodeBlockNumber, sourceBlockNumber } = await this.getBlocksDelay(); const isSynced = diff <= getMaxBlocksSubgraphDelay(this.network.getName()); // Our graph is desynced if its behind for more than 10 blocks if (!isSynced) { this.clog('error', `Subgraph is ${diff} blocks behind.`); } - return isSynced; + return { isSynced, diff, nodeBlockNumber, sourceBlockNumber }; } catch (e) { this.clog('error', 'Graph meta query error:', e); - return false; + return { isSynced: false, diff: null, nodeBlockNumber: null, sourceBlockNumber: null }; } } - async getBlocksDelay(): Promise { + async getBlocksDelay(): Promise<{ diff: bigint; nodeBlockNumber: bigint; sourceBlockNumber: bigint }> { const [latestBock, { _meta }] = await Promise.all([ this.network.getLatestBlockNumber(), this.query(this.subgraphUrl, QUERY_META), ]); - return latestBock - BigInt(_meta.block.number); + return { + diff: latestBock - BigInt(_meta.block.number), + nodeBlockNumber: latestBock, + sourceBlockNumber: latestBock, + }; } /** @@ -134,13 +138,13 @@ export class SubgraphSource extends AbstractSource { * * @return Promise> */ - async getRegisteredJobs(context): Promise> { + async getRegisteredJobs(context): Promise<{ data: Map; meta: SourceMetadata }> { let newJobs = new Map(); - const isSynced = await this.isGraphOk(); - if (!isSynced) { + const graphStatus = await this.getGraphStatus(); + if (!graphStatus.isSynced) { this.clog('warn', 'Subgraph is not ok, falling back to the blockchain datasource.'); - newJobs = await this.blockchainSource.getRegisteredJobs(context); - return newJobs; + ({ data: newJobs } = await this.blockchainSource.getRegisteredJobs(context)); + return { data: newJobs, meta: graphStatus }; } try { const res = await this.query(this.subgraphUrl, QUERY_ALL_JOBS); @@ -165,7 +169,7 @@ export class SubgraphSource extends AbstractSource { } catch (e) { throw this.err(e); } - return newJobs; + return { data: newJobs, meta: graphStatus }; } /** @@ -226,14 +230,18 @@ export class SubgraphSource extends AbstractSource { * @param context - agent context * @param jobOwnersSet - array of jobOwners addresses */ - public async getOwnersBalances(context, jobOwnersSet: Set): Promise> { - let result = new Map(); + public async getOwnersBalances( + context, + jobOwnersSet: Set, + ): Promise<{ data: Map; meta: SourceMetadata }> { + let result = new Map(), + graphStatus; try { - const isSynced = await this.isGraphOk(); - if (!isSynced) { + graphStatus = await this.getGraphStatus(); + if (!graphStatus.isSynced) { this.clog('warn', 'Subgraph is not ok, falling back to the blockchain datasource.'); - result = await this.blockchainSource.getOwnersBalances(context, jobOwnersSet); - return result; + ({ data: result } = await this.blockchainSource.getOwnersBalances(context, jobOwnersSet)); + return { data: result, meta: graphStatus }; } const { jobOwners } = await this.query(this.subgraphUrl, QUERY_JOB_OWNERS); @@ -243,6 +251,6 @@ export class SubgraphSource extends AbstractSource { } catch (e) { throw this.err(e); } - return result; + return { data: result, meta: graphStatus }; } } diff --git a/app/services/Logger.ts b/app/services/Logger.ts index 40287c8..946d7b9 100644 --- a/app/services/Logger.ts +++ b/app/services/Logger.ts @@ -14,7 +14,6 @@ const isTest = process.env.NODE_ENV === 'test'; let useSentry = false; -// @ts-ignore class SentryTransport extends Transport { constructor(opts?: TransportStreamOptions) { super(opts); @@ -39,6 +38,38 @@ class SentryTransport extends Transport { callback(); } + + eventNames(): Array { + return undefined; + } + + getMaxListeners(): number { + return 0; + } + + listenerCount(_: string | symbol, __?: any): number { + return 0; + } + + listeners(_: string | symbol): any[] { + return []; + } + + off(_: string | symbol, __: (...args: any[]) => void): this { + return undefined; + } + + rawListeners(_: string | symbol): any[] { + return []; + } + + removeAllListeners(_?: string | symbol): this { + return undefined; + } + + setMaxListeners(_: number): this { + return undefined; + } } const consoleFormat = winston.format.combine( diff --git a/tests/e2e/RandaoAgent.test.ts b/tests/e2e/RandaoAgent.test.ts index 0c238a3..35d1830 100644 --- a/tests/e2e/RandaoAgent.test.ts +++ b/tests/e2e/RandaoAgent.test.ts @@ -1,13 +1,11 @@ // @ts-ignore import sinon from 'sinon'; -import { AgentRandao_2_3_0 } from '../../app/agents/Agent.2.3.0.randao.js'; import { Network } from '../../app/Network.js'; import { App } from '../../app/App.js'; import { assert } from 'chai'; -import { AGENT_ADDRESS, AGENT_CONFIG, APP_CONFIG, NETWORK_CONFIG } from '../constants.js'; +import { APP_CONFIG, NETWORK_CONFIG } from '../constants.js'; import { stubAgent, stubNetwork } from '../stubs.js'; import { QUERY_ALL_JOBS, QUERY_JOB_OWNERS, QUERY_META, SubgraphSource } from '../../app/dataSources/SubgraphSource.js'; -import { BlockchainSource } from '../../app/dataSources/BlockchainSource.js'; import { toChecksummedAddress } from '../../app/Utils.js'; import { GOOD_RESOLVER_JOB_KEY, @@ -22,46 +20,46 @@ describe('AgentRandao_2_3_0', () => { it('should initialize empty agent correctly', async () => { const app = new App(APP_CONFIG); - const agent = new AgentRandao_2_3_0(AGENT_ADDRESS, AGENT_CONFIG, 'testnet'); - stubAgent(agent); - - const network = new Network('testnet', NETWORK_CONFIG, app, [agent]); + const network = new Network('testnet', NETWORK_CONFIG, app); stubNetwork(network); + const [agent] = network.getAgents(); + stubAgent(agent); await network.init(); - const dataSource = new BlockchainSource(network, agent); - await agent.init(network, dataSource); assert.equal(agent.getJobsCount().total, 0); assert.equal(agent.getCfg(), 0); }); let app, network, agent; - async function loadJobs(jobsResponse: any) { app = new App(APP_CONFIG); - agent = new AgentRandao_2_3_0(AGENT_ADDRESS, AGENT_CONFIG, 'testnet'); - stubAgent(agent); - - network = new Network('testnet', NETWORK_CONFIG, app, [agent]); + network = new Network('testnet', NETWORK_CONFIG, app); stubNetwork(network); - const subgraphUrl = 'stubSubgraphUrl'; - const dataSource = new SubgraphSource(network, agent, subgraphUrl); - await network.init(); + let mock; + sinon.stub(network, 'getAgentBlockchainDataSource').callsFake(function (a: any) { + const subgraphUrl = 'stubSubgraphUrl'; + stubAgent(a); + const dataSource = new SubgraphSource(network, a, subgraphUrl); + + mock = sinon.mock(dataSource); + mock + .expects('query') + .twice() + .withArgs(subgraphUrl, QUERY_META) + .returns({ + _meta: { block: { number: 4000530 } }, + }); + mock.expects('query').once().withArgs(subgraphUrl, QUERY_ALL_JOBS).returns(jobsResponse); + mock.expects('query').once().withArgs(subgraphUrl, QUERY_JOB_OWNERS).returns(JOB_OWNERS_RESPONSE); + + return dataSource; + }); - const mock = sinon.mock(dataSource); - mock - .expects('query') - .twice() - .withArgs(subgraphUrl, QUERY_META) - .returns({ - _meta: { block: { number: 4000530 } }, - }); - mock.expects('query').once().withArgs(subgraphUrl, QUERY_ALL_JOBS).returns(jobsResponse); - mock.expects('query').once().withArgs(subgraphUrl, QUERY_JOB_OWNERS).returns(JOB_OWNERS_RESPONSE); - await agent.init(network, dataSource); + await network.init(); + [agent] = network.getAgents(); mock.verify(); }