Skip to content

Commit

Permalink
feat: pub-sub for bebop
Browse files Browse the repository at this point in the history
  • Loading branch information
KanievskyiDanylo committed Dec 18, 2024
1 parent 6353b26 commit 67af728
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 70 deletions.
60 changes: 7 additions & 53 deletions src/dex/bebop/bebop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
private tokensMap: TokenDataMap = {};

private pricesCacheKey: string;
private tokensCacheKey: string;
private tokensAddrCacheKey: string;

private bebopAuthToken: string;
Expand All @@ -90,7 +89,6 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
) {
super(dexHelper, dexKey);
this.logger = dexHelper.getLogger(dexKey);
this.tokensCacheKey = `tokens`;
this.pricesCacheKey = `prices`;
this.tokensAddrCacheKey = `tokens_addr`;
const token = this.dexHelper.config.data.bebopAuthToken;
Expand All @@ -109,7 +107,6 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
tokensIntervalMs: BEBOP_TOKENS_POLLING_INTERVAL_MS,
pricesCacheKey: this.pricesCacheKey,
pricesCacheTTLSecs: BEBOP_PRICES_CACHE_TTL,
tokensCacheKey: this.tokensCacheKey,
tokensAddrCacheKey: this.tokensAddrCacheKey,
tokensCacheTTLSecs: BEBOP_TOKENS_CACHE_TTL,
tokensReqParams: {
Expand All @@ -132,11 +129,8 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
}

async initializePricing(blockNumber: number) {
if (!this.dexHelper.config.isSlave) {
this.rateFetcher.start();
await sleep(BEBOP_INIT_TIMEOUT_MS);
}

await this.rateFetcher.start();
await sleep(BEBOP_INIT_TIMEOUT_MS);
await this.setTokensMap();
}

Expand Down Expand Up @@ -184,7 +178,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
return [];
}

const prices = await this.getCachedPrices();
const prices = await this.rateFetcher.getCachedPrices();
if (!prices) {
throw new Error('No prices available');
}
Expand Down Expand Up @@ -523,7 +517,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
}

async setTokensMap() {
const tokens = await this.getCachedTokens();
const tokens = await this.rateFetcher.getCachedTokens();

if (tokens) {
this.tokensMap = tokens;
Expand All @@ -542,7 +536,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
tokenAddress: Address,
limit: number,
): Promise<PoolLiquidity[]> {
const prices = await this.getCachedPrices();
const prices = await this.rateFetcher.getCachedPrices();

if (!prices) {
return [];
Expand Down Expand Up @@ -812,13 +806,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
errorsData.count + 1
} within ${BEBOP_RESTRICT_CHECK_INTERVAL_MS / 1000 / 60} minutes`,
);
await this.dexHelper.cache.setex(
this.dexKey,
this.network,
BEBOP_RESTRICTED_CACHE_KEY,
BEBOP_RESTRICT_TTL_S,
'true',
);
this.rateFetcher.restrict();
} else {
this.logger.warn(
`${this.dexKey}-${this.network}: Error count increased`,
Expand All @@ -839,41 +827,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
}

async isRestricted(): Promise<boolean> {
const result = await this.dexHelper.cache.get(
this.dexKey,
this.network,
BEBOP_RESTRICTED_CACHE_KEY,
);

return result === 'true';
}

async getCachedPrices(): Promise<BebopPricingResponse | null> {
const cachedPrices = await this.dexHelper.cache.get(
this.dexKey,
this.network,
this.pricesCacheKey,
);

if (cachedPrices) {
return JSON.parse(cachedPrices) as BebopPricingResponse;
}

return null;
}

async getCachedTokens(): Promise<TokenDataMap | null> {
const cachedTokens = await this.dexHelper.cache.get(
this.dexKey,
this.network,
this.tokensAddrCacheKey,
);

if (cachedTokens) {
return JSON.parse(cachedTokens) as TokenDataMap;
}

return null;
return this.rateFetcher.isRestricted();
}

getTokenFromAddress(address: Address): Token {
Expand Down
103 changes: 87 additions & 16 deletions src/dex/bebop/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import {
BebopPricingResponse,
BebopRateFetcherConfig,
BebopTokensResponse,
TokenDataMap,
} from './types';
import {
BebopPricingUpdate,
pricesResponseValidator,
tokensResponseValidator,
} from './validators';
import { BebopPricingUpdate, tokensResponseValidator } from './validators';
import { WebSocketFetcher } from '../../lib/fetcher/wsFetcher';
import { utils } from 'ethers';
import { BEBOP_RESTRICT_TTL_S, BEBOP_RESTRICTED_CACHE_KEY } from './constants';
import { ExpKeyValuePubSub } from '../../lib/pub-sub';

export function levels_from_flat_array(values: number[]): BebopLevel[] {
const levels: BebopLevel[] = [];
Expand All @@ -27,15 +26,18 @@ export function levels_from_flat_array(values: number[]): BebopLevel[] {
}

export class RateFetcher {
private tokensPricesPubSub: ExpKeyValuePubSub;

private pricesFetcher: WebSocketFetcher<BebopPricingResponse>;
private pricesCacheKey: string;
private pricesCacheTTL: number;

private tokensFetcher: Fetcher<BebopTokensResponse>;
private tokensAddrCacheKey: string;
private tokensCacheKey: string;
private tokensCacheTTL: number;

private restrictPubSub: ExpKeyValuePubSub;

constructor(
private dexHelper: IDexHelper,
private dexKey: string,
Expand All @@ -45,6 +47,13 @@ export class RateFetcher {
) {
this.pricesCacheKey = config.rateConfig.pricesCacheKey;
this.pricesCacheTTL = config.rateConfig.pricesCacheTTLSecs;

this.tokensPricesPubSub = new ExpKeyValuePubSub(
this.dexHelper,
this.dexKey,
'tokensPrices',
);

this.pricesFetcher = new WebSocketFetcher<BebopPricingResponse>(
{
info: {
Expand All @@ -68,8 +77,8 @@ export class RateFetcher {
);

this.tokensAddrCacheKey = config.rateConfig.tokensAddrCacheKey;
this.tokensCacheKey = config.rateConfig.tokensCacheKey;
this.tokensCacheTTL = config.rateConfig.tokensCacheTTLSecs;

this.tokensFetcher = new Fetcher<BebopTokensResponse>(
dexHelper.httpRequest,
{
Expand All @@ -87,6 +96,14 @@ export class RateFetcher {
config.rateConfig.tokensIntervalMs,
logger,
);

this.restrictPubSub = new ExpKeyValuePubSub(
dexHelper,
dexKey,
'restrict',
'not_restricted',
BEBOP_RESTRICT_TTL_S,
);
}

parsePricingUpdate(updateObject: any): BebopPricingResponse {
Expand Down Expand Up @@ -114,8 +131,13 @@ export class RateFetcher {
}

start() {
this.pricesFetcher.startPolling();
this.tokensFetcher.startPolling();
if (!this.dexHelper.config.isSlave) {
this.pricesFetcher.startPolling();
this.tokensFetcher.startPolling();
} else {
this.tokensPricesPubSub.subscribe();
this.restrictPubSub.subscribe();
}
}

stop() {
Expand All @@ -141,17 +163,14 @@ export class RateFetcher {
this.dexHelper.cache.setex(
this.dexKey,
this.network,
this.tokensCacheKey,
this.tokensAddrCacheKey,
this.tokensCacheTTL,
JSON.stringify(tokenMap),
JSON.stringify(tokenAddrMap),
);

this.dexHelper.cache.setex(
this.dexKey,
this.network,
this.tokensAddrCacheKey,
this.tokensPricesPubSub.publish(
{ [this.tokensAddrCacheKey]: tokenAddrMap },
this.tokensCacheTTL,
JSON.stringify(tokenAddrMap),
);
}

Expand Down Expand Up @@ -179,5 +198,57 @@ export class RateFetcher {
this.pricesCacheTTL,
JSON.stringify(normalizedPrices),
);

this.tokensPricesPubSub.publish(
{ [this.pricesCacheKey]: normalizedPrices },
this.pricesCacheTTL,
);
}

async getCachedPrices(): Promise<BebopPricingResponse | null> {
const cachedPrices = await this.tokensPricesPubSub.getAndCache(
this.pricesCacheKey,
);

if (cachedPrices) {
return cachedPrices as BebopPricingResponse;
}

return null;
}

async getCachedTokens(): Promise<TokenDataMap | null> {
const cachedTokens = await this.tokensPricesPubSub.getAndCache(
this.tokensAddrCacheKey,
);

if (cachedTokens) {
return cachedTokens as TokenDataMap;
}

return null;
}

async isRestricted(): Promise<boolean> {
const result = await this.restrictPubSub.getAndCache(
BEBOP_RESTRICTED_CACHE_KEY,
);

return result === 'true';
}

async restrict(): Promise<void> {
await this.dexHelper.cache.setex(
this.dexKey,
this.network,
BEBOP_RESTRICTED_CACHE_KEY,
BEBOP_RESTRICT_TTL_S,
'true',
);

this.restrictPubSub.publish(
{ [BEBOP_RESTRICTED_CACHE_KEY]: 'true' },
BEBOP_RESTRICT_TTL_S,
);
}
}
1 change: 0 additions & 1 deletion src/dex/bebop/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ export type BebopRateFetcherConfig = {
tokensIntervalMs: number;
pricesCacheKey: string;
tokensAddrCacheKey: string;
tokensCacheKey: string;
pricesCacheTTLSecs: number;
tokensCacheTTLSecs: number;
};
Expand Down

0 comments on commit 67af728

Please sign in to comment.