From 43ae44a6c729eed3049778082a9c33385cfb2223 Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Wed, 18 Dec 2024 19:27:47 +0200 Subject: [PATCH] feat: pub-sub for `dexalot` --- src/dex/dexalot/constants.ts | 6 +- src/dex/dexalot/dexalot.ts | 207 ++++------------------------- src/dex/dexalot/rate-fetcher.ts | 228 ++++++++++++++++++++++++++++++-- src/dex/dexalot/types.ts | 1 - 4 files changed, 245 insertions(+), 197 deletions(-) diff --git a/src/dex/dexalot/constants.ts b/src/dex/dexalot/constants.ts index 4d1dc589f..a14f3a944 100644 --- a/src/dex/dexalot/constants.ts +++ b/src/dex/dexalot/constants.ts @@ -1,14 +1,12 @@ import BigNumber from 'bignumber.js'; -export const DEXALOT_PRICES_CACHES_TTL_S = 3; - -export const DEXALOT_PAIRS_CACHES_TTL_S = 21 * 60; // 21 mins +export const DEXALOT_PRICES_CACHES_TTL_S = 5; export const DEXALOT_TOKENS_CACHES_TTL_S = 21 * 60; // 21 mins export const DEXALOT_BLACKLIST_CACHES_TTL_S = 180 * 60; // 3 hours -export const DEXALOT_API_PRICES_POLLING_INTERVAL_MS = 1000; +export const DEXALOT_API_PRICES_POLLING_INTERVAL_MS = 2000; export const DEXALOT_API_PAIRS_POLLING_INTERVAL_MS = 1000 * 60 * 10; // 10 mins diff --git a/src/dex/dexalot/dexalot.ts b/src/dex/dexalot/dexalot.ts index 6fda285df..4f75e829e 100644 --- a/src/dex/dexalot/dexalot.ts +++ b/src/dex/dexalot/dexalot.ts @@ -28,13 +28,10 @@ import { ClobSide, DexalotData, PairData, - PairDataMap, - PriceDataMap, DexalotRfqError, DexalotAPIParameters, RFQResponse, RFQResponseError, - TokenAddrDataMap, TokenDataMap, } from './types'; import { @@ -52,15 +49,10 @@ import { DEXALOT_API_PRICES_POLLING_INTERVAL_MS, DEXALOT_PRICES_CACHES_TTL_S, DEXALOT_GAS_COST, - DEXALOT_PAIRS_CACHES_TTL_S, DEXALOT_API_PAIRS_POLLING_INTERVAL_MS, DEXALOT_TOKENS_CACHES_TTL_S, DEXALOT_API_BLACKLIST_POLLING_INTERVAL_MS, - DEXALOT_RATE_LIMITED_TTL_S, DEXALOT_MIN_SLIPPAGE_FACTOR_THRESHOLD_FOR_RESTRICTION, - DEXALOT_RESTRICTED_CACHE_KEY, - DEXALOT_RESTRICT_TTL_S, - DEXALOT_RATELIMIT_CACHE_VALUE, DEXALOT_BLACKLIST_CACHES_TTL_S, DEXALOT_FIRM_QUOTE_TIMEOUT_MS, } from './constants'; @@ -130,7 +122,6 @@ export class Dexalot extends SimpleExchange implements IDex { pricesReqParams: this.getAPIReqParams('api/rfq/prices', 'GET'), blacklistReqParams: this.getAPIReqParams('api/rfq/blacklist', 'GET'), pairsCacheKey: this.pairsCacheKey, - pairsCacheTTLSecs: DEXALOT_PAIRS_CACHES_TTL_S, pricesCacheKey: this.pricesCacheKey, pricesCacheTTLSecs: DEXALOT_PRICES_CACHES_TTL_S, tokensAddrCacheKey: this.tokensAddrCacheKey, @@ -144,10 +135,7 @@ export class Dexalot extends SimpleExchange implements IDex { } async initializePricing(blockNumber: number): Promise { - if (!this.dexHelper.config.isSlave) { - this.rateFetcher.start(); - } - + this.rateFetcher.start(); return; } @@ -169,7 +157,7 @@ export class Dexalot extends SimpleExchange implements IDex { return null; } - const cachedTokens = (await this.getCachedTokens()) || {}; + const cachedTokens = (await this.rateFetcher.getCachedTokens()) || {}; if ( !(normalizedSrcToken.address in cachedTokens) || !(normalizedDestToken.address in cachedTokens) @@ -180,7 +168,7 @@ export class Dexalot extends SimpleExchange implements IDex { normalizedDestToken.symbol = cachedTokens[normalizedDestToken.address].symbol; - const cachedPairs = (await this.getCachedPairs()) || {}; + const cachedPairs = (await this.rateFetcher.getCachedPairs()) || {}; const potentialPairs = [ { @@ -228,7 +216,7 @@ export class Dexalot extends SimpleExchange implements IDex { return []; } - const tokensAddr = (await this.getCachedTokensAddr()) || {}; + const tokensAddr = (await this.rateFetcher.getCachedTokensAddr()) || {}; return [ this.getIdentifier( @@ -238,62 +226,6 @@ export class Dexalot extends SimpleExchange implements IDex { ]; } - async getCachedPairs(): Promise { - const cachedPairs = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.pairsCacheKey, - ); - - if (cachedPairs) { - return JSON.parse(cachedPairs) as PairDataMap; - } - - return null; - } - - async getCachedPrices(): Promise { - const cachedPrices = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.pricesCacheKey, - ); - - if (cachedPrices) { - return JSON.parse(cachedPrices) as PriceDataMap; - } - - return null; - } - - async getCachedTokensAddr(): Promise { - const cachedTokensAddr = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.tokensAddrCacheKey, - ); - - if (cachedTokensAddr) { - return JSON.parse(cachedTokensAddr) as TokenAddrDataMap; - } - - return null; - } - - async getCachedTokens(): Promise { - const cachedTokens = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.tokensCacheKey, - ); - - if (cachedTokens) { - return JSON.parse(cachedTokens) as TokenDataMap; - } - - return null; - } - normalizeAddress(address: string): string { return address.toLowerCase() === ETHER_ADDRESS ? NULL_ADDRESS @@ -439,7 +371,7 @@ export class Dexalot extends SimpleExchange implements IDex { const normalizedSrcToken = this.normalizeToken(srcToken); const normalizedDestToken = this.normalizeToken(destToken); - this.tokensMap = (await this.getCachedTokens()) || {}; + this.tokensMap = (await this.rateFetcher.getCachedTokens()) || {}; if (normalizedSrcToken.address === normalizedDestToken.address) { return null; } @@ -456,7 +388,7 @@ export class Dexalot extends SimpleExchange implements IDex { : await this.getPoolIdentifiers(srcToken, destToken, side, blockNumber); pools = await Promise.all( - pools.map(async p => !(await this.isRestrictedPool(p))), + pools.map(async p => !(await this.rateFetcher.isRestrictedPool(p))), ).then(res => pools.filter((_v, i) => res[i])); if (pools.length === 0) { return null; @@ -470,12 +402,12 @@ export class Dexalot extends SimpleExchange implements IDex { return null; } - const priceMap = await this.getCachedPrices(); + const priceMap = await this.rateFetcher.getCachedPrices(); if (!priceMap) { return null; } - const tokensAddr = (await this.getCachedTokensAddr()) || {}; + const tokensAddr = (await this.rateFetcher.getCachedTokensAddr()) || {}; const pairKey = `${pairData.base}/${pairData.quote}`.toLowerCase(); if ( !(pairKey in priceMap) || @@ -558,7 +490,7 @@ export class Dexalot extends SimpleExchange implements IDex { side: SwapSide, options: PreprocessTransactionOptions, ): Promise<[OptimalSwapExchange, ExchangeTxInfo]> { - if (await this.isBlacklisted(options.txOrigin)) { + if (await this.rateFetcher.isBlacklisted(options.txOrigin)) { this.logger.warn( `${this.dexKey}-${this.network}: blacklisted TX Origin address '${options.txOrigin}' trying to build a transaction. Bailing...`, ); @@ -728,9 +660,12 @@ export class Dexalot extends SimpleExchange implements IDex { this.logger.warn( `${this.dexKey}-${this.network}: Encountered rate limited user=${options.userAddress}. Adding to local rate limit cache`, ); - await this.setRateLimited(options.userAddress, errorData.RetryAfter); + await this.rateFetcher.setRateLimited( + options.userAddress, + errorData.RetryAfter, + ); } else { - await this.setBlacklist(options.userAddress); + await this.rateFetcher.setBlacklist(options.userAddress); this.logger.error( `${this.dexKey}-${this.network}: Failed to fetch RFQ for ${swapIdentifier}: ${errorData.Reason}`, ); @@ -751,7 +686,9 @@ export class Dexalot extends SimpleExchange implements IDex { `${this.dexKey}-${this.network}: protocol is restricted for pools ${poolIdentifiers} due to swap: ${swapIdentifier}`, ); await Promise.all( - poolIdentifiers.map(async p => await this.restrictPool(p)), + poolIdentifiers.map( + async p => await this.rateFetcher.restrictPool(p), + ), ); } } @@ -837,106 +774,6 @@ export class Dexalot extends SimpleExchange implements IDex { }; } - getRestrictedPoolKey(poolIdentifier: string): string { - return `${DEXALOT_RESTRICTED_CACHE_KEY}-${poolIdentifier}`; - } - - async restrictPool( - poolIdentifier: string, - ttl: number = DEXALOT_RESTRICT_TTL_S, - ): Promise { - await this.dexHelper.cache.setex( - this.dexKey, - this.network, - this.getRestrictedPoolKey(poolIdentifier), - ttl, - 'true', - ); - return true; - } - - async isRestrictedPool(poolIdentifier: string): Promise { - const result = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.getRestrictedPoolKey(poolIdentifier), - ); - - return result === 'true'; - } - - async setBlacklist( - txOrigin: Address, - ttl: number = DEXALOT_BLACKLIST_CACHES_TTL_S, - ): Promise { - const cachedBlacklist = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.blacklistCacheKey, - ); - - let blacklist: string[] = []; - if (cachedBlacklist) { - blacklist = JSON.parse(cachedBlacklist); - } - - blacklist.push(txOrigin.toLowerCase()); - - this.dexHelper.cache.setex( - this.dexKey, - this.network, - this.blacklistCacheKey, - ttl, - JSON.stringify(blacklist), - ); - - return true; - } - - async isBlacklisted(txOrigin: Address): Promise { - const cachedBlacklist = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.blacklistCacheKey, - ); - - if (cachedBlacklist) { - const blacklist = JSON.parse(cachedBlacklist) as string[]; - return blacklist.includes(txOrigin.toLowerCase()); - } - - // To not show pricing for rate limited users - if (await this.isRateLimited(txOrigin)) { - return true; - } - - return false; - } - - getRateLimitedKey(address: Address) { - return `rate_limited_${address}`.toLowerCase(); - } - - async isRateLimited(txOrigin: Address): Promise { - const result = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.getRateLimitedKey(txOrigin), - ); - return result === DEXALOT_RATELIMIT_CACHE_VALUE; - } - - async setRateLimited(txOrigin: Address, ttl = DEXALOT_RATE_LIMITED_TTL_S) { - await this.dexHelper.cache.setex( - this.dexKey, - this.network, - this.getRateLimitedKey(txOrigin), - ttl, - DEXALOT_RATELIMIT_CACHE_VALUE, - ); - return true; - } - async getSimpleParam( srcToken: string, destToken: string, @@ -1037,9 +874,9 @@ export class Dexalot extends SimpleExchange implements IDex { limit: number, ): Promise { const normalizedTokenAddress = this.normalizeAddress(tokenAddress); - const pairs = (await this.getCachedPairs()) || {}; - this.tokensMap = (await this.getCachedTokens()) || {}; - const tokensAddr = (await this.getCachedTokensAddr()) || {}; + const pairs = (await this.rateFetcher.getCachedPairs()) || {}; + this.tokensMap = (await this.rateFetcher.getCachedTokens()) || {}; + const tokensAddr = (await this.rateFetcher.getCachedTokensAddr()) || {}; const token = this.getTokenFromAddress(normalizedTokenAddress); if (!token) { return []; @@ -1104,4 +941,8 @@ export class Dexalot extends SimpleExchange implements IDex { this.rateFetcher.stop(); } } + + async isBlacklisted(userAddress: string): Promise { + return this.rateFetcher.isBlacklisted(userAddress); + } } diff --git a/src/dex/dexalot/rate-fetcher.ts b/src/dex/dexalot/rate-fetcher.ts index 558ee598b..1140f4492 100644 --- a/src/dex/dexalot/rate-fetcher.ts +++ b/src/dex/dexalot/rate-fetcher.ts @@ -1,7 +1,7 @@ import { IDexHelper } from '../../dex-helper'; import { Fetcher } from '../../lib/fetcher/fetcher'; import { validateAndCast } from '../../lib/validators'; -import { Logger, Token } from '../../types'; +import { Address, Logger, Token } from '../../types'; import { DexalotRateFetcherConfig, DexalotPairsResponse, @@ -9,6 +9,8 @@ import { DexalotPricesResponse, PriceDataMap, DexalotBlacklistResponse, + TokenDataMap, + TokenAddrDataMap, } from './types'; import { pricesResponseValidator, @@ -16,13 +18,21 @@ import { blacklistResponseValidator, } from './validators'; import { Network } from '../../constants'; +import { + DEXALOT_BLACKLIST_CACHES_TTL_S, + DEXALOT_RATE_LIMITED_TTL_S, + DEXALOT_RATELIMIT_CACHE_VALUE, + DEXALOT_RESTRICT_TTL_S, + DEXALOT_RESTRICTED_CACHE_KEY, +} from './constants'; +import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub'; export class RateFetcher { private pairsFetcher: Fetcher; private pairsCacheKey: string; - private pairsCacheTTL: number; private rateFetcher: Fetcher; + private rateTokensPubSub: ExpKeyValuePubSub; private pricesCacheKey: string; private pricesCacheTTL: number; @@ -31,9 +41,12 @@ export class RateFetcher { private tokensCacheTTL: number; private blacklistFetcher: Fetcher; + private blacklistPubSub: NonExpSetPubSub; private blacklistCacheKey: string; private blacklistCacheTTL: number; + private restrictedPoolPubSub: ExpKeyValuePubSub; + constructor( private dexHelper: IDexHelper, private dexKey: string, @@ -42,7 +55,6 @@ export class RateFetcher { config: DexalotRateFetcherConfig, ) { this.pairsCacheKey = config.rateConfig.pairsCacheKey; - this.pairsCacheTTL = config.rateConfig.pairsCacheTTLSecs; this.pricesCacheKey = config.rateConfig.pricesCacheKey; this.pricesCacheTTL = config.rateConfig.pricesCacheTTLSecs; this.tokensAddrCacheKey = config.rateConfig.tokensAddrCacheKey; @@ -51,6 +63,12 @@ export class RateFetcher { this.blacklistCacheKey = config.rateConfig.blacklistCacheKey; this.blacklistCacheTTL = config.rateConfig.blacklistCacheTTLSecs; + this.rateTokensPubSub = new ExpKeyValuePubSub( + dexHelper, + dexKey, + 'rateTokens', + ); + this.pairsFetcher = new Fetcher( dexHelper.httpRequest, { @@ -87,6 +105,7 @@ export class RateFetcher { logger, ); + this.blacklistPubSub = new NonExpSetPubSub(dexHelper, dexKey, 'blacklist'); this.blacklistFetcher = new Fetcher( dexHelper.httpRequest, { @@ -104,12 +123,30 @@ export class RateFetcher { config.rateConfig.blacklistIntervalMs, logger, ); + + this.restrictedPoolPubSub = new ExpKeyValuePubSub( + dexHelper, + dexKey, + 'restricted-pool', + // using default value, we can lazy load non-restricted pools + // and restrict them by subscribing to this channel + 'false', + DEXALOT_RESTRICT_TTL_S, + ); } - start() { - this.pairsFetcher.startPolling(); - this.rateFetcher.startPolling(); - this.blacklistFetcher.startPolling(); + async start() { + if (!this.dexHelper.config.isSlave) { + this.pairsFetcher.startPolling(); + this.rateFetcher.startPolling(); + this.blacklistFetcher.startPolling(); + } else { + this.rateTokensPubSub.subscribe(); + this.restrictedPoolPubSub.subscribe(); + + const initSet = await this.getAllBlacklisted(); + this.blacklistPubSub.initializeAndSubscribe(initSet); + } } stop() { @@ -145,7 +182,7 @@ export class RateFetcher { this.dexKey, this.network, this.pairsCacheKey, - this.pairsCacheTTL, + this.tokensCacheTTL, JSON.stringify(dexPairs), ); @@ -164,6 +201,15 @@ export class RateFetcher { this.tokensCacheTTL, JSON.stringify(tokenAddrMap), ); + + this.rateTokensPubSub.publish( + { + [this.pairsCacheKey]: dexPairs, + [this.tokensCacheKey]: tokenMap, + [this.tokensAddrCacheKey]: tokenAddrMap, + }, + this.tokensCacheTTL, + ); } private handleRatesResponse(resp: DexalotPricesResponse): void { @@ -180,18 +226,182 @@ export class RateFetcher { this.pricesCacheTTL, JSON.stringify(dexPrices), ); + + this.rateTokensPubSub.publish( + { [this.pricesCacheKey]: dexPrices }, + this.pricesCacheTTL, + ); } private async handleBlacklistResponse( resp: DexalotBlacklistResponse, ): Promise { const { blacklist } = resp; + const data = blacklist.map(item => item.toLowerCase()); this.dexHelper.cache.setex( this.dexKey, this.network, this.blacklistCacheKey, this.blacklistCacheTTL, - JSON.stringify(blacklist.map(item => item.toLowerCase())), + JSON.stringify(data), + ); + + this.blacklistPubSub.publish(data); + } + + async getCachedTokens(): Promise { + const cachedTokens = await this.rateTokensPubSub.getAndCache( + this.tokensCacheKey, + ); + + if (cachedTokens) { + return cachedTokens as TokenDataMap; + } + + return null; + } + + async getCachedPairs(): Promise { + const cachedPairs = await this.rateTokensPubSub.getAndCache( + this.pairsCacheKey, + ); + + if (cachedPairs) { + return cachedPairs as PairDataMap; + } + + return null; + } + + async getCachedTokensAddr(): Promise { + const cachedTokensAddr = await this.rateTokensPubSub.getAndCache( + this.tokensAddrCacheKey, + ); + + if (cachedTokensAddr) { + return cachedTokensAddr as TokenAddrDataMap; + } + + return null; + } + + async setBlacklist( + txOrigin: Address, + ttl: number = DEXALOT_BLACKLIST_CACHES_TTL_S, + ): Promise { + const blacklist = await this.getAllBlacklisted(); + + blacklist.push(txOrigin.toLowerCase()); + + this.dexHelper.cache.setex( + this.dexKey, + this.network, + this.blacklistCacheKey, + ttl, + JSON.stringify(blacklist), + ); + + this.blacklistPubSub.publish([txOrigin.toLowerCase()]); + + return true; + } + + async getAllBlacklisted(): Promise { + const cachedBlacklist = await this.dexHelper.cache.get( + this.dexKey, + this.network, + this.blacklistCacheKey, ); + + if (cachedBlacklist) { + return JSON.parse(cachedBlacklist); + } + + return []; + } + + async isBlacklisted(txOrigin: Address): Promise { + const blacklisted = await this.blacklistPubSub.has(txOrigin.toLowerCase()); + + return blacklisted; + + /* + rate-limit check was only if blacklist data set was not available, + in the current implementation it should not be the case, + so skip next check + */ + // To not show pricing for rate limited users + if (await this.isRateLimited(txOrigin)) { + return true; + } + + return false; + } + + async setRateLimited(txOrigin: Address, ttl = DEXALOT_RATE_LIMITED_TTL_S) { + await this.dexHelper.cache.setex( + this.dexKey, + this.network, + this.getRateLimitedKey(txOrigin), + ttl, + DEXALOT_RATELIMIT_CACHE_VALUE, + ); + return true; + } + + async isRateLimited(txOrigin: Address): Promise { + const result = await this.dexHelper.cache.get( + this.dexKey, + this.network, + this.getRateLimitedKey(txOrigin), + ); + return result === DEXALOT_RATELIMIT_CACHE_VALUE; + } + + getRateLimitedKey(address: Address) { + return `rate_limited_${address}`.toLowerCase(); + } + + async getCachedPrices(): Promise { + const cachedPrices = await this.rateTokensPubSub.getAndCache( + this.pricesCacheKey, + ); + + if (cachedPrices) { + return cachedPrices as PriceDataMap; + } + + return null; + } + + async restrictPool( + poolIdentifier: string, + ttl: number = DEXALOT_RESTRICT_TTL_S, + ): Promise { + await this.dexHelper.cache.setex( + this.dexKey, + this.network, + this.getRestrictedPoolKey(poolIdentifier), + ttl, + 'true', + ); + + await this.restrictedPoolPubSub.publish( + { [this.getRestrictedPoolKey(poolIdentifier)]: 'true' }, + ttl, + ); + return true; + } + + async isRestrictedPool(poolIdentifier: string): Promise { + const result = await this.restrictedPoolPubSub.getAndCache( + this.getRestrictedPoolKey(poolIdentifier), + ); + + return result === 'true'; + } + + getRestrictedPoolKey(poolIdentifier: string): string { + return `${DEXALOT_RESTRICTED_CACHE_KEY}-${poolIdentifier}`; } } diff --git a/src/dex/dexalot/types.ts b/src/dex/dexalot/types.ts index 41bc53257..9dd8cfc64 100644 --- a/src/dex/dexalot/types.ts +++ b/src/dex/dexalot/types.ts @@ -117,7 +117,6 @@ export type DexalotRateFetcherConfig = { tokensCacheKey: string; blacklistCacheKey: string; blacklistCacheTTLSecs: number; - pairsCacheTTLSecs: number; pricesCacheTTLSecs: number; tokensCacheTTLSecs: number; };