From eeafb6930c89bc59178f77fbc17f2458a16b317b Mon Sep 17 00:00:00 2001 From: Danylo Kanievskyi Date: Wed, 18 Dec 2024 19:28:19 +0200 Subject: [PATCH] feat: pub-sub for `hashflow` --- src/dex/hashflow/constants.ts | 4 +- src/dex/hashflow/hashflow.ts | 78 +++------------------ src/dex/hashflow/rate-fetcher.ts | 114 ++++++++++++++++++++++++++++--- src/dex/hashflow/types.ts | 1 - 4 files changed, 119 insertions(+), 78 deletions(-) diff --git a/src/dex/hashflow/constants.ts b/src/dex/hashflow/constants.ts index 0d3993b0e..4802a07ef 100644 --- a/src/dex/hashflow/constants.ts +++ b/src/dex/hashflow/constants.ts @@ -4,11 +4,11 @@ export const HASHFLOW_BLACKLIST_TTL_S = 60 * 60 * 24 * 7; // 7 days export const HASHFLOW_MM_RESTRICT_TTL_S = 60 * 60; -export const HASHFLOW_PRICES_CACHES_TTL_S = 3; +export const HASHFLOW_PRICES_CACHES_TTL_S = 5; export const HASHFLOW_MARKET_MAKERS_CACHES_TTL_S = 30; -export const HASHFLOW_API_PRICES_POLLING_INTERVAL_MS = 1000; +export const HASHFLOW_API_PRICES_POLLING_INTERVAL_MS = 2000; export const HASHFLOW_API_MARKET_MAKERS_POLLING_INTERVAL_MS = 28 * 1000; // 28 secs diff --git a/src/dex/hashflow/hashflow.ts b/src/dex/hashflow/hashflow.ts index 6b36e94c4..c17dc4275 100644 --- a/src/dex/hashflow/hashflow.ts +++ b/src/dex/hashflow/hashflow.ts @@ -1,10 +1,6 @@ import { ChainId } from '@hashflow/sdk'; import { Chain, ChainType, HashflowApi } from '@hashflow/taker-js'; -import { - MarketMakersResponse, - PriceLevelsResponse, - RfqResponse, -} from '@hashflow/taker-js/dist/types/rest'; +import { RfqResponse } from '@hashflow/taker-js/dist/types/rest'; import BigNumber from 'bignumber.js'; import { Interface } from 'ethers/lib/utils'; import { assert } from 'ts-essentials'; @@ -145,7 +141,6 @@ export class Hashflow extends SimpleExchange implements IDex { }, headers: { Authorization: this.hashFlowAuthToken }, }, - getCachedMarketMakers: this.getCachedMarketMakers.bind(this), filterMarketMakers: this.getFilteredMarketMakers.bind(this), pricesCacheKey: this.pricesCacheKey, pricesCacheTTLSecs: HASHFLOW_PRICES_CACHES_TTL_S, @@ -157,9 +152,7 @@ export class Hashflow extends SimpleExchange implements IDex { } async initializePricing(blockNumber: number): Promise { - if (!this.dexHelper.config.isSlave) { - this.rateFetcher.start(); - } + this.rateFetcher.start(); return; } @@ -198,7 +191,7 @@ export class Hashflow extends SimpleExchange implements IDex { return []; } - const levels = (await this.getCachedLevels()) || {}; + const levels = (await this.rateFetcher.getCachedLevels()) || {}; const makers = Object.keys(levels); return makers @@ -381,32 +374,6 @@ export class Hashflow extends SimpleExchange implements IDex { return undefined; } - async getCachedMarketMakers(): Promise< - MarketMakersResponse['marketMakers'] | null - > { - const cachedMarketMakers = await this.dexHelper.cache.rawget( - this.marketMakersCacheKey, - ); - - if (cachedMarketMakers) { - return JSON.parse( - cachedMarketMakers, - ) as MarketMakersResponse['marketMakers']; - } - - return null; - } - - async getCachedLevels(): Promise { - const cachedLevels = await this.dexHelper.cache.rawget(this.pricesCacheKey); - - if (cachedLevels) { - return JSON.parse(cachedLevels) as PriceLevelsResponse['levels']; - } - - return null; - } - async getPricesVolume( srcToken: Token, destToken: Token, @@ -434,7 +401,7 @@ export class Hashflow extends SimpleExchange implements IDex { const marketMakersToUse = pools.map(p => p.split(`${prefix}_`).pop()); - const levelsMap = (await this.getCachedLevels()) || {}; + const levelsMap = (await this.rateFetcher.getCachedLevels()) || {}; Object.keys(levelsMap).forEach(mmKey => { if (!marketMakersToUse.includes(mmKey)) { @@ -715,7 +682,7 @@ export class Hashflow extends SimpleExchange implements IDex { this.logger.warn( `${this.dexKey}-${this.network}: Encountered restricted user=${options.userAddress}. Adding to local blacklist cache`, ); - await this.setBlacklist(options.userAddress); + await this.rateFetcher.setBlacklist(options.userAddress); } else { if (e instanceof TooStrictSlippageCheckError) { this.logger.warn( @@ -911,33 +878,6 @@ export class Hashflow extends SimpleExchange implements IDex { }; } - getBlackListKey(address: Address) { - return `blacklist_${address}`.toLowerCase(); - } - - async isBlacklisted(txOrigin: Address): Promise { - const result = await this.dexHelper.cache.get( - this.dexKey, - this.network, - this.getBlackListKey(txOrigin), - ); - return result === 'blacklisted'; - } - - async setBlacklist( - txOrigin: Address, - ttl: number = HASHFLOW_BLACKLIST_TTL_S, - ) { - await this.dexHelper.cache.setex( - this.dexKey, - this.network, - this.getBlackListKey(txOrigin), - ttl, - 'blacklisted', - ); - return true; - } - async getSimpleParam( srcToken: string, destToken: string, @@ -1058,9 +998,9 @@ export class Hashflow extends SimpleExchange implements IDex { .wrapETH(tokenAddress) .toLowerCase(); - const makers = (await this.getCachedMarketMakers()) || []; + const makers = (await this.rateFetcher.getCachedMarketMakers()) || []; const filteredMakers = await this.getFilteredMarketMakers(makers); - const pLevels = (await this.getCachedLevels()) || {}; + const pLevels = (await this.rateFetcher.getCachedLevels()) || {}; let baseToken: Token | undefined = undefined; // TODO: Improve efficiency of this part. Quite inefficient way to determine @@ -1120,4 +1060,8 @@ export class Hashflow extends SimpleExchange implements IDex { this.rateFetcher.stop(); } } + + async isBlacklisted(txOrigin: Address): Promise { + return this.rateFetcher.isBlacklisted(txOrigin); + } } diff --git a/src/dex/hashflow/rate-fetcher.ts b/src/dex/hashflow/rate-fetcher.ts index 2df301a41..8f3b5c7ba 100644 --- a/src/dex/hashflow/rate-fetcher.ts +++ b/src/dex/hashflow/rate-fetcher.ts @@ -1,17 +1,24 @@ +import { + MarketMakersResponse, + PriceLevelsResponse, +} from '@hashflow/taker-js/dist/types/rest'; import { Network } from '../../constants'; import { IDexHelper } from '../../dex-helper'; import { Fetcher, SkippingRequest } from '../../lib/fetcher/fetcher'; import { validateAndCast } from '../../lib/validators'; -import { Logger } from '../../types'; +import { Address, Logger } from '../../types'; import { HashflowMarketMakersResponse, HashflowRateFetcherConfig, HashflowRatesResponse, } from './types'; import { marketMakersValidator, pricesResponseValidator } from './validators'; +import { HASHFLOW_BLACKLIST_TTL_S } from './constants'; +import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub'; export class RateFetcher { private rateFetcher: Fetcher; + private ratePubSub: ExpKeyValuePubSub; private pricesCacheKey: string; private pricesCacheTTL: number; @@ -19,6 +26,8 @@ export class RateFetcher { private marketMakersCacheKey: string; private marketMakersCacheTTL: number; + private blacklistedPubSub: NonExpSetPubSub; + constructor( private dexHelper: IDexHelper, private dexKey: string, @@ -48,16 +57,18 @@ export class RateFetcher { logger, ); + this.ratePubSub = new ExpKeyValuePubSub(dexHelper, dexKey, 'rates'); + this.rateFetcher = new Fetcher( dexHelper.httpRequest, { info: { requestOptions: config.rateConfig.pricesReqParams, requestFunc: async options => { - const { filterMarketMakers, getCachedMarketMakers } = - config.rateConfig; + const { filterMarketMakers } = config.rateConfig; - const cachedMarketMakers = (await getCachedMarketMakers()) || []; + const cachedMarketMakers = + (await this.getCachedMarketMakers()) || []; const filteredMarketMakers = await filterMarketMakers( cachedMarketMakers, ); @@ -86,11 +97,24 @@ export class RateFetcher { config.rateConfig.pricesIntervalMs, logger, ); + + this.blacklistedPubSub = new NonExpSetPubSub( + dexHelper, + dexKey, + 'blacklisted', + ); } - start() { - this.marketMakersFetcher.startPolling(); - this.rateFetcher.startPolling(); + async start() { + if (!this.dexHelper.config.isSlave) { + this.marketMakersFetcher.startPolling(); + this.rateFetcher.startPolling(); + } else { + this.ratePubSub.subscribe(); + + const allBlacklisted = await this.getAllBlacklisted(); + this.blacklistedPubSub.initializeAndSubscribe(allBlacklisted); + } } stop() { @@ -109,10 +133,84 @@ export class RateFetcher { private handleRatesResponse(resp: HashflowRatesResponse): void { const { levels } = resp; - this.dexHelper.cache.rawset( + this.dexHelper.cache.setex( + this.dexKey, + this.network, this.pricesCacheKey, + this.pricesCacheTTL, JSON.stringify(levels), + ); + + this.ratePubSub.publish( + { [this.pricesCacheKey]: levels }, this.pricesCacheTTL, ); } + + async getCachedMarketMakers(): Promise< + MarketMakersResponse['marketMakers'] | null + > { + const cachedMarketMakers = await this.dexHelper.cache.rawget( + this.marketMakersCacheKey, + ); + + if (cachedMarketMakers) { + return JSON.parse( + cachedMarketMakers, + ) as MarketMakersResponse['marketMakers']; + } + + return null; + } + + async getCachedLevels(): Promise { + const cachedLevels = await this.ratePubSub.getAndCache(this.pricesCacheKey); + + if (cachedLevels) { + return cachedLevels as PriceLevelsResponse['levels']; + } + + return null; + } + + async isBlacklisted(txOrigin: Address): Promise { + return this.blacklistedPubSub.has(txOrigin.toLowerCase()); + } + + async setBlacklist( + txOrigin: Address, + ttl: number = HASHFLOW_BLACKLIST_TTL_S, + ) { + await this.dexHelper.cache.setex( + this.dexKey, + this.network, + this.getBlackListKey(txOrigin), + ttl, + 'blacklisted', + ); + + this.blacklistedPubSub.publish([txOrigin.toLowerCase()]); + + return true; + } + + async getAllBlacklisted(): Promise { + const defaultKey = this.getBlackListKey(''); + const pattern = `${defaultKey}*`; + const allBlacklisted = await this.dexHelper.cache.keys( + this.dexKey, + this.network, + pattern, + ); + + return allBlacklisted.map(t => this.getAddressFromBlackListKey(t)); + } + + getBlackListKey(address: Address) { + return `blacklist_${address}`.toLowerCase(); + } + + getAddressFromBlackListKey(key: Address) { + return (key.split('blacklist_')[1] ?? '').toLowerCase(); + } } diff --git a/src/dex/hashflow/types.ts b/src/dex/hashflow/types.ts index 6f6334528..b8ca326b9 100644 --- a/src/dex/hashflow/types.ts +++ b/src/dex/hashflow/types.ts @@ -64,7 +64,6 @@ export type HashflowRateFetcherConfig = { }; pricesIntervalMs: number; markerMakersIntervalMs: number; - getCachedMarketMakers: () => Promise; filterMarketMakers: (makers: string[]) => Promise; pricesCacheKey: string; marketMakersCacheKey: string;