Skip to content

Commit

Permalink
feat: removed develpment logs & helpers from pub-sub
Browse files Browse the repository at this point in the history
  • Loading branch information
KanievskyiDanylo committed Dec 18, 2024
1 parent a0930f1 commit 57dc00f
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 73 deletions.
7 changes: 1 addition & 6 deletions src/dex/cables/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,7 @@ export class CablesRateFetcher {
logger,
);

this.blacklistPubSub = new NonExpSetPubSub(
dexHelper,
dexKey,
'blacklist',
'',
);
this.blacklistPubSub = new NonExpSetPubSub(dexHelper, dexKey, 'blacklist');
this.blacklistFetcher = new Fetcher<CablesBlacklistResponse>(
dexHelper.httpRequest,
{
Expand Down
8 changes: 1 addition & 7 deletions src/dex/dexalot/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,7 @@ export class RateFetcher {
logger,
);

this.blacklistPubSub = new NonExpSetPubSub(
dexHelper,
dexKey,
'blacklist',
this.blacklistCacheKey,
);

this.blacklistPubSub = new NonExpSetPubSub(dexHelper, dexKey, 'blacklist');
this.blacklistFetcher = new Fetcher<DexalotBlacklistResponse>(
dexHelper.httpRequest,
{
Expand Down
1 change: 0 additions & 1 deletion src/dex/generic-rfq/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ export class RateFetcher {
this.dexHelper,
this.dexKey,
'blacklist',
this.blackListCacheKey,
);
}

Expand Down
2 changes: 0 additions & 2 deletions src/dex/hashflow/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ export class RateFetcher {
dexHelper,
dexKey,
'blacklisted',
// TODO-rfq-ps: temporary for validation local with cache
'',
);
}

Expand Down
1 change: 0 additions & 1 deletion src/dex/swaap-v2/rate-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ export class RateFetcher {
this.dexHelper,
this.dexKey,
'blacklist',
'',
);
}

Expand Down
62 changes: 6 additions & 56 deletions src/lib/pub-sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ import NodeCache from 'node-cache';
import { Network } from '../constants';
import { IDexHelper } from '../dex-helper';
import { Logger } from '../types';
import _ from 'lodash';
import jsonDiff from 'json-diff';
import hash from 'object-hash';

type KeyValuePubSubMsg = {
expiresAt: number;
hash: string;
data: Record<string, unknown>;
};

Expand All @@ -19,7 +15,6 @@ export class ExpKeyValuePubSub {
network: Network;
localCache: NodeCache = new NodeCache();

// TODO-rfq-ps: temporary logger
logger: Logger;

constructor(
Expand All @@ -45,20 +40,15 @@ export class ExpKeyValuePubSub {
}

publish(data: Record<string, unknown>, ttl: number) {
const hashedData = hash(data);
this.logger.info(`Publishing to ${this.channel} with hash ${hashedData}`);

const expiresAt = Math.round(Date.now() / 1000) + ttl;
this.dexHelper.cache.publish(
this.channel,
JSON.stringify({ expiresAt, data, hash: hashedData }),
JSON.stringify({ expiresAt, data }),
);
}

handleSubscription(msg: KeyValuePubSubMsg) {
const { expiresAt, data, hash } = msg;

this.logger.info(`Received message from ${this.channel} with hash ${hash}`);
const { expiresAt, data } = msg;

const now = Math.round(Date.now() / 1000);
// calculating ttl as message might come with the delay
Expand All @@ -82,31 +72,16 @@ export class ExpKeyValuePubSub {
async getAndCache<T>(key: string): Promise<T | null> {
const localValue = this.localCache.get<T>(key);

// if (localValue) {
// return localValue;
// }
if (localValue) {
return localValue;
}

const [value, ttl] = await Promise.all([
this.dexHelper.cache.get(this.dexKey, this.network, key),
this.dexHelper.cache.ttl(this.dexKey, this.network, key),
]);

// TODO-rfq-ps: compare local and cache value
const isEqual = _.isEqual(
localValue ?? null,
value ? JSON.parse(value) : null,
);
if (!isEqual) {
this.logger.info(
`Values are not equal for the key ${key}, local: ${JSON.stringify(
localValue,
)}, cache: ${value}, diff: ${jsonDiff.diffString(localValue, value)}`,
);
}

if (value && ttl > 0) {
// setting ttl same as in cache
// TODO-ps: check if ttl is not null
const parsedValue = JSON.parse(value);
this.localCache.set(key, parsedValue, ttl);
return parsedValue;
Expand All @@ -126,14 +101,12 @@ export class NonExpSetPubSub {
network: Network;
set = new Set<string>();

// TODO-rfq-ps: temporary logger
logger: Logger;

constructor(
private dexHelper: IDexHelper,
private dexKey: string,
channel: string,
private blackListCacheKey: string,
) {
this.network = this.dexHelper.config.data.network;
this.channel = `${this.network}_${this.dexKey}_${channel}`;
Expand All @@ -142,7 +115,6 @@ export class NonExpSetPubSub {
}

async initializeAndSubscribe(initialSet: string[]) {
// as there's no lazy load, we need to initialize the set
for (const member of initialSet) {
this.set.add(member);
}
Expand All @@ -160,38 +132,16 @@ export class NonExpSetPubSub {
}

publish(msg: SetPubSubMsg) {
this.logger.info(`Publishing to ${this.channel}`);

// should not be a problem, as we also subscribe to the channel on the same instance
// // as there's no lazy load, also store locally
// for (const key of set) {
// this.set.add(key);
// }
this.dexHelper.cache.publish(this.channel, JSON.stringify(msg));
}

handleSubscription(set: SetPubSubMsg) {
this.logger.info(`Received message from ${this.channel}`);
for (const key of set) {
this.set.add(key);
}
}

async has(key: string) {
const localValue = this.set.has(key);

const value = await this.dexHelper.cache.sismember(
this.blackListCacheKey,
key,
);

// TODO-rfq-ps: compare local and cache value
const isEqual = localValue === value;

if (!isEqual) {
this.logger.error('Values are not equal', { localValue, value });
}

return value;
return this.set.has(key);
}
}

0 comments on commit 57dc00f

Please sign in to comment.