diff --git a/src/dex/swaap-v2/constants.ts b/src/dex/swaap-v2/constants.ts index 8c6684abf..d528df744 100644 --- a/src/dex/swaap-v2/constants.ts +++ b/src/dex/swaap-v2/constants.ts @@ -1,10 +1,10 @@ import BigNumber from 'bignumber.js'; -export const SWAAP_RFQ_PRICES_CACHES_TTL_S = 3; +export const SWAAP_RFQ_PRICES_CACHES_TTL_S = 5; export const SWAAP_RFQ_QUOTE_TIMEOUT_MS = 2000; -export const SWAAP_RFQ_API_PRICES_POLLING_INTERVAL_MS = 1000; +export const SWAAP_RFQ_API_PRICES_POLLING_INTERVAL_MS = 2000; export const SWAAP_RFQ_API_TOKENS_POLLING_INTERVAL_MS = 1000 * 60 * 60; // 1 hour diff --git a/src/dex/swaap-v2/rate-fetcher.ts b/src/dex/swaap-v2/rate-fetcher.ts index a7303e7d4..593d3fa13 100644 --- a/src/dex/swaap-v2/rate-fetcher.ts +++ b/src/dex/swaap-v2/rate-fetcher.ts @@ -16,7 +16,6 @@ import { } from './types'; import { priceLevelsResponseValidator, - getQuoteResponseValidator, getTokensResponseValidator, notifyResponseValidator, getQuoteResponseWithRecipientValidator, @@ -26,8 +25,16 @@ import { SWAAP_RFQ_QUOTE_TIMEOUT_MS, SWAAP_NOTIFY_TIMEOUT_MS, SWAAP_NOTIFICATION_ORIGIN, + SWAAP_TOKENS_CACHE_KEY, + SWAAP_PRICES_CACHE_KEY, + SWAAP_403_TTL_S, + SWAAP_POOL_RESTRICT_TTL_S, } from './constants'; import { RequestConfig } from '../../dex-helper/irequest-wrapper'; +import { Network } from '../../constants'; +import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub'; + +const BLACKLISTED = 'blacklisted'; export class RateFetcher { private rateFetcher: Fetcher; @@ -37,8 +44,13 @@ export class RateFetcher { private tokenCacheKey: string; private pricesCacheKey: string; + private rateTokensPubSub: ExpKeyValuePubSub; + private restrictedPubSub: ExpKeyValuePubSub; + private blacklistPubSub: NonExpSetPubSub; + constructor( private dexHelper: IDexHelper, + private network: Network, private dexKey: string, private logger: Logger, config: SwaapV2RateFetcherConfig, @@ -48,6 +60,12 @@ export class RateFetcher { this.pricesCacheKey = config.rateConfig.pricesCacheKey; this.tokenCacheKey = config.tokensConfig.tokensCacheKey; + this.rateTokensPubSub = new ExpKeyValuePubSub( + this.dexHelper, + this.dexKey, + 'rateTokens', + ); + this.rateFetcher = new Fetcher( dexHelper.httpRequest, { @@ -83,11 +101,33 @@ export class RateFetcher { config.tokensConfig.tokensIntervalMs, logger, ); + + this.restrictedPubSub = new ExpKeyValuePubSub( + this.dexHelper, + this.dexKey, + 'restricted', + 'pool_is_not_restricted', + SWAAP_POOL_RESTRICT_TTL_S, + ); + + this.blacklistPubSub = new NonExpSetPubSub( + this.dexHelper, + this.dexKey, + 'blacklist', + ); } - start() { - this.rateFetcher.startPolling(); - this.tokensFetcher.startPolling(); + async start() { + if (!this.dexHelper.config.isSlave) { + this.rateFetcher.startPolling(); + this.tokensFetcher.startPolling(); + } else { + this.rateTokensPubSub.subscribe(); + this.restrictedPubSub.subscribe(); + + const initSet = await this.getAllBlacklisted(); + this.blacklistPubSub.initializeAndSubscribe(initSet); + } } stop() { @@ -112,6 +152,11 @@ export class RateFetcher { this.tokensCacheTTL, JSON.stringify(tokensMap), ); + + this.rateTokensPubSub.publish( + { [this.tokenCacheKey]: tokensMap }, + this.tokensCacheTTL, + ); } private handleRatesResponse(resp: SwaapV2PriceLevelsResponse): void { @@ -149,6 +194,11 @@ export class RateFetcher { this.pricesCacheTTL, JSON.stringify(levels), ); + + this.rateTokensPubSub.publish( + { [this.pricesCacheKey]: levels }, + this.pricesCacheTTL, + ); } async getQuote( @@ -264,4 +314,86 @@ export class RateFetcher { throw e; } } + + async getCachedTokens(): Promise { + const cachedTokens = await this.rateTokensPubSub.getAndCache( + this.tokenCacheKey, + ); + + if (cachedTokens) { + return cachedTokens as TokensMap; + } + + return null; + } + + async getCachedLevels(): Promise | null> { + const cachedLevels = await this.rateTokensPubSub.getAndCache( + this.pricesCacheKey, + ); + + if (cachedLevels) { + return cachedLevels as Record; + } + + return null; + } + + async isBlacklisted(txOrigin: Address): Promise { + return this.blacklistPubSub.has(txOrigin.toLowerCase()); + } + + getBlackListKey(address: Address) { + return `blacklist_${address}`.toLowerCase(); + } + + async setBlacklist( + txOrigin: Address, + ttl: number = SWAAP_403_TTL_S, + ): Promise { + await this.dexHelper.cache.setex( + this.dexKey, + this.network, + this.getBlackListKey(txOrigin), + ttl, + BLACKLISTED, + ); + + this.blacklistPubSub.publish([txOrigin.toLowerCase()]); + return true; + } + + async getAllBlacklisted(): Promise { + const defaultKey = this.getBlackListKey(''); + const pattern = `${defaultKey}*`; + const allBlacklisted = await this.dexHelper.cache.keys( + this.dexKey, + this.network, + pattern, + ); + + return allBlacklisted.map(t => this.getAddressFromBlackListKey(t)); + } + + getAddressFromBlackListKey(key: Address) { + return (key.split('blacklist_')[1] ?? '').toLowerCase(); + } + + async restrictPool(poolIdentifier: string): Promise { + await this.restrictedPubSub.publish( + { [this.getRestrictPoolKey(poolIdentifier)]: 'restricted' }, + SWAAP_POOL_RESTRICT_TTL_S, + ); + } + + async isRestrictedPool(poolIdentifier: string): Promise { + const restricted = await this.restrictedPubSub.getAndCache( + this.getRestrictPoolKey(poolIdentifier), + ); + return restricted === 'restricted'; + } + + getRestrictPoolKey(poolIdentifier: string): string { + return `restricted_mms_${poolIdentifier}`; + } } diff --git a/src/dex/swaap-v2/swaap-v2.ts b/src/dex/swaap-v2/swaap-v2.ts index 8140cbbdd..398d43b9e 100644 --- a/src/dex/swaap-v2/swaap-v2.ts +++ b/src/dex/swaap-v2/swaap-v2.ts @@ -79,8 +79,6 @@ import { BI_MAX_UINT256 } from '../../bigint-constants'; import { SpecialDex } from '../../executor/types'; import { extractReturnAmountPosition } from '../../executor/utils'; -const BLACKLISTED = 'blacklisted'; - export class SwaapV2 extends SimpleExchange implements IDex { readonly isStatePollingDex = true; readonly hasConstantPriceLargeAmounts = false; @@ -89,7 +87,6 @@ export class SwaapV2 extends SimpleExchange implements IDex { private rateFetcher: RateFetcher; private swaapV2AuthToken: string; private tokensMap: TokensMap = {}; - private runtimeMMsRestrictHashMapKey: string; public static dexKeysWithNetwork: { key: string; networks: Network[] }[] = getDexKeysWithNetwork(SwaapV2Config); @@ -115,6 +112,7 @@ export class SwaapV2 extends SimpleExchange implements IDex { this.rateFetcher = new RateFetcher( this.dexHelper, + this.network, this.dexKey, this.logger, { @@ -132,15 +130,10 @@ export class SwaapV2 extends SimpleExchange implements IDex { }, }, ); - - this.runtimeMMsRestrictHashMapKey = - `${CACHE_PREFIX}_${this.dexKey}_${this.network}_restricted_mms`.toLowerCase(); } async initializePricing(blockNumber: number): Promise { - if (!this.dexHelper.config.isSlave) { - await this.rateFetcher.start(); - } + await this.rateFetcher.start(); return; } @@ -168,7 +161,7 @@ export class SwaapV2 extends SimpleExchange implements IDex { normalizedDestToken.address, ); - const levels = await this.getCachedLevels(); + const levels = await this.rateFetcher.getCachedLevels(); if (levels === null) { return []; } @@ -277,34 +270,6 @@ export class SwaapV2 extends SimpleExchange implements IDex { }); } - async getCachedTokens(): Promise { - const cachedTokens = await this.dexHelper.cache.get( - this.dexKey, - this.network, - SWAAP_TOKENS_CACHE_KEY, - ); - - if (cachedTokens) { - return JSON.parse(cachedTokens) as TokensMap; - } - - return null; - } - - async getCachedLevels(): Promise | null> { - const cachedLevels = await this.dexHelper.cache.get( - this.dexKey, - this.network, - SWAAP_PRICES_CACHE_KEY, - ); - - if (cachedLevels) { - return JSON.parse(cachedLevels) as Record; - } - - return null; - } - normalizeToken(token: Token): Token { return { address: normalizeTokenAddress(token.address), @@ -330,11 +295,11 @@ export class SwaapV2 extends SimpleExchange implements IDex { ); try { - if (await this.isRestrictedPool(requestedPoolIdentifier)) { + if (await this.rateFetcher.isRestrictedPool(requestedPoolIdentifier)) { return null; } - this.tokensMap = (await this.getCachedTokens()) || {}; + this.tokensMap = (await this.rateFetcher.getCachedTokens()) || {}; if (normalizedSrcToken.address === normalizedDestToken.address) { return null; @@ -362,7 +327,7 @@ export class SwaapV2 extends SimpleExchange implements IDex { return null; } - const levels = await this.getCachedLevels(); + const levels = await this.rateFetcher.getCachedLevels(); if (levels === null) { return null; } @@ -636,12 +601,18 @@ export class SwaapV2 extends SimpleExchange implements IDex { ]; } catch (e) { if (isAxiosError(e) && e.response?.status === 403) { - await this.setBlacklist(options.userAddress, SWAAP_403_TTL_S); + await this.rateFetcher.setBlacklist( + options.userAddress, + SWAAP_403_TTL_S, + ); this.logger.warn( `${this.dexKey}-${this.network}: Encountered blacklisted user=${options.userAddress}. Adding to local blacklist cache`, ); } else if (isAxiosError(e) && e.response?.status === 429) { - await this.setBlacklist(options.userAddress, SWAAP_429_TTL_S); + await this.rateFetcher.setBlacklist( + options.userAddress, + SWAAP_429_TTL_S, + ); this.logger.warn( `${this.dexKey}-${this.network}: Encountered restricted user=${options.userAddress}. Adding to local blacklist cache`, ); @@ -677,11 +648,7 @@ export class SwaapV2 extends SimpleExchange implements IDex { ); // We use timestamp for creation date to later discern if it already expired or not - await this.dexHelper.cache.hset( - this.runtimeMMsRestrictHashMapKey, - poolIdentifier, - Date.now().toString(), - ); + await this.rateFetcher.restrictPool(poolIdentifier); this.rateFetcher .notify(SWAAP_BANNED_CODE, message, this.getNotifyReqParams()) @@ -698,45 +665,8 @@ export class SwaapV2 extends SimpleExchange implements IDex { }); } - async isRestrictedPool(poolIdentifier: string): Promise { - const expirationThreshold = Date.now() - SWAAP_POOL_RESTRICT_TTL_S * 1000; - const createdAt = await this.dexHelper.cache.hget( - this.runtimeMMsRestrictHashMapKey, - poolIdentifier, - ); - const wasNotRestricted = createdAt === null; - if (wasNotRestricted) { - return false; - } - const restrictionExpired = +createdAt < expirationThreshold; - return !restrictionExpired; - } - - async isBlacklisted(txOrigin: Address): Promise { - const result = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.getBlackListKey(txOrigin), - ); - return result === BLACKLISTED; - } - - getBlackListKey(address: Address) { - return `blacklist_${address}`.toLowerCase(); - } - - async setBlacklist( - txOrigin: Address, - ttl: number = SWAAP_403_TTL_S, - ): Promise { - await this.dexHelper.cache.setex( - this.dexKey, - this.network, - this.getBlackListKey(txOrigin), - ttl, - BLACKLISTED, - ); - return true; + isBlacklisted(userAddress: Address): AsyncOrSync { + return this.rateFetcher.isBlacklisted(userAddress); } // Returns estimated gas cost of calldata for this DEX in multiSwap @@ -971,10 +901,10 @@ export class SwaapV2 extends SimpleExchange implements IDex { tokenAddress: Address, limit: number, ): Promise { - this.tokensMap = (await this.getCachedTokens()) || {}; + this.tokensMap = (await this.rateFetcher.getCachedTokens()) || {}; const normalizedTokenAddress = normalizeTokenAddress(tokenAddress); - const pLevels = await this.getCachedLevels(); + const pLevels = await this.rateFetcher.getCachedLevels(); if (pLevels === null) { return []; @@ -992,7 +922,9 @@ export class SwaapV2 extends SimpleExchange implements IDex { base, quote, ); - const isRestrictedPool = await this.isRestrictedPool(poolIdentifier); + const isRestrictedPool = await this.rateFetcher.isRestrictedPool( + poolIdentifier, + ); return { pair,