Skip to content

Commit

Permalink
feat: pub-sub for swaap-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
KanievskyiDanylo committed Dec 18, 2024
1 parent eeafb69 commit 6353b26
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 96 deletions.
4 changes: 2 additions & 2 deletions src/dex/swaap-v2/constants.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import BigNumber from 'bignumber.js';

export const SWAAP_RFQ_PRICES_CACHES_TTL_S = 3;
export const SWAAP_RFQ_PRICES_CACHES_TTL_S = 5;

export const SWAAP_RFQ_QUOTE_TIMEOUT_MS = 2000;

export const SWAAP_RFQ_API_PRICES_POLLING_INTERVAL_MS = 1000;
export const SWAAP_RFQ_API_PRICES_POLLING_INTERVAL_MS = 2000;

export const SWAAP_RFQ_API_TOKENS_POLLING_INTERVAL_MS = 1000 * 60 * 60; // 1 hour

Expand Down
140 changes: 136 additions & 4 deletions src/dex/swaap-v2/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
} from './types';
import {
priceLevelsResponseValidator,
getQuoteResponseValidator,
getTokensResponseValidator,
notifyResponseValidator,
getQuoteResponseWithRecipientValidator,
Expand All @@ -26,8 +25,16 @@ import {
SWAAP_RFQ_QUOTE_TIMEOUT_MS,
SWAAP_NOTIFY_TIMEOUT_MS,
SWAAP_NOTIFICATION_ORIGIN,
SWAAP_TOKENS_CACHE_KEY,
SWAAP_PRICES_CACHE_KEY,
SWAAP_403_TTL_S,
SWAAP_POOL_RESTRICT_TTL_S,
} from './constants';
import { RequestConfig } from '../../dex-helper/irequest-wrapper';
import { Network } from '../../constants';
import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub';

const BLACKLISTED = 'blacklisted';

export class RateFetcher {
private rateFetcher: Fetcher<SwaapV2PriceLevelsResponse>;
Expand All @@ -37,8 +44,13 @@ export class RateFetcher {
private tokenCacheKey: string;
private pricesCacheKey: string;

private rateTokensPubSub: ExpKeyValuePubSub;
private restrictedPubSub: ExpKeyValuePubSub;
private blacklistPubSub: NonExpSetPubSub;

constructor(
private dexHelper: IDexHelper,
private network: Network,
private dexKey: string,
private logger: Logger,
config: SwaapV2RateFetcherConfig,
Expand All @@ -48,6 +60,12 @@ export class RateFetcher {
this.pricesCacheKey = config.rateConfig.pricesCacheKey;
this.tokenCacheKey = config.tokensConfig.tokensCacheKey;

this.rateTokensPubSub = new ExpKeyValuePubSub(
this.dexHelper,
this.dexKey,
'rateTokens',
);

this.rateFetcher = new Fetcher<SwaapV2PriceLevelsResponse>(
dexHelper.httpRequest,
{
Expand Down Expand Up @@ -83,11 +101,33 @@ export class RateFetcher {
config.tokensConfig.tokensIntervalMs,
logger,
);

this.restrictedPubSub = new ExpKeyValuePubSub(
this.dexHelper,
this.dexKey,
'restricted',
'pool_is_not_restricted',
SWAAP_POOL_RESTRICT_TTL_S,
);

this.blacklistPubSub = new NonExpSetPubSub(
this.dexHelper,
this.dexKey,
'blacklist',
);
}

start() {
this.rateFetcher.startPolling();
this.tokensFetcher.startPolling();
async start() {
if (!this.dexHelper.config.isSlave) {
this.rateFetcher.startPolling();
this.tokensFetcher.startPolling();
} else {
this.rateTokensPubSub.subscribe();
this.restrictedPubSub.subscribe();

const initSet = await this.getAllBlacklisted();
this.blacklistPubSub.initializeAndSubscribe(initSet);
}
}

stop() {
Expand All @@ -112,6 +152,11 @@ export class RateFetcher {
this.tokensCacheTTL,
JSON.stringify(tokensMap),
);

this.rateTokensPubSub.publish(
{ [this.tokenCacheKey]: tokensMap },
this.tokensCacheTTL,
);
}

private handleRatesResponse(resp: SwaapV2PriceLevelsResponse): void {
Expand Down Expand Up @@ -149,6 +194,11 @@ export class RateFetcher {
this.pricesCacheTTL,
JSON.stringify(levels),
);

this.rateTokensPubSub.publish(
{ [this.pricesCacheKey]: levels },
this.pricesCacheTTL,
);
}

async getQuote(
Expand Down Expand Up @@ -264,4 +314,86 @@ export class RateFetcher {
throw e;
}
}

async getCachedTokens(): Promise<TokensMap | null> {
const cachedTokens = await this.rateTokensPubSub.getAndCache(
this.tokenCacheKey,
);

if (cachedTokens) {
return cachedTokens as TokensMap;
}

return null;
}

async getCachedLevels(): Promise<Record<string, SwaapV2PriceLevels> | null> {
const cachedLevels = await this.rateTokensPubSub.getAndCache(
this.pricesCacheKey,
);

if (cachedLevels) {
return cachedLevels as Record<string, SwaapV2PriceLevels>;
}

return null;
}

async isBlacklisted(txOrigin: Address): Promise<boolean> {
return this.blacklistPubSub.has(txOrigin.toLowerCase());
}

getBlackListKey(address: Address) {
return `blacklist_${address}`.toLowerCase();
}

async setBlacklist(
txOrigin: Address,
ttl: number = SWAAP_403_TTL_S,
): Promise<boolean> {
await this.dexHelper.cache.setex(
this.dexKey,
this.network,
this.getBlackListKey(txOrigin),
ttl,
BLACKLISTED,
);

this.blacklistPubSub.publish([txOrigin.toLowerCase()]);
return true;
}

async getAllBlacklisted(): Promise<Address[]> {
const defaultKey = this.getBlackListKey('');
const pattern = `${defaultKey}*`;
const allBlacklisted = await this.dexHelper.cache.keys(
this.dexKey,
this.network,
pattern,
);

return allBlacklisted.map(t => this.getAddressFromBlackListKey(t));
}

getAddressFromBlackListKey(key: Address) {
return (key.split('blacklist_')[1] ?? '').toLowerCase();
}

async restrictPool(poolIdentifier: string): Promise<void> {
await this.restrictedPubSub.publish(
{ [this.getRestrictPoolKey(poolIdentifier)]: 'restricted' },
SWAAP_POOL_RESTRICT_TTL_S,
);
}

async isRestrictedPool(poolIdentifier: string): Promise<boolean> {
const restricted = await this.restrictedPubSub.getAndCache(
this.getRestrictPoolKey(poolIdentifier),
);
return restricted === 'restricted';
}

getRestrictPoolKey(poolIdentifier: string): string {
return `restricted_mms_${poolIdentifier}`;
}
}
Loading

0 comments on commit 6353b26

Please sign in to comment.