diff --git a/package.json b/package.json index 5f0a33edf..9f5f4fbd5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@paraswap/dex-lib", - "version": "4.0.8", + "version": "4.0.6-rfq-pub-sub.0", "main": "build/index.js", "types": "build/index.d.ts", "repository": "https://github.com/paraswap/paraswap-dex-lib", diff --git a/src/abi/fluid-dex/fluid-dex.abi.json b/src/abi/fluid-dex/fluid-dex.abi.json index cdd5e0f1a..b884a89b6 100644 --- a/src/abi/fluid-dex/fluid-dex.abi.json +++ b/src/abi/fluid-dex/fluid-dex.abi.json @@ -381,15 +381,13 @@ }, { "anonymous": false, - "inputs": [ - ], + "inputs": [], "name": "LogPauseSwapAndArbitrage", "type": "event" }, { "anonymous": false, - "inputs": [ - ], + "inputs": [], "name": "LogUnpauseSwapAndArbitrage", "type": "event" }, diff --git a/src/dex-helper/dummy-dex-helper.ts b/src/dex-helper/dummy-dex-helper.ts index c147c4525..5b922e14f 100644 --- a/src/dex-helper/dummy-dex-helper.ts +++ b/src/dex-helper/dummy-dex-helper.ts @@ -43,6 +43,23 @@ class DummyCache implements ICache { return null; } + async keys( + dexKey: string, + network: number, + cacheKey: string, + ): Promise { + return []; + } + + async ttl( + dexKey: string, + network: number, + cacheKey: string, + ): Promise { + const key = `${network}_${dexKey}_${cacheKey}`.toLowerCase(); + return this.storage[key] ? 1 : -1; + } + async rawget(key: string): Promise { return this.storage[key] ? this.storage[key] : null; return null; @@ -139,6 +156,10 @@ class DummyCache implements ICache { return set.has(key); } + async smembers(setKey: string): Promise { + return Array.from(this.setMap[setKey] ?? []); + } + async hset(mapKey: string, key: string, value: string): Promise { if (!this.hashStorage[mapKey]) this.hashStorage[mapKey] = {}; this.hashStorage[mapKey][key] = value; diff --git a/src/dex-helper/icache.ts b/src/dex-helper/icache.ts index 90bcf313b..bc2d4b821 100644 --- a/src/dex-helper/icache.ts +++ b/src/dex-helper/icache.ts @@ -5,6 +5,10 @@ export interface ICache { cacheKey: string, ): Promise; + ttl(dexKey: string, network: number, cacheKey: string): Promise; + + keys(dexKey: string, network: number, cacheKey: string): Promise; + rawget(key: string): Promise; rawset(key: string, value: string, ttl: number): Promise; @@ -52,6 +56,8 @@ export interface ICache { sismember(setKey: string, key: string): Promise; + smembers(setKey: string): Promise; + hset(mapKey: string, key: string, value: string): Promise; hdel(mapKey: string, keys: string[]): Promise; diff --git a/src/dex/generic-rfq/generic-rfq.ts b/src/dex/generic-rfq/generic-rfq.ts index d358fffe8..5dfe85006 100644 --- a/src/dex/generic-rfq/generic-rfq.ts +++ b/src/dex/generic-rfq/generic-rfq.ts @@ -447,12 +447,7 @@ export class GenericRFQ extends ParaSwapLimitOrders { } async setBlacklist(userAddress: string): Promise { - await this.dexHelper.cache.hset( - this.rateFetcher.blackListCacheKey, - userAddress.toLowerCase(), - 'true', - ); - return true; + return this.rateFetcher.setBlacklist(userAddress); } releaseResources(): void { diff --git a/src/dex/generic-rfq/rate-fetcher.ts b/src/dex/generic-rfq/rate-fetcher.ts index ae5ef30cb..46db952e6 100644 --- a/src/dex/generic-rfq/rate-fetcher.ts +++ b/src/dex/generic-rfq/rate-fetcher.ts @@ -37,6 +37,7 @@ import { ERC1271Contract, } from '../../lib/erc1271-utils'; import { isContractAddress } from '../../utils'; +import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub'; const GET_FIRM_RATE_TIMEOUT_MS = 2000; export const reversePrice = (price: PriceAndAmountBigNumber) => @@ -55,6 +56,9 @@ export class RateFetcher { private addressToTokenMap: Record = {}; private pairs: PairMap = {}; + private pricesPubSub: ExpKeyValuePubSub; + private blacklistPubSub?: NonExpSetPubSub; + private firmRateAuth?: (options: RequestConfig) => void; public blackListCacheKey: string; @@ -123,6 +127,13 @@ export class RateFetcher { logger, ); + this.pricesPubSub = new ExpKeyValuePubSub( + this.dexHelper, + this.dexKey, + 'prices', + ); + + this.blackListCacheKey = `${this.dexHelper.config.data.network}_${this.dexKey}_blacklist`; if (config.blacklistConfig) { this.blackListFetcher = new Fetcher( dexHelper.httpRequest, @@ -142,9 +153,14 @@ export class RateFetcher { config.blacklistConfig.intervalMs, logger, ); + + this.blacklistPubSub = new NonExpSetPubSub( + this.dexHelper, + this.dexKey, + 'blacklist', + ); } - this.blackListCacheKey = `${this.dexHelper.config.data.network}_${this.dexKey}_blacklist`; if (this.config.firmRateConfig.secret) { this.firmRateAuth = this.authHttp(this.config.firmRateConfig.secret); } @@ -161,6 +177,16 @@ export class RateFetcher { this.config.maker, ); } + + if (this.dexHelper.config.isSlave) { + this.pricesPubSub.subscribe(); + if (this.blacklistPubSub) { + const initSet = await this.dexHelper.cache.smembers( + this.blackListCacheKey, + ); + this.blacklistPubSub.initializeAndSubscribe(initSet); + } + } } start() { @@ -214,16 +240,22 @@ export class RateFetcher { for (const address of resp.blacklist) { this.dexHelper.cache.sadd(this.blackListCacheKey, address.toLowerCase()); } + + if (this.blacklistPubSub) { + this.blacklistPubSub.publish(resp.blacklist); + } } public isBlackListed(userAddress: string) { - return this.dexHelper.cache.sismember( - this.blackListCacheKey, - userAddress.toLowerCase(), - ); + if (this.blacklistPubSub) { + return this.blacklistPubSub.has(userAddress.toLowerCase()); + } + return false; } private handleRatesResponse(resp: RatesResponse) { + const pubSubData: Record = {}; + const ttl = this.config.rateConfig.dataTTLS; const pairs = this.pairs; if (isEmpty(pairs)) return; @@ -252,37 +284,51 @@ export class RateFetcher { } if (prices.bids.length) { + const key = `${baseToken.address}_${quoteToken.address}_bids`; + const value = prices.bids; + pubSubData[key] = value; + this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - `${baseToken.address}_${quoteToken.address}_bids`, - this.config.rateConfig.dataTTLS, - JSON.stringify(prices.bids), + key, + ttl, + JSON.stringify(value), ); currentPricePairs.add(`${baseToken.address}_${quoteToken.address}`); } if (prices.asks.length) { + const key = `${baseToken.address}_${quoteToken.address}_asks`; + const value = prices.asks; + pubSubData[key] = value; + this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - `${baseToken.address}_${quoteToken.address}_asks`, - this.config.rateConfig.dataTTLS, - JSON.stringify(prices.asks), + key, + ttl, + JSON.stringify(value), ); currentPricePairs.add(`${quoteToken.address}_${baseToken.address}`); } }); if (currentPricePairs.size > 0) { + const key = `pairs`; + const value = Array.from(currentPricePairs); + pubSubData[key] = value; + this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - `pairs`, - this.config.rateConfig.dataTTLS, - JSON.stringify(Array.from(currentPricePairs)), + key, + ttl, + JSON.stringify(value), ); } + + this.pricesPubSub.publish(pubSubData, ttl); } checkHealth(): boolean { @@ -322,17 +368,13 @@ export class RateFetcher { } public async getAvailablePairs(): Promise { - const pairs = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, - `pairs`, - ); + const pairs = await this.pricesPubSub.getAndCache(`pairs`); if (!pairs) { return []; } - return JSON.parse(pairs) as string[]; + return pairs; } public async getOrderPrice( @@ -342,49 +384,36 @@ export class RateFetcher { ): Promise { let reversed = false; - let pricesAsString: string | null = null; + let prices: PriceAndAmount[] | null = null; if (side === SwapSide.SELL) { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + prices = await this.pricesPubSub.getAndCache( `${srcToken.address}_${destToken.address}_bids`, ); - if (!pricesAsString) { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + if (!prices) { + prices = await this.pricesPubSub.getAndCache( `${destToken.address}_${srcToken.address}_asks`, ); reversed = true; } } else { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + prices = await this.pricesPubSub.getAndCache( `${destToken.address}_${srcToken.address}_asks`, ); - if (!pricesAsString) { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + if (!prices) { + prices = await this.pricesPubSub.getAndCache( `${srcToken.address}_${destToken.address}_bids`, ); reversed = true; } } - if (!pricesAsString) { + if (!prices) { return null; } - const orderPricesAsString: PriceAndAmount[] = JSON.parse(pricesAsString); - if (!orderPricesAsString) { - return null; - } - - let orderPrices = orderPricesAsString.map(price => [ + let orderPrices = prices.map(price => [ new BigNumber(price[0]), new BigNumber(price[1]), ]); @@ -483,4 +512,16 @@ export class RateFetcher { throw e; } } + + async setBlacklist(userAddress: string): Promise { + await this.dexHelper.cache.hset( + this.blackListCacheKey, + userAddress.toLowerCase(), + 'true', + ); + if (this.blacklistPubSub) { + this.blacklistPubSub.publish([userAddress.toLowerCase()]); + } + return true; + } } diff --git a/src/lib/pub-sub.ts b/src/lib/pub-sub.ts new file mode 100644 index 000000000..3684d6f0f --- /dev/null +++ b/src/lib/pub-sub.ts @@ -0,0 +1,147 @@ +import NodeCache from 'node-cache'; +import { Network } from '../constants'; +import { IDexHelper } from '../dex-helper'; +import { Logger } from '../types'; + +type KeyValuePubSubMsg = { + expiresAt: number; + data: Record; +}; + +type SetPubSubMsg = string[]; + +export class ExpKeyValuePubSub { + channel: string; + network: Network; + localCache: NodeCache = new NodeCache(); + + logger: Logger; + + constructor( + private dexHelper: IDexHelper, + private dexKey: string, + channel: string, + private defaultValue?: any, + private defaultTTL?: number, + ) { + this.network = this.dexHelper.config.data.network; + this.channel = `${this.network}_${this.dexKey}_${channel}`; + + this.logger = this.dexHelper.getLogger(`ExpKeyValuePubSub_${this.channel}`); + } + + subscribe() { + this.logger.info(`Subscribing to ${this.channel}`); + + this.dexHelper.cache.subscribe(this.channel, (_, msg) => { + const decodedMsg = JSON.parse(msg) as KeyValuePubSubMsg; + this.handleSubscription(decodedMsg); + }); + } + + publish(data: Record, ttl: number) { + const expiresAt = Math.round(Date.now() / 1000) + ttl; + this.dexHelper.cache.publish( + this.channel, + JSON.stringify({ expiresAt, data }), + ); + } + + handleSubscription(msg: KeyValuePubSubMsg) { + const { expiresAt, data } = msg; + + const now = Math.round(Date.now() / 1000); + // calculating ttl as message might come with the delay + const ttl = expiresAt - now; + + if (ttl > 0) { + const keys = Object.keys(data); + for (const key of keys) { + this.localCache.set(key, data[key], ttl); + } + } else { + this.logger.error('Message has expired', { + now, + expiresAt, + diffInSeconds: now - expiresAt, + keys: Object.keys(data), + }); + } + } + + async getAndCache(key: string): Promise { + const localValue = this.localCache.get(key); + + if (localValue) { + return localValue; + } + + const [value, ttl] = await Promise.all([ + this.dexHelper.cache.get(this.dexKey, this.network, key), + this.dexHelper.cache.ttl(this.dexKey, this.network, key), + ]); + + if (value && ttl > 0) { + const parsedValue = JSON.parse(value); + this.localCache.set(key, parsedValue, ttl); + return parsedValue; + } + + if (this.defaultValue && this.defaultTTL && this.defaultTTL > 0) { + this.localCache.set(key, this.defaultValue, this.defaultTTL); + return this.defaultValue; + } + + return null; + } +} + +export class NonExpSetPubSub { + channel: string; + network: Network; + set = new Set(); + + logger: Logger; + + constructor( + private dexHelper: IDexHelper, + private dexKey: string, + channel: string, + ) { + this.network = this.dexHelper.config.data.network; + this.channel = `${this.network}_${this.dexKey}_${channel}`; + + this.logger = this.dexHelper.getLogger(`NonExpSetPubSub_${this.channel}`); + } + + async initializeAndSubscribe(initialSet: string[]) { + for (const member of initialSet) { + this.set.add(member); + } + + this.subscribe(); + } + + subscribe() { + this.logger.info(`Subscribing to ${this.channel}`); + + this.dexHelper.cache.subscribe(this.channel, (_, msg) => { + const decodedMsg = JSON.parse(msg) as SetPubSubMsg; + this.handleSubscription(decodedMsg); + }); + } + + publish(msg: SetPubSubMsg) { + this.dexHelper.cache.publish(this.channel, JSON.stringify(msg)); + } + + handleSubscription(set: SetPubSubMsg) { + for (const key of set) { + this.set.add(key); + } + } + + async has(key: string) { + return this.set.has(key); + } +}