Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/paginate priority logs #288

Open
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"@nestjs/schematics": "^10.0.0",
"@nestjs/testing": "^10.0.0",
"@types/cookie-parser": "^1.4.7",
"@types/eventsource": "^1.1.15",
"@types/express": "^4.17.17",
"@types/jest": "^29.5.2",
"@types/node": "^20.3.1",
Expand Down
10 changes: 3 additions & 7 deletions backend/src/activity/activity.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import { ActivityService } from './activity.service';
import { SessionGuard } from '../session.guard';
import { Request, Response } from 'express';
import { KEEP_ALIVE_MESSAGE, SSE_HEADER } from '../../../src/constants/sse';

@Controller('activity')
@UseGuards(SessionGuard)
Expand Down Expand Up @@ -42,19 +43,14 @@ export class ActivityController {

@Get('stream')
sse(@Req() req: Request, @Res() res: Response) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
});
res.writeHead(200, SSE_HEADER);

res.flushHeaders();

this.activityService.addClient(res);

const heartbeatInterval = setInterval(() => {
res.write(': keep-alive\n\n');
res.write(KEEP_ALIVE_MESSAGE);
}, 10000);

req.on('close', () => {
Expand Down
10 changes: 5 additions & 5 deletions backend/src/activity/activity.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Activity } from './entities/activity.entity';
import { ActivityType } from '../../../src/types';
import { UpdateOptions, Op } from 'sequelize';
import { Response } from 'express';
import { ClientManager } from '../utils/client-manager';

@Injectable()
export class ActivityService {
Expand All @@ -12,19 +13,18 @@ export class ActivityService {
private activityRepository: typeof Activity,
) {}

private clients: Response[] = [];
private clientManager = new ClientManager();

public addClient(client: Response) {
this.clients.push(client);
this.clientManager.addClient(client);
}

public removeClient(client: Response) {
this.clients = this.clients.filter((c) => c !== client);
this.clientManager.removeClient(client);
}

public sendMessageToClients(data: any) {
const message = `data: ${JSON.stringify(data)}\n\n`;
this.clients.forEach((client) => client.write(message));
this.clientManager.sendMessageToClients(data);
}

public async storeActivity(data: string, pubKey: string, type: ActivityType) {
Expand Down
21 changes: 20 additions & 1 deletion backend/src/logs/logs.controller.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// src/logs/logs.controller.ts
import { Controller, Get, Res, Req, Param, UseGuards } from '@nestjs/common';
import { Request, Response } from 'express';
import { LogsService } from './logs.service';
import { SessionGuard } from '../session.guard';
import { KEEP_ALIVE_MESSAGE, SSE_HEADER } from '../../../src/constants/sse';

@Controller('logs')
@UseGuards(SessionGuard)
Expand All @@ -26,6 +26,25 @@ export class LogsController {
return this.logsService.readLogMetrics();
}

@Get('priority-log-stream')
sse(@Req() req: Request, @Res() res: Response) {
res.writeHead(200, SSE_HEADER);

res.flushHeaders();

this.logsService.addClient(res);

const heartbeatInterval = setInterval(() => {
res.write(KEEP_ALIVE_MESSAGE);
}, 10000);

req.on('close', () => {
clearInterval(heartbeatInterval);
this.logsService.removeClient(res);
res.end();
});
}

@Get('dismiss/:index')
dismissLogAlert(@Param('index') index: string) {
return this.logsService.dismissLog(index);
Expand Down
24 changes: 21 additions & 3 deletions backend/src/logs/logs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { LogLevels, LogType, SSELog } from '../../../src/types';
import { InjectModel } from '@nestjs/sequelize';
import { Log } from './entities/log.entity';
import { Op } from 'sequelize';
import { ClientManager } from '../utils/client-manager';

@Injectable()
export class LogsService {
Expand All @@ -20,14 +21,28 @@ export class LogsService {

private sseStreams: Map<string, Subject<any>> = new Map();

private clientManager = new ClientManager();

public addClient(client: Response) {
this.clientManager.addClient(client);
}

public removeClient(client: Response) {
this.clientManager.removeClient(client);
}

public sendMessageToClients(data: any) {
this.clientManager.sendMessageToClients(data);
}

public async startSse(url: string, type: LogType) {
console.log(`starting sse ${url}, ${type}...`);
const eventSource = new EventSource(url);

const sseStream: Subject<any> = new Subject();
this.sseStreams.set(url, sseStream);

eventSource.onmessage = (event) => {
eventSource.onmessage = async (event) => {
let newData;

try {
Expand All @@ -39,10 +54,13 @@ export class LogsService {
const { level } = newData;

if (level !== LogLevels.INFO) {
this.logRepository.create(
const result = (await this.logRepository.create(
{ type, level, data: JSON.stringify(newData), isHidden: false },
{ ignoreDuplicates: true },
);
)) as any;

this.sendMessageToClients(result.dataValues);

if (this.isDebug) {
console.log(
newData,
Expand Down
18 changes: 18 additions & 0 deletions backend/src/utils/client-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Response } from 'express';

export class ClientManager {
private clients: Response[] = [];

public addClient(client: Response): void {
this.clients.push(client);
}

public removeClient(client: Response): void {
this.clients = this.clients.filter((c) => c !== client);
}

public sendMessageToClients(data: any): void {
const message = `data: ${JSON.stringify(data)}\n\n`;
this.clients.forEach((client) => client.write(message));
}
}
6 changes: 6 additions & 0 deletions backend/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,11 @@
resolved "https://registry.yarnpkg.com/@types/estree/-/estree-1.0.5.tgz#a6ce3e556e00fd9895dd872dd172ad0d4bd687f4"
integrity sha512-/kYRxGDLWzHOB7q+wtSUQlFrtcdUccpfy+X+9iMBpHK8QLLhx2wIPYuS5DYtR9Wa/YlZAbIovy7qVdB1Aq6Lyw==

"@types/eventsource@^1.1.15":
version "1.1.15"
resolved "https://registry.yarnpkg.com/@types/eventsource/-/eventsource-1.1.15.tgz#949383d3482e20557cbecbf3b038368d94b6be27"
integrity sha512-XQmGcbnxUNa06HR3VBVkc9+A2Vpi9ZyLJcdS5dwaQQ/4ZMWFO+5c90FnMUpbtMZwB/FChoYHwuVg8TvkECacTA==

"@types/express-serve-static-core@^4.17.33":
version "4.19.0"
resolved "https://registry.yarnpkg.com/@types/express-serve-static-core/-/express-serve-static-core-4.19.0.tgz#3ae8ab3767d98d0b682cda063c3339e1e86ccfaa"
Expand Down Expand Up @@ -6049,6 +6054,7 @@ wkx@^0.5.0:
"@types/node" "*"

"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0", wrap-ansi@^7.0.0:
name wrap-ansi-cjs
version "7.0.0"
resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43"
integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==
Expand Down
3 changes: 3 additions & 0 deletions siren.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ const handleSSe = (res, req, url) => {
app.prepare().then(() => {
const server = express()
server.get('/activity-stream', (req, res) => handleSSe(res, req, `${backendUrl}/activity/stream`))
server.get('/priority-log-stream', (req, res) =>
handleSSe(res, req, `${backendUrl}/logs/priority-log-stream`),
)
server.get('/validator-logs', (req, res) => handleSSe(res, req, `${backendUrl}/logs/validator`))
server.get('/beacon-logs', (req, res) => handleSSe(res, req, `${backendUrl}/logs/beacon`))

Expand Down
9 changes: 9 additions & 0 deletions src/components/AlertInfo/AlertInfo.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import sortAlertMessagesBySeverity from '../../../utilities/sortAlerts'
import useDiagnosticAlerts from '../../hooks/useDiagnosticAlerts'
import useDivDimensions from '../../hooks/useDivDimensions'
import useMediaQuery from '../../hooks/useMediaQuery'
import useSSEData from '../../hooks/useSSEData'
import { proposerDuties } from '../../recoil/atoms'
import { LogLevels, StatusColor } from '../../types'
import AlertCard from '../AlertCard/AlertCard'
Expand All @@ -24,6 +25,14 @@ const AlertInfo: FC<AlertInfoProps> = ({ metrics, ...props }) => {
const [filter, setFilter] = useState('all')
const duties = useRecoilValue(proposerDuties)

const { data: streamedData } = useSSEData({
url: '/priority-log-stream',
isReady: true,
isStateStore: true,
})

console.log(streamedData)

const priorityLogAlerts = useMemo(() => {
return Object.values(metrics)
.flat()
Expand Down
8 changes: 8 additions & 0 deletions src/constants/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export const SSE_HEADER = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
}

export const KEEP_ALIVE_MESSAGE = ': keep-alive\n\n'