From 732319cbf5d088955170c1cc1030f7d661dcade2 Mon Sep 17 00:00:00 2001 From: Yevhen Shulha Date: Tue, 3 Oct 2023 17:08:23 +0300 Subject: [PATCH] fix: make state empty obj & add logging to cache clearing --- src/dex/algebra/algebra-factory.ts | 21 +++++++--------- src/dex/algebra/algebra.ts | 24 ++++++++++-------- src/dex/algebra/types.ts | 5 +--- .../pancakeswap-v3/pancakeswap-v3-factory.ts | 22 +++++++--------- src/dex/pancakeswap-v3/pancakeswap-v3.ts | 25 ++++++++++--------- src/dex/uniswap-v3/types.ts | 6 +---- src/dex/uniswap-v3/uniswap-v3-factory.ts | 22 +++++++--------- src/dex/uniswap-v3/uniswap-v3.ts | 25 ++++++++++--------- 8 files changed, 68 insertions(+), 82 deletions(-) diff --git a/src/dex/algebra/algebra-factory.ts b/src/dex/algebra/algebra-factory.ts index 7a34bf23d..1650878ad 100644 --- a/src/dex/algebra/algebra-factory.ts +++ b/src/dex/algebra/algebra-factory.ts @@ -13,7 +13,7 @@ export type OnPoolCreatedCallback = ({ }: { token0: string; token1: string; -}) => FactoryState | null; +}) => Promise; /* * "Stateless" event subscriber in order to capture "PoolCreated" event on new pools created. @@ -21,7 +21,7 @@ export type OnPoolCreatedCallback = ({ */ export class AlgebraFactory extends StatefulEventSubscriber { handlers: { - [event: string]: (event: any) => DeepReadonly | null; + [event: string]: (event: any) => Promise; } = {}; logDecoder: (log: Log) => any; @@ -46,28 +46,25 @@ export class AlgebraFactory extends StatefulEventSubscriber { } generateState(): FactoryState { - return { - token0: '', - token1: '', - }; + return {}; } - protected processLog( + protected async processLog( _: DeepReadonly, log: Readonly, - ): DeepReadonly | null { + ): Promise { const event = this.logDecoder(log); if (event.name in this.handlers) { - return this.handlers[event.name](event); + await this.handlers[event.name](event); } - return null; + return {}; } - handleNewPool(event: LogDescription) { + async handleNewPool(event: LogDescription) { const token0 = event.args.token0; const token1 = event.args.token1; - return this.onPoolCreated({ token0, token1 }); + await this.onPoolCreated({ token0, token1 }); } } diff --git a/src/dex/algebra/algebra.ts b/src/dex/algebra/algebra.ts index 27c99c1d7..8856d02e6 100644 --- a/src/dex/algebra/algebra.ts +++ b/src/dex/algebra/algebra.ts @@ -161,7 +161,7 @@ export class Algebra extends SimpleExchange implements IDex { * When a non existing pool is queried, it's blacklisted for an arbitrary long period in order to prevent issuing too many rpc calls * Once the pool is created, it gets immediately flagged */ - onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = ({ + onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = async ({ token0, token1, }) => { @@ -174,16 +174,18 @@ export class Algebra extends SimpleExchange implements IDex { // delete entry locally to let local instance discover the pool delete this.eventPools[this.getPoolIdentifier(_token0, _token1)]; - this.logger.info( - `${logPrefix} delete pool from not existing set: ${poolKey}`, - ); - // delete pool record from set, not waiting for result - this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]); - - return { - token0: _token0, - token1: _token1, - }; + try { + this.logger.info( + `${logPrefix} delete pool from not existing set: ${poolKey}`, + ); + // delete pool record from set + await this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]); + } catch (error) { + this.logger.error( + `${logPrefix} failed to delete pool from set: ${poolKey}`, + error, + ); + } }; async getPool( diff --git a/src/dex/algebra/types.ts b/src/dex/algebra/types.ts index e8cb3684e..ec4227967 100644 --- a/src/dex/algebra/types.ts +++ b/src/dex/algebra/types.ts @@ -51,10 +51,7 @@ export type PoolState_v1_9 = { areTicksCompressed: boolean; }; -export type FactoryState = { - token0: string; - token1: string; -}; +export type FactoryState = Record; export type AlgebraData = { path: { diff --git a/src/dex/pancakeswap-v3/pancakeswap-v3-factory.ts b/src/dex/pancakeswap-v3/pancakeswap-v3-factory.ts index 4b0c161c6..8bfc7a46b 100644 --- a/src/dex/pancakeswap-v3/pancakeswap-v3-factory.ts +++ b/src/dex/pancakeswap-v3/pancakeswap-v3-factory.ts @@ -15,7 +15,7 @@ export type OnPoolCreatedCallback = ({ token0: string; token1: string; fee: bigint; -}) => FactoryState; +}) => Promise; /* * "Stateless" event subscriber in order to capture "PoolCreated" event on new pools created. @@ -23,7 +23,7 @@ export type OnPoolCreatedCallback = ({ */ export class PancakeswapV3Factory extends StatefulEventSubscriber { handlers: { - [event: string]: (event: any) => DeepReadonly | null; + [event: string]: (event: any) => Promise; } = {}; logDecoder: (log: Log) => any; @@ -48,30 +48,26 @@ export class PancakeswapV3Factory extends StatefulEventSubscriber } generateState(): FactoryState { - return { - token0: '', - token1: '', - fee: 0n, - }; + return {}; } - protected processLog( + protected async processLog( _: DeepReadonly, log: Readonly, - ): DeepReadonly | null { + ): Promise { const event = this.logDecoder(log); if (event.name in this.handlers) { - return this.handlers[event.name](event); + await this.handlers[event.name](event); } - return null; + return {}; } - handleNewPool(event: LogDescription) { + async handleNewPool(event: LogDescription) { const token0 = event.args.token0; const token1 = event.args.token1; const fee = event.args.fee; - return this.onPoolCreated({ token0, token1, fee }); + await this.onPoolCreated({ token0, token1, fee }); } } diff --git a/src/dex/pancakeswap-v3/pancakeswap-v3.ts b/src/dex/pancakeswap-v3/pancakeswap-v3.ts index 66430b0e3..c892818dd 100644 --- a/src/dex/pancakeswap-v3/pancakeswap-v3.ts +++ b/src/dex/pancakeswap-v3/pancakeswap-v3.ts @@ -171,7 +171,7 @@ export class PancakeswapV3 * When a non existing pool is queried, it's blacklisted for an arbitrary long period in order to prevent issuing too many rpc calls * Once the pool is created, it gets immediately flagged */ - onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = ({ + onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = async ({ token0, token1, fee, @@ -185,17 +185,18 @@ export class PancakeswapV3 // delete entry locally to let local instance discover the pool delete this.eventPools[this.getPoolIdentifier(_token0, _token1, fee)]; - this.logger.info( - `${logPrefix} delete pool from not existing set: ${poolKey}`, - ); - // delete pool record from set, not waiting for result - this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]); - - return { - token0: _token0, - token1: _token1, - fee, - }; + try { + this.logger.info( + `${logPrefix} delete pool from not existing set: ${poolKey}`, + ); + // delete pool record from set + await this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]); + } catch (error) { + this.logger.error( + `${logPrefix} failed to delete pool from set :${poolKey}`, + error, + ); + } }; async getPool( diff --git a/src/dex/uniswap-v3/types.ts b/src/dex/uniswap-v3/types.ts index bdb396d19..7dd093292 100644 --- a/src/dex/uniswap-v3/types.ts +++ b/src/dex/uniswap-v3/types.ts @@ -53,11 +53,7 @@ export type PoolState = { balance1: bigint; }; -export type FactoryState = { - token0: string; - token1: string; - fee: bigint; -}; +export type FactoryState = Record; export type UniswapV3Data = { path: { diff --git a/src/dex/uniswap-v3/uniswap-v3-factory.ts b/src/dex/uniswap-v3/uniswap-v3-factory.ts index db8fc4e80..0f568e7ab 100644 --- a/src/dex/uniswap-v3/uniswap-v3-factory.ts +++ b/src/dex/uniswap-v3/uniswap-v3-factory.ts @@ -15,7 +15,7 @@ export type OnPoolCreatedCallback = ({ token0: string; token1: string; fee: bigint; -}) => FactoryState | null; +}) => Promise; /* * "Stateless" event subscriber in order to capture "PoolCreated" event on new pools created. @@ -23,7 +23,7 @@ export type OnPoolCreatedCallback = ({ */ export class UniswapV3Factory extends StatefulEventSubscriber { handlers: { - [event: string]: (event: any) => DeepReadonly | null; + [event: string]: (event: any) => Promise; } = {}; logDecoder: (log: Log) => any; @@ -48,30 +48,26 @@ export class UniswapV3Factory extends StatefulEventSubscriber { } generateState(): FactoryState { - return { - token0: '', - token1: '', - fee: 0n, - }; + return {}; } - protected processLog( + protected async processLog( _: DeepReadonly, log: Readonly, - ): DeepReadonly | null { + ): Promise { const event = this.logDecoder(log); if (event.name in this.handlers) { - return this.handlers[event.name](event); + await this.handlers[event.name](event); } - return null; + return {}; } - handleNewPool(event: LogDescription) { + async handleNewPool(event: LogDescription) { const token0 = event.args.token0; const token1 = event.args.token1; const fee = event.args.fee; - return this.onPoolCreated({ token0, token1, fee }); + await this.onPoolCreated({ token0, token1, fee }); } } diff --git a/src/dex/uniswap-v3/uniswap-v3.ts b/src/dex/uniswap-v3/uniswap-v3.ts index 3c46c2193..1d1cb8029 100644 --- a/src/dex/uniswap-v3/uniswap-v3.ts +++ b/src/dex/uniswap-v3/uniswap-v3.ts @@ -201,7 +201,7 @@ export class UniswapV3 * When a non existing pool is queried, it's blacklisted for an arbitrary long period in order to prevent issuing too many rpc calls * Once the pool is created, it gets immediately flagged */ - onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = ({ + onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = async ({ token0, token1, fee, @@ -215,17 +215,18 @@ export class UniswapV3 // delete entry locally to let local instance discover the pool delete this.eventPools[this.getPoolIdentifier(_token0, _token1, fee)]; - this.logger.info( - `${logPrefix} delete pool from not existing set: ${poolKey}`, - ); - // delete pool record from set, not waiting for result - this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]); - - return { - token0: _token0, - token1: _token1, - fee, - }; + try { + this.logger.info( + `${logPrefix} delete pool from not existing set: ${poolKey}`, + ); + // delete pool record from set + await this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]); + } catch (error) { + this.logger.error( + `${logPrefix} failed to delete pool from set: ${poolKey}`, + error, + ); + } }; async getPool(