Skip to content

Commit

Permalink
feat: rename JsonPubSub to ExpKeyValuePubSub and SetPubSub to NonExpS…
Browse files Browse the repository at this point in the history
…etPubSub
  • Loading branch information
KanievskyiDanylo committed Dec 18, 2024
1 parent 0a6613a commit a0930f1
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 47 deletions.
10 changes: 5 additions & 5 deletions src/dex/bebop/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { BebopPricingUpdate, tokensResponseValidator } from './validators';
import { WebSocketFetcher } from '../../lib/fetcher/wsFetcher';
import { utils } from 'ethers';
import { BEBOP_RESTRICT_TTL_S, BEBOP_RESTRICTED_CACHE_KEY } from './constants';
import { JsonPubSub } from '../../lib/pub-sub';
import { ExpKeyValuePubSub } from '../../lib/pub-sub';

export function levels_from_flat_array(values: number[]): BebopLevel[] {
const levels: BebopLevel[] = [];
Expand All @@ -26,7 +26,7 @@ export function levels_from_flat_array(values: number[]): BebopLevel[] {
}

export class RateFetcher {
private tokensPricesPubSub: JsonPubSub;
private tokensPricesPubSub: ExpKeyValuePubSub;

private pricesFetcher: WebSocketFetcher<BebopPricingResponse>;
private pricesCacheKey: string;
Expand All @@ -36,7 +36,7 @@ export class RateFetcher {
private tokensAddrCacheKey: string;
private tokensCacheTTL: number;

private restrictPubSub: JsonPubSub;
private restrictPubSub: ExpKeyValuePubSub;

constructor(
private dexHelper: IDexHelper,
Expand All @@ -48,7 +48,7 @@ export class RateFetcher {
this.pricesCacheKey = config.rateConfig.pricesCacheKey;
this.pricesCacheTTL = config.rateConfig.pricesCacheTTLSecs;

this.tokensPricesPubSub = new JsonPubSub(
this.tokensPricesPubSub = new ExpKeyValuePubSub(
this.dexHelper,
this.dexKey,
'tokensPrices',
Expand Down Expand Up @@ -97,7 +97,7 @@ export class RateFetcher {
logger,
);

this.restrictPubSub = new JsonPubSub(
this.restrictPubSub = new ExpKeyValuePubSub(
dexHelper,
dexKey,
'restrict',
Expand Down
21 changes: 13 additions & 8 deletions src/dex/cables/rate-fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Network } from '../../constants';
import { IDexHelper } from '../../dex-helper';
import { Fetcher } from '../../lib/fetcher/fetcher';
import { JsonPubSub, SetPubSub } from '../../lib/pub-sub';
import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub';
import { validateAndCast } from '../../lib/validators';
import { Address, Logger, Token } from '../../types';
import { Address, Logger } from '../../types';
import { PairData } from '../cables/types';
import {
CABLES_RESTRICT_TTL_S,
Expand All @@ -24,7 +24,7 @@ import {
} from './validators';

export class CablesRateFetcher {
private tokensPairsPricesPubSub: JsonPubSub;
private tokensPairsPricesPubSub: ExpKeyValuePubSub;

public tokensFetcher: Fetcher<CablesTokensResponse>;
public tokensCacheKey: string;
Expand All @@ -39,11 +39,11 @@ export class CablesRateFetcher {
public pricesCacheTTL: number;

public blacklistFetcher: Fetcher<CablesBlacklistResponse>;
private blacklistPubSub: SetPubSub;
private blacklistPubSub: NonExpSetPubSub;
public blacklistCacheKey: string;
public blacklistCacheTTL: number;

private restrictPubSub: JsonPubSub;
private restrictPubSub: ExpKeyValuePubSub;

constructor(
private dexHelper: IDexHelper,
Expand All @@ -64,7 +64,7 @@ export class CablesRateFetcher {
this.blacklistCacheKey = config.rateConfig.blacklistCacheKey;
this.blacklistCacheTTL = config.rateConfig.blacklistCacheTTLSecs;

this.tokensPairsPricesPubSub = new JsonPubSub(
this.tokensPairsPricesPubSub = new ExpKeyValuePubSub(
dexHelper,
dexKey,
'tokensPairsPrices',
Expand Down Expand Up @@ -106,7 +106,12 @@ export class CablesRateFetcher {
logger,
);

this.blacklistPubSub = new SetPubSub(dexHelper, dexKey, 'blacklist', '');
this.blacklistPubSub = new NonExpSetPubSub(
dexHelper,
dexKey,
'blacklist',
'',
);
this.blacklistFetcher = new Fetcher<CablesBlacklistResponse>(
dexHelper.httpRequest,
{
Expand Down Expand Up @@ -143,7 +148,7 @@ export class CablesRateFetcher {
logger,
);

this.restrictPubSub = new JsonPubSub(
this.restrictPubSub = new ExpKeyValuePubSub(
dexHelper,
dexKey,
'restrict',
Expand Down
18 changes: 11 additions & 7 deletions src/dex/dexalot/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import {
DEXALOT_RESTRICT_TTL_S,
DEXALOT_RESTRICTED_CACHE_KEY,
} from './constants';
import { JsonPubSub, SetPubSub } from '../../lib/pub-sub';
import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub';

export class RateFetcher {
private pairsFetcher: Fetcher<DexalotPairsResponse>;
private pairsCacheKey: string;

private rateFetcher: Fetcher<DexalotPricesResponse>;
private rateTokensPubSub: JsonPubSub;
private rateTokensPubSub: ExpKeyValuePubSub;
private pricesCacheKey: string;
private pricesCacheTTL: number;

Expand All @@ -41,11 +41,11 @@ export class RateFetcher {
private tokensCacheTTL: number;

private blacklistFetcher: Fetcher<DexalotBlacklistResponse>;
private blacklistPubSub: SetPubSub;
private blacklistPubSub: NonExpSetPubSub;
private blacklistCacheKey: string;
private blacklistCacheTTL: number;

private restrictedPoolPubSub: JsonPubSub;
private restrictedPoolPubSub: ExpKeyValuePubSub;

constructor(
private dexHelper: IDexHelper,
Expand All @@ -63,7 +63,11 @@ export class RateFetcher {
this.blacklistCacheKey = config.rateConfig.blacklistCacheKey;
this.blacklistCacheTTL = config.rateConfig.blacklistCacheTTLSecs;

this.rateTokensPubSub = new JsonPubSub(dexHelper, dexKey, 'rateTokens');
this.rateTokensPubSub = new ExpKeyValuePubSub(
dexHelper,
dexKey,
'rateTokens',
);

this.pairsFetcher = new Fetcher<DexalotPairsResponse>(
dexHelper.httpRequest,
Expand Down Expand Up @@ -101,7 +105,7 @@ export class RateFetcher {
logger,
);

this.blacklistPubSub = new SetPubSub(
this.blacklistPubSub = new NonExpSetPubSub(
dexHelper,
dexKey,
'blacklist',
Expand All @@ -126,7 +130,7 @@ export class RateFetcher {
logger,
);

this.restrictedPoolPubSub = new JsonPubSub(
this.restrictedPoolPubSub = new ExpKeyValuePubSub(
dexHelper,
dexKey,
'restricted-pool',
Expand Down
14 changes: 9 additions & 5 deletions src/dex/generic-rfq/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {
ERC1271Contract,
} from '../../lib/erc1271-utils';
import { isContractAddress } from '../../utils';
import { JsonPubSub, SetPubSub } from '../../lib/pub-sub';
import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub';

const GET_FIRM_RATE_TIMEOUT_MS = 2000;
export const reversePrice = (price: PriceAndAmountBigNumber) =>
Expand All @@ -56,8 +56,8 @@ export class RateFetcher {
private addressToTokenMap: Record<string, TokenWithInfo> = {};
private pairs: PairMap = {};

private pricesPubSub: JsonPubSub;
private blacklistPubSub?: SetPubSub;
private pricesPubSub: ExpKeyValuePubSub;
private blacklistPubSub?: NonExpSetPubSub;

private firmRateAuth?: (options: RequestConfig) => void;

Expand Down Expand Up @@ -127,7 +127,11 @@ export class RateFetcher {
logger,
);

this.pricesPubSub = new JsonPubSub(this.dexHelper, this.dexKey, 'prices');
this.pricesPubSub = new ExpKeyValuePubSub(
this.dexHelper,
this.dexKey,
'prices',
);

this.blackListCacheKey = `${this.dexHelper.config.data.network}_${this.dexKey}_blacklist`;
if (config.blacklistConfig) {
Expand All @@ -150,7 +154,7 @@ export class RateFetcher {
logger,
);

this.blacklistPubSub = new SetPubSub(
this.blacklistPubSub = new NonExpSetPubSub(
this.dexHelper,
this.dexKey,
'blacklist',
Expand Down
10 changes: 5 additions & 5 deletions src/dex/hashflow/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ import {
} from './types';
import { marketMakersValidator, pricesResponseValidator } from './validators';
import { HASHFLOW_BLACKLIST_TTL_S } from './constants';
import { JsonPubSub, SetPubSub } from '../../lib/pub-sub';
import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub';

export class RateFetcher {
private rateFetcher: Fetcher<HashflowRatesResponse>;
private ratePubSub: JsonPubSub;
private ratePubSub: ExpKeyValuePubSub;
private pricesCacheKey: string;
private pricesCacheTTL: number;

private marketMakersFetcher: Fetcher<HashflowMarketMakersResponse>;
private marketMakersCacheKey: string;
private marketMakersCacheTTL: number;

private blacklistedPubSub: SetPubSub;
private blacklistedPubSub: NonExpSetPubSub;

constructor(
private dexHelper: IDexHelper,
Expand Down Expand Up @@ -57,7 +57,7 @@ export class RateFetcher {
logger,
);

this.ratePubSub = new JsonPubSub(dexHelper, dexKey, 'rates');
this.ratePubSub = new ExpKeyValuePubSub(dexHelper, dexKey, 'rates');

this.rateFetcher = new Fetcher<HashflowRatesResponse>(
dexHelper.httpRequest,
Expand Down Expand Up @@ -98,7 +98,7 @@ export class RateFetcher {
logger,
);

this.blacklistedPubSub = new SetPubSub(
this.blacklistedPubSub = new NonExpSetPubSub(
dexHelper,
dexKey,
'blacklisted',
Expand Down
14 changes: 7 additions & 7 deletions src/dex/swaap-v2/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from './constants';
import { RequestConfig } from '../../dex-helper/irequest-wrapper';
import { Network } from '../../constants';
import { JsonPubSub, SetPubSub } from '../../lib/pub-sub';
import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub';

const BLACKLISTED = 'blacklisted';

Expand All @@ -44,9 +44,9 @@ export class RateFetcher {
private tokenCacheKey: string;
private pricesCacheKey: string;

private rateTokensPubSub: JsonPubSub;
private restrictedPubSub: JsonPubSub;
private blacklistPubSub: SetPubSub;
private rateTokensPubSub: ExpKeyValuePubSub;
private restrictedPubSub: ExpKeyValuePubSub;
private blacklistPubSub: NonExpSetPubSub;

constructor(
private dexHelper: IDexHelper,
Expand All @@ -60,7 +60,7 @@ export class RateFetcher {
this.pricesCacheKey = config.rateConfig.pricesCacheKey;
this.tokenCacheKey = config.tokensConfig.tokensCacheKey;

this.rateTokensPubSub = new JsonPubSub(
this.rateTokensPubSub = new ExpKeyValuePubSub(
this.dexHelper,
this.dexKey,
'rateTokens',
Expand Down Expand Up @@ -102,15 +102,15 @@ export class RateFetcher {
logger,
);

this.restrictedPubSub = new JsonPubSub(
this.restrictedPubSub = new ExpKeyValuePubSub(
this.dexHelper,
this.dexKey,
'restricted',
'pool_is_not_restricted',
SWAAP_POOL_RESTRICT_TTL_S,
);

this.blacklistPubSub = new SetPubSub(
this.blacklistPubSub = new NonExpSetPubSub(
this.dexHelper,
this.dexKey,
'blacklist',
Expand Down
20 changes: 10 additions & 10 deletions src/lib/pub-sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import _ from 'lodash';
import jsonDiff from 'json-diff';
import hash from 'object-hash';

type JsonPubSubMsg = {
type KeyValuePubSubMsg = {
expiresAt: number;
hash: string;
data: Record<string, unknown>;
};

type SetPubSubMsg = string[];

export class JsonPubSub {
export class ExpKeyValuePubSub {
channel: string;
network: Network;
localCache: NodeCache = new NodeCache();
Expand All @@ -32,14 +32,14 @@ export class JsonPubSub {
this.network = this.dexHelper.config.data.network;
this.channel = `${this.network}_${this.dexKey}_${channel}`;

this.logger = this.dexHelper.getLogger(`JsonPubSub_${this.channel}`);
this.logger = this.dexHelper.getLogger(`ExpKeyValuePubSub_${this.channel}`);
}

subscribe() {
this.logger.info(`Subscribing to ${this.channel}`);

this.dexHelper.cache.subscribe(this.channel, (_, msg) => {
const decodedMsg = JSON.parse(msg) as JsonPubSubMsg;
const decodedMsg = JSON.parse(msg) as KeyValuePubSubMsg;
this.handleSubscription(decodedMsg);
});
}
Expand All @@ -55,8 +55,8 @@ export class JsonPubSub {
);
}

handleSubscription(json: JsonPubSubMsg) {
const { expiresAt, data, hash } = json;
handleSubscription(msg: KeyValuePubSubMsg) {
const { expiresAt, data, hash } = msg;

this.logger.info(`Received message from ${this.channel} with hash ${hash}`);

Expand Down Expand Up @@ -121,7 +121,7 @@ export class JsonPubSub {
}
}

export class SetPubSub {
export class NonExpSetPubSub {
channel: string;
network: Network;
set = new Set<string>();
Expand All @@ -138,7 +138,7 @@ export class SetPubSub {
this.network = this.dexHelper.config.data.network;
this.channel = `${this.network}_${this.dexKey}_${channel}`;

this.logger = this.dexHelper.getLogger(`SetPubSub_${this.channel}`);
this.logger = this.dexHelper.getLogger(`NonExpSetPubSub_${this.channel}`);
}

async initializeAndSubscribe(initialSet: string[]) {
Expand All @@ -159,15 +159,15 @@ export class SetPubSub {
});
}

publish(set: SetPubSubMsg) {
publish(msg: SetPubSubMsg) {
this.logger.info(`Publishing to ${this.channel}`);

// should not be a problem, as we also subscribe to the channel on the same instance
// // as there's no lazy load, also store locally
// for (const key of set) {
// this.set.add(key);
// }
this.dexHelper.cache.publish(this.channel, JSON.stringify(set));
this.dexHelper.cache.publish(this.channel, JSON.stringify(msg));
}

handleSubscription(set: SetPubSubMsg) {
Expand Down

0 comments on commit a0930f1

Please sign in to comment.