diff --git a/package.json b/package.json index 9f5f4fbd5..7de0ecffa 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@paraswap/dex-lib", - "version": "4.0.6-rfq-pub-sub.0", + "version": "4.0.6-rfq-pub-sub.19", "main": "build/index.js", "types": "build/index.d.ts", "repository": "https://github.com/paraswap/paraswap-dex-lib", diff --git a/src/dex/bebop/bebop.ts b/src/dex/bebop/bebop.ts index fe69a4f19..463bfda0f 100644 --- a/src/dex/bebop/bebop.ts +++ b/src/dex/bebop/bebop.ts @@ -73,7 +73,6 @@ export class Bebop extends SimpleExchange implements IDex { private tokensMap: TokenDataMap = {}; private pricesCacheKey: string; - private tokensCacheKey: string; private tokensAddrCacheKey: string; private bebopAuthToken: string; @@ -90,7 +89,6 @@ export class Bebop extends SimpleExchange implements IDex { ) { super(dexHelper, dexKey); this.logger = dexHelper.getLogger(dexKey); - this.tokensCacheKey = `tokens`; this.pricesCacheKey = `prices`; this.tokensAddrCacheKey = `tokens_addr`; const token = this.dexHelper.config.data.bebopAuthToken; @@ -109,7 +107,6 @@ export class Bebop extends SimpleExchange implements IDex { tokensIntervalMs: BEBOP_TOKENS_POLLING_INTERVAL_MS, pricesCacheKey: this.pricesCacheKey, pricesCacheTTLSecs: BEBOP_PRICES_CACHE_TTL, - tokensCacheKey: this.tokensCacheKey, tokensAddrCacheKey: this.tokensAddrCacheKey, tokensCacheTTLSecs: BEBOP_TOKENS_CACHE_TTL, tokensReqParams: { @@ -132,11 +129,8 @@ export class Bebop extends SimpleExchange implements IDex { } async initializePricing(blockNumber: number) { - if (!this.dexHelper.config.isSlave) { - this.rateFetcher.start(); - await sleep(BEBOP_INIT_TIMEOUT_MS); - } - + await this.rateFetcher.start(); + await sleep(BEBOP_INIT_TIMEOUT_MS); await this.setTokensMap(); } @@ -184,7 +178,7 @@ export class Bebop extends SimpleExchange implements IDex { return []; } - const prices = await this.getCachedPrices(); + const prices = await this.rateFetcher.getCachedPrices(); if (!prices) { throw new Error('No prices available'); } @@ -523,7 +517,7 @@ export class Bebop extends SimpleExchange implements IDex { } async setTokensMap() { - const tokens = await this.getCachedTokens(); + const tokens = await this.rateFetcher.getCachedTokens(); if (tokens) { this.tokensMap = tokens; @@ -542,7 +536,7 @@ export class Bebop extends SimpleExchange implements IDex { tokenAddress: Address, limit: number, ): Promise { - const prices = await this.getCachedPrices(); + const prices = await this.rateFetcher.getCachedPrices(); if (!prices) { return []; @@ -812,13 +806,7 @@ export class Bebop extends SimpleExchange implements IDex { errorsData.count + 1 } within ${BEBOP_RESTRICT_CHECK_INTERVAL_MS / 1000 / 60} minutes`, ); - await this.dexHelper.cache.setex( - this.dexKey, - this.network, - BEBOP_RESTRICTED_CACHE_KEY, - BEBOP_RESTRICT_TTL_S, - 'true', - ); + this.rateFetcher.restrict(); } else { this.logger.warn( `${this.dexKey}-${this.network}: Error count increased`, @@ -839,41 +827,7 @@ export class Bebop extends SimpleExchange implements IDex { } async isRestricted(): Promise { - const result = await this.dexHelper.cache.get( - this.dexKey, - this.network, - BEBOP_RESTRICTED_CACHE_KEY, - ); - - return result === 'true'; - } - - async getCachedPrices(): Promise { - const cachedPrices = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.pricesCacheKey, - ); - - if (cachedPrices) { - return JSON.parse(cachedPrices) as BebopPricingResponse; - } - - return null; - } - - async getCachedTokens(): Promise { - const cachedTokens = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.tokensAddrCacheKey, - ); - - if (cachedTokens) { - return JSON.parse(cachedTokens) as TokenDataMap; - } - - return null; + return this.rateFetcher.isRestricted(); } getTokenFromAddress(address: Address): Token { diff --git a/src/dex/bebop/rate-fetcher.ts b/src/dex/bebop/rate-fetcher.ts index 37de21969..8cff7f3db 100644 --- a/src/dex/bebop/rate-fetcher.ts +++ b/src/dex/bebop/rate-fetcher.ts @@ -9,14 +9,13 @@ import { BebopPricingResponse, BebopRateFetcherConfig, BebopTokensResponse, + TokenDataMap, } from './types'; -import { - BebopPricingUpdate, - pricesResponseValidator, - tokensResponseValidator, -} from './validators'; +import { BebopPricingUpdate, tokensResponseValidator } from './validators'; import { WebSocketFetcher } from '../../lib/fetcher/wsFetcher'; import { utils } from 'ethers'; +import { BEBOP_RESTRICT_TTL_S, BEBOP_RESTRICTED_CACHE_KEY } from './constants'; +import { ExpKeyValuePubSub } from '../../lib/pub-sub'; export function levels_from_flat_array(values: number[]): BebopLevel[] { const levels: BebopLevel[] = []; @@ -27,15 +26,18 @@ export function levels_from_flat_array(values: number[]): BebopLevel[] { } export class RateFetcher { + private tokensPricesPubSub: ExpKeyValuePubSub; + private pricesFetcher: WebSocketFetcher; private pricesCacheKey: string; private pricesCacheTTL: number; private tokensFetcher: Fetcher; private tokensAddrCacheKey: string; - private tokensCacheKey: string; private tokensCacheTTL: number; + private restrictPubSub: ExpKeyValuePubSub; + constructor( private dexHelper: IDexHelper, private dexKey: string, @@ -45,6 +47,13 @@ export class RateFetcher { ) { this.pricesCacheKey = config.rateConfig.pricesCacheKey; this.pricesCacheTTL = config.rateConfig.pricesCacheTTLSecs; + + this.tokensPricesPubSub = new ExpKeyValuePubSub( + this.dexHelper, + this.dexKey, + 'tokensPrices', + ); + this.pricesFetcher = new WebSocketFetcher( { info: { @@ -68,8 +77,8 @@ export class RateFetcher { ); this.tokensAddrCacheKey = config.rateConfig.tokensAddrCacheKey; - this.tokensCacheKey = config.rateConfig.tokensCacheKey; this.tokensCacheTTL = config.rateConfig.tokensCacheTTLSecs; + this.tokensFetcher = new Fetcher( dexHelper.httpRequest, { @@ -87,6 +96,14 @@ export class RateFetcher { config.rateConfig.tokensIntervalMs, logger, ); + + this.restrictPubSub = new ExpKeyValuePubSub( + dexHelper, + dexKey, + 'restrict', + 'not_restricted', + BEBOP_RESTRICT_TTL_S, + ); } parsePricingUpdate(updateObject: any): BebopPricingResponse { @@ -114,8 +131,13 @@ export class RateFetcher { } start() { - this.pricesFetcher.startPolling(); - this.tokensFetcher.startPolling(); + if (!this.dexHelper.config.isSlave) { + this.pricesFetcher.startPolling(); + this.tokensFetcher.startPolling(); + } else { + this.tokensPricesPubSub.subscribe(); + this.restrictPubSub.subscribe(); + } } stop() { @@ -141,17 +163,14 @@ export class RateFetcher { this.dexHelper.cache.setex( this.dexKey, this.network, - this.tokensCacheKey, + this.tokensAddrCacheKey, this.tokensCacheTTL, - JSON.stringify(tokenMap), + JSON.stringify(tokenAddrMap), ); - this.dexHelper.cache.setex( - this.dexKey, - this.network, - this.tokensAddrCacheKey, + this.tokensPricesPubSub.publish( + { [this.tokensAddrCacheKey]: tokenAddrMap }, this.tokensCacheTTL, - JSON.stringify(tokenAddrMap), ); } @@ -179,5 +198,57 @@ export class RateFetcher { this.pricesCacheTTL, JSON.stringify(normalizedPrices), ); + + this.tokensPricesPubSub.publish( + { [this.pricesCacheKey]: normalizedPrices }, + this.pricesCacheTTL, + ); + } + + async getCachedPrices(): Promise { + const cachedPrices = await this.tokensPricesPubSub.getAndCache( + this.pricesCacheKey, + ); + + if (cachedPrices) { + return cachedPrices as BebopPricingResponse; + } + + return null; + } + + async getCachedTokens(): Promise { + const cachedTokens = await this.tokensPricesPubSub.getAndCache( + this.tokensAddrCacheKey, + ); + + if (cachedTokens) { + return cachedTokens as TokenDataMap; + } + + return null; + } + + async isRestricted(): Promise { + const result = await this.restrictPubSub.getAndCache( + BEBOP_RESTRICTED_CACHE_KEY, + ); + + return result === 'true'; + } + + async restrict(): Promise { + await this.dexHelper.cache.setex( + this.dexKey, + this.network, + BEBOP_RESTRICTED_CACHE_KEY, + BEBOP_RESTRICT_TTL_S, + 'true', + ); + + this.restrictPubSub.publish( + { [BEBOP_RESTRICTED_CACHE_KEY]: 'true' }, + BEBOP_RESTRICT_TTL_S, + ); } } diff --git a/src/dex/bebop/types.ts b/src/dex/bebop/types.ts index a61aff2af..ee7a854df 100644 --- a/src/dex/bebop/types.ts +++ b/src/dex/bebop/types.ts @@ -16,7 +16,6 @@ export type BebopRateFetcherConfig = { tokensIntervalMs: number; pricesCacheKey: string; tokensAddrCacheKey: string; - tokensCacheKey: string; pricesCacheTTLSecs: number; tokensCacheTTLSecs: number; }; diff --git a/src/dex/cables/cables.ts b/src/dex/cables/cables.ts index eb14b3f8e..f32acb295 100644 --- a/src/dex/cables/cables.ts +++ b/src/dex/cables/cables.ts @@ -495,7 +495,7 @@ export class Cables extends SimpleExchange implements IDex { return null; } - const isRestricted = await this.isRestricted(); + const isRestricted = await this.rateFetcher.isRestricted(); if (isRestricted) { return null; } @@ -514,7 +514,7 @@ export class Cables extends SimpleExchange implements IDex { if (pools.length === 0) return null; // ---------- Prices ---------- - const priceMap = await this.getCachedPrices(); + const priceMap = await this.rateFetcher.getCachedPrices(); if (!priceMap) return null; @@ -596,9 +596,7 @@ export class Cables extends SimpleExchange implements IDex { } async initializePricing(blockNumber: number): Promise { - if (!this.dexHelper.config.isSlave) { - this.rateFetcher.start(); - } + this.rateFetcher.start(); return; } @@ -619,7 +617,7 @@ export class Cables extends SimpleExchange implements IDex { } async setTokensMap() { - const tokens = await this.getCachedTokens(); + const tokens = await this.rateFetcher.getCachedTokens(); if (tokens) { this.tokensMap = Object.keys(tokens).reduce((acc, key) => { @@ -637,41 +635,8 @@ export class Cables extends SimpleExchange implements IDex { return []; } - /** - * CACHED UTILS - */ - async getCachedTokens(): Promise { - const cachedTokens = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.rateFetcher.tokensCacheKey, - ); - - return cachedTokens ? JSON.parse(cachedTokens) : {}; - } - - async getCachedPairs(): Promise { - const cachedPairs = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.rateFetcher.pairsCacheKey, - ); - - return cachedPairs ? JSON.parse(cachedPairs) : {}; - } - - async getCachedPrices(): Promise { - const cachedPrices = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.rateFetcher.pricesCacheKey, - ); - - return cachedPrices ? JSON.parse(cachedPrices) : {}; - } - async getCachedTokensAddr(): Promise { - const tokens = await this.getCachedTokens(); + const tokens = await this.rateFetcher.getCachedTokens(); const tokensAddr: Record = {}; for (const key of Object.keys(tokens)) { tokensAddr[tokens[key].symbol.toLowerCase()] = tokens[key].address; @@ -701,12 +666,12 @@ export class Cables extends SimpleExchange implements IDex { return null; } - const cachedTokens = await this.getCachedTokens(); + const cachedTokens = await this.rateFetcher.getCachedTokens(); srcToken.symbol = this.findKeyByAddress(cachedTokens, srcToken.address); destToken.symbol = this.findKeyByAddress(cachedTokens, destToken.address); - const cachedPairs = await this.getCachedPairs(); + const cachedPairs = await this.rateFetcher.getCachedPairs(); const potentialPairs = [ { @@ -734,28 +699,7 @@ export class Cables extends SimpleExchange implements IDex { } async isBlacklisted(txOrigin: Address): Promise { - const cachedBlacklist = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.rateFetcher.blacklistCacheKey, - ); - - if (cachedBlacklist) { - const blacklist = JSON.parse(cachedBlacklist) as string[]; - return blacklist.includes(txOrigin.toLowerCase()); - } - - return false; - } - - async isRestricted(): Promise { - const result = await this.dexHelper.cache.get( - this.dexKey, - this.network, - CABLES_RESTRICTED_CACHE_KEY, - ); - - return result === 'true'; + return this.rateFetcher.isBlacklisted(txOrigin); } async restrict() { @@ -795,13 +739,7 @@ export class Cables extends SimpleExchange implements IDex { errorsData.count + 1 } within ${CABLES_RESTRICT_CHECK_INTERVAL_MS / 1000 / 60} minutes`, ); - await this.dexHelper.cache.setex( - this.dexKey, - this.network, - CABLES_RESTRICTED_CACHE_KEY, - CABLES_RESTRICT_TTL_S, - 'true', - ); + this.rateFetcher.restrict(); } else { this.logger.warn( `${this.dexKey}-${this.network}: Error count increased`, @@ -813,7 +751,7 @@ export class Cables extends SimpleExchange implements IDex { await this.dexHelper.cache.setex( this.dexKey, this.network, - CABLES_RESTRICTED_CACHE_KEY, + CABLES_ERRORS_CACHE_KEY, ERRORS_TTL_S, Utils.Serialize(data), ); diff --git a/src/dex/cables/rate-fetcher.ts b/src/dex/cables/rate-fetcher.ts index 3df0b0f83..6fd34f8d2 100644 --- a/src/dex/cables/rate-fetcher.ts +++ b/src/dex/cables/rate-fetcher.ts @@ -1,9 +1,14 @@ import { Network } from '../../constants'; import { IDexHelper } from '../../dex-helper'; import { Fetcher } from '../../lib/fetcher/fetcher'; +import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub'; import { validateAndCast } from '../../lib/validators'; -import { Logger, Token } from '../../types'; +import { Address, Logger } from '../../types'; import { PairData } from '../cables/types'; +import { + CABLES_RESTRICT_TTL_S, + CABLES_RESTRICTED_CACHE_KEY, +} from './constants'; import { CablesBlacklistResponse, CablesPairsResponse, @@ -19,6 +24,8 @@ import { } from './validators'; export class CablesRateFetcher { + private tokensPairsPricesPubSub: ExpKeyValuePubSub; + public tokensFetcher: Fetcher; public tokensCacheKey: string; public tokensCacheTTL: number; @@ -32,9 +39,12 @@ export class CablesRateFetcher { public pricesCacheTTL: number; public blacklistFetcher: Fetcher; + private blacklistPubSub: NonExpSetPubSub; public blacklistCacheKey: string; public blacklistCacheTTL: number; + private restrictPubSub: ExpKeyValuePubSub; + constructor( private dexHelper: IDexHelper, private dexKey: string, @@ -54,6 +64,12 @@ export class CablesRateFetcher { this.blacklistCacheKey = config.rateConfig.blacklistCacheKey; this.blacklistCacheTTL = config.rateConfig.blacklistCacheTTLSecs; + this.tokensPairsPricesPubSub = new ExpKeyValuePubSub( + dexHelper, + dexKey, + 'tokensPairsPrices', + ); + this.pairsFetcher = new Fetcher( dexHelper.httpRequest, { @@ -90,6 +106,7 @@ export class CablesRateFetcher { logger, ); + this.blacklistPubSub = new NonExpSetPubSub(dexHelper, dexKey, 'blacklist'); this.blacklistFetcher = new Fetcher( dexHelper.httpRequest, { @@ -125,16 +142,32 @@ export class CablesRateFetcher { config.rateConfig.tokensIntervalMs, logger, ); + + this.restrictPubSub = new ExpKeyValuePubSub( + dexHelper, + dexKey, + 'restrict', + 'not_restricted', + CABLES_RESTRICT_TTL_S, + ); } /** * Utils */ - start() { - this.pairsFetcher.startPolling(); - this.pricesFetcher.startPolling(); - this.blacklistFetcher.startPolling(); - this.tokensFetcher.startPolling(); + async start() { + if (!this.dexHelper.config.isSlave) { + this.pairsFetcher.startPolling(); + this.pricesFetcher.startPolling(); + this.blacklistFetcher.startPolling(); + this.tokensFetcher.startPolling(); + } else { + this.tokensPairsPricesPubSub.subscribe(); + this.restrictPubSub.subscribe(); + + const initBlacklisted = await this.getAllBlacklisted(); + this.blacklistPubSub.initializeAndSubscribe(initBlacklisted); + } } stop() { this.pairsFetcher.stopPolling(); @@ -159,6 +192,11 @@ export class CablesRateFetcher { this.pairsCacheTTL, JSON.stringify(normalized_pairs), ); + + this.tokensPairsPricesPubSub.publish( + { [this.pairsCacheKey]: normalized_pairs }, + this.pairsCacheTTL, + ); } private handlePricesResponse(res: CablesPricesResponse): void { @@ -172,17 +210,25 @@ export class CablesRateFetcher { this.pricesCacheTTL, JSON.stringify(prices), ); + + this.tokensPairsPricesPubSub.publish( + { [this.pricesCacheKey]: prices }, + this.pricesCacheTTL, + ); } private handleBlacklistResponse(res: CablesBlacklistResponse): void { const { blacklist } = res; + const list = blacklist.map(item => item.toLowerCase()); this.dexHelper.cache.setex( this.dexKey, this.network, this.blacklistCacheKey, this.blacklistCacheTTL, - JSON.stringify(blacklist.map(item => item.toLowerCase())), + JSON.stringify(list), ); + + this.blacklistPubSub.publish(list); } // Convert addresses to lowercase @@ -208,5 +254,72 @@ export class CablesRateFetcher { this.tokensCacheTTL, JSON.stringify(normalizedTokens), ); + + this.tokensPairsPricesPubSub.publish( + { [this.tokensCacheKey]: normalizedTokens }, + this.tokensCacheTTL, + ); + } + + /** + * CACHED UTILS + */ + async getCachedTokens(): Promise { + const cachedTokens = await this.tokensPairsPricesPubSub.getAndCache( + this.tokensCacheKey, + ); + return cachedTokens ?? {}; + } + + async getCachedPairs(): Promise { + const cachedPairs = await this.tokensPairsPricesPubSub.getAndCache( + this.pairsCacheKey, + ); + return cachedPairs ?? {}; + } + + async getCachedPrices(): Promise { + const cachedPrices = await this.tokensPairsPricesPubSub.getAndCache( + this.pricesCacheKey, + ); + + return cachedPrices ?? {}; + } + + async getAllBlacklisted(): Promise { + const cachedBlacklist = await this.dexHelper.cache.get( + this.dexKey, + this.network, + this.blacklistCacheKey, + ); + + return cachedBlacklist ? JSON.parse(cachedBlacklist) : []; + } + + async isBlacklisted(txOrigin: Address): Promise { + return this.blacklistPubSub.has(txOrigin.toLowerCase()); + } + + async isRestricted(): Promise { + const result = await this.restrictPubSub.getAndCache( + CABLES_RESTRICTED_CACHE_KEY, + ); + + return result === 'true'; + } + + async restrict(): Promise { + await this.dexHelper.cache.setex( + this.dexKey, + this.network, + CABLES_RESTRICTED_CACHE_KEY, + CABLES_RESTRICT_TTL_S, + 'true', + ); + + this.restrictPubSub.publish( + { [CABLES_RESTRICTED_CACHE_KEY]: 'true' }, + CABLES_RESTRICT_TTL_S, + ); } } diff --git a/src/dex/camelot/camelot.ts b/src/dex/camelot/camelot.ts index 9ad942f86..8f6b80db3 100644 --- a/src/dex/camelot/camelot.ts +++ b/src/dex/camelot/camelot.ts @@ -646,6 +646,7 @@ export class Camelot isFeeTokenInRoute: Object.values(transferFees).some(f => f !== 0), pools: [ { + stable: pairParam.stable, address: pairParam.exchange, fee: parseInt(pairParam.fee), direction: pairParam.direction, @@ -843,9 +844,17 @@ export class Camelot if (side === SwapSide.BUY) throw new Error('Buy not supported'); let exchangeDataTypes = ['bytes4', 'bytes32']; + const isStable = data.pools.some(pool => !!pool.stable); + const isStablePoolAndPoolCount = isStable + ? BigNumber.from(1) + .shl(255) + .or(BigNumber.from(data.pools.length)) + .toHexString() + : hexZeroPad(hexlify(data.pools.length), 32); + let exchangeDataToPack = [ hexZeroPad(hexlify(0), 4), - hexZeroPad(hexlify(data.pools.length), 32), + isStablePoolAndPoolCount, ]; const pools = encodePools(data.pools, this.feeFactor); diff --git a/src/dex/generic-rfq/rate-fetcher.ts b/src/dex/generic-rfq/rate-fetcher.ts index 46db952e6..cbf9cadd2 100644 --- a/src/dex/generic-rfq/rate-fetcher.ts +++ b/src/dex/generic-rfq/rate-fetcher.ts @@ -288,13 +288,6 @@ export class RateFetcher { const value = prices.bids; pubSubData[key] = value; - this.dexHelper.cache.setex( - this.dexKey, - this.dexHelper.config.data.network, - key, - ttl, - JSON.stringify(value), - ); currentPricePairs.add(`${baseToken.address}_${quoteToken.address}`); } @@ -303,13 +296,6 @@ export class RateFetcher { const value = prices.asks; pubSubData[key] = value; - this.dexHelper.cache.setex( - this.dexKey, - this.dexHelper.config.data.network, - key, - ttl, - JSON.stringify(value), - ); currentPricePairs.add(`${quoteToken.address}_${baseToken.address}`); } }); @@ -318,17 +304,22 @@ export class RateFetcher { const key = `pairs`; const value = Array.from(currentPricePairs); pubSubData[key] = value; - - this.dexHelper.cache.setex( - this.dexKey, - this.dexHelper.config.data.network, - key, - ttl, - JSON.stringify(value), - ); } + // publish in priority this.pricesPubSub.publish(pubSubData, ttl); + + if (Object.keys(pubSubData).length > 0) { + Object.keys(pubSubData).forEach(key => { + this.dexHelper.cache.setex( + this.dexKey, + this.dexHelper.config.data.network, + key, + ttl, + JSON.stringify(pubSubData[key]), + ); + }); + } } checkHealth(): boolean { diff --git a/src/dex/solidly/solidly.ts b/src/dex/solidly/solidly.ts index 6bd7fd853..369b2b21d 100644 --- a/src/dex/solidly/solidly.ts +++ b/src/dex/solidly/solidly.ts @@ -409,6 +409,7 @@ export class Solidly extends UniswapV2 { isFeeTokenInRoute: Object.values(transferFees).some(f => f !== 0), pools: [ { + stable: pairParam.stable, address: pairParam.exchange, fee: parseInt(pairParam.fee), direction: pairParam.direction, @@ -657,9 +658,17 @@ export class Solidly extends UniswapV2 { if (side === SwapSide.BUY) throw new Error(`Buy not supported`); let exchangeDataTypes = ['bytes4', 'bytes32']; + const isStable = data.pools.some(pool => !!pool.stable); + const isStablePoolAndPoolCount = isStable + ? BigNumber.from(1) + .shl(255) + .or(BigNumber.from(data.pools.length)) + .toHexString() + : hexZeroPad(hexlify(data.pools.length), 32); + let exchangeDataToPack = [ hexZeroPad(hexlify(0), 4), - hexZeroPad(hexlify(data.pools.length), 32), + isStablePoolAndPoolCount, ]; const pools = encodePools(data.pools, this.feeFactor); diff --git a/src/dex/solidly/types.ts b/src/dex/solidly/types.ts index 4db1c4b32..b0895dd5c 100644 --- a/src/dex/solidly/types.ts +++ b/src/dex/solidly/types.ts @@ -19,9 +19,12 @@ export interface SolidlyPoolOrderedParams extends UniswapV2PoolOrderedParams { stable: boolean; } -export type SolidlyData = UniswapV2Data & { isFeeTokenInRoute: boolean }; +export type SolidlyPool = UniswapPool & { stable: boolean }; -export type SolidlyPool = UniswapPool; +export type SolidlyData = { + isFeeTokenInRoute: boolean; + pools: SolidlyPool[]; +} & UniswapV2Data; export interface DexParams extends Omit { feeCode: number; diff --git a/src/dex/swaap-v2/constants.ts b/src/dex/swaap-v2/constants.ts index 8c6684abf..d528df744 100644 --- a/src/dex/swaap-v2/constants.ts +++ b/src/dex/swaap-v2/constants.ts @@ -1,10 +1,10 @@ import BigNumber from 'bignumber.js'; -export const SWAAP_RFQ_PRICES_CACHES_TTL_S = 3; +export const SWAAP_RFQ_PRICES_CACHES_TTL_S = 5; export const SWAAP_RFQ_QUOTE_TIMEOUT_MS = 2000; -export const SWAAP_RFQ_API_PRICES_POLLING_INTERVAL_MS = 1000; +export const SWAAP_RFQ_API_PRICES_POLLING_INTERVAL_MS = 2000; export const SWAAP_RFQ_API_TOKENS_POLLING_INTERVAL_MS = 1000 * 60 * 60; // 1 hour diff --git a/src/dex/swaap-v2/rate-fetcher.ts b/src/dex/swaap-v2/rate-fetcher.ts index a7303e7d4..593d3fa13 100644 --- a/src/dex/swaap-v2/rate-fetcher.ts +++ b/src/dex/swaap-v2/rate-fetcher.ts @@ -16,7 +16,6 @@ import { } from './types'; import { priceLevelsResponseValidator, - getQuoteResponseValidator, getTokensResponseValidator, notifyResponseValidator, getQuoteResponseWithRecipientValidator, @@ -26,8 +25,16 @@ import { SWAAP_RFQ_QUOTE_TIMEOUT_MS, SWAAP_NOTIFY_TIMEOUT_MS, SWAAP_NOTIFICATION_ORIGIN, + SWAAP_TOKENS_CACHE_KEY, + SWAAP_PRICES_CACHE_KEY, + SWAAP_403_TTL_S, + SWAAP_POOL_RESTRICT_TTL_S, } from './constants'; import { RequestConfig } from '../../dex-helper/irequest-wrapper'; +import { Network } from '../../constants'; +import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub'; + +const BLACKLISTED = 'blacklisted'; export class RateFetcher { private rateFetcher: Fetcher; @@ -37,8 +44,13 @@ export class RateFetcher { private tokenCacheKey: string; private pricesCacheKey: string; + private rateTokensPubSub: ExpKeyValuePubSub; + private restrictedPubSub: ExpKeyValuePubSub; + private blacklistPubSub: NonExpSetPubSub; + constructor( private dexHelper: IDexHelper, + private network: Network, private dexKey: string, private logger: Logger, config: SwaapV2RateFetcherConfig, @@ -48,6 +60,12 @@ export class RateFetcher { this.pricesCacheKey = config.rateConfig.pricesCacheKey; this.tokenCacheKey = config.tokensConfig.tokensCacheKey; + this.rateTokensPubSub = new ExpKeyValuePubSub( + this.dexHelper, + this.dexKey, + 'rateTokens', + ); + this.rateFetcher = new Fetcher( dexHelper.httpRequest, { @@ -83,11 +101,33 @@ export class RateFetcher { config.tokensConfig.tokensIntervalMs, logger, ); + + this.restrictedPubSub = new ExpKeyValuePubSub( + this.dexHelper, + this.dexKey, + 'restricted', + 'pool_is_not_restricted', + SWAAP_POOL_RESTRICT_TTL_S, + ); + + this.blacklistPubSub = new NonExpSetPubSub( + this.dexHelper, + this.dexKey, + 'blacklist', + ); } - start() { - this.rateFetcher.startPolling(); - this.tokensFetcher.startPolling(); + async start() { + if (!this.dexHelper.config.isSlave) { + this.rateFetcher.startPolling(); + this.tokensFetcher.startPolling(); + } else { + this.rateTokensPubSub.subscribe(); + this.restrictedPubSub.subscribe(); + + const initSet = await this.getAllBlacklisted(); + this.blacklistPubSub.initializeAndSubscribe(initSet); + } } stop() { @@ -112,6 +152,11 @@ export class RateFetcher { this.tokensCacheTTL, JSON.stringify(tokensMap), ); + + this.rateTokensPubSub.publish( + { [this.tokenCacheKey]: tokensMap }, + this.tokensCacheTTL, + ); } private handleRatesResponse(resp: SwaapV2PriceLevelsResponse): void { @@ -149,6 +194,11 @@ export class RateFetcher { this.pricesCacheTTL, JSON.stringify(levels), ); + + this.rateTokensPubSub.publish( + { [this.pricesCacheKey]: levels }, + this.pricesCacheTTL, + ); } async getQuote( @@ -264,4 +314,86 @@ export class RateFetcher { throw e; } } + + async getCachedTokens(): Promise { + const cachedTokens = await this.rateTokensPubSub.getAndCache( + this.tokenCacheKey, + ); + + if (cachedTokens) { + return cachedTokens as TokensMap; + } + + return null; + } + + async getCachedLevels(): Promise | null> { + const cachedLevels = await this.rateTokensPubSub.getAndCache( + this.pricesCacheKey, + ); + + if (cachedLevels) { + return cachedLevels as Record; + } + + return null; + } + + async isBlacklisted(txOrigin: Address): Promise { + return this.blacklistPubSub.has(txOrigin.toLowerCase()); + } + + getBlackListKey(address: Address) { + return `blacklist_${address}`.toLowerCase(); + } + + async setBlacklist( + txOrigin: Address, + ttl: number = SWAAP_403_TTL_S, + ): Promise { + await this.dexHelper.cache.setex( + this.dexKey, + this.network, + this.getBlackListKey(txOrigin), + ttl, + BLACKLISTED, + ); + + this.blacklistPubSub.publish([txOrigin.toLowerCase()]); + return true; + } + + async getAllBlacklisted(): Promise { + const defaultKey = this.getBlackListKey(''); + const pattern = `${defaultKey}*`; + const allBlacklisted = await this.dexHelper.cache.keys( + this.dexKey, + this.network, + pattern, + ); + + return allBlacklisted.map(t => this.getAddressFromBlackListKey(t)); + } + + getAddressFromBlackListKey(key: Address) { + return (key.split('blacklist_')[1] ?? '').toLowerCase(); + } + + async restrictPool(poolIdentifier: string): Promise { + await this.restrictedPubSub.publish( + { [this.getRestrictPoolKey(poolIdentifier)]: 'restricted' }, + SWAAP_POOL_RESTRICT_TTL_S, + ); + } + + async isRestrictedPool(poolIdentifier: string): Promise { + const restricted = await this.restrictedPubSub.getAndCache( + this.getRestrictPoolKey(poolIdentifier), + ); + return restricted === 'restricted'; + } + + getRestrictPoolKey(poolIdentifier: string): string { + return `restricted_mms_${poolIdentifier}`; + } } diff --git a/src/dex/swaap-v2/swaap-v2.ts b/src/dex/swaap-v2/swaap-v2.ts index 8140cbbdd..398d43b9e 100644 --- a/src/dex/swaap-v2/swaap-v2.ts +++ b/src/dex/swaap-v2/swaap-v2.ts @@ -79,8 +79,6 @@ import { BI_MAX_UINT256 } from '../../bigint-constants'; import { SpecialDex } from '../../executor/types'; import { extractReturnAmountPosition } from '../../executor/utils'; -const BLACKLISTED = 'blacklisted'; - export class SwaapV2 extends SimpleExchange implements IDex { readonly isStatePollingDex = true; readonly hasConstantPriceLargeAmounts = false; @@ -89,7 +87,6 @@ export class SwaapV2 extends SimpleExchange implements IDex { private rateFetcher: RateFetcher; private swaapV2AuthToken: string; private tokensMap: TokensMap = {}; - private runtimeMMsRestrictHashMapKey: string; public static dexKeysWithNetwork: { key: string; networks: Network[] }[] = getDexKeysWithNetwork(SwaapV2Config); @@ -115,6 +112,7 @@ export class SwaapV2 extends SimpleExchange implements IDex { this.rateFetcher = new RateFetcher( this.dexHelper, + this.network, this.dexKey, this.logger, { @@ -132,15 +130,10 @@ export class SwaapV2 extends SimpleExchange implements IDex { }, }, ); - - this.runtimeMMsRestrictHashMapKey = - `${CACHE_PREFIX}_${this.dexKey}_${this.network}_restricted_mms`.toLowerCase(); } async initializePricing(blockNumber: number): Promise { - if (!this.dexHelper.config.isSlave) { - await this.rateFetcher.start(); - } + await this.rateFetcher.start(); return; } @@ -168,7 +161,7 @@ export class SwaapV2 extends SimpleExchange implements IDex { normalizedDestToken.address, ); - const levels = await this.getCachedLevels(); + const levels = await this.rateFetcher.getCachedLevels(); if (levels === null) { return []; } @@ -277,34 +270,6 @@ export class SwaapV2 extends SimpleExchange implements IDex { }); } - async getCachedTokens(): Promise { - const cachedTokens = await this.dexHelper.cache.get( - this.dexKey, - this.network, - SWAAP_TOKENS_CACHE_KEY, - ); - - if (cachedTokens) { - return JSON.parse(cachedTokens) as TokensMap; - } - - return null; - } - - async getCachedLevels(): Promise | null> { - const cachedLevels = await this.dexHelper.cache.get( - this.dexKey, - this.network, - SWAAP_PRICES_CACHE_KEY, - ); - - if (cachedLevels) { - return JSON.parse(cachedLevels) as Record; - } - - return null; - } - normalizeToken(token: Token): Token { return { address: normalizeTokenAddress(token.address), @@ -330,11 +295,11 @@ export class SwaapV2 extends SimpleExchange implements IDex { ); try { - if (await this.isRestrictedPool(requestedPoolIdentifier)) { + if (await this.rateFetcher.isRestrictedPool(requestedPoolIdentifier)) { return null; } - this.tokensMap = (await this.getCachedTokens()) || {}; + this.tokensMap = (await this.rateFetcher.getCachedTokens()) || {}; if (normalizedSrcToken.address === normalizedDestToken.address) { return null; @@ -362,7 +327,7 @@ export class SwaapV2 extends SimpleExchange implements IDex { return null; } - const levels = await this.getCachedLevels(); + const levels = await this.rateFetcher.getCachedLevels(); if (levels === null) { return null; } @@ -636,12 +601,18 @@ export class SwaapV2 extends SimpleExchange implements IDex { ]; } catch (e) { if (isAxiosError(e) && e.response?.status === 403) { - await this.setBlacklist(options.userAddress, SWAAP_403_TTL_S); + await this.rateFetcher.setBlacklist( + options.userAddress, + SWAAP_403_TTL_S, + ); this.logger.warn( `${this.dexKey}-${this.network}: Encountered blacklisted user=${options.userAddress}. Adding to local blacklist cache`, ); } else if (isAxiosError(e) && e.response?.status === 429) { - await this.setBlacklist(options.userAddress, SWAAP_429_TTL_S); + await this.rateFetcher.setBlacklist( + options.userAddress, + SWAAP_429_TTL_S, + ); this.logger.warn( `${this.dexKey}-${this.network}: Encountered restricted user=${options.userAddress}. Adding to local blacklist cache`, ); @@ -677,11 +648,7 @@ export class SwaapV2 extends SimpleExchange implements IDex { ); // We use timestamp for creation date to later discern if it already expired or not - await this.dexHelper.cache.hset( - this.runtimeMMsRestrictHashMapKey, - poolIdentifier, - Date.now().toString(), - ); + await this.rateFetcher.restrictPool(poolIdentifier); this.rateFetcher .notify(SWAAP_BANNED_CODE, message, this.getNotifyReqParams()) @@ -698,45 +665,8 @@ export class SwaapV2 extends SimpleExchange implements IDex { }); } - async isRestrictedPool(poolIdentifier: string): Promise { - const expirationThreshold = Date.now() - SWAAP_POOL_RESTRICT_TTL_S * 1000; - const createdAt = await this.dexHelper.cache.hget( - this.runtimeMMsRestrictHashMapKey, - poolIdentifier, - ); - const wasNotRestricted = createdAt === null; - if (wasNotRestricted) { - return false; - } - const restrictionExpired = +createdAt < expirationThreshold; - return !restrictionExpired; - } - - async isBlacklisted(txOrigin: Address): Promise { - const result = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.getBlackListKey(txOrigin), - ); - return result === BLACKLISTED; - } - - getBlackListKey(address: Address) { - return `blacklist_${address}`.toLowerCase(); - } - - async setBlacklist( - txOrigin: Address, - ttl: number = SWAAP_403_TTL_S, - ): Promise { - await this.dexHelper.cache.setex( - this.dexKey, - this.network, - this.getBlackListKey(txOrigin), - ttl, - BLACKLISTED, - ); - return true; + isBlacklisted(userAddress: Address): AsyncOrSync { + return this.rateFetcher.isBlacklisted(userAddress); } // Returns estimated gas cost of calldata for this DEX in multiSwap @@ -971,10 +901,10 @@ export class SwaapV2 extends SimpleExchange implements IDex { tokenAddress: Address, limit: number, ): Promise { - this.tokensMap = (await this.getCachedTokens()) || {}; + this.tokensMap = (await this.rateFetcher.getCachedTokens()) || {}; const normalizedTokenAddress = normalizeTokenAddress(tokenAddress); - const pLevels = await this.getCachedLevels(); + const pLevels = await this.rateFetcher.getCachedLevels(); if (pLevels === null) { return []; @@ -992,7 +922,9 @@ export class SwaapV2 extends SimpleExchange implements IDex { base, quote, ); - const isRestrictedPool = await this.isRestrictedPool(poolIdentifier); + const isRestrictedPool = await this.rateFetcher.isRestrictedPool( + poolIdentifier, + ); return { pair, diff --git a/src/lib/pub-sub.ts b/src/lib/pub-sub.ts index 3684d6f0f..0ecb1dd38 100644 --- a/src/lib/pub-sub.ts +++ b/src/lib/pub-sub.ts @@ -31,7 +31,7 @@ export class ExpKeyValuePubSub { } subscribe() { - this.logger.info(`Subscribing to ${this.channel}`); + this.logger.info(`Subscribing`); this.dexHelper.cache.subscribe(this.channel, (_, msg) => { const decodedMsg = JSON.parse(msg) as KeyValuePubSubMsg; @@ -40,15 +40,26 @@ export class ExpKeyValuePubSub { } publish(data: Record, ttl: number) { - const expiresAt = Math.round(Date.now() / 1000) + ttl; - this.dexHelper.cache.publish( - this.channel, - JSON.stringify({ expiresAt, data }), - ); + if (Object.keys(data).length > 0) { + const expiresAt = Math.round(Date.now() / 1000) + ttl; + this.logger.info( + `Publishing keys: '${Object.keys(data)}', expiresAt: '${expiresAt}'`, + ); + + this.dexHelper.cache.publish( + this.channel, + JSON.stringify({ expiresAt, data }), + ); + } } handleSubscription(msg: KeyValuePubSubMsg) { const { expiresAt, data } = msg; + this.logger.info( + `Received subscription, keys: '${Object.keys( + data, + )}', expiresAt: '${expiresAt}'`, + ); const now = Math.round(Date.now() / 1000); // calculating ttl as message might come with the delay @@ -73,9 +84,12 @@ export class ExpKeyValuePubSub { const localValue = this.localCache.get(key); if (localValue) { + this.logger.info(`Returning from local cache: '${key}'`); return localValue; } + this.logger.info(`Returning from external cache: '${key}'`); + const [value, ttl] = await Promise.all([ this.dexHelper.cache.get(this.dexKey, this.network, key), this.dexHelper.cache.ttl(this.dexKey, this.network, key), @@ -115,6 +129,8 @@ export class NonExpSetPubSub { } async initializeAndSubscribe(initialSet: string[]) { + this.logger.info(`initializeAndSubscribe with ${initialSet}`); + for (const member of initialSet) { this.set.add(member); } @@ -123,7 +139,7 @@ export class NonExpSetPubSub { } subscribe() { - this.logger.info(`Subscribing to ${this.channel}`); + this.logger.info(`Subscribing`); this.dexHelper.cache.subscribe(this.channel, (_, msg) => { const decodedMsg = JSON.parse(msg) as SetPubSubMsg; @@ -132,10 +148,15 @@ export class NonExpSetPubSub { } publish(msg: SetPubSubMsg) { - this.dexHelper.cache.publish(this.channel, JSON.stringify(msg)); + if (msg.length > 0) { + this.logger.info(`Publishing msg: '${msg}'`); + this.dexHelper.cache.publish(this.channel, JSON.stringify(msg)); + } } handleSubscription(set: SetPubSubMsg) { + this.logger.info(`Received subscription msg: '${set}'`); + for (const key of set) { this.set.add(key); }