Skip to content

Commit

Permalink
fix: add state to all factories
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieJoo committed Oct 2, 2023
1 parent 674ef92 commit 26a8afa
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 69 deletions.
29 changes: 18 additions & 11 deletions src/dex/algebra/algebra-factory.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import { Interface } from '@ethersproject/abi';
import { AsyncOrSync, DeepReadonly } from 'ts-essentials';
import { DeepReadonly } from 'ts-essentials';
import FactoryABI from '../../abi/algebra/AlgebraFactory-v1_1.abi.json';
import { IDexHelper } from '../../dex-helper/idex-helper';
import { StatefulEventSubscriber } from '../../stateful-event-subscriber';
import { Address, Log, Logger } from '../../types';
import { LogDescription } from 'ethers/lib/utils';
import { FactoryState } from './types';

export type OnPoolCreatedCallback = ({
token0,
token1,
}: {
token0: string;
token1: string;
}) => AsyncOrSync<void>;
}) => FactoryState | null;

/*
* Stateless event subscriber in order to capture "Pool" event on new pools created.
* "Stateless" event subscriber in order to capture "PoolCreated" event on new pools created.
* State is present, but it's a placeholder to actually make the events reach handlers (if there's no previous state - `processBlockLogs` is not called)
*/
export class AlgebraFactory extends StatefulEventSubscriber<void> {
export class AlgebraFactory extends StatefulEventSubscriber<FactoryState> {
handlers: {
[event: string]: (event: any) => DeepReadonly<void> | null;
[event: string]: (event: any) => DeepReadonly<FactoryState> | null;
} = {};

logDecoder: (log: Log) => any;
Expand All @@ -34,7 +36,7 @@ export class AlgebraFactory extends StatefulEventSubscriber<void> {
protected readonly onPoolCreated: OnPoolCreatedCallback,
mapKey: string = '',
) {
super(parentName, `factory`, dexHelper, logger, true, mapKey);
super(parentName, `${parentName} Factory`, dexHelper, logger, true, mapKey);

this.addressesSubscribed = [factoryAddress];

Expand All @@ -43,15 +45,20 @@ export class AlgebraFactory extends StatefulEventSubscriber<void> {
this.handlers['Pool'] = this.handleNewPool.bind(this);
}

generateState(): AsyncOrSync<void> {}
generateState(): FactoryState {
return {
token0: '',
token1: '',
};
}

protected processLog(
_: DeepReadonly<void>,
_: DeepReadonly<FactoryState>,
log: Readonly<Log>,
): DeepReadonly<void> | null {
): DeepReadonly<FactoryState> | null {
const event = this.logDecoder(log);
if (event.name in this.handlers) {
this.handlers[event.name](event);
return this.handlers[event.name](event);
}

return null;
Expand All @@ -61,6 +68,6 @@ export class AlgebraFactory extends StatefulEventSubscriber<void> {
const token0 = event.args.token0;
const token1 = event.args.token1;

this.onPoolCreated({ token0, token1 });
return this.onPoolCreated({ token0, token1 });
}
}
25 changes: 13 additions & 12 deletions src/dex/algebra/algebra.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ export class Algebra extends SimpleExchange implements IDex<AlgebraData> {

/*
* When a non existing pool is queried, it's blacklisted for an arbitrary long period in order to prevent issuing too many rpc calls
* Once the pool is created, it gets immediately flagged
* Once the pool is created, it gets immediately flagged
*/
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = async ({
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = ({
token0,
token1,
}) => {
Expand All @@ -171,18 +171,19 @@ export class Algebra extends SimpleExchange implements IDex<AlgebraData> {

// consider doing it only from master pool for less calls to distant cache

try {
this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set
await this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);
} catch (e) {
this.logger.error(`${logPrefix} ERROR deleting pool: ${poolKey}`);
}

// delete entry locally to let local instance discover the pool
delete this.eventPools[this.getPoolIdentifier(_token0, _token1)];

this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set, not waiting for result
this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);

return {
token0: _token0,
token1: _token1,
};
};

async getPool(
Expand Down
5 changes: 5 additions & 0 deletions src/dex/algebra/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ export type PoolState_v1_9 = {
areTicksCompressed: boolean;
};

export type FactoryState = {
token0: string;
token1: string;
};

export type AlgebraData = {
path: {
tokenIn: Address;
Expand Down
30 changes: 19 additions & 11 deletions src/dex/pancakeswap-v3/pancakeswap-v3-factory.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Interface } from '@ethersproject/abi';
import { AsyncOrSync, DeepReadonly } from 'ts-essentials';
import { DeepReadonly } from 'ts-essentials';
import FactoryABI from '../../abi/pancakeswap-v3/PancakeswapV3Factory.abi.json';
import { IDexHelper } from '../../dex-helper/idex-helper';
import { StatefulEventSubscriber } from '../../stateful-event-subscriber';
import { Address, Log, Logger } from '../../types';
import { LogDescription } from 'ethers/lib/utils';
import { FactoryState } from '../uniswap-v3/types';

export type OnPoolCreatedCallback = ({
token0,
Expand All @@ -14,14 +15,15 @@ export type OnPoolCreatedCallback = ({
token0: string;
token1: string;
fee: bigint;
}) => AsyncOrSync<void>;
}) => FactoryState;

/*
* Stateless event subscriber in order to capture "Pool" event on new pools created.
* "Stateless" event subscriber in order to capture "PoolCreated" event on new pools created.
* State is present, but it's a placeholder to actually make the events reach handlers (if there's no previous state - `processBlockLogs` is not called)
*/
export class PancakeswapV3Factory extends StatefulEventSubscriber<void> {
export class PancakeswapV3Factory extends StatefulEventSubscriber<FactoryState> {
handlers: {
[event: string]: (event: any) => DeepReadonly<void> | null;
[event: string]: (event: any) => DeepReadonly<FactoryState> | null;
} = {};

logDecoder: (log: Log) => any;
Expand All @@ -36,7 +38,7 @@ export class PancakeswapV3Factory extends StatefulEventSubscriber<void> {
protected readonly onPoolCreated: OnPoolCreatedCallback,
mapKey: string = '',
) {
super(parentName, `factory`, dexHelper, logger, true, mapKey);
super(parentName, `${parentName} Factory`, dexHelper, logger, true, mapKey);

this.addressesSubscribed = [factoryAddress];

Expand All @@ -45,15 +47,21 @@ export class PancakeswapV3Factory extends StatefulEventSubscriber<void> {
this.handlers['PoolCreated'] = this.handleNewPool.bind(this);
}

generateState(): AsyncOrSync<void> {}
generateState(): FactoryState {
return {
token0: '',
token1: '',
fee: 0n,
};
}

protected processLog(
_: DeepReadonly<void>,
_: DeepReadonly<FactoryState>,
log: Readonly<Log>,
): DeepReadonly<void> | null {
): DeepReadonly<FactoryState> | null {
const event = this.logDecoder(log);
if (event.name in this.handlers) {
this.handlers[event.name](event);
return this.handlers[event.name](event);
}

return null;
Expand All @@ -64,6 +72,6 @@ export class PancakeswapV3Factory extends StatefulEventSubscriber<void> {
const token1 = event.args.token1;
const fee = event.args.fee;

this.onPoolCreated({ token0, token1, fee });
return this.onPoolCreated({ token0, token1, fee });
}
}
26 changes: 14 additions & 12 deletions src/dex/pancakeswap-v3/pancakeswap-v3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ export class PancakeswapV3

/*
* When a non existing pool is queried, it's blacklisted for an arbitrary long period in order to prevent issuing too many rpc calls
* Once the pool is created, it gets immediately flagged
* Once the pool is created, it gets immediately flagged
*/
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = async ({
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = ({
token0,
token1,
fee,
Expand All @@ -182,18 +182,20 @@ export class PancakeswapV3

// consider doing it only from master pool for less calls to distant cache

try {
this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set
await this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);
} catch (e) {
this.logger.error(`${logPrefix} ERROR deleting pool: ${poolKey}`);
}

// delete entry locally to let local instance discover the pool
delete this.eventPools[this.getPoolIdentifier(_token0, _token1, fee)];

this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set, not waiting for result
this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);

return {
token0: _token0,
token1: _token1,
fee,
};
};

async getPool(
Expand Down
6 changes: 6 additions & 0 deletions src/dex/uniswap-v3/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ export type PoolState = {
balance1: bigint;
};

export type FactoryState = {
token0: string;
token1: string;
fee: bigint;
};

export type UniswapV3Data = {
path: {
tokenIn: Address;
Expand Down
30 changes: 19 additions & 11 deletions src/dex/uniswap-v3/uniswap-v3-factory.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Interface } from '@ethersproject/abi';
import { AsyncOrSync, DeepReadonly } from 'ts-essentials';
import { DeepReadonly } from 'ts-essentials';
import FactoryABI from '../../abi/uniswap-v3/UniswapV3Factory.abi.json';
import { IDexHelper } from '../../dex-helper/idex-helper';
import { StatefulEventSubscriber } from '../../stateful-event-subscriber';
import { Address, Log, Logger } from '../../types';
import { LogDescription } from 'ethers/lib/utils';
import { FactoryState } from './types';

export type OnPoolCreatedCallback = ({
token0,
Expand All @@ -14,14 +15,15 @@ export type OnPoolCreatedCallback = ({
token0: string;
token1: string;
fee: bigint;
}) => AsyncOrSync<void>;
}) => FactoryState | null;

/*
* Stateless event subscriber in order to capture "Pool" event on new pools created.
* "Stateless" event subscriber in order to capture "PoolCreated" event on new pools created.
* State is present, but it's a placeholder to actually make the events reach handlers (if there's no previous state - `processBlockLogs` is not called)
*/
export class UniswapV3Factory extends StatefulEventSubscriber<void> {
export class UniswapV3Factory extends StatefulEventSubscriber<FactoryState> {
handlers: {
[event: string]: (event: any) => DeepReadonly<void> | null;
[event: string]: (event: any) => DeepReadonly<FactoryState> | null;
} = {};

logDecoder: (log: Log) => any;
Expand All @@ -36,7 +38,7 @@ export class UniswapV3Factory extends StatefulEventSubscriber<void> {
protected readonly onPoolCreated: OnPoolCreatedCallback,
mapKey: string = '',
) {
super(parentName, `factory`, dexHelper, logger, true, mapKey);
super(parentName, `${parentName} Factory`, dexHelper, logger, true, mapKey);

this.addressesSubscribed = [factoryAddress];

Expand All @@ -45,15 +47,21 @@ export class UniswapV3Factory extends StatefulEventSubscriber<void> {
this.handlers['PoolCreated'] = this.handleNewPool.bind(this);
}

generateState(): AsyncOrSync<void> {}
generateState(): FactoryState {
return {
token0: '',
token1: '',
fee: 0n,
};
}

protected processLog(
_: DeepReadonly<void>,
_: DeepReadonly<FactoryState>,
log: Readonly<Log>,
): DeepReadonly<void> | null {
): DeepReadonly<FactoryState> | null {
const event = this.logDecoder(log);
if (event.name in this.handlers) {
this.handlers[event.name](event);
return this.handlers[event.name](event);
}

return null;
Expand All @@ -64,6 +72,6 @@ export class UniswapV3Factory extends StatefulEventSubscriber<void> {
const token1 = event.args.token1;
const fee = event.args.fee;

this.onPoolCreated({ token0, token1, fee });
return this.onPoolCreated({ token0, token1, fee });
}
}
26 changes: 14 additions & 12 deletions src/dex/uniswap-v3/uniswap-v3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ export class UniswapV3

/*
* When a non existing pool is queried, it's blacklisted for an arbitrary long period in order to prevent issuing too many rpc calls
* Once the pool is created, it gets immediately flagged
* Once the pool is created, it gets immediately flagged
*/
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = async ({
onPoolCreatedDeleteFromNonExistingSet: OnPoolCreatedCallback = ({
token0,
token1,
fee,
Expand All @@ -212,18 +212,20 @@ export class UniswapV3

// consider doing it only from master pool for less calls to distant cache

try {
this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set
await this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);
} catch (e) {
this.logger.error(`${logPrefix} ERROR deleting pool: ${poolKey}`);
}

// delete entry locally to let local instance discover the pool
delete this.eventPools[this.getPoolIdentifier(_token0, _token1, fee)];

this.logger.info(
`${logPrefix} delete pool from not existing set: ${poolKey}`,
);
// delete pool record from set, not waiting for result
this.dexHelper.cache.zrem(this.notExistingPoolSetKey, [poolKey]);

return {
token0: _token0,
token1: _token1,
fee,
};
};

async getPool(
Expand Down

0 comments on commit 26a8afa

Please sign in to comment.