Skip to content

Commit

Permalink
improve data fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
defi-dev committed Dec 14, 2023
1 parent 6ef222a commit 4a1d8f0
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 169 deletions.
72 changes: 8 additions & 64 deletions app/App.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import { Config, AllNetworksConfig, IAgent, AgentConfig } from './Types';
import { Config, AllNetworksConfig } from './Types';
import { Network } from './Network.js';
import { toChecksummedAddress } from './Utils.js';
import { initApi } from './Api.js';
import { getAgentVersionAndType } from './ConfigGetters.js';
import { AgentRandao_2_3_0 } from './agents/Agent.2.3.0.randao.js';
import { AgentLight_2_2_0 } from './agents/Agent.2.2.0.light.js';
import { SubgraphSource } from './dataSources/SubgraphSource.js';
import { BlockchainSource } from './dataSources/BlockchainSource.js';
import logger, { updateSentryScope } from './services/Logger.js';
import logger from './services/Logger.js';

export class App {
private networks: { [key: string]: Network };
Expand Down Expand Up @@ -87,8 +81,7 @@ export class App {

for (const [netName, netConfig] of Object.entries(allNetworkConfigs.details)) {
if (allNetworkConfigs.enabled.includes(netName)) {
const agents = this.buildAgents(netName, netConfig.agents);
networks[netName] = new Network(netName, netConfig, this, agents);
networks[netName] = new Network(netName, netConfig, this);
} else {
logger.debug(`App: Skipping ${netName} network...`);
}
Expand All @@ -97,65 +90,16 @@ export class App {
return networks;
}

public buildAgents(networkName: string, agentsConfig: { [key: string]: AgentConfig }): IAgent[] {
const agents = [];
// TODO: get type & AgentConfig
for (const [address, agentConfig] of Object.entries(agentsConfig)) {
const checksummedAddress = toChecksummedAddress(address);
let { version, strategy } = agentConfig;
if (!version || !strategy) {
[version, strategy] = getAgentVersionAndType(checksummedAddress, networkName);
}
let agent;

if (version.startsWith('2.') && strategy === 'randao') {
agent = new AgentRandao_2_3_0(checksummedAddress, agentConfig, networkName);
} else if (version.startsWith('2.') && strategy === 'light') {
agent = new AgentLight_2_2_0(checksummedAddress, agentConfig, networkName);
} else {
throw new Error(
`App: Not supported agent version/strategy: network=${networkName},version=${version},strategy=${strategy}`,
);
}

agents.push(agent);
}
return agents;
}

public async initNetworks(networks: { [netName: string]: Network }) {
this.networks = networks;
logger.debug('App: Network initialization start...');
const inits = [];
for (const network of Object.values(this.networks)) {
inits.push(network.init());

for (const agent of network.getAgents()) {
let dataSource;
// TODO: Add support for different agents. Now if there are multiple agents, the tags linked to the latest one.
updateSentryScope(
network.getName(),
network.getFlashbotsRpc(),
agent.address,
agent.getKeyAddress(),
agent.dataSourceType,
agent.subgraphUrl,
);
if (agent.dataSourceType === 'subgraph') {
dataSource = new SubgraphSource(network, agent, agent.subgraphUrl);
} else if (agent.dataSourceType === 'blockchain') {
dataSource = new BlockchainSource(network, agent);
} else {
throw new Error(`App: missing dataSource for agent ${agent.address}`);
}
await agent.init(network, dataSource);
}
await network.init().catch(e => {
logger.error(`App: Network ${network.getName()} initialization failed ${e}`);
});
}
logger.debug('App: Waiting for all networks to be initialized...');
try {
await Promise.all(inits);
} catch (e) {
logger.error(`App: Networks initialization failed ${e}`);
if (!Object.values(this.networks).some(n => !!n.getChainId())) {
logger.error('App: Networks initialization failed');
process.exit(1);
}
logger.info('App: Networks initialization done!');
Expand Down
2 changes: 1 addition & 1 deletion app/ConfigGetters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export function getDefaultNetworkConfig(name) {
export function getMaxBlocksSubgraphDelay(networkName) {
return (
{
arbitrumOne: 1000,
arbitrumOne: 10000,
}[networkName] || 10
);
}
Expand Down
157 changes: 128 additions & 29 deletions app/Network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import {
NetworkConfig,
Resolver,
} from './Types.js';
import { bigintToHex } from './Utils.js';
import { bigintToHex, toChecksummedAddress } from './Utils.js';
import pIteration from 'p-iteration';
import { ethers } from 'ethers';
import EventEmitter from 'events';
import {
getAgentVersionAndType,
getAverageBlockTime,
getDefaultNetworkConfig,
getExternalLensAddress,
Expand All @@ -17,10 +20,13 @@ import {
} from './ConfigGetters.js';
import { getExternalLensAbi, getMulticall2Abi } from './services/AbiService.js';
import { EthersContractWrapperFactory } from './clients/EthersContractWrapperFactory.js';
import EventEmitter from 'events';
import { App } from './App';
import logger from './services/Logger.js';
import { App } from './App.js';
import logger, { updateSentryScope } from './services/Logger.js';
import ContractEventsEmitter from './services/ContractEventsEmitter.js';
import { SubgraphSource } from './dataSources/SubgraphSource.js';
import { BlockchainSource } from './dataSources/BlockchainSource.js';
import { AgentRandao_2_3_0 } from './agents/Agent.2.3.0.randao.js';
import { AgentLight_2_2_0 } from './agents/Agent.2.2.0.light.js';

interface ResolverJobWithCallback {
lastSuccessBlock?: bigint;
Expand Down Expand Up @@ -61,6 +67,7 @@ export class Network {

private currentBlockDelay: number;
private latestBaseFee: bigint;
private agentsStartBlockNumber: bigint;
private latestBlockNumber: bigint;
private latestBlockTimestamp: bigint;

Expand All @@ -76,7 +83,7 @@ export class Network {
return new Error(`NetworkError${this.toString()}: ${args.join(' ')}`);
}

constructor(name: string, networkConfig: NetworkConfig, app: App, agents: IAgent[]) {
constructor(name: string, networkConfig: NetworkConfig, app: App) {
this.initialized = false;
this.app = app;
this.name = name;
Expand All @@ -85,7 +92,7 @@ export class Network {
this.maxBlockDelay = networkConfig.max_block_delay;
this.maxNewBlockDelay = networkConfig.max_new_block_delay;
this.networkConfig = networkConfig;
this.agents = agents;
this.agents = this.buildAgents();

this.flashbotsRpc = networkConfig?.flashbots?.rpc;
this.flashbotsAddress = networkConfig?.flashbots?.address;
Expand Down Expand Up @@ -231,6 +238,32 @@ export class Network {
};
}

private buildAgents(): IAgent[] {
const agents = [];
// TODO: get type & AgentConfig
for (const [address, agentConfig] of Object.entries(this.networkConfig.agents)) {
const checksummedAddress = toChecksummedAddress(address);
let { version, strategy } = agentConfig;
if (!version || !strategy) {
[version, strategy] = getAgentVersionAndType(checksummedAddress, this.name);
}
let agent;

if (version.startsWith('2.') && strategy === 'randao') {
agent = new AgentRandao_2_3_0(checksummedAddress, agentConfig, this.name);
} else if (version.startsWith('2.') && strategy === 'light') {
agent = new AgentLight_2_2_0(checksummedAddress, agentConfig, this.name);
} else {
throw new Error(
`App: Not supported agent version/strategy: network=${this.name},version=${version},strategy=${strategy}`,
);
}

agents.push(agent);
}
return agents;
}

private initProvider() {
this.provider = new ethers.providers.WebSocketProvider(this.rpc);
this.fixProvider(this.provider);
Expand Down Expand Up @@ -323,6 +356,74 @@ export class Network {
'info',
`The network '${this.getName()}' has been initialized. The last block number: ${this.latestBlockNumber}`,
);

await this.initAgents();

if (this.agentsStartBlockNumber < this.latestBlockNumber) {
let startBlockNumber = Number(this.agentsStartBlockNumber);
let diff = Number(this.latestBlockNumber) - startBlockNumber;
this.latestBlockNumber = this.agentsStartBlockNumber;
this.contractEventsEmitter.setBlockLogsMode(true);
this.clog('info', `Sync diff between sources: ${diff}. Start fetching this blocks manually...`);

const step = 10;
while (diff > step) {
const count = diff > step ? step : diff;
const before = this.nowMs();
const blocks = await pIteration.map(Array.from(Array(Number(count)).keys()), n => {
return this.queryBlock(+n + 1);
});

blocks.forEach(block => this._handleNewBlock(block, before));

if (this.contractEventsEmitter.blockLogsMode) {
this.contractEventsEmitter.emitByBlockLogs(
await this.provider.getLogs({
fromBlock: Number(this.agentsStartBlockNumber) + 1,
toBlock: Number(this.agentsStartBlockNumber) + count,
}),
);
}

startBlockNumber += count;
diff = parseInt(await this.queryLatestBlock().then(b => b.number.toString())) - startBlockNumber;
}
this.contractEventsEmitter.setBlockLogsMode(false);
}
}

private async initAgents() {
let lowBlockNumber;
for (const agent of this.getAgents()) {
let dataSource;
// TODO: Add support for different agents. Now if there are multiple agents, the tags linked to the latest one.
updateSentryScope(
this.getName(),
this.getFlashbotsRpc(),
agent.address,
agent.getKeyAddress(),
agent.dataSourceType,
agent.subgraphUrl,
);
if (agent.dataSourceType === 'subgraph') {
dataSource = this.getAgentSubgraphDataSource(agent);
} else if (agent.dataSourceType === 'blockchain') {
dataSource = this.getAgentBlockchainDataSource(agent);
} else {
throw new Error(`App: missing dataSource for agent ${agent.address}`);
}
const syncBlockNumber = await agent.init(this, dataSource);
lowBlockNumber = !lowBlockNumber || syncBlockNumber < lowBlockNumber ? syncBlockNumber : lowBlockNumber;
}
this.agentsStartBlockNumber = lowBlockNumber;
}

public getAgentSubgraphDataSource(agent) {
return new SubgraphSource(this, agent, agent.subgraphUrl);
}

public getAgentBlockchainDataSource(agent) {
return new BlockchainSource(this, agent);
}

public stop() {
Expand Down Expand Up @@ -351,20 +452,14 @@ export class Network {
this.clog('error', `⚠️ Block not found (number=${blockNumber},before=${before},nowMs=${this.nowMs()})`);
return null;
}
const fetchBlockDelay = this.nowMs() - before;
if (process.env.NODE_ENV !== 'test') {
this.clog(
'info',
`🧱 New block: (number=${blockNumber},timestamp=${block.timestamp},hash=${block.hash},txCount=${block.transactions.length},baseFee=${block.baseFeePerGas},fetchDelayMs=${fetchBlockDelay})`,
);
}
this.latestBaseFee = BigInt(block.baseFeePerGas.toString());
this.latestBlockTimestamp = BigInt(block.timestamp.toString());
this.currentBlockDelay = this.nowS() - parseInt(block.timestamp.toString());

this.newBlockEventEmitter.emit('newBlock', block.timestamp, blockNumber);
this._handleNewBlock(block, before);
this._walkThroughTheJobs(block.number, block.timestamp);

this.walkThroughTheJobs(blockNumber, block.timestamp);
if (this.contractEventsEmitter.blockLogsMode) {
const fromBlock = bigintToHex(blockNumber);
this.contractEventsEmitter.emitByBlockLogs(await this.provider.getLogs({ fromBlock, toBlock: fromBlock }));
}

setTimeout(async () => {
if (this.latestBlockNumber > blockNumber) {
Expand All @@ -384,12 +479,22 @@ export class Network {
} while (block);
}, this.maxNewBlockDelay * 1000);

if (this.contractEventsEmitter.blockLogsMode) {
const fromBlock = bigintToHex(blockNumber);
this.contractEventsEmitter.emitByBlockLogs(await this.provider.getLogs({ fromBlock, toBlock: fromBlock }));
return block;
}

private _handleNewBlock(block, before) {
const fetchBlockDelay = this.nowMs() - before;
if (process.env.NODE_ENV !== 'test') {
this.clog(
'info',
`🧱 New block: (number=${block.number},timestamp=${block.timestamp},hash=${block.hash},txCount=${block.transactions.length},baseFee=${block.baseFeePerGas},fetchDelayMs=${fetchBlockDelay})`,
);
}
this.latestBaseFee = BigInt(block.baseFeePerGas.toString());
this.latestBlockTimestamp = BigInt(block.timestamp.toString());
this.currentBlockDelay = this.nowS() - parseInt(block.timestamp.toString());

return block;
this.newBlockEventEmitter.emit('newBlock', block.timestamp, block.number);
}

public isBlockDelayAboveMax() {
Expand All @@ -412,7 +517,7 @@ export class Network {
return this.contractEventsEmitter.contractEmitter(contract);
}

private walkThroughTheJobs(blockNumber: number, blockTimestamp: number) {
private _walkThroughTheJobs(blockNumber: number, blockTimestamp: number) {
this.triggerIntervalCallbacks(blockNumber, blockTimestamp);
this.callResolversAndTriggerCallbacks(blockNumber);
}
Expand Down Expand Up @@ -486,12 +591,6 @@ export class Network {
}
}

private _validateKeyInMap(key: string, map: { [key: string]: object }, type: string): void {
if (!map[key]) {
throw this.err(`Callback key already exists: type=${type},key=${key}`);
}
}

public registerTimeout(key: string, triggerCallbackAfter: number, callback: (blockTimestamp: number) => void) {
this._validateKeyLength(key, 'interval');
this._validateKeyNotInMap(key, this.timeoutData, 'interval');
Expand Down
16 changes: 13 additions & 3 deletions app/Types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,22 @@ export interface IRandaoAgent extends IAgent {

export interface IDataSource {
getType(): string;
getBlocksDelay(): Promise<bigint>;
getRegisteredJobs(_context): Promise<Map<string, RandaoJob | LightJob>>;
getOwnersBalances(context, jobOwnersSet: Set<string>): Promise<Map<string, BigNumber>>;
getBlocksDelay(): Promise<{ diff: bigint; nodeBlockNumber: bigint; sourceBlockNumber: bigint }>;
getRegisteredJobs(_context): Promise<{ data: Map<string, RandaoJob | LightJob>; meta: SourceMetadata }>;
getOwnersBalances(
context,
jobOwnersSet: Set<string>,
): Promise<{ data: Map<string, BigNumber>; meta: SourceMetadata }>;
addLensFieldsToOneJob(newJobs: RandaoJob | LightJob): void;
}

export interface SourceMetadata {
isSynced: boolean;
diff: bigint;
nodeBlockNumber: bigint;
sourceBlockNumber: bigint;
}

export interface IAgent {
readonly executorType: ExecutorType;
readonly address: string;
Expand Down
Loading

0 comments on commit 4a1d8f0

Please sign in to comment.