Skip to content

Commit

Permalink
feat: pub-sub for cables
Browse files Browse the repository at this point in the history
  • Loading branch information
KanievskyiDanylo committed Dec 18, 2024
1 parent 67af728 commit 544893b
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 76 deletions.
78 changes: 9 additions & 69 deletions src/dex/cables/cables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ export class Cables extends SimpleExchange implements IDex<any> {
return null;
}

const isRestricted = await this.isRestricted();
const isRestricted = await this.rateFetcher.isRestricted();
if (isRestricted) {
return null;
}
Expand All @@ -514,7 +514,7 @@ export class Cables extends SimpleExchange implements IDex<any> {
if (pools.length === 0) return null;

// ---------- Prices ----------
const priceMap = await this.getCachedPrices();
const priceMap = await this.rateFetcher.getCachedPrices();

if (!priceMap) return null;

Expand Down Expand Up @@ -619,7 +619,7 @@ export class Cables extends SimpleExchange implements IDex<any> {
}

async setTokensMap() {
const tokens = await this.getCachedTokens();
const tokens = await this.rateFetcher.getCachedTokens();

if (tokens) {
this.tokensMap = Object.keys(tokens).reduce((acc, key) => {
Expand All @@ -637,41 +637,8 @@ export class Cables extends SimpleExchange implements IDex<any> {
return [];
}

/**
* CACHED UTILS
*/
async getCachedTokens(): Promise<any> {
const cachedTokens = await this.dexHelper.cache.get(
this.dexKey,
this.network,
this.rateFetcher.tokensCacheKey,
);

return cachedTokens ? JSON.parse(cachedTokens) : {};
}

async getCachedPairs(): Promise<any> {
const cachedPairs = await this.dexHelper.cache.get(
this.dexKey,
this.network,
this.rateFetcher.pairsCacheKey,
);

return cachedPairs ? JSON.parse(cachedPairs) : {};
}

async getCachedPrices(): Promise<any> {
const cachedPrices = await this.dexHelper.cache.get(
this.dexKey,
this.network,
this.rateFetcher.pricesCacheKey,
);

return cachedPrices ? JSON.parse(cachedPrices) : {};
}

async getCachedTokensAddr(): Promise<any> {
const tokens = await this.getCachedTokens();
const tokens = await this.rateFetcher.getCachedTokens();
const tokensAddr: Record<string, Address> = {};
for (const key of Object.keys(tokens)) {
tokensAddr[tokens[key].symbol.toLowerCase()] = tokens[key].address;
Expand Down Expand Up @@ -701,12 +668,12 @@ export class Cables extends SimpleExchange implements IDex<any> {
return null;
}

const cachedTokens = await this.getCachedTokens();
const cachedTokens = await this.rateFetcher.getCachedTokens();

srcToken.symbol = this.findKeyByAddress(cachedTokens, srcToken.address);
destToken.symbol = this.findKeyByAddress(cachedTokens, destToken.address);

const cachedPairs = await this.getCachedPairs();
const cachedPairs = await this.rateFetcher.getCachedPairs();

const potentialPairs = [
{
Expand Down Expand Up @@ -734,28 +701,7 @@ export class Cables extends SimpleExchange implements IDex<any> {
}

async isBlacklisted(txOrigin: Address): Promise<boolean> {
const cachedBlacklist = await this.dexHelper.cache.get(
this.dexKey,
this.network,
this.rateFetcher.blacklistCacheKey,
);

if (cachedBlacklist) {
const blacklist = JSON.parse(cachedBlacklist) as string[];
return blacklist.includes(txOrigin.toLowerCase());
}

return false;
}

async isRestricted(): Promise<boolean> {
const result = await this.dexHelper.cache.get(
this.dexKey,
this.network,
CABLES_RESTRICTED_CACHE_KEY,
);

return result === 'true';
return this.rateFetcher.isBlacklisted(txOrigin);
}

async restrict() {
Expand Down Expand Up @@ -795,13 +741,7 @@ export class Cables extends SimpleExchange implements IDex<any> {
errorsData.count + 1
} within ${CABLES_RESTRICT_CHECK_INTERVAL_MS / 1000 / 60} minutes`,
);
await this.dexHelper.cache.setex(
this.dexKey,
this.network,
CABLES_RESTRICTED_CACHE_KEY,
CABLES_RESTRICT_TTL_S,
'true',
);
this.rateFetcher.restrict();
} else {
this.logger.warn(
`${this.dexKey}-${this.network}: Error count increased`,
Expand All @@ -813,7 +753,7 @@ export class Cables extends SimpleExchange implements IDex<any> {
await this.dexHelper.cache.setex(
this.dexKey,
this.network,
CABLES_RESTRICTED_CACHE_KEY,
CABLES_ERRORS_CACHE_KEY,
ERRORS_TTL_S,
Utils.Serialize(data),
);
Expand Down
127 changes: 120 additions & 7 deletions src/dex/cables/rate-fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import { Network } from '../../constants';
import { IDexHelper } from '../../dex-helper';
import { Fetcher } from '../../lib/fetcher/fetcher';
import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub';
import { validateAndCast } from '../../lib/validators';
import { Logger, Token } from '../../types';
import { Address, Logger } from '../../types';
import { PairData } from '../cables/types';
import {
CABLES_RESTRICT_TTL_S,
CABLES_RESTRICTED_CACHE_KEY,
} from './constants';
import {
CablesBlacklistResponse,
CablesPairsResponse,
Expand All @@ -19,6 +24,8 @@ import {
} from './validators';

export class CablesRateFetcher {
private tokensPairsPricesPubSub: ExpKeyValuePubSub;

public tokensFetcher: Fetcher<CablesTokensResponse>;
public tokensCacheKey: string;
public tokensCacheTTL: number;
Expand All @@ -32,9 +39,12 @@ export class CablesRateFetcher {
public pricesCacheTTL: number;

public blacklistFetcher: Fetcher<CablesBlacklistResponse>;
private blacklistPubSub: NonExpSetPubSub;
public blacklistCacheKey: string;
public blacklistCacheTTL: number;

private restrictPubSub: ExpKeyValuePubSub;

constructor(
private dexHelper: IDexHelper,
private dexKey: string,
Expand All @@ -54,6 +64,12 @@ export class CablesRateFetcher {
this.blacklistCacheKey = config.rateConfig.blacklistCacheKey;
this.blacklistCacheTTL = config.rateConfig.blacklistCacheTTLSecs;

this.tokensPairsPricesPubSub = new ExpKeyValuePubSub(
dexHelper,
dexKey,
'tokensPairsPrices',
);

this.pairsFetcher = new Fetcher<CablesPairsResponse>(
dexHelper.httpRequest,
{
Expand Down Expand Up @@ -90,6 +106,7 @@ export class CablesRateFetcher {
logger,
);

this.blacklistPubSub = new NonExpSetPubSub(dexHelper, dexKey, 'blacklist');
this.blacklistFetcher = new Fetcher<CablesBlacklistResponse>(
dexHelper.httpRequest,
{
Expand Down Expand Up @@ -125,16 +142,32 @@ export class CablesRateFetcher {
config.rateConfig.tokensIntervalMs,
logger,
);

this.restrictPubSub = new ExpKeyValuePubSub(
dexHelper,
dexKey,
'restrict',
'not_restricted',
CABLES_RESTRICT_TTL_S,
);
}

/**
* Utils
*/
start() {
this.pairsFetcher.startPolling();
this.pricesFetcher.startPolling();
this.blacklistFetcher.startPolling();
this.tokensFetcher.startPolling();
async start() {
if (!this.dexHelper.config.isSlave) {
this.pairsFetcher.startPolling();
this.pricesFetcher.startPolling();
this.blacklistFetcher.startPolling();
this.tokensFetcher.startPolling();
} else {
this.tokensPairsPricesPubSub.subscribe();
this.restrictPubSub.subscribe();

const initBlacklisted = await this.getAllBlacklisted();
this.blacklistPubSub.initializeAndSubscribe(initBlacklisted);
}
}
stop() {
this.pairsFetcher.stopPolling();
Expand All @@ -159,6 +192,11 @@ export class CablesRateFetcher {
this.pairsCacheTTL,
JSON.stringify(normalized_pairs),
);

this.tokensPairsPricesPubSub.publish(
{ [this.pairsCacheKey]: normalized_pairs },
this.pairsCacheTTL,
);
}

private handlePricesResponse(res: CablesPricesResponse): void {
Expand All @@ -172,17 +210,25 @@ export class CablesRateFetcher {
this.pricesCacheTTL,
JSON.stringify(prices),
);

this.tokensPairsPricesPubSub.publish(
{ [this.pricesCacheKey]: prices },
this.pricesCacheTTL,
);
}

private handleBlacklistResponse(res: CablesBlacklistResponse): void {
const { blacklist } = res;
const list = blacklist.map(item => item.toLowerCase());
this.dexHelper.cache.setex(
this.dexKey,
this.network,
this.blacklistCacheKey,
this.blacklistCacheTTL,
JSON.stringify(blacklist.map(item => item.toLowerCase())),
JSON.stringify(list),
);

this.blacklistPubSub.publish(list);
}

// Convert addresses to lowercase
Expand All @@ -208,5 +254,72 @@ export class CablesRateFetcher {
this.tokensCacheTTL,
JSON.stringify(normalizedTokens),
);

this.tokensPairsPricesPubSub.publish(
{ [this.tokensCacheKey]: normalizedTokens },
this.tokensCacheTTL,
);
}

/**
* CACHED UTILS
*/
async getCachedTokens(): Promise<any> {
const cachedTokens = await this.tokensPairsPricesPubSub.getAndCache(
this.tokensCacheKey,
);
return cachedTokens ?? {};
}

async getCachedPairs(): Promise<any> {
const cachedPairs = await this.tokensPairsPricesPubSub.getAndCache(
this.pairsCacheKey,
);
return cachedPairs ?? {};
}

async getCachedPrices(): Promise<any> {
const cachedPrices = await this.tokensPairsPricesPubSub.getAndCache(
this.pricesCacheKey,
);

return cachedPrices ?? {};
}

async getAllBlacklisted(): Promise<string[]> {
const cachedBlacklist = await this.dexHelper.cache.get(
this.dexKey,
this.network,
this.blacklistCacheKey,
);

return cachedBlacklist ? JSON.parse(cachedBlacklist) : [];
}

async isBlacklisted(txOrigin: Address): Promise<boolean> {
return this.blacklistPubSub.has(txOrigin.toLowerCase());
}

async isRestricted(): Promise<boolean> {
const result = await this.restrictPubSub.getAndCache(
CABLES_RESTRICTED_CACHE_KEY,
);

return result === 'true';
}

async restrict(): Promise<void> {
await this.dexHelper.cache.setex(
this.dexKey,
this.network,
CABLES_RESTRICTED_CACHE_KEY,
CABLES_RESTRICT_TTL_S,
'true',
);

this.restrictPubSub.publish(
{ [CABLES_RESTRICTED_CACHE_KEY]: 'true' },
CABLES_RESTRICT_TTL_S,
);
}
}

0 comments on commit 544893b

Please sign in to comment.