diff --git a/frontend/src/components/CeleryStateSync.tsx b/frontend/src/components/CeleryStateSync.tsx index 1fa7df8..75357c4 100644 --- a/frontend/src/components/CeleryStateSync.tsx +++ b/frontend/src/components/CeleryStateSync.tsx @@ -1,15 +1,10 @@ import { handleEvent, loadInitialState, useStateStore } from "@stores/useStateStore" +import { toWebSocketUri } from "@utils/webSocketUtils" import React, { useEffect } from "react" import useWebSocket from "react-use-websocket" -const toWsUri = (path: string): string => { - const location = window.location - const protocol = location.protocol === "https:" ? "wss:" : "ws:" - return `${protocol}//${location.host}/${path}` -} - const CeleryStateSync: React.FC = () => { - const { readyState } = useWebSocket(toWsUri("ws/events"), { + const { readyState } = useWebSocket(toWebSocketUri("ws/events"), { shouldReconnect: () => true, onError: (error) => console.error("Error connecting to websockets!", error), onReconnectStop: (numAttempts) => console.error(`Out of attempts to reconnected websockets (${numAttempts})`), diff --git a/frontend/src/layout/header/WSStatus.tsx b/frontend/src/components/common/WsStateIcon.tsx similarity index 79% rename from frontend/src/layout/header/WSStatus.tsx rename to frontend/src/components/common/WsStateIcon.tsx index 19b7916..db4b5d9 100644 --- a/frontend/src/layout/header/WSStatus.tsx +++ b/frontend/src/components/common/WsStateIcon.tsx @@ -7,8 +7,6 @@ import Slide from "@mui/material/Slide" import Stack from "@mui/material/Stack" import Tooltip from "@mui/material/Tooltip" import Typography from "@mui/material/Typography" -import useSettingsStore from "@stores/useSettingsStore" -import { useStateStore } from "@stores/useStateStore" import React, { useEffect, useState } from "react" import { ReadyState } from "react-use-websocket" @@ -40,23 +38,26 @@ const statusMeta: Record = { }, } -const WSStatus: React.FC = () => { - const isDemo = useSettingsStore((state) => state.demo) - const status = useStateStore((store) => store.status) - const meta: Meta = isDemo - ? { - description: "Demo Mode", - icon: , - } - : statusMeta[status] +const demoMeta: Meta = { + description: "Demo Mode", + icon: , +} + +interface WsStateIconProps { + state: ReadyState + isDemo?: boolean +} +const WsStateIcon: React.FC = ({ state, isDemo }) => { + const meta: Meta = isDemo ? demoMeta : statusMeta[state] const [isOpen, setOpen] = useState(true) useEffect(() => { setOpen(true) const token = setTimeout(() => setOpen(false), 1000 * 5) return () => clearTimeout(token) - }, [status, isDemo]) + }, [state, isDemo]) + return ( @@ -68,4 +69,4 @@ const WSStatus: React.FC = () => { ) } -export default WSStatus +export default WsStateIcon diff --git a/frontend/src/components/raw_events/LimitSelect.tsx b/frontend/src/components/raw_events/LimitSelect.tsx new file mode 100644 index 0000000..42ffc71 --- /dev/null +++ b/frontend/src/components/raw_events/LimitSelect.tsx @@ -0,0 +1,31 @@ +import FormControl from "@mui/material/FormControl" +import InputLabel from "@mui/material/InputLabel" +import MenuItem from "@mui/material/MenuItem" +import Select from "@mui/material/Select" +import React from "react" + +interface LimitSelectProps { + limit: number + setLimit: (limit: number) => void +} + +export const LimitSelect: React.FC = ({ limit, setLimit }) => { + return ( + + Limit + + + ) +} diff --git a/frontend/src/components/raw_events/RawEventRow.tsx b/frontend/src/components/raw_events/RawEventRow.tsx new file mode 100644 index 0000000..eee046c --- /dev/null +++ b/frontend/src/components/raw_events/RawEventRow.tsx @@ -0,0 +1,58 @@ +import TaskAvatar from "@components/task/TaskAvatar" +import { CeleryEvent } from "@hooks/useRawEvents" +import KeyboardArrowDownIcon from "@mui/icons-material/KeyboardArrowDown" +import KeyboardArrowUpIcon from "@mui/icons-material/KeyboardArrowUp" +import { useTheme } from "@mui/material" +import Collapse from "@mui/material/Collapse" +import IconButton from "@mui/material/IconButton" +import TableCell from "@mui/material/TableCell" +import TableRow from "@mui/material/TableRow" +import { JsonViewer } from "@textea/json-viewer" +import { format } from "date-fns" +import React from "react" + +interface RawEventRowProps { + event: CeleryEvent +} + +export const RawEventRow: React.FC = ({ event }) => { + const [open, setOpen] = React.useState(false) + + const theme = useTheme() + return ( + <> + *": { borderBottom: "unset" } }}> + + {event?.uuid ? ( + + ) : ( + + )} + + + {event?.timestamp ? format(event.timestamp as number, "hh:mm:ss.SSS") : "Unknown"} + + {(event?.type as string) || "Unknown"} + {(event?.name as string) || (event?.hostname as string) || "Unknown"} + + setOpen(!open)}> + {open ? : } + + + + + + + + + + + + ) +} diff --git a/frontend/src/components/raw_events/RawEventsTable.tsx b/frontend/src/components/raw_events/RawEventsTable.tsx new file mode 100644 index 0000000..992c500 --- /dev/null +++ b/frontend/src/components/raw_events/RawEventsTable.tsx @@ -0,0 +1,36 @@ +import { RawEventRow } from "@components/raw_events/RawEventRow" +import { CeleryEvent } from "@hooks/useRawEvents" +import Table from "@mui/material/Table" +import TableBody from "@mui/material/TableBody" +import TableCell from "@mui/material/TableCell" +import TableContainer from "@mui/material/TableContainer" +import TableHead from "@mui/material/TableHead" +import TableRow from "@mui/material/TableRow" +import React from "react" + +interface RawEventsTableProps { + events: CeleryEvent[] +} + +export const RawEventsTable: React.FC = ({ events }) => { + return ( + + + + + Task + Timestamp + Type + Name + Expand + + + + {events.map((event, index) => ( + + ))} + +
+
+ ) +} diff --git a/frontend/src/components/raw_events/ToggleConnect.tsx b/frontend/src/components/raw_events/ToggleConnect.tsx new file mode 100644 index 0000000..56b0e2a --- /dev/null +++ b/frontend/src/components/raw_events/ToggleConnect.tsx @@ -0,0 +1,21 @@ +import PauseIcon from "@mui/icons-material/Pause" +import PlayArrowIcon from "@mui/icons-material/PlayArrow" +import IconButton from "@mui/material/IconButton" +import Tooltip from "@mui/material/Tooltip" +import React from "react" + +interface ToggleConnectProps { + connect: boolean + setConnect: (connect: boolean) => void + disabled?: boolean +} + +export const ToggleConnect: React.FC = ({ connect, setConnect, disabled }) => { + return ( + + setConnect(!connect)} disabled={disabled}> + {connect ? : } + + + ) +} diff --git a/frontend/src/hooks/useRawEvents.ts b/frontend/src/hooks/useRawEvents.ts new file mode 100644 index 0000000..1bf205b --- /dev/null +++ b/frontend/src/hooks/useRawEvents.ts @@ -0,0 +1,25 @@ +import { toWebSocketUri } from "@utils/webSocketUtils" +import { useState } from "react" +import useWebSocket from "react-use-websocket" + +export type CeleryEvent = Record + +export const useRawEvents = (connect: boolean, limit: number) => { + const [events, setEvents] = useState([]) + const { readyState } = useWebSocket( + toWebSocketUri("ws/raw_events"), + { + shouldReconnect: () => connect, + share: true, + onError: (error) => console.error("Error connecting to websockets!", error), + onReconnectStop: (numAttempts) => + console.error(`Out of attempts to reconnected websockets (${numAttempts})`), + onMessage: (event) => { + const message = JSON.parse(event.data) + setEvents((state) => [message, ...state.slice(0, limit - 1)]) + }, + }, + connect, + ) + return { events, readyState } +} diff --git a/frontend/src/layout/explorer/ExplorerLayout.tsx b/frontend/src/layout/explorer/ExplorerLayout.tsx new file mode 100644 index 0000000..a92d7a1 --- /dev/null +++ b/frontend/src/layout/explorer/ExplorerLayout.tsx @@ -0,0 +1,55 @@ +import ArrowBackIosNewIcon from "@mui/icons-material/ArrowBackIosNew" +import ArrowForwardIosIcon from "@mui/icons-material/ArrowForwardIos" +import Box from "@mui/material/Box" +import Divider from "@mui/material/Divider" +import IconButton from "@mui/material/IconButton" +import Stack from "@mui/material/Stack" +import Toolbar from "@mui/material/Toolbar" +import Tooltip from "@mui/material/Tooltip" +import Typography from "@mui/material/Typography" +import React, { useState } from "react" + +interface ExplorerLayoutProps { + facets?: React.ReactNode + actions?: React.ReactNode + children?: React.ReactNode +} + +const FACET_WIDTH = 300 +export const ExplorerLayout: React.FC = ({ facets, actions, children }) => { + const [isFacetMenuOpen, setFacetMenuOpen] = useState(true) + + return ( + + theme.transitions.create("width"), overflow: "hidden" }} + > + + Facets + + + {facets} + + + + + setFacetMenuOpen(!isFacetMenuOpen)}> + {isFacetMenuOpen ? : } + + + + + {actions} + + + {children} + + + ) +} diff --git a/frontend/src/layout/header/Header.tsx b/frontend/src/layout/header/Header.tsx index 41a99fe..c211e18 100644 --- a/frontend/src/layout/header/Header.tsx +++ b/frontend/src/layout/header/Header.tsx @@ -1,7 +1,7 @@ +import WsStateIcon from "@components/common/WsStateIcon" import SearchBox from "@components/search/SearchBox" import NotificationBadge from "@layout/header/NotificationBadge" import ThemeSelector from "@layout/header/ThemeSelector" -import WSStatus from "@layout/header/WSStatus" import { DRAWER_WIDTH, DRAWER_WIDTH_COLLAPSED } from "@layout/menu/Menu" import GitHubIcon from "@mui/icons-material/GitHub" import AppBar from "@mui/material/AppBar" @@ -11,9 +11,17 @@ import Slide from "@mui/material/Slide" import Stack from "@mui/material/Stack" import Toolbar from "@mui/material/Toolbar" import useScrollTrigger from "@mui/material/useScrollTrigger" +import useSettingsStore from "@stores/useSettingsStore" import useSettings from "@stores/useSettingsStore" +import { useStateStore } from "@stores/useStateStore" import React from "react" +const StateWsStatusIcon: React.FC = () => { + const isDemo = useSettingsStore((state) => state.demo) + const status = useStateStore((store) => store.status) + return +} + const Header: React.FC = () => { const trigger = useScrollTrigger({ target: window }) const menuExpanded = useSettings((state) => state.menuExpanded) @@ -32,7 +40,7 @@ const Header: React.FC = () => { - + , + label: "Dashboard", + icon: , to: "/", external: false, }, @@ -61,6 +62,12 @@ const menuLinks: MenuLink[] = [ to: "/explorer", external: false, }, + { + label: "Live Events", + icon: , + to: "/raw_events", + external: false, + }, { label: "API Explorer", icon: , diff --git a/frontend/src/pages/ExplorerPage.tsx b/frontend/src/pages/ExplorerPage.tsx index a7b617c..09ab6bc 100644 --- a/frontend/src/pages/ExplorerPage.tsx +++ b/frontend/src/pages/ExplorerPage.tsx @@ -2,64 +2,39 @@ import CopyLinkButton from "@components/common/CopyLinkButton" import ExplorerGrid from "@components/explorer/ExplorerGrid" import FacetSet from "@components/explorer/FacetSet" import { useExplorerFilter } from "@hooks/explorer/useExplorerFilter" -import ArrowBackIosNewIcon from "@mui/icons-material/ArrowBackIosNew" -import ArrowForwardIosIcon from "@mui/icons-material/ArrowForwardIos" +import { ExplorerLayout } from "@layout/explorer/ExplorerLayout" import Box from "@mui/material/Box" -import Divider from "@mui/material/Divider" -import IconButton from "@mui/material/IconButton" -import Stack from "@mui/material/Stack" -import Toolbar from "@mui/material/Toolbar" import Typography from "@mui/material/Typography" import { useStateStore } from "@stores/useStateStore" import { useTourChangeStepOnLoad } from "@stores/useTourStore" import { StateTask } from "@utils/translateServerModels" -import React, { useState } from "react" +import React from "react" -const FACET_WIDTH = 300 const ExplorerPage: React.FC = () => { const tasks = useStateStore((state) => { const tasks: StateTask[] = [] state.tasks.forEach((task) => tasks.push(task)) return tasks }) - const [isFacetMenuOpen, setFacetMenuOpen] = useState(true) const [filters, setFilter] = useExplorerFilter() useTourChangeStepOnLoad(11) return ( - - - theme.transitions.create("width"), overflow: "hidden" }} - > - - Facets - - + - - - setFacetMenuOpen(!isFacetMenuOpen)}> - {isFacetMenuOpen ? : } - - - - - {tasks.length} Tasks found - - - - - - + } + actions={ + <> + + {tasks.length} Tasks found + + } + > + + ) } diff --git a/frontend/src/pages/RawEventsPage.tsx b/frontend/src/pages/RawEventsPage.tsx new file mode 100644 index 0000000..6899278 --- /dev/null +++ b/frontend/src/pages/RawEventsPage.tsx @@ -0,0 +1,80 @@ +import WsStateIcon from "@components/common/WsStateIcon" +import Facet from "@components/explorer/Facet" +import { LimitSelect } from "@components/raw_events/LimitSelect" +import { RawEventsTable } from "@components/raw_events/RawEventsTable" +import { ToggleConnect } from "@components/raw_events/ToggleConnect" +import { CeleryEvent, useRawEvents } from "@hooks/useRawEvents" +import { ExplorerLayout } from "@layout/explorer/ExplorerLayout" +import CircularProgress from "@mui/material/CircularProgress" +import Stack from "@mui/material/Stack" +import Typography from "@mui/material/Typography" +import useSettingsStore from "@stores/useSettingsStore" +import { countUniqueProperties } from "@utils/CountUniqueProperties" +import React, { useMemo, useState } from "react" + +interface PlaceholderProps { + text: React.ReactNode + progress?: boolean +} + +const Placeholder: React.FC = ({ text, progress }) => { + return ( + + {progress && } + {text} + + ) +} + +const filterEventTypes = (event: CeleryEvent, selectedTypes: string[]) => + selectedTypes.length == 0 || (event?.type && selectedTypes.includes(event?.type as string)) + +const RawEventsPage: React.FC = () => { + const isDemo = useSettingsStore((state) => state.demo) + const limit = useSettingsStore((state) => state.rawEventsLimit) + const [connect, setConnect] = useState(!isDemo) + const { events, readyState } = useRawEvents(connect, limit) + const [selectedTypes, setSelectedTypes] = useState([]) + const groups = useMemo(() => countUniqueProperties(events, ["type"]), [events]) + const filteredEvents = useMemo( + () => events.filter((event) => filterEventTypes(event, selectedTypes)), + [events, selectedTypes], + ) + + return ( + <> + + + + {events.length} Events + + + useSettingsStore.setState({ rawEventsLimit: newLimit })} + /> + + } + facets={ + setSelectedTypes([...values.values()])} + /> + } + > + {isDemo ? ( + + ) : events.length === 0 ? ( + + ) : ( + + )} + + + ) +} +export default RawEventsPage diff --git a/frontend/src/router.tsx b/frontend/src/router.tsx index cbcfad6..3ccd13a 100644 --- a/frontend/src/router.tsx +++ b/frontend/src/router.tsx @@ -1,6 +1,7 @@ import RootLayout from "@layout/RootLayout" import ErrorPage from "@pages/ErrorPage" import ExplorerPage from "@pages/ExplorerPage" +import RawEventsPage from "@pages/RawEventsPage" import SettingsPage from "@pages/SettingsPage" import TaskPage from "@pages/TaskPage" import WorkerPage from "@pages/WorkerPage" @@ -17,6 +18,7 @@ export const router = createBrowserRouter([ { index: true, element: , errorElement: }, { path: "/settings", element: }, { path: "/explorer", element: }, + { path: "/raw_events", element: }, { path: "tasks/:taskId", element: }, { path: "workers/:workerId", element: }, ], diff --git a/frontend/src/stores/useSettingsStore.ts b/frontend/src/stores/useSettingsStore.ts index ff1bfac..15de453 100644 --- a/frontend/src/stores/useSettingsStore.ts +++ b/frontend/src/stores/useSettingsStore.ts @@ -12,6 +12,7 @@ interface Settings { menuExpanded: boolean hideWelcomeBanner: boolean demo: boolean + rawEventsLimit: number } const defaultSettings: Settings = { @@ -19,6 +20,7 @@ const defaultSettings: Settings = { menuExpanded: true, hideWelcomeBanner: false, demo: Boolean(import.meta.env.VITE_DEMO_MODE), + rawEventsLimit: 100, } const useSettingsStore = create()( persist( diff --git a/frontend/src/utils/webSocketUtils.ts b/frontend/src/utils/webSocketUtils.ts new file mode 100644 index 0000000..cc7a606 --- /dev/null +++ b/frontend/src/utils/webSocketUtils.ts @@ -0,0 +1,5 @@ +export const toWebSocketUri = (path: string): string => { + const location = window.location + const protocol = location.protocol === "https:" ? "wss:" : "ws:" + return `${protocol}//${location.host}/${path}` +} diff --git a/server/events/broadcaster.py b/server/events/broadcaster.py index 0aced48..df2c4c0 100644 --- a/server/events/broadcaster.py +++ b/server/events/broadcaster.py @@ -1,37 +1,55 @@ +import asyncio +import json import logging from events.models import EventCategory, EventMessage, EventType +from events.exceptions import InconsistentStateStore, InvalidEvent from events.receiver import state from events.subscriber import QueueSubscriber from tasks.model import Task from workers.models import Worker -from ws.managers import events_manager +from ws.managers import events_manager, raw_events_manager logger = logging.getLogger(__name__) class EventBroadcaster(QueueSubscriber[dict]): async def handle_event(self, event: dict) -> None: + await asyncio.gather( + broadcast_raw_event(event), + broadcast_parsed_event(event), + ) + + +async def broadcast_raw_event(event: dict) -> None: + logger.debug(f"Broadcasting raw event of type {event.get('type', 'UNKNOWN')!r}") + try: + await raw_events_manager.broadcast(json.dumps(event)) + except Exception as e: + logger.exception(f"Failed to broadcast raw event: {e}") + + +async def broadcast_parsed_event(event: dict) -> None: + try: + message = parse_event(event) + except InvalidEvent as e: + logger.warning(f"Event object is invalid, failed to parse: {e}", exc_info=True) + except InconsistentStateStore as e: + logger.exception(f"Inconsistent event state store: {e}") + except Exception as e: + logger.exception(f"Failed to parse event message: {e}") + else: + logger.debug(f"Broadcasting event {message.type.value!r}") try: - message = parse_event(event) + await events_manager.broadcast(message.model_dump_json()) except Exception as e: - logger.exception(f"Failed to generate event message: {e}") - else: - if message is not None: - logger.debug(f"Broadcasting event {message.type.value!r}") - try: - await events_manager.broadcast(message.json()) - except Exception as e: - logger.exception(f"Failed to broadcast event: {e}") - else: - logger.warning("Ignored event as no message was specified") - - -def parse_event(event: dict) -> EventMessage | None: + logger.exception(f"Failed to broadcast event: {e}") + + +def parse_event(event: dict) -> EventMessage: event_type = event.get('type') if event_type is None: - logger.warning(f"Received event without type: {event}") - return None + raise InvalidEvent(f"Received event without type: {event}") event_category, _ = event_type.split("-", 1) state.event(event) @@ -40,19 +58,18 @@ def parse_event(event: dict) -> EventMessage | None: elif event_category == "worker": return parse_worker_event(event, event_type) else: - logger.error(f"Unknown event category {event_category!r}") - return None + raise InvalidEvent(f"Unknown event category {event_category!r}") def parse_worker_event(event: dict, event_type: str) -> EventMessage | None: worker_hostname = event.get("hostname") if worker_hostname is None: - logger.warning(f"Worker event {event_type!r} is missing hostname: {event}") - return None + raise InvalidEvent(f"Worker event {event_type!r} is missing hostname: {event}") + state_worker = state.workers.get(worker_hostname) if state_worker is None: - logger.warning(f"Could not find worker {worker_hostname!r} in state") - return None + raise InconsistentStateStore(f"Could not find worker {worker_hostname!r} in state") + worker = Worker.from_celery_worker(state_worker) return EventMessage( type=EventType(event_type), @@ -64,12 +81,12 @@ def parse_worker_event(event: dict, event_type: str) -> EventMessage | None: def parse_task_event(event: dict, event_type: str) -> EventMessage | None: task_id = event.get("uuid") if task_id is None: - logger.warning(f"Task event {event_type!r} is missing uuid: {event}") - return None + raise InvalidEvent(f"Task event {event_type!r} is missing uuid: {event}") + state_task = state.tasks.get(task_id) if state_task is None: - logger.warning(f"Could not find task {task_id!r} in state") - return None + raise InconsistentStateStore(f"Could not find task {task_id!r} in state") + task = Task.from_celery_task(state_task) return EventMessage( type=EventType(event_type), diff --git a/server/events/broadcaster_test.py b/server/events/broadcaster_test.py index 3c5ec9e..5f8fd4c 100644 --- a/server/events/broadcaster_test.py +++ b/server/events/broadcaster_test.py @@ -1,31 +1,33 @@ +import json from asyncio import Queue import pytest -from polyfactory import Ignore from polyfactory.factories.pydantic_factory import ModelFactory from pytest_mock import MockerFixture from events.broadcaster import EventBroadcaster, parse_event, parse_task_event, parse_worker_event +from events.exceptions import InconsistentStateStore, InvalidEvent from events.models import EventCategory, EventMessage, EventType from events.receiver import state from tasks.model import Task -from workers.models import Worker +from workers.models import CPULoad, Worker from ws.managers import events_manager -class EventMessageFactory(ModelFactory[EventMessage]): - __model__ = EventMessage - - class WorkerFactory(ModelFactory[Worker]): __model__ = Worker - cpu_load = Ignore() + cpu_load = CPULoad(0, 0, 0) class TaskFactory(ModelFactory[Task]): __model__ = Task +class EventMessageFactory(ModelFactory[EventMessage]): + __model__ = EventMessage + data = TaskFactory + + @pytest.fixture() def broadcaster(): return EventBroadcaster(Queue()) @@ -33,87 +35,83 @@ def broadcaster(): @pytest.mark.asyncio async def test_broadcasts_event(broadcaster, mocker: MockerFixture): - message = EventMessageFactory.build() - parse_event_mock = mocker.patch("events.broadcaster.parse_event", return_value=message) - broadcast_mock = mocker.patch.object(events_manager, "broadcast") event = {"type": "task-succeeded", "task_id": "1234", "result": "foo"} + raw_event_mock = mocker.patch("events.broadcaster.broadcast_raw_event") + parsed_event_mock = mocker.patch("events.broadcaster.broadcast_parsed_event") await broadcaster.handle_event(event) - parse_event_mock.assert_called_once_with(event) - broadcast_mock.assert_called_once_with(message.json()) + raw_event_mock.assert_called_once_with(event) + parsed_event_mock.assert_called_once_with(event) @pytest.mark.asyncio -async def test_event_parsing_failure(broadcaster, mocker: MockerFixture): - parse_event_mock = mocker.patch("events.broadcaster.parse_event", side_effect=Exception("Parsing failed")) - broadcast_mock = mocker.patch.object(events_manager, "broadcast") +async def test_broadcast_failure(broadcaster, mocker: MockerFixture): event = {"type": "task-succeeded", "task_id": "1234", "result": "foo"} + message = EventMessageFactory.build() + parse_event_mock = mocker.patch("events.broadcaster.parse_event", return_value=message) + broadcast_mock = mocker.patch("ws.websocket_manager.WebsocketManager.broadcast") + broadcast_mock.side_effect = Exception("Broadcast failed") await broadcaster.handle_event(event) + assert broadcast_mock.call_args_list == [mocker.call(json.dumps(event)), mocker.call(message.json())] parse_event_mock.assert_called_once_with(event) - broadcast_mock.assert_not_called() @pytest.mark.asyncio -async def test_no_message_specified(broadcaster, mocker: MockerFixture): - parse_event_mock = mocker.patch("events.broadcaster.parse_event", return_value=None) +async def test_broadcast_parsed_event(broadcaster, mocker: MockerFixture): + message = EventMessageFactory.build() + parse_event_mock = mocker.patch("events.broadcaster.parse_event", return_value=message) broadcast_mock = mocker.patch.object(events_manager, "broadcast") event = {"type": "task-succeeded", "task_id": "1234", "result": "foo"} await broadcaster.handle_event(event) parse_event_mock.assert_called_once_with(event) - broadcast_mock.assert_not_called() + broadcast_mock.assert_called_once_with(message.json()) @pytest.mark.asyncio -async def test_broadcast_failure(broadcaster, mocker: MockerFixture): - message = EventMessageFactory.build() - parse_event_mock = mocker.patch("events.broadcaster.parse_event", return_value=message) - broadcast_mock = mocker.patch.object(events_manager, "broadcast", side_effect=Exception("Broadcast failed")) +async def test_event_parsing_failure(broadcaster, mocker: MockerFixture): event = {"type": "task-succeeded", "task_id": "1234", "result": "foo"} + parse_event_mock = mocker.patch("events.broadcaster.parse_event", side_effect=Exception("Parsing failed")) + broadcast_mock = mocker.patch.object(events_manager, "broadcast") await broadcaster.handle_event(event) parse_event_mock.assert_called_once_with(event) - broadcast_mock.assert_called_once_with(message.json()) - - -def test_parse_event_no_type(): - event = {} - assert parse_event(event) is None - - -def test_parse_event_unknown_category(caplog: pytest.LogCaptureFixture): - event = {"type": "foo-bar"} - assert parse_event(event) is None - assert "Unknown event category 'foo'" in caplog.text - - -def test_parse_worker_event_missing_hostname(caplog: pytest.LogCaptureFixture): - event = {"type": "worker-started"} - assert parse_worker_event(event, "worker-started") is None - assert "Worker event 'worker-started' is missing hostname" in caplog.text + broadcast_mock.assert_not_called() -def test_parse_worker_event_missing_worker(caplog: pytest.LogCaptureFixture): - event = {"type": "worker-started", "hostname": "worker"} - assert parse_worker_event(event, "worker-started") is None - assert "Could not find worker 'worker' in state" in caplog.text +@pytest.mark.parametrize( + "event,match", [ + ({}, "Received event without type"), + ({"type": "foo-bar"}, "Unknown event category 'foo'"), + ({"type": "task-started"}, "Task event 'task-started' is missing uuid"), + ({"type": "worker-started"}, "Worker event 'worker-started' is missing hostname"), + ] + ) +def test_parse_invalid_event(event, match, mocker: MockerFixture): + mocker.patch.object(state, "event") + with pytest.raises(InvalidEvent, match=match): + parse_event(event) -def test_parse_task_event_missing_uuid(caplog: pytest.LogCaptureFixture): - event = {"type": "task-started"} - assert parse_task_event(event, "task-started") is None - assert "Task event 'task-started' is missing uuid" in caplog.text +@pytest.mark.parametrize( + "event,match", [ + ({"type": "worker-started", "hostname": "worker"}, "Could not find worker 'worker' in state"), + ({"type": "task-started", "uuid": "task"}, "Could not find task 'task' in state"), + ] + ) +def test_parse_event_missing_object(event, match, mocker: MockerFixture): + mocker.patch.object(state, "event") + mocker.patch.object(state.tasks, "get", return_value=None) + mocker.patch.object(state.workers, "get", return_value=None) -def test_parse_task_event_missing_task(caplog: pytest.LogCaptureFixture): - event = {"type": "task-started", "uuid": "task"} - assert parse_task_event(event, "task-started") is None - assert "Could not find task 'task' in state" in caplog.text + with pytest.raises(InconsistentStateStore, match=match): + parse_event(event) def test_parse_worker_event(mocker: MockerFixture): diff --git a/server/events/exceptions.py b/server/events/exceptions.py new file mode 100644 index 0000000..07df857 --- /dev/null +++ b/server/events/exceptions.py @@ -0,0 +1,6 @@ +class InvalidEvent(ValueError): + pass + + +class InconsistentStateStore(KeyError): + pass diff --git a/server/ws/managers.py b/server/ws/managers.py index 06ea4cf..6e4c0d7 100644 --- a/server/ws/managers.py +++ b/server/ws/managers.py @@ -1,3 +1,4 @@ from ws.websocket_manager import WebsocketManager events_manager = WebsocketManager("Events") +raw_events_manager = WebsocketManager("RawEvents") diff --git a/server/ws/router.py b/server/ws/router.py index 27c4040..31d69b6 100644 --- a/server/ws/router.py +++ b/server/ws/router.py @@ -3,19 +3,29 @@ from fastapi import APIRouter from starlette.websockets import WebSocket, WebSocketDisconnect, WebSocketState -from ws.managers import events_manager +from ws.managers import events_manager, raw_events_manager +from ws.websocket_manager import WebsocketManager logger = logging.getLogger(__name__) ws_router = APIRouter(prefix="/ws", tags=["websockets"]) -@ws_router.websocket("/events") -async def subscribe_events(websocket: WebSocket): +async def connect_to_manager(websocket: WebSocket, manager: WebsocketManager) -> None: await websocket.accept() - events_manager.subscribe(websocket) + manager.subscribe(websocket) while websocket.client_state is WebSocketState.CONNECTED: try: msg = await websocket.receive_text() - logger.warning(f"Client {websocket.client!r} sent to events ws: {msg}") + logger.warning(f"Client {websocket.client!r} sent message to {manager.name!r} manager: {msg}") except WebSocketDisconnect: - events_manager.unsubscribe(websocket) + manager.unsubscribe(websocket) + + +@ws_router.websocket("/events") +async def subscribe_events(websocket: WebSocket): + await connect_to_manager(websocket, events_manager) + + +@ws_router.websocket("/raw_events") +async def subscribe_events(websocket: WebSocket): + await connect_to_manager(websocket, raw_events_manager) diff --git a/server/ws/websocket_manager.py b/server/ws/websocket_manager.py index f34080a..5cfe32f 100644 --- a/server/ws/websocket_manager.py +++ b/server/ws/websocket_manager.py @@ -27,7 +27,7 @@ async def broadcast(self, message: str) -> None: connection.send_text(message) for connection in self.active_connections ], return_exceptions=True - ) + ) for result, connection in zip(results, self.active_connections, strict=True): if isinstance(result, Exception): logger.exception(f"Failed to send message to client {connection.client!r}: {result}", exc_info=result)