From baf51507e125a8eb0f257b60c684acc0fedd93e5 Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Fri, 13 Dec 2024 11:59:31 +0200 Subject: [PATCH 1/9] chore: move setBlacklist method to rate-fetcher --- src/dex/generic-rfq/generic-rfq.ts | 7 +------ src/dex/generic-rfq/rate-fetcher.ts | 9 +++++++++ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/dex/generic-rfq/generic-rfq.ts b/src/dex/generic-rfq/generic-rfq.ts index d358fffe8..5dfe85006 100644 --- a/src/dex/generic-rfq/generic-rfq.ts +++ b/src/dex/generic-rfq/generic-rfq.ts @@ -447,12 +447,7 @@ export class GenericRFQ extends ParaSwapLimitOrders { } async setBlacklist(userAddress: string): Promise { - await this.dexHelper.cache.hset( - this.rateFetcher.blackListCacheKey, - userAddress.toLowerCase(), - 'true', - ); - return true; + return this.rateFetcher.setBlacklist(userAddress); } releaseResources(): void { diff --git a/src/dex/generic-rfq/rate-fetcher.ts b/src/dex/generic-rfq/rate-fetcher.ts index ae5ef30cb..dfe6316fa 100644 --- a/src/dex/generic-rfq/rate-fetcher.ts +++ b/src/dex/generic-rfq/rate-fetcher.ts @@ -483,4 +483,13 @@ export class RateFetcher { throw e; } } + + async setBlacklist(userAddress: string): Promise { + await this.dexHelper.cache.hset( + this.blackListCacheKey, + userAddress.toLowerCase(), + 'true', + ); + return true; + } } From f688e964bf3ec8158f050f9de1f88b3d91f23a28 Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Fri, 13 Dec 2024 17:25:42 +0200 Subject: [PATCH 2/9] feat: add pub-sub interfaces --- src/dex-helper/dummy-dex-helper.ts | 12 +++ src/dex-helper/icache.ts | 8 ++ src/lib/pub-sub.ts | 130 +++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+) create mode 100644 src/lib/pub-sub.ts diff --git a/src/dex-helper/dummy-dex-helper.ts b/src/dex-helper/dummy-dex-helper.ts index c147c4525..86111f477 100644 --- a/src/dex-helper/dummy-dex-helper.ts +++ b/src/dex-helper/dummy-dex-helper.ts @@ -43,6 +43,14 @@ class DummyCache implements ICache { return null; } + async ttl( + dexKey: string, + network: number, + cacheKey: string, + ): Promise { + return '1'; + } + async rawget(key: string): Promise { return this.storage[key] ? this.storage[key] : null; return null; @@ -139,6 +147,10 @@ class DummyCache implements ICache { return set.has(key); } + async smembers(setKey: string): Promise { + return Array.from(this.setMap[setKey] ?? []); + } + async hset(mapKey: string, key: string, value: string): Promise { if (!this.hashStorage[mapKey]) this.hashStorage[mapKey] = {}; this.hashStorage[mapKey][key] = value; diff --git a/src/dex-helper/icache.ts b/src/dex-helper/icache.ts index 90bcf313b..cbad46110 100644 --- a/src/dex-helper/icache.ts +++ b/src/dex-helper/icache.ts @@ -5,6 +5,12 @@ export interface ICache { cacheKey: string, ): Promise; + ttl( + dexKey: string, + network: number, + cacheKey: string, + ): Promise; + rawget(key: string): Promise; rawset(key: string, value: string, ttl: number): Promise; @@ -52,6 +58,8 @@ export interface ICache { sismember(setKey: string, key: string): Promise; + smembers(setKey: string): Promise; + hset(mapKey: string, key: string, value: string): Promise; hdel(mapKey: string, keys: string[]): Promise; diff --git a/src/lib/pub-sub.ts b/src/lib/pub-sub.ts new file mode 100644 index 000000000..f7677acc4 --- /dev/null +++ b/src/lib/pub-sub.ts @@ -0,0 +1,130 @@ +import NodeCache from 'node-cache'; +import { Network } from '../constants'; +import { IDexHelper } from '../dex-helper'; + +type JsonPubSubMsg = { + expiresAt: number; + data: Record; +}; + +type SetPubSubMsg = string[]; + +export class JsonPubSub { + channel: string; + network: Network; + localCache: NodeCache = new NodeCache(); + + constructor( + private dexHelper: IDexHelper, + private dexKey: string, + channel: string, + ) { + this.network = this.dexHelper.config.data.network; + this.channel = `${this.network}_${this.dexKey}_${channel}`; + } + + initialize() { + this.subscribe(); + } + + subscribe() { + this.dexHelper.cache.subscribe(this.channel, (_, msg) => { + const decodedMsg = JSON.parse(msg) as JsonPubSubMsg; + this.handleSubscription(decodedMsg); + }); + } + + publish(data: Record, ttl: number) { + const expiresAt = Math.round(Date.now() / 1000) + ttl; + this.dexHelper.cache.publish( + this.channel, + JSON.stringify({ expiresAt, data }), + ); + } + + handleSubscription(json: JsonPubSubMsg) { + const { expiresAt, data } = json; + + const now = Math.round(Date.now() / 1000); + // calculating ttl as message might come with the delay + const ttl = expiresAt - now; + + const keys = Object.keys(data); + for (const key of keys) { + this.localCache.set(key, data[key], ttl); + } + } + + async getAndCache(key: string): Promise { + const localValue = this.localCache.get(key); + + if (localValue) { + return localValue; + } + + const [value, ttl] = await Promise.all([ + this.dexHelper.cache.get(this.dexKey, this.network, key), + this.dexHelper.cache.ttl(this.dexKey, this.network, key), + ]); + + if (value) { + // setting ttl same as in cache + // TODO-ps: check if ttl is not null + const parsedValue = JSON.parse(value); + this.localCache.set(key, parsedValue, Number(ttl)); + return parsedValue; + } + + return null; + } +} + +export class SetPubSub { + channel: string; + network: Network; + set = new Set(); + + constructor( + private dexHelper: IDexHelper, + private dexKey: string, + channel: string, + ) { + this.network = this.dexHelper.config.data.network; + this.channel = `${this.network}_${this.dexKey}_${channel}`; + } + + async initialize(key: string) { + // as there's no lazy load, we need to initialize the set + const initSet = await this.dexHelper.cache.smembers(key); + for (const member of initSet) { + this.set.add(member); + } + + this.subscribe(); + } + + subscribe() { + this.dexHelper.cache.subscribe(this.channel, (_, msg) => { + const decodedMsg = JSON.parse(msg) as SetPubSubMsg; + this.handleSubscription(decodedMsg); + }); + } + + publish(set: SetPubSubMsg) { + // as there's no lazy load, also store locally + for (const key of set) { + this.set.add(key); + } + this.dexHelper.cache.publish(this.channel, JSON.stringify(set)); + } + + handleSubscription(set: SetPubSubMsg) { + for (const key of set) { + this.set.add(key); + } + } + + async has(key: string) { + return this.set.has(key); + } +} From a211f9834c1d280d5317437e73dabab05b014698 Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Fri, 13 Dec 2024 17:41:39 +0200 Subject: [PATCH 3/9] feat: integrate pub-sub with generic-rfq --- src/dex/generic-rfq/rate-fetcher.ts | 107 +++++++++++++++++----------- 1 file changed, 66 insertions(+), 41 deletions(-) diff --git a/src/dex/generic-rfq/rate-fetcher.ts b/src/dex/generic-rfq/rate-fetcher.ts index dfe6316fa..658a31ba5 100644 --- a/src/dex/generic-rfq/rate-fetcher.ts +++ b/src/dex/generic-rfq/rate-fetcher.ts @@ -37,6 +37,7 @@ import { ERC1271Contract, } from '../../lib/erc1271-utils'; import { isContractAddress } from '../../utils'; +import { JsonPubSub, SetPubSub } from '../../lib/pub-sub'; const GET_FIRM_RATE_TIMEOUT_MS = 2000; export const reversePrice = (price: PriceAndAmountBigNumber) => @@ -55,6 +56,9 @@ export class RateFetcher { private addressToTokenMap: Record = {}; private pairs: PairMap = {}; + private pricesPubSub: JsonPubSub; + private blacklistPubSub?: SetPubSub; + private firmRateAuth?: (options: RequestConfig) => void; public blackListCacheKey: string; @@ -123,6 +127,8 @@ export class RateFetcher { logger, ); + this.pricesPubSub = new JsonPubSub(this.dexHelper, this.dexKey, 'prices'); + if (config.blacklistConfig) { this.blackListFetcher = new Fetcher( dexHelper.httpRequest, @@ -142,6 +148,12 @@ export class RateFetcher { config.blacklistConfig.intervalMs, logger, ); + + this.blacklistPubSub = new SetPubSub( + this.dexHelper, + this.dexKey, + 'blacklist', + ); } this.blackListCacheKey = `${this.dexHelper.config.data.network}_${this.dexKey}_blacklist`; @@ -161,6 +173,13 @@ export class RateFetcher { this.config.maker, ); } + + if (this.dexHelper.config.isSlave) { + this.pricesPubSub.initialize(); + if (this.blacklistPubSub) { + this.blacklistPubSub.initialize(this.blackListCacheKey); + } + } } start() { @@ -214,16 +233,22 @@ export class RateFetcher { for (const address of resp.blacklist) { this.dexHelper.cache.sadd(this.blackListCacheKey, address.toLowerCase()); } + + if (this.blacklistPubSub) { + this.blacklistPubSub.publish(resp.blacklist); + } } public isBlackListed(userAddress: string) { - return this.dexHelper.cache.sismember( - this.blackListCacheKey, - userAddress.toLowerCase(), - ); + if (this.blacklistPubSub) { + return this.blacklistPubSub.has(userAddress.toLowerCase()); + } + return false; } private handleRatesResponse(resp: RatesResponse) { + const pubSubData: Record = {}; + const ttl = this.config.rateConfig.dataTTLS; const pairs = this.pairs; if (isEmpty(pairs)) return; @@ -252,37 +277,51 @@ export class RateFetcher { } if (prices.bids.length) { + const key = `${baseToken.address}_${quoteToken.address}_bids`; + const value = prices.bids; + pubSubData[key] = value; + this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - `${baseToken.address}_${quoteToken.address}_bids`, - this.config.rateConfig.dataTTLS, - JSON.stringify(prices.bids), + key, + ttl, + JSON.stringify(value), ); currentPricePairs.add(`${baseToken.address}_${quoteToken.address}`); } if (prices.asks.length) { + const key = `${baseToken.address}_${quoteToken.address}_asks`; + const value = prices.asks; + pubSubData[key] = value; + this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - `${baseToken.address}_${quoteToken.address}_asks`, - this.config.rateConfig.dataTTLS, - JSON.stringify(prices.asks), + key, + ttl, + JSON.stringify(value), ); currentPricePairs.add(`${quoteToken.address}_${baseToken.address}`); } }); if (currentPricePairs.size > 0) { + const key = `pairs`; + const value = Array.from(currentPricePairs); + pubSubData[key] = value; + this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - `pairs`, - this.config.rateConfig.dataTTLS, - JSON.stringify(Array.from(currentPricePairs)), + key, + ttl, + JSON.stringify(value), ); } + + this.pricesPubSub.publish(pubSubData, ttl); } checkHealth(): boolean { @@ -322,17 +361,13 @@ export class RateFetcher { } public async getAvailablePairs(): Promise { - const pairs = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, - `pairs`, - ); + const pairs = await this.pricesPubSub.getAndCache(`pairs`); if (!pairs) { return []; } - return JSON.parse(pairs) as string[]; + return pairs; } public async getOrderPrice( @@ -342,49 +377,36 @@ export class RateFetcher { ): Promise { let reversed = false; - let pricesAsString: string | null = null; + let prices: PriceAndAmount[] | null = null; if (side === SwapSide.SELL) { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + prices = await this.pricesPubSub.getAndCache( `${srcToken.address}_${destToken.address}_bids`, ); - if (!pricesAsString) { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + if (!prices) { + prices = await this.pricesPubSub.getAndCache( `${destToken.address}_${srcToken.address}_asks`, ); reversed = true; } } else { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + prices = await this.pricesPubSub.getAndCache( `${destToken.address}_${srcToken.address}_asks`, ); - if (!pricesAsString) { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + if (!prices) { + prices = await this.pricesPubSub.getAndCache( `${srcToken.address}_${destToken.address}_bids`, ); reversed = true; } } - if (!pricesAsString) { + if (!prices) { return null; } - const orderPricesAsString: PriceAndAmount[] = JSON.parse(pricesAsString); - if (!orderPricesAsString) { - return null; - } - - let orderPrices = orderPricesAsString.map(price => [ + let orderPrices = prices.map(price => [ new BigNumber(price[0]), new BigNumber(price[1]), ]); @@ -490,6 +512,9 @@ export class RateFetcher { userAddress.toLowerCase(), 'true', ); + if (this.blacklistPubSub) { + this.blacklistPubSub.publish([userAddress.toLowerCase()]); + } return true; } } From df7995cbb9f570cd8832cd0fc1556f75e43a5d97 Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Mon, 16 Dec 2024 16:46:57 +0200 Subject: [PATCH 4/9] 4.0.6-rfq-pub-sub.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 0d8297852..9f5f4fbd5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@paraswap/dex-lib", - "version": "4.0.5", + "version": "4.0.6-rfq-pub-sub.0", "main": "build/index.js", "types": "build/index.d.ts", "repository": "https://github.com/paraswap/paraswap-dex-lib", From 3fab8855c29d9df4ac1ff5a317dfdd2f47286858 Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Wed, 18 Dec 2024 19:11:35 +0200 Subject: [PATCH 5/9] Revert "chore: move setBlacklist method to rate-fetcher" This reverts commit baf51507e125a8eb0f257b60c684acc0fedd93e5. --- src/dex/generic-rfq/generic-rfq.ts | 7 ++++++- src/dex/generic-rfq/rate-fetcher.ts | 12 ------------ 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/dex/generic-rfq/generic-rfq.ts b/src/dex/generic-rfq/generic-rfq.ts index 5dfe85006..d358fffe8 100644 --- a/src/dex/generic-rfq/generic-rfq.ts +++ b/src/dex/generic-rfq/generic-rfq.ts @@ -447,7 +447,12 @@ export class GenericRFQ extends ParaSwapLimitOrders { } async setBlacklist(userAddress: string): Promise { - return this.rateFetcher.setBlacklist(userAddress); + await this.dexHelper.cache.hset( + this.rateFetcher.blackListCacheKey, + userAddress.toLowerCase(), + 'true', + ); + return true; } releaseResources(): void { diff --git a/src/dex/generic-rfq/rate-fetcher.ts b/src/dex/generic-rfq/rate-fetcher.ts index 658a31ba5..d8b5eff30 100644 --- a/src/dex/generic-rfq/rate-fetcher.ts +++ b/src/dex/generic-rfq/rate-fetcher.ts @@ -505,16 +505,4 @@ export class RateFetcher { throw e; } } - - async setBlacklist(userAddress: string): Promise { - await this.dexHelper.cache.hset( - this.blackListCacheKey, - userAddress.toLowerCase(), - 'true', - ); - if (this.blacklistPubSub) { - this.blacklistPubSub.publish([userAddress.toLowerCase()]); - } - return true; - } } From a0320cde25b514d5e6939d1737f5d2dac7e0e530 Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Wed, 18 Dec 2024 19:12:00 +0200 Subject: [PATCH 6/9] Revert "feat: add pub-sub interfaces" This reverts commit f688e964bf3ec8158f050f9de1f88b3d91f23a28. --- src/dex-helper/dummy-dex-helper.ts | 12 --- src/dex-helper/icache.ts | 8 -- src/lib/pub-sub.ts | 130 ----------------------------- 3 files changed, 150 deletions(-) delete mode 100644 src/lib/pub-sub.ts diff --git a/src/dex-helper/dummy-dex-helper.ts b/src/dex-helper/dummy-dex-helper.ts index 86111f477..c147c4525 100644 --- a/src/dex-helper/dummy-dex-helper.ts +++ b/src/dex-helper/dummy-dex-helper.ts @@ -43,14 +43,6 @@ class DummyCache implements ICache { return null; } - async ttl( - dexKey: string, - network: number, - cacheKey: string, - ): Promise { - return '1'; - } - async rawget(key: string): Promise { return this.storage[key] ? this.storage[key] : null; return null; @@ -147,10 +139,6 @@ class DummyCache implements ICache { return set.has(key); } - async smembers(setKey: string): Promise { - return Array.from(this.setMap[setKey] ?? []); - } - async hset(mapKey: string, key: string, value: string): Promise { if (!this.hashStorage[mapKey]) this.hashStorage[mapKey] = {}; this.hashStorage[mapKey][key] = value; diff --git a/src/dex-helper/icache.ts b/src/dex-helper/icache.ts index cbad46110..90bcf313b 100644 --- a/src/dex-helper/icache.ts +++ b/src/dex-helper/icache.ts @@ -5,12 +5,6 @@ export interface ICache { cacheKey: string, ): Promise; - ttl( - dexKey: string, - network: number, - cacheKey: string, - ): Promise; - rawget(key: string): Promise; rawset(key: string, value: string, ttl: number): Promise; @@ -58,8 +52,6 @@ export interface ICache { sismember(setKey: string, key: string): Promise; - smembers(setKey: string): Promise; - hset(mapKey: string, key: string, value: string): Promise; hdel(mapKey: string, keys: string[]): Promise; diff --git a/src/lib/pub-sub.ts b/src/lib/pub-sub.ts deleted file mode 100644 index f7677acc4..000000000 --- a/src/lib/pub-sub.ts +++ /dev/null @@ -1,130 +0,0 @@ -import NodeCache from 'node-cache'; -import { Network } from '../constants'; -import { IDexHelper } from '../dex-helper'; - -type JsonPubSubMsg = { - expiresAt: number; - data: Record; -}; - -type SetPubSubMsg = string[]; - -export class JsonPubSub { - channel: string; - network: Network; - localCache: NodeCache = new NodeCache(); - - constructor( - private dexHelper: IDexHelper, - private dexKey: string, - channel: string, - ) { - this.network = this.dexHelper.config.data.network; - this.channel = `${this.network}_${this.dexKey}_${channel}`; - } - - initialize() { - this.subscribe(); - } - - subscribe() { - this.dexHelper.cache.subscribe(this.channel, (_, msg) => { - const decodedMsg = JSON.parse(msg) as JsonPubSubMsg; - this.handleSubscription(decodedMsg); - }); - } - - publish(data: Record, ttl: number) { - const expiresAt = Math.round(Date.now() / 1000) + ttl; - this.dexHelper.cache.publish( - this.channel, - JSON.stringify({ expiresAt, data }), - ); - } - - handleSubscription(json: JsonPubSubMsg) { - const { expiresAt, data } = json; - - const now = Math.round(Date.now() / 1000); - // calculating ttl as message might come with the delay - const ttl = expiresAt - now; - - const keys = Object.keys(data); - for (const key of keys) { - this.localCache.set(key, data[key], ttl); - } - } - - async getAndCache(key: string): Promise { - const localValue = this.localCache.get(key); - - if (localValue) { - return localValue; - } - - const [value, ttl] = await Promise.all([ - this.dexHelper.cache.get(this.dexKey, this.network, key), - this.dexHelper.cache.ttl(this.dexKey, this.network, key), - ]); - - if (value) { - // setting ttl same as in cache - // TODO-ps: check if ttl is not null - const parsedValue = JSON.parse(value); - this.localCache.set(key, parsedValue, Number(ttl)); - return parsedValue; - } - - return null; - } -} - -export class SetPubSub { - channel: string; - network: Network; - set = new Set(); - - constructor( - private dexHelper: IDexHelper, - private dexKey: string, - channel: string, - ) { - this.network = this.dexHelper.config.data.network; - this.channel = `${this.network}_${this.dexKey}_${channel}`; - } - - async initialize(key: string) { - // as there's no lazy load, we need to initialize the set - const initSet = await this.dexHelper.cache.smembers(key); - for (const member of initSet) { - this.set.add(member); - } - - this.subscribe(); - } - - subscribe() { - this.dexHelper.cache.subscribe(this.channel, (_, msg) => { - const decodedMsg = JSON.parse(msg) as SetPubSubMsg; - this.handleSubscription(decodedMsg); - }); - } - - publish(set: SetPubSubMsg) { - // as there's no lazy load, also store locally - for (const key of set) { - this.set.add(key); - } - this.dexHelper.cache.publish(this.channel, JSON.stringify(set)); - } - - handleSubscription(set: SetPubSubMsg) { - for (const key of set) { - this.set.add(key); - } - } - - async has(key: string) { - return this.set.has(key); - } -} From 1d04fd99466b5bad40d6b1d8fdb9d7de01e09346 Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Wed, 18 Dec 2024 19:12:32 +0200 Subject: [PATCH 7/9] Revert "feat: integrate pub-sub with generic-rfq" This reverts commit a211f9834c1d280d5317437e73dabab05b014698. --- src/dex/generic-rfq/rate-fetcher.ts | 104 +++++++++++----------------- 1 file changed, 41 insertions(+), 63 deletions(-) diff --git a/src/dex/generic-rfq/rate-fetcher.ts b/src/dex/generic-rfq/rate-fetcher.ts index d8b5eff30..ae5ef30cb 100644 --- a/src/dex/generic-rfq/rate-fetcher.ts +++ b/src/dex/generic-rfq/rate-fetcher.ts @@ -37,7 +37,6 @@ import { ERC1271Contract, } from '../../lib/erc1271-utils'; import { isContractAddress } from '../../utils'; -import { JsonPubSub, SetPubSub } from '../../lib/pub-sub'; const GET_FIRM_RATE_TIMEOUT_MS = 2000; export const reversePrice = (price: PriceAndAmountBigNumber) => @@ -56,9 +55,6 @@ export class RateFetcher { private addressToTokenMap: Record = {}; private pairs: PairMap = {}; - private pricesPubSub: JsonPubSub; - private blacklistPubSub?: SetPubSub; - private firmRateAuth?: (options: RequestConfig) => void; public blackListCacheKey: string; @@ -127,8 +123,6 @@ export class RateFetcher { logger, ); - this.pricesPubSub = new JsonPubSub(this.dexHelper, this.dexKey, 'prices'); - if (config.blacklistConfig) { this.blackListFetcher = new Fetcher( dexHelper.httpRequest, @@ -148,12 +142,6 @@ export class RateFetcher { config.blacklistConfig.intervalMs, logger, ); - - this.blacklistPubSub = new SetPubSub( - this.dexHelper, - this.dexKey, - 'blacklist', - ); } this.blackListCacheKey = `${this.dexHelper.config.data.network}_${this.dexKey}_blacklist`; @@ -173,13 +161,6 @@ export class RateFetcher { this.config.maker, ); } - - if (this.dexHelper.config.isSlave) { - this.pricesPubSub.initialize(); - if (this.blacklistPubSub) { - this.blacklistPubSub.initialize(this.blackListCacheKey); - } - } } start() { @@ -233,22 +214,16 @@ export class RateFetcher { for (const address of resp.blacklist) { this.dexHelper.cache.sadd(this.blackListCacheKey, address.toLowerCase()); } - - if (this.blacklistPubSub) { - this.blacklistPubSub.publish(resp.blacklist); - } } public isBlackListed(userAddress: string) { - if (this.blacklistPubSub) { - return this.blacklistPubSub.has(userAddress.toLowerCase()); - } - return false; + return this.dexHelper.cache.sismember( + this.blackListCacheKey, + userAddress.toLowerCase(), + ); } private handleRatesResponse(resp: RatesResponse) { - const pubSubData: Record = {}; - const ttl = this.config.rateConfig.dataTTLS; const pairs = this.pairs; if (isEmpty(pairs)) return; @@ -277,51 +252,37 @@ export class RateFetcher { } if (prices.bids.length) { - const key = `${baseToken.address}_${quoteToken.address}_bids`; - const value = prices.bids; - pubSubData[key] = value; - this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - key, - ttl, - JSON.stringify(value), + `${baseToken.address}_${quoteToken.address}_bids`, + this.config.rateConfig.dataTTLS, + JSON.stringify(prices.bids), ); currentPricePairs.add(`${baseToken.address}_${quoteToken.address}`); } if (prices.asks.length) { - const key = `${baseToken.address}_${quoteToken.address}_asks`; - const value = prices.asks; - pubSubData[key] = value; - this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - key, - ttl, - JSON.stringify(value), + `${baseToken.address}_${quoteToken.address}_asks`, + this.config.rateConfig.dataTTLS, + JSON.stringify(prices.asks), ); currentPricePairs.add(`${quoteToken.address}_${baseToken.address}`); } }); if (currentPricePairs.size > 0) { - const key = `pairs`; - const value = Array.from(currentPricePairs); - pubSubData[key] = value; - this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - key, - ttl, - JSON.stringify(value), + `pairs`, + this.config.rateConfig.dataTTLS, + JSON.stringify(Array.from(currentPricePairs)), ); } - - this.pricesPubSub.publish(pubSubData, ttl); } checkHealth(): boolean { @@ -361,13 +322,17 @@ export class RateFetcher { } public async getAvailablePairs(): Promise { - const pairs = await this.pricesPubSub.getAndCache(`pairs`); + const pairs = await this.dexHelper.cache.get( + this.dexKey, + this.dexHelper.config.data.network, + `pairs`, + ); if (!pairs) { return []; } - return pairs; + return JSON.parse(pairs) as string[]; } public async getOrderPrice( @@ -377,36 +342,49 @@ export class RateFetcher { ): Promise { let reversed = false; - let prices: PriceAndAmount[] | null = null; + let pricesAsString: string | null = null; if (side === SwapSide.SELL) { - prices = await this.pricesPubSub.getAndCache( + pricesAsString = await this.dexHelper.cache.get( + this.dexKey, + this.dexHelper.config.data.network, `${srcToken.address}_${destToken.address}_bids`, ); - if (!prices) { - prices = await this.pricesPubSub.getAndCache( + if (!pricesAsString) { + pricesAsString = await this.dexHelper.cache.get( + this.dexKey, + this.dexHelper.config.data.network, `${destToken.address}_${srcToken.address}_asks`, ); reversed = true; } } else { - prices = await this.pricesPubSub.getAndCache( + pricesAsString = await this.dexHelper.cache.get( + this.dexKey, + this.dexHelper.config.data.network, `${destToken.address}_${srcToken.address}_asks`, ); - if (!prices) { - prices = await this.pricesPubSub.getAndCache( + if (!pricesAsString) { + pricesAsString = await this.dexHelper.cache.get( + this.dexKey, + this.dexHelper.config.data.network, `${srcToken.address}_${destToken.address}_bids`, ); reversed = true; } } - if (!prices) { + if (!pricesAsString) { + return null; + } + + const orderPricesAsString: PriceAndAmount[] = JSON.parse(pricesAsString); + if (!orderPricesAsString) { return null; } - let orderPrices = prices.map(price => [ + let orderPrices = orderPricesAsString.map(price => [ new BigNumber(price[0]), new BigNumber(price[1]), ]); From bc4713208ffa42403535beb253e70958936ce0ec Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Wed, 18 Dec 2024 19:18:13 +0200 Subject: [PATCH 8/9] feat: implement pub-sub --- src/dex-helper/dummy-dex-helper.ts | 21 +++++ src/dex-helper/icache.ts | 6 ++ src/lib/pub-sub.ts | 147 +++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+) create mode 100644 src/lib/pub-sub.ts diff --git a/src/dex-helper/dummy-dex-helper.ts b/src/dex-helper/dummy-dex-helper.ts index c147c4525..5b922e14f 100644 --- a/src/dex-helper/dummy-dex-helper.ts +++ b/src/dex-helper/dummy-dex-helper.ts @@ -43,6 +43,23 @@ class DummyCache implements ICache { return null; } + async keys( + dexKey: string, + network: number, + cacheKey: string, + ): Promise { + return []; + } + + async ttl( + dexKey: string, + network: number, + cacheKey: string, + ): Promise { + const key = `${network}_${dexKey}_${cacheKey}`.toLowerCase(); + return this.storage[key] ? 1 : -1; + } + async rawget(key: string): Promise { return this.storage[key] ? this.storage[key] : null; return null; @@ -139,6 +156,10 @@ class DummyCache implements ICache { return set.has(key); } + async smembers(setKey: string): Promise { + return Array.from(this.setMap[setKey] ?? []); + } + async hset(mapKey: string, key: string, value: string): Promise { if (!this.hashStorage[mapKey]) this.hashStorage[mapKey] = {}; this.hashStorage[mapKey][key] = value; diff --git a/src/dex-helper/icache.ts b/src/dex-helper/icache.ts index 90bcf313b..bc2d4b821 100644 --- a/src/dex-helper/icache.ts +++ b/src/dex-helper/icache.ts @@ -5,6 +5,10 @@ export interface ICache { cacheKey: string, ): Promise; + ttl(dexKey: string, network: number, cacheKey: string): Promise; + + keys(dexKey: string, network: number, cacheKey: string): Promise; + rawget(key: string): Promise; rawset(key: string, value: string, ttl: number): Promise; @@ -52,6 +56,8 @@ export interface ICache { sismember(setKey: string, key: string): Promise; + smembers(setKey: string): Promise; + hset(mapKey: string, key: string, value: string): Promise; hdel(mapKey: string, keys: string[]): Promise; diff --git a/src/lib/pub-sub.ts b/src/lib/pub-sub.ts new file mode 100644 index 000000000..3684d6f0f --- /dev/null +++ b/src/lib/pub-sub.ts @@ -0,0 +1,147 @@ +import NodeCache from 'node-cache'; +import { Network } from '../constants'; +import { IDexHelper } from '../dex-helper'; +import { Logger } from '../types'; + +type KeyValuePubSubMsg = { + expiresAt: number; + data: Record; +}; + +type SetPubSubMsg = string[]; + +export class ExpKeyValuePubSub { + channel: string; + network: Network; + localCache: NodeCache = new NodeCache(); + + logger: Logger; + + constructor( + private dexHelper: IDexHelper, + private dexKey: string, + channel: string, + private defaultValue?: any, + private defaultTTL?: number, + ) { + this.network = this.dexHelper.config.data.network; + this.channel = `${this.network}_${this.dexKey}_${channel}`; + + this.logger = this.dexHelper.getLogger(`ExpKeyValuePubSub_${this.channel}`); + } + + subscribe() { + this.logger.info(`Subscribing to ${this.channel}`); + + this.dexHelper.cache.subscribe(this.channel, (_, msg) => { + const decodedMsg = JSON.parse(msg) as KeyValuePubSubMsg; + this.handleSubscription(decodedMsg); + }); + } + + publish(data: Record, ttl: number) { + const expiresAt = Math.round(Date.now() / 1000) + ttl; + this.dexHelper.cache.publish( + this.channel, + JSON.stringify({ expiresAt, data }), + ); + } + + handleSubscription(msg: KeyValuePubSubMsg) { + const { expiresAt, data } = msg; + + const now = Math.round(Date.now() / 1000); + // calculating ttl as message might come with the delay + const ttl = expiresAt - now; + + if (ttl > 0) { + const keys = Object.keys(data); + for (const key of keys) { + this.localCache.set(key, data[key], ttl); + } + } else { + this.logger.error('Message has expired', { + now, + expiresAt, + diffInSeconds: now - expiresAt, + keys: Object.keys(data), + }); + } + } + + async getAndCache(key: string): Promise { + const localValue = this.localCache.get(key); + + if (localValue) { + return localValue; + } + + const [value, ttl] = await Promise.all([ + this.dexHelper.cache.get(this.dexKey, this.network, key), + this.dexHelper.cache.ttl(this.dexKey, this.network, key), + ]); + + if (value && ttl > 0) { + const parsedValue = JSON.parse(value); + this.localCache.set(key, parsedValue, ttl); + return parsedValue; + } + + if (this.defaultValue && this.defaultTTL && this.defaultTTL > 0) { + this.localCache.set(key, this.defaultValue, this.defaultTTL); + return this.defaultValue; + } + + return null; + } +} + +export class NonExpSetPubSub { + channel: string; + network: Network; + set = new Set(); + + logger: Logger; + + constructor( + private dexHelper: IDexHelper, + private dexKey: string, + channel: string, + ) { + this.network = this.dexHelper.config.data.network; + this.channel = `${this.network}_${this.dexKey}_${channel}`; + + this.logger = this.dexHelper.getLogger(`NonExpSetPubSub_${this.channel}`); + } + + async initializeAndSubscribe(initialSet: string[]) { + for (const member of initialSet) { + this.set.add(member); + } + + this.subscribe(); + } + + subscribe() { + this.logger.info(`Subscribing to ${this.channel}`); + + this.dexHelper.cache.subscribe(this.channel, (_, msg) => { + const decodedMsg = JSON.parse(msg) as SetPubSubMsg; + this.handleSubscription(decodedMsg); + }); + } + + publish(msg: SetPubSubMsg) { + this.dexHelper.cache.publish(this.channel, JSON.stringify(msg)); + } + + handleSubscription(set: SetPubSubMsg) { + for (const key of set) { + this.set.add(key); + } + } + + async has(key: string) { + return this.set.has(key); + } +} From f80c28863e48a67b5ae0a814a575cc7c3cc670f9 Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Wed, 18 Dec 2024 19:27:11 +0200 Subject: [PATCH 9/9] feat: pub-sub for `generic-rfq` --- src/dex/generic-rfq/generic-rfq.ts | 7 +- src/dex/generic-rfq/rate-fetcher.ts | 125 ++++++++++++++++++---------- 2 files changed, 84 insertions(+), 48 deletions(-) diff --git a/src/dex/generic-rfq/generic-rfq.ts b/src/dex/generic-rfq/generic-rfq.ts index d358fffe8..5dfe85006 100644 --- a/src/dex/generic-rfq/generic-rfq.ts +++ b/src/dex/generic-rfq/generic-rfq.ts @@ -447,12 +447,7 @@ export class GenericRFQ extends ParaSwapLimitOrders { } async setBlacklist(userAddress: string): Promise { - await this.dexHelper.cache.hset( - this.rateFetcher.blackListCacheKey, - userAddress.toLowerCase(), - 'true', - ); - return true; + return this.rateFetcher.setBlacklist(userAddress); } releaseResources(): void { diff --git a/src/dex/generic-rfq/rate-fetcher.ts b/src/dex/generic-rfq/rate-fetcher.ts index ae5ef30cb..46db952e6 100644 --- a/src/dex/generic-rfq/rate-fetcher.ts +++ b/src/dex/generic-rfq/rate-fetcher.ts @@ -37,6 +37,7 @@ import { ERC1271Contract, } from '../../lib/erc1271-utils'; import { isContractAddress } from '../../utils'; +import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub'; const GET_FIRM_RATE_TIMEOUT_MS = 2000; export const reversePrice = (price: PriceAndAmountBigNumber) => @@ -55,6 +56,9 @@ export class RateFetcher { private addressToTokenMap: Record = {}; private pairs: PairMap = {}; + private pricesPubSub: ExpKeyValuePubSub; + private blacklistPubSub?: NonExpSetPubSub; + private firmRateAuth?: (options: RequestConfig) => void; public blackListCacheKey: string; @@ -123,6 +127,13 @@ export class RateFetcher { logger, ); + this.pricesPubSub = new ExpKeyValuePubSub( + this.dexHelper, + this.dexKey, + 'prices', + ); + + this.blackListCacheKey = `${this.dexHelper.config.data.network}_${this.dexKey}_blacklist`; if (config.blacklistConfig) { this.blackListFetcher = new Fetcher( dexHelper.httpRequest, @@ -142,9 +153,14 @@ export class RateFetcher { config.blacklistConfig.intervalMs, logger, ); + + this.blacklistPubSub = new NonExpSetPubSub( + this.dexHelper, + this.dexKey, + 'blacklist', + ); } - this.blackListCacheKey = `${this.dexHelper.config.data.network}_${this.dexKey}_blacklist`; if (this.config.firmRateConfig.secret) { this.firmRateAuth = this.authHttp(this.config.firmRateConfig.secret); } @@ -161,6 +177,16 @@ export class RateFetcher { this.config.maker, ); } + + if (this.dexHelper.config.isSlave) { + this.pricesPubSub.subscribe(); + if (this.blacklistPubSub) { + const initSet = await this.dexHelper.cache.smembers( + this.blackListCacheKey, + ); + this.blacklistPubSub.initializeAndSubscribe(initSet); + } + } } start() { @@ -214,16 +240,22 @@ export class RateFetcher { for (const address of resp.blacklist) { this.dexHelper.cache.sadd(this.blackListCacheKey, address.toLowerCase()); } + + if (this.blacklistPubSub) { + this.blacklistPubSub.publish(resp.blacklist); + } } public isBlackListed(userAddress: string) { - return this.dexHelper.cache.sismember( - this.blackListCacheKey, - userAddress.toLowerCase(), - ); + if (this.blacklistPubSub) { + return this.blacklistPubSub.has(userAddress.toLowerCase()); + } + return false; } private handleRatesResponse(resp: RatesResponse) { + const pubSubData: Record = {}; + const ttl = this.config.rateConfig.dataTTLS; const pairs = this.pairs; if (isEmpty(pairs)) return; @@ -252,37 +284,51 @@ export class RateFetcher { } if (prices.bids.length) { + const key = `${baseToken.address}_${quoteToken.address}_bids`; + const value = prices.bids; + pubSubData[key] = value; + this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - `${baseToken.address}_${quoteToken.address}_bids`, - this.config.rateConfig.dataTTLS, - JSON.stringify(prices.bids), + key, + ttl, + JSON.stringify(value), ); currentPricePairs.add(`${baseToken.address}_${quoteToken.address}`); } if (prices.asks.length) { + const key = `${baseToken.address}_${quoteToken.address}_asks`; + const value = prices.asks; + pubSubData[key] = value; + this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - `${baseToken.address}_${quoteToken.address}_asks`, - this.config.rateConfig.dataTTLS, - JSON.stringify(prices.asks), + key, + ttl, + JSON.stringify(value), ); currentPricePairs.add(`${quoteToken.address}_${baseToken.address}`); } }); if (currentPricePairs.size > 0) { + const key = `pairs`; + const value = Array.from(currentPricePairs); + pubSubData[key] = value; + this.dexHelper.cache.setex( this.dexKey, this.dexHelper.config.data.network, - `pairs`, - this.config.rateConfig.dataTTLS, - JSON.stringify(Array.from(currentPricePairs)), + key, + ttl, + JSON.stringify(value), ); } + + this.pricesPubSub.publish(pubSubData, ttl); } checkHealth(): boolean { @@ -322,17 +368,13 @@ export class RateFetcher { } public async getAvailablePairs(): Promise { - const pairs = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, - `pairs`, - ); + const pairs = await this.pricesPubSub.getAndCache(`pairs`); if (!pairs) { return []; } - return JSON.parse(pairs) as string[]; + return pairs; } public async getOrderPrice( @@ -342,49 +384,36 @@ export class RateFetcher { ): Promise { let reversed = false; - let pricesAsString: string | null = null; + let prices: PriceAndAmount[] | null = null; if (side === SwapSide.SELL) { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + prices = await this.pricesPubSub.getAndCache( `${srcToken.address}_${destToken.address}_bids`, ); - if (!pricesAsString) { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + if (!prices) { + prices = await this.pricesPubSub.getAndCache( `${destToken.address}_${srcToken.address}_asks`, ); reversed = true; } } else { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + prices = await this.pricesPubSub.getAndCache( `${destToken.address}_${srcToken.address}_asks`, ); - if (!pricesAsString) { - pricesAsString = await this.dexHelper.cache.get( - this.dexKey, - this.dexHelper.config.data.network, + if (!prices) { + prices = await this.pricesPubSub.getAndCache( `${srcToken.address}_${destToken.address}_bids`, ); reversed = true; } } - if (!pricesAsString) { + if (!prices) { return null; } - const orderPricesAsString: PriceAndAmount[] = JSON.parse(pricesAsString); - if (!orderPricesAsString) { - return null; - } - - let orderPrices = orderPricesAsString.map(price => [ + let orderPrices = prices.map(price => [ new BigNumber(price[0]), new BigNumber(price[1]), ]); @@ -483,4 +512,16 @@ export class RateFetcher { throw e; } } + + async setBlacklist(userAddress: string): Promise { + await this.dexHelper.cache.hset( + this.blackListCacheKey, + userAddress.toLowerCase(), + 'true', + ); + if (this.blacklistPubSub) { + this.blacklistPubSub.publish([userAddress.toLowerCase()]); + } + return true; + } }