Skip to content

Commit

Permalink
feat: add setState logs on UniV3 & forks
Browse files Browse the repository at this point in the history
  • Loading branch information
KanievskyiDanylo committed Dec 10, 2024
1 parent 1b553bf commit 78ab510
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
11 changes: 11 additions & 0 deletions src/dex/pancakeswap-v3/pancakeswap-v3-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,17 @@ export class PancakeSwapV3EventPool extends StatefulEventSubscriber<PoolState> {
return null;
}

_setState(state: any, blockNumber: number, reason?: string): void {
this.logger.info(
`PancakeV3: Setting state: ${!!state ? 'non-empty' : 'empty'} for ${
this.name
} ${
this.addressesSubscribed[0]
} for bn: '${blockNumber}' due to reason: '${
reason ?? 'outside_of_event_subscriber'
}'`,
);
super._setState(state, blockNumber);
}

async generateState(blockNumber: number): Promise<Readonly<PoolState>> {
Expand Down
12 changes: 12 additions & 0 deletions src/dex/uniswap-v3/uniswap-v3-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,18 @@ export class UniswapV3EventPool extends StatefulEventSubscriber<PoolState> {
);
return null;
}

_setState(state: any, blockNumber: number, reason?: string): void {
this.logger.info(
`UniV3: Setting state: '${!!state ? 'non-empty' : 'empty'}' for ${
this.name
} ${
this.addressesSubscribed[0]
} for bn: '${blockNumber}' due to reason: '${
reason ?? 'outside_of_event_subscriber'
}'`,
);
super._setState(state, blockNumber);
}

async generateState(blockNumber: number): Promise<Readonly<PoolState>> {
Expand Down
38 changes: 23 additions & 15 deletions src/stateful-event-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ export abstract class StatefulEventSubscriber<State>
) {
let masterBn: undefined | number = undefined;
if (options && options.state) {
this.setState(options.state, blockNumber);
this.setState(options.state, blockNumber, 'initialize_1');
} else if (options && options.forceRegenerate) {
// ZkEVM forces to always regenerate state when it is old
this.logger.debug(
`${this.parentName}: ${this.name}: forced to regenerate state`,
);
const state = await this.generateState(blockNumber);
this.setState(state, blockNumber);
this.setState(state, blockNumber, 'initialize_2');
} else {
if (this.dexHelper.config.isSlave && this.masterPoolNeeded) {
let stateAsString = await this.dexHelper.cache.hget(
Expand Down Expand Up @@ -135,14 +135,14 @@ export abstract class StatefulEventSubscriber<State>
}
// set state and the according blockNumber. state.bn can be smaller, greater or equal
// to blockNumber
this.setState(state.state, blockNumber);
this.setState(state.state, blockNumber, 'initialize_3');
} else {
// if no state found in cache generate new state using rpc
this.logger.info(
`${this.parentName}: ${this.name}: did not found state on cache generating new one`,
);
const state = await this.generateState(blockNumber);
this.setState(state, blockNumber);
this.setState(state, blockNumber, 'initialize_4');

// we should publish only if generateState succeeded
const data = this.getPoolIdentifierData();
Expand All @@ -161,7 +161,7 @@ export abstract class StatefulEventSubscriber<State>
`${this.parentName}: ${this.name}: cache generating state`,
);
const state = await this.generateState(blockNumber);
this.setState(state, blockNumber);
this.setState(state, blockNumber, 'initialize_5');
}
}

Expand Down Expand Up @@ -235,7 +235,7 @@ export abstract class StatefulEventSubscriber<State>
this.logger.info(
`StatefulEventSubscriber_1 restart, bn: ${blockNumber}, state_bn: ${this.stateBlockNumber}: ${this.parentName}: ${this.name}`,
);
this._setState(null, blockNumber);
this._setState(null, blockNumber, 'restart');
}
}

Expand Down Expand Up @@ -275,7 +275,7 @@ export abstract class StatefulEventSubscriber<State>
}
if (!this.state) {
const freshState = await this.generateState(blockNumber);
this.setState(freshState, blockNumber);
this.setState(freshState, blockNumber, 'update_1');
}
//Find the last state before the blockNumber of the logs
let stateBeforeLog: DeepReadonly<State> | undefined;
Expand All @@ -291,7 +291,7 @@ export abstract class StatefulEventSubscriber<State>
logs.slice(index, indexBlockEnd),
blockHeader,
);
if (nextState) this.setState(nextState, blockNumber);
if (nextState) this.setState(nextState, blockNumber, 'update_2');
}
lastBlockNumber = blockNumber;
index = indexBlockEnd;
Expand All @@ -315,7 +315,7 @@ export abstract class StatefulEventSubscriber<State>
);
try {
const state = await this.generateState(latestBlockNumber);
this.setState(state, latestBlockNumber);
this.setState(state, latestBlockNumber, 'update_3');
return true;
} catch (e) {
this.logger.error(
Expand Down Expand Up @@ -346,12 +346,12 @@ export abstract class StatefulEventSubscriber<State>
}

if (lastBn) {
this._setState(this.stateHistory[lastBn], lastBn);
this._setState(this.stateHistory[lastBn], lastBn, 'rollback_1');
} else {
this.logger.info(
`StatefulEventSubscriber_1 rollback, bn: ${blockNumber}: ${this.parentName}: ${this.name}`,
);
this._setState(null, blockNumber);
this._setState(null, blockNumber, 'rollback_2');
}
} else {
//Keep the current state in this.state and in the history
Expand Down Expand Up @@ -390,7 +390,11 @@ export abstract class StatefulEventSubscriber<State>
return this.state;
}

_setState(state: DeepReadonly<State> | null, blockNumber: number) {
_setState(
state: DeepReadonly<State> | null,
blockNumber: number,
reason?: string,
) {
if (
this.dexHelper.config.isSlave &&
this.masterPoolNeeded &&
Expand Down Expand Up @@ -418,7 +422,7 @@ export abstract class StatefulEventSubscriber<State>
`${this.parentName}: received state from a scheduled job`,
'info',
);
this.setState(state.state, state.bn);
this.setState(state.state, state.bn, 'addBatchHGet');
return true;
},
);
Expand Down Expand Up @@ -481,14 +485,18 @@ export abstract class StatefulEventSubscriber<State>
//no longer needed. If the blockNumber is greater than or equal to the
//current state, then the current state will be updated and the invalid flag
//can be reset.
setState(state: DeepReadonly<State>, blockNumber: number): void {
setState(
state: DeepReadonly<State>,
blockNumber: number,
reason?: string,
): void {
if (!blockNumber) {
this.logger.error('setState() with blockNumber', blockNumber);
return;
}
this.stateHistory[blockNumber] = state;
if (!this.state || blockNumber >= this.stateBlockNumber) {
this._setState(state, blockNumber);
this._setState(state, blockNumber, reason);
this.invalid = false;
}
const minBlockNumberToKeep = this.stateBlockNumber - MAX_BLOCKS_HISTORY;
Expand Down

0 comments on commit 78ab510

Please sign in to comment.