From 544893b65f5dd39a16f0592c141b62b1cd45595e Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Wed, 18 Dec 2024 19:30:00 +0200 Subject: [PATCH] feat: pub-sub for `cables` --- src/dex/cables/cables.ts | 78 +++----------------- src/dex/cables/rate-fetcher.ts | 127 +++++++++++++++++++++++++++++++-- 2 files changed, 129 insertions(+), 76 deletions(-) diff --git a/src/dex/cables/cables.ts b/src/dex/cables/cables.ts index eb14b3f8e..8ece2d92f 100644 --- a/src/dex/cables/cables.ts +++ b/src/dex/cables/cables.ts @@ -495,7 +495,7 @@ export class Cables extends SimpleExchange implements IDex { return null; } - const isRestricted = await this.isRestricted(); + const isRestricted = await this.rateFetcher.isRestricted(); if (isRestricted) { return null; } @@ -514,7 +514,7 @@ export class Cables extends SimpleExchange implements IDex { if (pools.length === 0) return null; // ---------- Prices ---------- - const priceMap = await this.getCachedPrices(); + const priceMap = await this.rateFetcher.getCachedPrices(); if (!priceMap) return null; @@ -619,7 +619,7 @@ export class Cables extends SimpleExchange implements IDex { } async setTokensMap() { - const tokens = await this.getCachedTokens(); + const tokens = await this.rateFetcher.getCachedTokens(); if (tokens) { this.tokensMap = Object.keys(tokens).reduce((acc, key) => { @@ -637,41 +637,8 @@ export class Cables extends SimpleExchange implements IDex { return []; } - /** - * CACHED UTILS - */ - async getCachedTokens(): Promise { - const cachedTokens = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.rateFetcher.tokensCacheKey, - ); - - return cachedTokens ? JSON.parse(cachedTokens) : {}; - } - - async getCachedPairs(): Promise { - const cachedPairs = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.rateFetcher.pairsCacheKey, - ); - - return cachedPairs ? JSON.parse(cachedPairs) : {}; - } - - async getCachedPrices(): Promise { - const cachedPrices = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.rateFetcher.pricesCacheKey, - ); - - return cachedPrices ? JSON.parse(cachedPrices) : {}; - } - async getCachedTokensAddr(): Promise { - const tokens = await this.getCachedTokens(); + const tokens = await this.rateFetcher.getCachedTokens(); const tokensAddr: Record = {}; for (const key of Object.keys(tokens)) { tokensAddr[tokens[key].symbol.toLowerCase()] = tokens[key].address; @@ -701,12 +668,12 @@ export class Cables extends SimpleExchange implements IDex { return null; } - const cachedTokens = await this.getCachedTokens(); + const cachedTokens = await this.rateFetcher.getCachedTokens(); srcToken.symbol = this.findKeyByAddress(cachedTokens, srcToken.address); destToken.symbol = this.findKeyByAddress(cachedTokens, destToken.address); - const cachedPairs = await this.getCachedPairs(); + const cachedPairs = await this.rateFetcher.getCachedPairs(); const potentialPairs = [ { @@ -734,28 +701,7 @@ export class Cables extends SimpleExchange implements IDex { } async isBlacklisted(txOrigin: Address): Promise { - const cachedBlacklist = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.rateFetcher.blacklistCacheKey, - ); - - if (cachedBlacklist) { - const blacklist = JSON.parse(cachedBlacklist) as string[]; - return blacklist.includes(txOrigin.toLowerCase()); - } - - return false; - } - - async isRestricted(): Promise { - const result = await this.dexHelper.cache.get( - this.dexKey, - this.network, - CABLES_RESTRICTED_CACHE_KEY, - ); - - return result === 'true'; + return this.rateFetcher.isBlacklisted(txOrigin); } async restrict() { @@ -795,13 +741,7 @@ export class Cables extends SimpleExchange implements IDex { errorsData.count + 1 } within ${CABLES_RESTRICT_CHECK_INTERVAL_MS / 1000 / 60} minutes`, ); - await this.dexHelper.cache.setex( - this.dexKey, - this.network, - CABLES_RESTRICTED_CACHE_KEY, - CABLES_RESTRICT_TTL_S, - 'true', - ); + this.rateFetcher.restrict(); } else { this.logger.warn( `${this.dexKey}-${this.network}: Error count increased`, @@ -813,7 +753,7 @@ export class Cables extends SimpleExchange implements IDex { await this.dexHelper.cache.setex( this.dexKey, this.network, - CABLES_RESTRICTED_CACHE_KEY, + CABLES_ERRORS_CACHE_KEY, ERRORS_TTL_S, Utils.Serialize(data), ); diff --git a/src/dex/cables/rate-fetcher.ts b/src/dex/cables/rate-fetcher.ts index 3df0b0f83..6fd34f8d2 100644 --- a/src/dex/cables/rate-fetcher.ts +++ b/src/dex/cables/rate-fetcher.ts @@ -1,9 +1,14 @@ import { Network } from '../../constants'; import { IDexHelper } from '../../dex-helper'; import { Fetcher } from '../../lib/fetcher/fetcher'; +import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub'; import { validateAndCast } from '../../lib/validators'; -import { Logger, Token } from '../../types'; +import { Address, Logger } from '../../types'; import { PairData } from '../cables/types'; +import { + CABLES_RESTRICT_TTL_S, + CABLES_RESTRICTED_CACHE_KEY, +} from './constants'; import { CablesBlacklistResponse, CablesPairsResponse, @@ -19,6 +24,8 @@ import { } from './validators'; export class CablesRateFetcher { + private tokensPairsPricesPubSub: ExpKeyValuePubSub; + public tokensFetcher: Fetcher; public tokensCacheKey: string; public tokensCacheTTL: number; @@ -32,9 +39,12 @@ export class CablesRateFetcher { public pricesCacheTTL: number; public blacklistFetcher: Fetcher; + private blacklistPubSub: NonExpSetPubSub; public blacklistCacheKey: string; public blacklistCacheTTL: number; + private restrictPubSub: ExpKeyValuePubSub; + constructor( private dexHelper: IDexHelper, private dexKey: string, @@ -54,6 +64,12 @@ export class CablesRateFetcher { this.blacklistCacheKey = config.rateConfig.blacklistCacheKey; this.blacklistCacheTTL = config.rateConfig.blacklistCacheTTLSecs; + this.tokensPairsPricesPubSub = new ExpKeyValuePubSub( + dexHelper, + dexKey, + 'tokensPairsPrices', + ); + this.pairsFetcher = new Fetcher( dexHelper.httpRequest, { @@ -90,6 +106,7 @@ export class CablesRateFetcher { logger, ); + this.blacklistPubSub = new NonExpSetPubSub(dexHelper, dexKey, 'blacklist'); this.blacklistFetcher = new Fetcher( dexHelper.httpRequest, { @@ -125,16 +142,32 @@ export class CablesRateFetcher { config.rateConfig.tokensIntervalMs, logger, ); + + this.restrictPubSub = new ExpKeyValuePubSub( + dexHelper, + dexKey, + 'restrict', + 'not_restricted', + CABLES_RESTRICT_TTL_S, + ); } /** * Utils */ - start() { - this.pairsFetcher.startPolling(); - this.pricesFetcher.startPolling(); - this.blacklistFetcher.startPolling(); - this.tokensFetcher.startPolling(); + async start() { + if (!this.dexHelper.config.isSlave) { + this.pairsFetcher.startPolling(); + this.pricesFetcher.startPolling(); + this.blacklistFetcher.startPolling(); + this.tokensFetcher.startPolling(); + } else { + this.tokensPairsPricesPubSub.subscribe(); + this.restrictPubSub.subscribe(); + + const initBlacklisted = await this.getAllBlacklisted(); + this.blacklistPubSub.initializeAndSubscribe(initBlacklisted); + } } stop() { this.pairsFetcher.stopPolling(); @@ -159,6 +192,11 @@ export class CablesRateFetcher { this.pairsCacheTTL, JSON.stringify(normalized_pairs), ); + + this.tokensPairsPricesPubSub.publish( + { [this.pairsCacheKey]: normalized_pairs }, + this.pairsCacheTTL, + ); } private handlePricesResponse(res: CablesPricesResponse): void { @@ -172,17 +210,25 @@ export class CablesRateFetcher { this.pricesCacheTTL, JSON.stringify(prices), ); + + this.tokensPairsPricesPubSub.publish( + { [this.pricesCacheKey]: prices }, + this.pricesCacheTTL, + ); } private handleBlacklistResponse(res: CablesBlacklistResponse): void { const { blacklist } = res; + const list = blacklist.map(item => item.toLowerCase()); this.dexHelper.cache.setex( this.dexKey, this.network, this.blacklistCacheKey, this.blacklistCacheTTL, - JSON.stringify(blacklist.map(item => item.toLowerCase())), + JSON.stringify(list), ); + + this.blacklistPubSub.publish(list); } // Convert addresses to lowercase @@ -208,5 +254,72 @@ export class CablesRateFetcher { this.tokensCacheTTL, JSON.stringify(normalizedTokens), ); + + this.tokensPairsPricesPubSub.publish( + { [this.tokensCacheKey]: normalizedTokens }, + this.tokensCacheTTL, + ); + } + + /** + * CACHED UTILS + */ + async getCachedTokens(): Promise { + const cachedTokens = await this.tokensPairsPricesPubSub.getAndCache( + this.tokensCacheKey, + ); + return cachedTokens ?? {}; + } + + async getCachedPairs(): Promise { + const cachedPairs = await this.tokensPairsPricesPubSub.getAndCache( + this.pairsCacheKey, + ); + return cachedPairs ?? {}; + } + + async getCachedPrices(): Promise { + const cachedPrices = await this.tokensPairsPricesPubSub.getAndCache( + this.pricesCacheKey, + ); + + return cachedPrices ?? {}; + } + + async getAllBlacklisted(): Promise { + const cachedBlacklist = await this.dexHelper.cache.get( + this.dexKey, + this.network, + this.blacklistCacheKey, + ); + + return cachedBlacklist ? JSON.parse(cachedBlacklist) : []; + } + + async isBlacklisted(txOrigin: Address): Promise { + return this.blacklistPubSub.has(txOrigin.toLowerCase()); + } + + async isRestricted(): Promise { + const result = await this.restrictPubSub.getAndCache( + CABLES_RESTRICTED_CACHE_KEY, + ); + + return result === 'true'; + } + + async restrict(): Promise { + await this.dexHelper.cache.setex( + this.dexKey, + this.network, + CABLES_RESTRICTED_CACHE_KEY, + CABLES_RESTRICT_TTL_S, + 'true', + ); + + this.restrictPubSub.publish( + { [CABLES_RESTRICTED_CACHE_KEY]: 'true' }, + CABLES_RESTRICT_TTL_S, + ); } }