Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub3: SwaapV2 & Bebop & Cables #866

Open
wants to merge 9 commits into
base: feat/pub-sub-hashflow-dexalot
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@paraswap/dex-lib",
"version": "4.0.6-rfq-pub-sub.0",
"version": "4.0.6-rfq-pub-sub.16",
"main": "build/index.js",
"types": "build/index.d.ts",
"repository": "https://github.com/paraswap/paraswap-dex-lib",
Expand Down
60 changes: 7 additions & 53 deletions src/dex/bebop/bebop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
private tokensMap: TokenDataMap = {};

private pricesCacheKey: string;
private tokensCacheKey: string;
private tokensAddrCacheKey: string;

private bebopAuthToken: string;
Expand All @@ -90,7 +89,6 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
) {
super(dexHelper, dexKey);
this.logger = dexHelper.getLogger(dexKey);
this.tokensCacheKey = `tokens`;
this.pricesCacheKey = `prices`;
this.tokensAddrCacheKey = `tokens_addr`;
const token = this.dexHelper.config.data.bebopAuthToken;
Expand All @@ -109,7 +107,6 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
tokensIntervalMs: BEBOP_TOKENS_POLLING_INTERVAL_MS,
pricesCacheKey: this.pricesCacheKey,
pricesCacheTTLSecs: BEBOP_PRICES_CACHE_TTL,
tokensCacheKey: this.tokensCacheKey,
tokensAddrCacheKey: this.tokensAddrCacheKey,
tokensCacheTTLSecs: BEBOP_TOKENS_CACHE_TTL,
tokensReqParams: {
Expand All @@ -132,11 +129,8 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
}

async initializePricing(blockNumber: number) {
if (!this.dexHelper.config.isSlave) {
this.rateFetcher.start();
await sleep(BEBOP_INIT_TIMEOUT_MS);
}

await this.rateFetcher.start();
await sleep(BEBOP_INIT_TIMEOUT_MS);
await this.setTokensMap();
}

Expand Down Expand Up @@ -184,7 +178,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
return [];
}

const prices = await this.getCachedPrices();
const prices = await this.rateFetcher.getCachedPrices();
if (!prices) {
throw new Error('No prices available');
}
Expand Down Expand Up @@ -523,7 +517,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
}

async setTokensMap() {
const tokens = await this.getCachedTokens();
const tokens = await this.rateFetcher.getCachedTokens();

if (tokens) {
this.tokensMap = tokens;
Expand All @@ -542,7 +536,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
tokenAddress: Address,
limit: number,
): Promise<PoolLiquidity[]> {
const prices = await this.getCachedPrices();
const prices = await this.rateFetcher.getCachedPrices();

if (!prices) {
return [];
Expand Down Expand Up @@ -812,13 +806,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
errorsData.count + 1
} within ${BEBOP_RESTRICT_CHECK_INTERVAL_MS / 1000 / 60} minutes`,
);
await this.dexHelper.cache.setex(
this.dexKey,
this.network,
BEBOP_RESTRICTED_CACHE_KEY,
BEBOP_RESTRICT_TTL_S,
'true',
);
this.rateFetcher.restrict();
} else {
this.logger.warn(
`${this.dexKey}-${this.network}: Error count increased`,
Expand All @@ -839,41 +827,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
}

async isRestricted(): Promise<boolean> {
const result = await this.dexHelper.cache.get(
this.dexKey,
this.network,
BEBOP_RESTRICTED_CACHE_KEY,
);

return result === 'true';
}

async getCachedPrices(): Promise<BebopPricingResponse | null> {
const cachedPrices = await this.dexHelper.cache.get(
this.dexKey,
this.network,
this.pricesCacheKey,
);

if (cachedPrices) {
return JSON.parse(cachedPrices) as BebopPricingResponse;
}

return null;
}

async getCachedTokens(): Promise<TokenDataMap | null> {
const cachedTokens = await this.dexHelper.cache.get(
this.dexKey,
this.network,
this.tokensAddrCacheKey,
);

if (cachedTokens) {
return JSON.parse(cachedTokens) as TokenDataMap;
}

return null;
return this.rateFetcher.isRestricted();
}

getTokenFromAddress(address: Address): Token {
Expand Down
103 changes: 87 additions & 16 deletions src/dex/bebop/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import {
BebopPricingResponse,
BebopRateFetcherConfig,
BebopTokensResponse,
TokenDataMap,
} from './types';
import {
BebopPricingUpdate,
pricesResponseValidator,
tokensResponseValidator,
} from './validators';
import { BebopPricingUpdate, tokensResponseValidator } from './validators';
import { WebSocketFetcher } from '../../lib/fetcher/wsFetcher';
import { utils } from 'ethers';
import { BEBOP_RESTRICT_TTL_S, BEBOP_RESTRICTED_CACHE_KEY } from './constants';
import { ExpKeyValuePubSub } from '../../lib/pub-sub';

export function levels_from_flat_array(values: number[]): BebopLevel[] {
const levels: BebopLevel[] = [];
Expand All @@ -27,15 +26,18 @@ export function levels_from_flat_array(values: number[]): BebopLevel[] {
}

export class RateFetcher {
private tokensPricesPubSub: ExpKeyValuePubSub;

private pricesFetcher: WebSocketFetcher<BebopPricingResponse>;
private pricesCacheKey: string;
private pricesCacheTTL: number;

private tokensFetcher: Fetcher<BebopTokensResponse>;
private tokensAddrCacheKey: string;
private tokensCacheKey: string;
private tokensCacheTTL: number;

private restrictPubSub: ExpKeyValuePubSub;

constructor(
private dexHelper: IDexHelper,
private dexKey: string,
Expand All @@ -45,6 +47,13 @@ export class RateFetcher {
) {
this.pricesCacheKey = config.rateConfig.pricesCacheKey;
this.pricesCacheTTL = config.rateConfig.pricesCacheTTLSecs;

this.tokensPricesPubSub = new ExpKeyValuePubSub(
this.dexHelper,
this.dexKey,
'tokensPrices',
);

this.pricesFetcher = new WebSocketFetcher<BebopPricingResponse>(
{
info: {
Expand All @@ -68,8 +77,8 @@ export class RateFetcher {
);

this.tokensAddrCacheKey = config.rateConfig.tokensAddrCacheKey;
this.tokensCacheKey = config.rateConfig.tokensCacheKey;
this.tokensCacheTTL = config.rateConfig.tokensCacheTTLSecs;

this.tokensFetcher = new Fetcher<BebopTokensResponse>(
dexHelper.httpRequest,
{
Expand All @@ -87,6 +96,14 @@ export class RateFetcher {
config.rateConfig.tokensIntervalMs,
logger,
);

this.restrictPubSub = new ExpKeyValuePubSub(
dexHelper,
dexKey,
'restrict',
'not_restricted',
BEBOP_RESTRICT_TTL_S,
);
}

parsePricingUpdate(updateObject: any): BebopPricingResponse {
Expand Down Expand Up @@ -114,8 +131,13 @@ export class RateFetcher {
}

start() {
this.pricesFetcher.startPolling();
this.tokensFetcher.startPolling();
if (!this.dexHelper.config.isSlave) {
this.pricesFetcher.startPolling();
this.tokensFetcher.startPolling();
} else {
this.tokensPricesPubSub.subscribe();
this.restrictPubSub.subscribe();
}
}

stop() {
Expand All @@ -141,17 +163,14 @@ export class RateFetcher {
this.dexHelper.cache.setex(
this.dexKey,
this.network,
this.tokensCacheKey,
this.tokensAddrCacheKey,
this.tokensCacheTTL,
JSON.stringify(tokenMap),
JSON.stringify(tokenAddrMap),
);

this.dexHelper.cache.setex(
this.dexKey,
this.network,
this.tokensAddrCacheKey,
this.tokensPricesPubSub.publish(
{ [this.tokensAddrCacheKey]: tokenAddrMap },
this.tokensCacheTTL,
JSON.stringify(tokenAddrMap),
);
}

Expand Down Expand Up @@ -179,5 +198,57 @@ export class RateFetcher {
this.pricesCacheTTL,
JSON.stringify(normalizedPrices),
);

this.tokensPricesPubSub.publish(
{ [this.pricesCacheKey]: normalizedPrices },
this.pricesCacheTTL,
);
}

async getCachedPrices(): Promise<BebopPricingResponse | null> {
const cachedPrices = await this.tokensPricesPubSub.getAndCache(
this.pricesCacheKey,
);

if (cachedPrices) {
return cachedPrices as BebopPricingResponse;
}

return null;
}

async getCachedTokens(): Promise<TokenDataMap | null> {
const cachedTokens = await this.tokensPricesPubSub.getAndCache(
this.tokensAddrCacheKey,
);

if (cachedTokens) {
return cachedTokens as TokenDataMap;
}

return null;
}

async isRestricted(): Promise<boolean> {
const result = await this.restrictPubSub.getAndCache(
BEBOP_RESTRICTED_CACHE_KEY,
);

return result === 'true';
}

async restrict(): Promise<void> {
await this.dexHelper.cache.setex(
this.dexKey,
this.network,
BEBOP_RESTRICTED_CACHE_KEY,
BEBOP_RESTRICT_TTL_S,
'true',
);

this.restrictPubSub.publish(
{ [BEBOP_RESTRICTED_CACHE_KEY]: 'true' },
BEBOP_RESTRICT_TTL_S,
);
}
}
1 change: 0 additions & 1 deletion src/dex/bebop/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ export type BebopRateFetcherConfig = {
tokensIntervalMs: number;
pricesCacheKey: string;
tokensAddrCacheKey: string;
tokensCacheKey: string;
pricesCacheTTLSecs: number;
tokensCacheTTLSecs: number;
};
Expand Down
Loading
Loading