Skip to content

Commit

Permalink
Reconnect in the stats server (#277)
Browse files Browse the repository at this point in the history
* reconnect in the stats server

* log
  • Loading branch information
brittcyr authored Nov 12, 2024
1 parent 47f5f2c commit db688d4
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions debug-ui/scripts/stats-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ const depth: promClient.Gauge<'depth_bps' | 'market' | 'trader'> =
*/
export class ManifestStatsServer {
private connection: Connection;
private ws: WebSocket;
private ws: WebSocket | null = null;
// Base and quote volume
private baseVolumeAtomsSinceLastCheckpoint: Map<string, number> = new Map();
private quoteVolumeAtomsSinceLastCheckpoint: Map<string, number> = new Map();
Expand All @@ -84,29 +84,33 @@ export class ManifestStatsServer {

constructor() {
this.connection = new Connection(RPC_URL!);
this.ws = new WebSocket('wss://mfx-feed-mainnet.fly.dev');
this.resetWebsocket();
}

private resetWebsocket() {
// Allow old one to timeout.
if (this.ws != null) {
try {
this.ws.close();
} catch (err) {}
}

this.ws = new WebSocket('wss://mfx-feed-mainnet.fly.dev');

this.ws.on('open', () => {});
this.ws.onopen = () => {};

this.ws.on('close', () => {
// Rely on the next iteration to force a reset.
console.log('Disconnected. Reconnecting');
this.ws.onclose = () => {
// Rely on the next iteration to force a reconnect. This happens without a
// keep-alive.
reconnects.inc();
});
this.ws.on('error', () => {
// Rely on the next iteration to force a reset.
console.log('Error. Reconnecting');
};
this.ws.onerror = () => {
// Rely on the next iteration to force a reconnect.
reconnects.inc();
});
};

this.ws.on('message', async (message) => {
const fill: FillLogResult = JSON.parse(message.toString());
this.ws.onmessage = async (message) => {
const fill: FillLogResult = JSON.parse(message.data.toString());
const { market, baseAtoms, quoteAtoms, price, slot } = fill;

// Do not accept old spurious messages.
Expand All @@ -116,6 +120,7 @@ export class ManifestStatsServer {
this.lastFillSlot = slot;

fills.inc({ market });
console.log('Got fill', fill);

if (this.markets.get(market) == undefined) {
this.baseVolumeAtomsSinceLastCheckpoint.set(market, 0);
Expand Down Expand Up @@ -156,7 +161,7 @@ export class ManifestStatsServer {
this.quoteVolumeAtomsSinceLastCheckpoint.get(market)! +
Number(quoteAtoms),
);
});
};
}

/**
Expand Down Expand Up @@ -205,6 +210,12 @@ export class ManifestStatsServer {
this.resetWebsocket();

this.markets.forEach((value: Market, market: string) => {
console.log(
'Saving checkpoints for market',
market,
'base since last',
this.baseVolumeAtomsSinceLastCheckpoint.get(market),
);
this.baseVolumeAtomsCheckpoints.set(market, [
...this.baseVolumeAtomsCheckpoints.get(market)!.slice(1),
this.baseVolumeAtomsSinceLastCheckpoint.get(market)!,
Expand Down

0 comments on commit db688d4

Please sign in to comment.