Skip to content

Commit

Permalink
Merge pull request #854 from paraswap/fix/rpc-fallback-univ3
Browse files Browse the repository at this point in the history
New logs on UniV3 setState
  • Loading branch information
KanievskyiDanylo authored Dec 10, 2024
2 parents 6d0b9d5 + 377561d commit f709b7a
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 16 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@paraswap/dex-lib",
"version": "3.11.9",
"version": "3.11.10",
"main": "build/index.js",
"types": "build/index.d.ts",
"repository": "https://github.com/paraswap/paraswap-dex-lib",
Expand Down
27 changes: 27 additions & 0 deletions src/dex/pancakeswap-v3/pancakeswap-v3-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,33 @@ export class PancakeSwapV3EventPool extends StatefulEventSubscriber<PoolState> {
return TICK_BITMAP_TO_USE + TICK_BITMAP_BUFFER;
}

async checkState(
blockNumber: number,
): Promise<DeepReadonly<PoolState> | null> {
const state = this.getState(blockNumber);
if (state) {
return state;
}

this.logger.error(
`PancakeV3: No state found for ${this.name} ${this.addressesSubscribed[0]} for bn: ${blockNumber}`,
);
return null;
}

_setState(state: any, blockNumber: number, reason?: string): void {
if (this.parentName === 'PancakeswapV3') {
this.logger.info(
`PancakeV3: Setting state: ${!!state ? 'non-empty' : 'empty'} for '${
this.name
}' for bn: '${blockNumber}' due to reason: '${
reason ?? 'outside_of_event_subscriber'
}'`,
);
}
super._setState(state, blockNumber);
}

async generateState(blockNumber: number): Promise<Readonly<PoolState>> {
const callData = this._getStateRequestCallData();

Expand Down
10 changes: 10 additions & 0 deletions src/dex/pancakeswap-v3/pancakeswap-v3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,10 @@ export class PancakeswapV3

if (selectedPools.length === 0) return null;

await Promise.all(
selectedPools.map(pool => pool.checkState(blockNumber)),
);

const poolsToUse = selectedPools.reduce(
(acc, pool) => {
let state = pool.getState(blockNumber);
Expand All @@ -632,6 +636,12 @@ export class PancakeswapV3
},
);

poolsToUse.poolWithoutState.forEach(pool => {
this.logger.warn(
`PancakeV3: Pool ${pool.name} on ${this.dexKey} has no state. Fallback to rpc`,
);
});

const rpcResultsPromise = this.getPricingFromRpc(
_srcToken,
_destToken,
Expand Down
27 changes: 27 additions & 0 deletions src/dex/uniswap-v3/uniswap-v3-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,33 @@ export class UniswapV3EventPool extends StatefulEventSubscriber<PoolState> {
return TICK_BITMAP_TO_USE + TICK_BITMAP_BUFFER;
}

async checkState(
blockNumber: number,
): Promise<DeepReadonly<PoolState> | null> {
const state = this.getState(blockNumber);
if (state) {
return state;
}

this.logger.error(
`UniV3: No state found for ${this.name} ${this.addressesSubscribed[0]} for bn: ${blockNumber}`,
);
return null;
}

_setState(state: any, blockNumber: number, reason?: string): void {
if (this.parentName === 'UniswapV3') {
this.logger.info(
`UniV3: Setting state: '${!!state ? 'non-empty' : 'empty'}' for '${
this.name
}' for bn: '${blockNumber}' due to reason: '${
reason ?? 'outside_of_event_subscriber'
}'`,
);
}
super._setState(state, blockNumber);
}

async generateState(blockNumber: number): Promise<Readonly<PoolState>> {
const callData = this._getStateRequestCallData();

Expand Down
11 changes: 11 additions & 0 deletions src/dex/uniswap-v3/uniswap-v3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -715,13 +715,18 @@ export class UniswapV3

if (selectedPools.length === 0) return null;

await Promise.all(
selectedPools.map(pool => pool.checkState(blockNumber)),
);

const poolsToUse = selectedPools.reduce(
(acc, pool) => {
let state = pool.getState(blockNumber);
if (state === null) {
this.logger.trace(
`${this.dexKey}: State === null. Fallback to rpc ${pool.name}`,
);
// as we generate state (if nullified) in previous Promise.all, here should only be pools with failed initialization
acc.poolWithoutState.push(pool);
} else {
acc.poolWithState.push(pool);
Expand All @@ -734,6 +739,12 @@ export class UniswapV3
},
);

poolsToUse.poolWithoutState.forEach(pool => {
this.logger.warn(
`UniV3: Pool ${pool.name} on ${this.dexKey} has no state. Fallback to rpc`,
);
});

const states = poolsToUse.poolWithState.map(
p => p.getState(blockNumber)!,
);
Expand Down
38 changes: 23 additions & 15 deletions src/stateful-event-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ export abstract class StatefulEventSubscriber<State>
) {
let masterBn: undefined | number = undefined;
if (options && options.state) {
this.setState(options.state, blockNumber);
this.setState(options.state, blockNumber, 'initialize_1');
} else if (options && options.forceRegenerate) {
// ZkEVM forces to always regenerate state when it is old
this.logger.debug(
`${this.parentName}: ${this.name}: forced to regenerate state`,
);
const state = await this.generateState(blockNumber);
this.setState(state, blockNumber);
this.setState(state, blockNumber, 'initialize_2');
} else {
if (this.dexHelper.config.isSlave && this.masterPoolNeeded) {
let stateAsString = await this.dexHelper.cache.hget(
Expand Down Expand Up @@ -135,14 +135,14 @@ export abstract class StatefulEventSubscriber<State>
}
// set state and the according blockNumber. state.bn can be smaller, greater or equal
// to blockNumber
this.setState(state.state, blockNumber);
this.setState(state.state, blockNumber, 'initialize_3');
} else {
// if no state found in cache generate new state using rpc
this.logger.info(
`${this.parentName}: ${this.name}: did not found state on cache generating new one`,
);
const state = await this.generateState(blockNumber);
this.setState(state, blockNumber);
this.setState(state, blockNumber, 'initialize_4');

// we should publish only if generateState succeeded
const data = this.getPoolIdentifierData();
Expand All @@ -161,7 +161,7 @@ export abstract class StatefulEventSubscriber<State>
`${this.parentName}: ${this.name}: cache generating state`,
);
const state = await this.generateState(blockNumber);
this.setState(state, blockNumber);
this.setState(state, blockNumber, 'initialize_5');
}
}

Expand Down Expand Up @@ -235,7 +235,7 @@ export abstract class StatefulEventSubscriber<State>
this.logger.info(
`StatefulEventSubscriber_1 restart, bn: ${blockNumber}, state_bn: ${this.stateBlockNumber}: ${this.parentName}: ${this.name}`,
);
this._setState(null, blockNumber);
this._setState(null, blockNumber, 'restart');
}
}

Expand Down Expand Up @@ -275,7 +275,7 @@ export abstract class StatefulEventSubscriber<State>
}
if (!this.state) {
const freshState = await this.generateState(blockNumber);
this.setState(freshState, blockNumber);
this.setState(freshState, blockNumber, 'update_1');
}
//Find the last state before the blockNumber of the logs
let stateBeforeLog: DeepReadonly<State> | undefined;
Expand All @@ -291,7 +291,7 @@ export abstract class StatefulEventSubscriber<State>
logs.slice(index, indexBlockEnd),
blockHeader,
);
if (nextState) this.setState(nextState, blockNumber);
if (nextState) this.setState(nextState, blockNumber, 'update_2');
}
lastBlockNumber = blockNumber;
index = indexBlockEnd;
Expand All @@ -315,7 +315,7 @@ export abstract class StatefulEventSubscriber<State>
);
try {
const state = await this.generateState(latestBlockNumber);
this.setState(state, latestBlockNumber);
this.setState(state, latestBlockNumber, 'update_3');
return true;
} catch (e) {
this.logger.error(
Expand Down Expand Up @@ -346,12 +346,12 @@ export abstract class StatefulEventSubscriber<State>
}

if (lastBn) {
this._setState(this.stateHistory[lastBn], lastBn);
this._setState(this.stateHistory[lastBn], lastBn, 'rollback_1');
} else {
this.logger.info(
`StatefulEventSubscriber_1 rollback, bn: ${blockNumber}: ${this.parentName}: ${this.name}`,
);
this._setState(null, blockNumber);
this._setState(null, blockNumber, 'rollback_2');
}
} else {
//Keep the current state in this.state and in the history
Expand Down Expand Up @@ -390,7 +390,11 @@ export abstract class StatefulEventSubscriber<State>
return this.state;
}

_setState(state: DeepReadonly<State> | null, blockNumber: number) {
_setState(
state: DeepReadonly<State> | null,
blockNumber: number,
reason?: string,
) {
if (
this.dexHelper.config.isSlave &&
this.masterPoolNeeded &&
Expand Down Expand Up @@ -418,7 +422,7 @@ export abstract class StatefulEventSubscriber<State>
`${this.parentName}: received state from a scheduled job`,
'info',
);
this.setState(state.state, state.bn);
this.setState(state.state, state.bn, 'addBatchHGet');
return true;
},
);
Expand Down Expand Up @@ -481,14 +485,18 @@ export abstract class StatefulEventSubscriber<State>
//no longer needed. If the blockNumber is greater than or equal to the
//current state, then the current state will be updated and the invalid flag
//can be reset.
setState(state: DeepReadonly<State>, blockNumber: number): void {
setState(
state: DeepReadonly<State>,
blockNumber: number,
reason?: string,
): void {
if (!blockNumber) {
this.logger.error('setState() with blockNumber', blockNumber);
return;
}
this.stateHistory[blockNumber] = state;
if (!this.state || blockNumber >= this.stateBlockNumber) {
this._setState(state, blockNumber);
this._setState(state, blockNumber, reason);
this.invalid = false;
}
const minBlockNumberToKeep = this.stateBlockNumber - MAX_BLOCKS_HISTORY;
Expand Down

0 comments on commit f709b7a

Please sign in to comment.