From 156c1e209dd1090f62f6338e0e9452e6cd72e754 Mon Sep 17 00:00:00 2001 From: eitelvolkerts Date: Thu, 4 Jan 2024 18:29:18 +0300 Subject: [PATCH] Refactoring Oleg's contributions; this one is for the addition of a subquery datasource and the necessary infrastructural changes --- app/Network.ts | 7 + app/Types.ts | 2 +- app/agents/AbstractAgent.ts | 8 + app/dataSources/SubgraphSource.ts | 2 +- app/dataSources/SubquerySource.ts | 262 ++++++++++++++++++++++++++++++ 5 files changed, 279 insertions(+), 2 deletions(-) create mode 100644 app/dataSources/SubquerySource.ts diff --git a/app/Network.ts b/app/Network.ts index db5b72c..24e001d 100644 --- a/app/Network.ts +++ b/app/Network.ts @@ -24,6 +24,7 @@ import { App } from './App.js'; import logger, { updateSentryScope } from './services/Logger.js'; import ContractEventsEmitter from './services/ContractEventsEmitter.js'; import { SubgraphSource } from './dataSources/SubgraphSource.js'; +import { SubquerySource } from './dataSources/SubquerySource.js'; import { BlockchainSource } from './dataSources/BlockchainSource.js'; import { AgentRandao_2_3_0 } from './agents/Agent.2.3.0.randao.js'; import { AgentLight_2_2_0 } from './agents/Agent.2.2.0.light.js'; @@ -408,6 +409,8 @@ export class Network { ); if (agent.dataSourceType === 'subgraph') { dataSource = this.getAgentSubgraphDataSource(agent); + } else if (agent.dataSourceType === 'subquery') { //allows subquery source alongside the subgraph one + dataSource = this.getAgentSubqueryDataSource(agent); } else if (agent.dataSourceType === 'blockchain') { dataSource = this.getAgentBlockchainDataSource(agent); } else { @@ -423,6 +426,10 @@ export class Network { return new SubgraphSource(this, agent, agent.subgraphUrl); } + public getAgentSubqueryDataSource(agent) { + return new SubquerySource(this, agent, agent.subgraphUrl); + } + public getAgentBlockchainDataSource(agent) { return new BlockchainSource(this, agent); } diff --git a/app/Types.ts b/app/Types.ts index 724faa3..6118a61 100644 --- a/app/Types.ts +++ b/app/Types.ts @@ -11,7 +11,7 @@ import { AbstractJob } from './jobs/AbstractJob'; export type AvailableNetworkNames = 'mainnet' | 'bsc' | 'polygon' | 'goerli'; export type ExecutorType = 'flashbots' | 'pga'; export type Strategy = 'randao' | 'light'; -export type DataSourceType = 'blockchain' | 'subgraph'; +export type DataSourceType = 'blockchain' | 'subgraph' | 'subquery'; export enum CALLDATA_SOURCE { SELECTOR, diff --git a/app/agents/AbstractAgent.ts b/app/agents/AbstractAgent.ts index b64ea3b..6841709 100644 --- a/app/agents/AbstractAgent.ts +++ b/app/agents/AbstractAgent.ts @@ -114,6 +114,14 @@ export abstract class AbstractAgent implements IAgent { } this.dataSourceType = 'subgraph'; this.subgraphUrl = agentConfig.subgraph_url; + } else if (agentConfig.data_source === 'subquery') { + if (!agentConfig.subgraph_url) { + throw new Error( + "Please set 'subgraph_url' if you want to proceed with {'data_source': 'subquery'}. Notice that 'graph_url' is deprecated so please change it to 'subgraph_url'.", + ); + } + this.dataSourceType = 'subquery'; + this.subgraphUrl = agentConfig.subgraph_url; } else if (agentConfig.data_source === 'blockchain') { this.dataSourceType = 'blockchain'; } else { diff --git a/app/dataSources/SubgraphSource.ts b/app/dataSources/SubgraphSource.ts index 5d3859c..893d166 100644 --- a/app/dataSources/SubgraphSource.ts +++ b/app/dataSources/SubgraphSource.ts @@ -253,4 +253,4 @@ export class SubgraphSource extends AbstractSource { } return { data: result, meta: graphStatus }; } -} +} \ No newline at end of file diff --git a/app/dataSources/SubquerySource.ts b/app/dataSources/SubquerySource.ts new file mode 100644 index 0000000..98641e3 --- /dev/null +++ b/app/dataSources/SubquerySource.ts @@ -0,0 +1,262 @@ +import axios from 'axios'; +import { AbstractSource } from './AbstractSource.js'; +import { BlockchainSource } from './BlockchainSource.js'; +import { RandaoJob } from '../jobs/RandaoJob'; +import { LightJob } from '../jobs/LightJob'; +import { Network } from '../Network'; +import { IAgent, SourceMetadata } from '../Types'; +import { BigNumber, utils } from 'ethers'; +import { toChecksummedAddress } from '../Utils.js'; +import logger from '../services/Logger.js'; +import { getMaxBlocksSubgraphDelay } from '../ConfigGetters.js'; + +//the different structure mirrors the structure of the subquery database, made this way due to a lack of priori documentation on subgraph +//everything else mirrors the subgraph exactly +export const QUERY_ALL_JOBS = `{ + jobs(first: 1000) { + nodes { + id + active + jobAddress + jobId + assertResolverSelector + credits + calldataSource + fixedReward + jobSelector + lastExecutionAt + minKeeperCVP + preDefinedCalldata + intervalSeconds + resolverAddress + resolverCalldata + useJobOwnerCredits + owner { + id + } + pendingOwner { + id + } + jobCreatedAt + jobNextKeeperId + jobReservedSlasherId + jobSlashingPossibleAfter + } + } +}`; + +export const QUERY_META = `{ + _metadata { + lastProcessedHeight + } +}`; + +export const QUERY_JOB_OWNERS = `{ + jobOwners { + nodes { + id + credits + } + } +}`; + +/** + * This class used for fetching data from subgraph + */ +export class SubquerySource extends AbstractSource { + private blockchainSource: BlockchainSource; + private readonly subgraphUrl: string; + + private toString(): string { + return `(url: ${this.subgraphUrl})`; + } + + private clog(level: string, ...args: unknown[]) { + logger.log(level, `SubgraphDataSource${this.toString()}: ${args.join(' ')}`); + } + + private err(...args: unknown[]): Error { + return new Error(`SubgraphDataSourceError${this.toString()}: ${args.join(' ')}`); + } + + constructor(network: Network, agent: IAgent, graphUrl: string) { + super(network, agent); + this.type = 'subgraph'; + this.subgraphUrl = graphUrl; + + this.blockchainSource = new BlockchainSource(network, agent); + } + + /** + * A query builder for requests. + * It's using axios because "fetch" is bad at error handling (all error codes except network error is 200). + * @param endpoint + * @param query + */ + async query(endpoint, query) { + const res = await axios.post(endpoint, { query }); + if (res.data.errors) { + let locations = ''; + if ('locations' in res.data.errors[0]) { + locations = `Locations: ${JSON.stringify(res.data.errors[0].locations)}. `; + } + throw new Error(`Subgraph query error: ${res.data.errors[0].message}. ${locations}Executed query:\n${query}\n`); + } + return res.data.data; + } + + /** + * Checking if our graph is existing and synced + */ + async getGraphStatus(): Promise { + try { + const { diff, nodeBlockNumber, sourceBlockNumber } = await this.getBlocksDelay(); + const isSynced = diff <= getMaxBlocksSubgraphDelay(this.network.getName()); // Our graph is desynced if its behind for more than 10 blocks + if (!isSynced) { + this.clog('error', `Subgraph is ${diff} blocks behind.`); + } + return { isSynced, diff, nodeBlockNumber, sourceBlockNumber }; + } catch (e) { + this.clog('error', 'Graph meta query error:', e); + return { isSynced: false, diff: null, nodeBlockNumber: null, sourceBlockNumber: null }; + } + } + + async getBlocksDelay(): Promise<{ diff: bigint; nodeBlockNumber: bigint; sourceBlockNumber: bigint }> { + const [latestBock, { _metadata }] = await Promise.all([ + this.network.getLatestBlockNumber(), + this.query(this.subgraphUrl, QUERY_META), + ]); + return { + diff: latestBock - BigInt(_metadata.lastProcessedHeight), + nodeBlockNumber: latestBock, + sourceBlockNumber: latestBock, + }; + } + + /** + * Getting a list of jobs from subgraph and initialise job. + * Returns Map structure which key is jobKey and value is instance of RandaoJob or LightJob. Await is required. + * + * @param context - agent caller context. This can be Agent.2.2.0.light or Agent.2.3.0.randao + * + * @return Promise> + */ + async getRegisteredJobs(context): Promise<{ data: Map; meta: SourceMetadata }> { + let newJobs = new Map(); + const graphStatus = await this.getGraphStatus(); + if (!graphStatus.isSynced) { + this.clog('warn', 'Subgraph is not ok, falling back to the blockchain datasource.'); + ({ data: newJobs } = await this.blockchainSource.getRegisteredJobs(context)); + return { data: newJobs, meta: graphStatus }; + } + try { + const res = await this.query(this.subgraphUrl, QUERY_ALL_JOBS); + const { jobs } = res; + const { nodes } = jobs; + nodes.forEach(job => { + const newJob = context._buildNewJob({ + name: 'RegisterJob', + args: { + jobAddress: job.jobAddress, + jobId: BigNumber.from(job.jobId), + jobKey: job.id, + }, + }); + const lensJob = this.addLensFieldsToJobs(job); + newJob.applyJob({ + ...lensJob, + owner: lensJob.owner, + config: lensJob.config, + }); + newJobs.set(job.id, newJob); + }); + } catch (e) { + throw this.err(e); + } + return { data: newJobs, meta: graphStatus }; + } + + /** + * here we can populate job with full graph data, as if we made a request to getJobs lens method. + * But we already hale all the data + * @param graphData + */ + private addLensFieldsToJobs(graphData) { + const lensFields: any = {}; + // setting an owner + lensFields.owner = utils.getAddress(this._checkNullAddress(graphData.owner, true, 'id')); + // if job is about to get transferred setting future owner address. Otherwise, null address + lensFields.pendingTransfer = this._checkNullAddress(graphData.pendingOwner, true, 'id'); + // transfer min cvp into bigNumber as it's returned in big number when getting data from blockchain. Data consistency. + lensFields.jobLevelMinKeeperCvp = BigNumber.from(graphData.minKeeperCVP); + // From graph zero predefinedcalldata is returned as null, but from blockchain its 0x + lensFields.preDefinedCalldata = this._checkNullAddress(graphData.preDefinedCalldata); + + // setting a resolver field + lensFields.resolver = { + resolverCalldata: this._checkNullAddress(graphData.resolverCalldata), + resolverAddress: this._checkNullAddress(graphData.resolverAddress, true), + }; + // setting randao data + lensFields.randaoData = { + jobNextKeeperId: BigNumber.from(graphData.jobNextKeeperId), + jobReservedSlasherId: BigNumber.from(graphData.jobReservedSlasherId), + jobSlashingPossibleAfter: BigNumber.from(graphData.jobSlashingPossibleAfter), + jobCreatedAt: BigNumber.from(graphData.jobCreatedAt), + }; + // setting details + lensFields.details = { + selector: graphData.jobSelector, + credits: BigNumber.from(graphData.credits), + maxBaseFeeGwei: parseInt(graphData.maxBaseFeeGwei), + rewardPct: parseInt(graphData.rewardPct), + fixedReward: parseInt(graphData.fixedReward), + calldataSource: parseInt(graphData.calldataSource), + intervalSeconds: parseInt(graphData.intervalSeconds), + lastExecutionAt: parseInt(graphData.lastExecutionAt), + }; + // with SubquerySource you don't need to use parseConfig. You can create config field right here. + lensFields.config = { + isActive: graphData.active, + useJobOwnerCredits: graphData.useJobOwnerCredits, + assertResolverSelector: graphData.assertResolverSelector, + checkKeeperMinCvpDeposit: +graphData.minKeeperCVP > 0, + }; + return lensFields; + } + + public async addLensFieldsToOneJob(newJob: LightJob | RandaoJob) { + return this.blockchainSource.addLensFieldsToOneJob(newJob); + } + + /** + * Gets job owner's balances from subgraph + * @param context - agent context + * @param jobOwnersSet - array of jobOwners addresses + */ + public async getOwnersBalances( + context, + jobOwnersSet: Set, + ): Promise<{ data: Map; meta: SourceMetadata }> { + let result = new Map(), + graphStatus; + try { + graphStatus = await this.getGraphStatus(); + if (!graphStatus.isSynced) { + this.clog('warn', 'Subgraph is not ok, falling back to the blockchain datasource.'); + ({ data: result } = await this.blockchainSource.getOwnersBalances(context, jobOwnersSet)); + return { data: result, meta: graphStatus }; + } + + const { jobOwners } = await this.query(this.subgraphUrl, QUERY_JOB_OWNERS); + const { nodes } = jobOwners; + nodes.forEach(JobOwner => { + result.set(toChecksummedAddress(JobOwner.id), BigNumber.from(JobOwner.credits)); + }); + } catch (e) { + throw this.err(e); + } + return { data: result, meta: graphStatus }; + } +}