Skip to content

Commit

Permalink
fix: make state empty obj & add logging to cache clearing
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieJoo committed Oct 3, 2023
1 parent 26a8afa commit 732319c
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 82 deletions.
21 changes: 9 additions & 12 deletions src/dex/algebra/algebra-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ export type OnPoolCreatedCallback = ({
}: {
token0: string;
token1: string;
}) => FactoryState | null;
}) => Promise<void>;

/*
* "Stateless" event subscriber in order to capture "PoolCreated" event on new pools created.
* State is present, but it's a placeholder to actually make the events reach handlers (if there's no previous state - `processBlockLogs` is not called)
*/
export class AlgebraFactory extends StatefulEventSubscriber<FactoryState> {
handlers: {
[event: string]: (event: any) => DeepReadonly<FactoryState> | null;
[event: string]: (event: any) => Promise<void>;
} = {};

logDecoder: (log: Log) => any;
Expand All @@ -46,28 +46,25 @@ export class AlgebraFactory extends StatefulEventSubscriber<FactoryState> {
}

generateState(): FactoryState {
return {
token0: '',
token1: '',
};
return {};
}

protected processLog(
protected async processLog(
_: DeepReadonly<FactoryState>,
log: Readonly<Log>,
): DeepReadonly<FactoryState> | null {
): Promise<FactoryState> {
const event = this.logDecoder(log);
if (event.name in this.handlers) {
return this.handlers[event.name](event);
await this.handlers[event.name](event);
}

return null;
return {};
}

handleNewPool(event: LogDescription) {
async handleNewPool(event: LogDescription) {
const token0 = event.args.token0;
const token1 = event.args.token1;

return this.onPoolCreated({ token0, token1 });
await this.onPoolCreated({ token0, token1 });
}
}
24 changes: 13 additions & 11 deletions src/dex/algebra/algebra.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ export class Algebra extends SimpleExchange implements IDex<AlgebraData> {
* When a non existing pool is queried, it's blacklisted for an arbitrary long period in order to prevent issuing too many rpc calls
* Once the pool is created, it gets immediately flagged
*/
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = ({
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = async ({
token0,
token1,
}) => {
Expand All @@ -174,16 +174,18 @@ export class Algebra extends SimpleExchange implements IDex<AlgebraData> {
// delete entry locally to let local instance discover the pool
delete this.eventPools[this.getPoolIdentifier(_token0, _token1)];

this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set, not waiting for result
this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);

return {
token0: _token0,
token1: _token1,
};
try {
this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set
await this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);
} catch (error) {
this.logger.error(
`${logPrefix} failed to delete pool from set: ${poolKey}`,
error,
);
}
};

async getPool(
Expand Down
5 changes: 1 addition & 4 deletions src/dex/algebra/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ export type PoolState_v1_9 = {
areTicksCompressed: boolean;
};

export type FactoryState = {
token0: string;
token1: string;
};
export type FactoryState = Record<string, never>;

export type AlgebraData = {
path: {
Expand Down
22 changes: 9 additions & 13 deletions src/dex/pancakeswap-v3/pancakeswap-v3-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ export type OnPoolCreatedCallback = ({
token0: string;
token1: string;
fee: bigint;
}) => FactoryState;
}) => Promise<void>;

/*
* "Stateless" event subscriber in order to capture "PoolCreated" event on new pools created.
* State is present, but it's a placeholder to actually make the events reach handlers (if there's no previous state - `processBlockLogs` is not called)
*/
export class PancakeswapV3Factory extends StatefulEventSubscriber<FactoryState> {
handlers: {
[event: string]: (event: any) => DeepReadonly<FactoryState> | null;
[event: string]: (event: any) => Promise<void>;
} = {};

logDecoder: (log: Log) => any;
Expand All @@ -48,30 +48,26 @@ export class PancakeswapV3Factory extends StatefulEventSubscriber<FactoryState>
}

generateState(): FactoryState {
return {
token0: '',
token1: '',
fee: 0n,
};
return {};
}

protected processLog(
protected async processLog(
_: DeepReadonly<FactoryState>,
log: Readonly<Log>,
): DeepReadonly<FactoryState> | null {
): Promise<FactoryState> {
const event = this.logDecoder(log);
if (event.name in this.handlers) {
return this.handlers[event.name](event);
await this.handlers[event.name](event);
}

return null;
return {};
}

handleNewPool(event: LogDescription) {
async handleNewPool(event: LogDescription) {
const token0 = event.args.token0;
const token1 = event.args.token1;
const fee = event.args.fee;

return this.onPoolCreated({ token0, token1, fee });
await this.onPoolCreated({ token0, token1, fee });
}
}
25 changes: 13 additions & 12 deletions src/dex/pancakeswap-v3/pancakeswap-v3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export class PancakeswapV3
* When a non existing pool is queried, it's blacklisted for an arbitrary long period in order to prevent issuing too many rpc calls
* Once the pool is created, it gets immediately flagged
*/
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = ({
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = async ({
token0,
token1,
fee,
Expand All @@ -185,17 +185,18 @@ export class PancakeswapV3
// delete entry locally to let local instance discover the pool
delete this.eventPools[this.getPoolIdentifier(_token0, _token1, fee)];

this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set, not waiting for result
this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);

return {
token0: _token0,
token1: _token1,
fee,
};
try {
this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set
await this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);
} catch (error) {
this.logger.error(
`${logPrefix} failed to delete pool from set :${poolKey}`,
error,
);
}
};

async getPool(
Expand Down
6 changes: 1 addition & 5 deletions src/dex/uniswap-v3/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ export type PoolState = {
balance1: bigint;
};

export type FactoryState = {
token0: string;
token1: string;
fee: bigint;
};
export type FactoryState = Record<string, never>;

export type UniswapV3Data = {
path: {
Expand Down
22 changes: 9 additions & 13 deletions src/dex/uniswap-v3/uniswap-v3-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ export type OnPoolCreatedCallback = ({
token0: string;
token1: string;
fee: bigint;
}) => FactoryState | null;
}) => Promise<void>;

/*
* "Stateless" event subscriber in order to capture "PoolCreated" event on new pools created.
* State is present, but it's a placeholder to actually make the events reach handlers (if there's no previous state - `processBlockLogs` is not called)
*/
export class UniswapV3Factory extends StatefulEventSubscriber<FactoryState> {
handlers: {
[event: string]: (event: any) => DeepReadonly<FactoryState> | null;
[event: string]: (event: any) => Promise<void>;
} = {};

logDecoder: (log: Log) => any;
Expand All @@ -48,30 +48,26 @@ export class UniswapV3Factory extends StatefulEventSubscriber<FactoryState> {
}

generateState(): FactoryState {
return {
token0: '',
token1: '',
fee: 0n,
};
return {};
}

protected processLog(
protected async processLog(
_: DeepReadonly<FactoryState>,
log: Readonly<Log>,
): DeepReadonly<FactoryState> | null {
): Promise<FactoryState> {
const event = this.logDecoder(log);
if (event.name in this.handlers) {
return this.handlers[event.name](event);
await this.handlers[event.name](event);
}

return null;
return {};
}

handleNewPool(event: LogDescription) {
async handleNewPool(event: LogDescription) {
const token0 = event.args.token0;
const token1 = event.args.token1;
const fee = event.args.fee;

return this.onPoolCreated({ token0, token1, fee });
await this.onPoolCreated({ token0, token1, fee });
}
}
25 changes: 13 additions & 12 deletions src/dex/uniswap-v3/uniswap-v3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ export class UniswapV3
* When a non existing pool is queried, it's blacklisted for an arbitrary long period in order to prevent issuing too many rpc calls
* Once the pool is created, it gets immediately flagged
*/
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = ({
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = async ({
token0,
token1,
fee,
Expand All @@ -215,17 +215,18 @@ export class UniswapV3
// delete entry locally to let local instance discover the pool
delete this.eventPools[this.getPoolIdentifier(_token0, _token1, fee)];

this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set, not waiting for result
this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);

return {
token0: _token0,
token1: _token1,
fee,
};
try {
this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set
await this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);
} catch (error) {
this.logger.error(
`${logPrefix} failed to delete pool from set: ${poolKey}`,
error,
);
}
};

async getPool(
Expand Down

0 comments on commit 732319c

Please sign in to comment.