Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: Core & generic-rfq #864

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@paraswap/dex-lib",
"version": "4.0.8",
"version": "4.0.6-rfq-pub-sub.0",
"main": "build/index.js",
"types": "build/index.d.ts",
"repository": "https://github.com/paraswap/paraswap-dex-lib",
Expand Down
6 changes: 2 additions & 4 deletions src/abi/fluid-dex/fluid-dex.abi.json
Original file line number Diff line number Diff line change
Expand Up @@ -381,15 +381,13 @@
},
{
"anonymous": false,
"inputs": [
],
"inputs": [],
"name": "LogPauseSwapAndArbitrage",
"type": "event"
},
{
"anonymous": false,
"inputs": [
],
"inputs": [],
"name": "LogUnpauseSwapAndArbitrage",
"type": "event"
},
Expand Down
21 changes: 21 additions & 0 deletions src/dex-helper/dummy-dex-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ class DummyCache implements ICache {
return null;
}

async keys(
dexKey: string,
network: number,
cacheKey: string,
): Promise<string[]> {
return [];
}

async ttl(
dexKey: string,
network: number,
cacheKey: string,
): Promise<number> {
const key = `${network}_${dexKey}_${cacheKey}`.toLowerCase();
return this.storage[key] ? 1 : -1;
}

async rawget(key: string): Promise<string | null> {
return this.storage[key] ? this.storage[key] : null;
return null;
Expand Down Expand Up @@ -139,6 +156,10 @@ class DummyCache implements ICache {
return set.has(key);
}

async smembers(setKey: string): Promise<string[]> {
return Array.from(this.setMap[setKey] ?? []);
}

async hset(mapKey: string, key: string, value: string): Promise<void> {
if (!this.hashStorage[mapKey]) this.hashStorage[mapKey] = {};
this.hashStorage[mapKey][key] = value;
Expand Down
6 changes: 6 additions & 0 deletions src/dex-helper/icache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ export interface ICache {
cacheKey: string,
): Promise<string | null>;

ttl(dexKey: string, network: number, cacheKey: string): Promise<number>;

keys(dexKey: string, network: number, cacheKey: string): Promise<string[]>;

rawget(key: string): Promise<string | null>;

rawset(key: string, value: string, ttl: number): Promise<string | null>;
Expand Down Expand Up @@ -52,6 +56,8 @@ export interface ICache {

sismember(setKey: string, key: string): Promise<boolean>;

smembers(setKey: string): Promise<string[]>;

hset(mapKey: string, key: string, value: string): Promise<void>;

hdel(mapKey: string, keys: string[]): Promise<number>;
Expand Down
7 changes: 1 addition & 6 deletions src/dex/generic-rfq/generic-rfq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,12 +447,7 @@ export class GenericRFQ extends ParaSwapLimitOrders {
}

async setBlacklist(userAddress: string): Promise<boolean> {
await this.dexHelper.cache.hset(
this.rateFetcher.blackListCacheKey,
userAddress.toLowerCase(),
'true',
);
return true;
return this.rateFetcher.setBlacklist(userAddress);
}

releaseResources(): void {
Expand Down
125 changes: 83 additions & 42 deletions src/dex/generic-rfq/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
ERC1271Contract,
} from '../../lib/erc1271-utils';
import { isContractAddress } from '../../utils';
import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub';

const GET_FIRM_RATE_TIMEOUT_MS = 2000;
export const reversePrice = (price: PriceAndAmountBigNumber) =>
Expand All @@ -55,6 +56,9 @@ export class RateFetcher {
private addressToTokenMap: Record<string, TokenWithInfo> = {};
private pairs: PairMap = {};

private pricesPubSub: ExpKeyValuePubSub;
private blacklistPubSub?: NonExpSetPubSub;

private firmRateAuth?: (options: RequestConfig) => void;

public blackListCacheKey: string;
Expand Down Expand Up @@ -123,6 +127,13 @@ export class RateFetcher {
logger,
);

this.pricesPubSub = new ExpKeyValuePubSub(
this.dexHelper,
this.dexKey,
'prices',
);

this.blackListCacheKey = `${this.dexHelper.config.data.network}_${this.dexKey}_blacklist`;
if (config.blacklistConfig) {
this.blackListFetcher = new Fetcher<BlackListResponse>(
dexHelper.httpRequest,
Expand All @@ -142,9 +153,14 @@ export class RateFetcher {
config.blacklistConfig.intervalMs,
logger,
);

this.blacklistPubSub = new NonExpSetPubSub(
this.dexHelper,
this.dexKey,
'blacklist',
);
}

this.blackListCacheKey = `${this.dexHelper.config.data.network}_${this.dexKey}_blacklist`;
if (this.config.firmRateConfig.secret) {
this.firmRateAuth = this.authHttp(this.config.firmRateConfig.secret);
}
Expand All @@ -161,6 +177,16 @@ export class RateFetcher {
this.config.maker,
);
}

if (this.dexHelper.config.isSlave) {
this.pricesPubSub.subscribe();
if (this.blacklistPubSub) {
const initSet = await this.dexHelper.cache.smembers(
this.blackListCacheKey,
);
this.blacklistPubSub.initializeAndSubscribe(initSet);
}
}
}

start() {
Expand Down Expand Up @@ -214,16 +240,22 @@ export class RateFetcher {
for (const address of resp.blacklist) {
this.dexHelper.cache.sadd(this.blackListCacheKey, address.toLowerCase());
}

if (this.blacklistPubSub) {
this.blacklistPubSub.publish(resp.blacklist);
}
}

public isBlackListed(userAddress: string) {
return this.dexHelper.cache.sismember(
this.blackListCacheKey,
userAddress.toLowerCase(),
);
if (this.blacklistPubSub) {
return this.blacklistPubSub.has(userAddress.toLowerCase());
}
return false;
}

private handleRatesResponse(resp: RatesResponse) {
const pubSubData: Record<string, unknown> = {};
const ttl = this.config.rateConfig.dataTTLS;
const pairs = this.pairs;

if (isEmpty(pairs)) return;
Expand Down Expand Up @@ -252,37 +284,51 @@ export class RateFetcher {
}

if (prices.bids.length) {
const key = `${baseToken.address}_${quoteToken.address}_bids`;
const value = prices.bids;
pubSubData[key] = value;

this.dexHelper.cache.setex(
this.dexKey,
this.dexHelper.config.data.network,
`${baseToken.address}_${quoteToken.address}_bids`,
this.config.rateConfig.dataTTLS,
JSON.stringify(prices.bids),
key,
ttl,
JSON.stringify(value),
);
currentPricePairs.add(`${baseToken.address}_${quoteToken.address}`);
}

if (prices.asks.length) {
const key = `${baseToken.address}_${quoteToken.address}_asks`;
const value = prices.asks;
pubSubData[key] = value;

this.dexHelper.cache.setex(
this.dexKey,
this.dexHelper.config.data.network,
`${baseToken.address}_${quoteToken.address}_asks`,
this.config.rateConfig.dataTTLS,
JSON.stringify(prices.asks),
key,
ttl,
JSON.stringify(value),
);
currentPricePairs.add(`${quoteToken.address}_${baseToken.address}`);
}
});

if (currentPricePairs.size > 0) {
const key = `pairs`;
const value = Array.from(currentPricePairs);
pubSubData[key] = value;

this.dexHelper.cache.setex(
this.dexKey,
this.dexHelper.config.data.network,
`pairs`,
this.config.rateConfig.dataTTLS,
JSON.stringify(Array.from(currentPricePairs)),
key,
ttl,
JSON.stringify(value),
);
}

this.pricesPubSub.publish(pubSubData, ttl);
}

checkHealth(): boolean {
Expand Down Expand Up @@ -322,17 +368,13 @@ export class RateFetcher {
}

public async getAvailablePairs(): Promise<string[]> {
const pairs = await this.dexHelper.cache.get(
this.dexKey,
this.dexHelper.config.data.network,
`pairs`,
);
const pairs = await this.pricesPubSub.getAndCache<string[]>(`pairs`);

if (!pairs) {
return [];
}

return JSON.parse(pairs) as string[];
return pairs;
}

public async getOrderPrice(
Expand All @@ -342,49 +384,36 @@ export class RateFetcher {
): Promise<PriceAndAmountBigNumber[] | null> {
let reversed = false;

let pricesAsString: string | null = null;
let prices: PriceAndAmount[] | null = null;
if (side === SwapSide.SELL) {
pricesAsString = await this.dexHelper.cache.get(
this.dexKey,
this.dexHelper.config.data.network,
prices = await this.pricesPubSub.getAndCache(
`${srcToken.address}_${destToken.address}_bids`,
);

if (!pricesAsString) {
pricesAsString = await this.dexHelper.cache.get(
this.dexKey,
this.dexHelper.config.data.network,
if (!prices) {
prices = await this.pricesPubSub.getAndCache(
`${destToken.address}_${srcToken.address}_asks`,
);
reversed = true;
}
} else {
pricesAsString = await this.dexHelper.cache.get(
this.dexKey,
this.dexHelper.config.data.network,
prices = await this.pricesPubSub.getAndCache(
`${destToken.address}_${srcToken.address}_asks`,
);

if (!pricesAsString) {
pricesAsString = await this.dexHelper.cache.get(
this.dexKey,
this.dexHelper.config.data.network,
if (!prices) {
prices = await this.pricesPubSub.getAndCache(
`${srcToken.address}_${destToken.address}_bids`,
);
reversed = true;
}
}

if (!pricesAsString) {
if (!prices) {
return null;
}

const orderPricesAsString: PriceAndAmount[] = JSON.parse(pricesAsString);
if (!orderPricesAsString) {
return null;
}

let orderPrices = orderPricesAsString.map(price => [
let orderPrices = prices.map(price => [
new BigNumber(price[0]),
new BigNumber(price[1]),
]);
Expand Down Expand Up @@ -483,4 +512,16 @@ export class RateFetcher {
throw e;
}
}

async setBlacklist(userAddress: string): Promise<boolean> {
await this.dexHelper.cache.hset(
this.blackListCacheKey,
userAddress.toLowerCase(),
'true',
);
if (this.blacklistPubSub) {
this.blacklistPubSub.publish([userAddress.toLowerCase()]);
}
return true;
}
}
Loading
Loading