Skip to content

Commit

Permalink
feat: 여러 카메라 스트리밍 적용
Browse files Browse the repository at this point in the history
  • Loading branch information
sukkyun2 committed Aug 10, 2024
1 parent d3e495e commit 958992d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 28 deletions.
21 changes: 21 additions & 0 deletions app/api_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,24 @@ def bad_request(error: str):

class Config:
arbitrary_types_allowed = True


class ApiListResponse(BaseModel, Generic[T]):
code: int = 200
message: str = "OK"
items: List[T] = None

@staticmethod
def ok():
return ApiListResponse(code=200, message="OK")

@staticmethod
def ok_with_data(data: List[T]):
return ApiListResponse(code=200, message="OK", items=data)

@staticmethod
def bad_request(error: str):
return ApiListResponse(code=400, message=error)

class Config:
arbitrary_types_allowed = True
29 changes: 15 additions & 14 deletions app/connection_manager.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
from typing import List
from collections import defaultdict

from starlette.websockets import WebSocket, WebSocketDisconnect


class ConnectionManager:
def __init__(self):
self.publisher = None
self.subscribers: List[WebSocket] = []
self.publishers: defaultdict = defaultdict(WebSocket)
self.subscribers: defaultdict = defaultdict(list)

async def connect(self, websocket: WebSocket):
async def connect(self, location_name: str, websocket: WebSocket):
await websocket.accept()
self.publisher = websocket
self.publishers[location_name] = websocket

def disconnect(self):
self.publisher = None
def disconnect(self, location_name: str):
del self.publishers[location_name]

async def subscribe(self, websocket: WebSocket):
async def subscribe(self, location_name: str, websocket: WebSocket):
await websocket.accept()
self.subscribers.append(websocket)
self.subscribers[location_name].append(websocket)

def unsubscribe(self, websocket: WebSocket):
self.subscribers.remove(websocket)
def unsubscribe(self, location_name: str, websocket: WebSocket):
self.subscribers[location_name].remove(websocket)

async def broadcast(self, message: bytes):
for subscriber in self.subscribers:
async def broadcast(self, location_name: str, message: bytes):
subscribers_by_location = self.subscribers[location_name]
for subscriber in subscribers_by_location:
try:
await subscriber.send_bytes(message)
except WebSocketDisconnect:
self.unsubscribe(subscriber)
self.unsubscribe(location_name, subscriber)
29 changes: 15 additions & 14 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fastapi import UploadFile, File
from fastapi.middleware.cors import CORSMiddleware

from app.api_response import ApiResponse
from app.api_response import ApiResponse, ApiListResponse
from app.config import settings
from app.connection_manager import ConnectionManager
from app.history import async_save_history
Expand Down Expand Up @@ -43,14 +43,14 @@ async def detect_image(file: UploadFile = File(...)) -> ApiResponse:
return ApiResponse.ok()


@app.get("/api/exists-publisher")
def exists_publisher() -> ApiResponse[bool]:
return ApiResponse[bool].ok_with_data(bool(manager.publisher))
@app.get("/api/publishers")
def exists_publisher() -> ApiListResponse[str]:
return ApiListResponse[str].ok_with_data(list(manager.publishers.keys()))


@app.websocket("/ws/publisher")
async def websocket_publisher(websocket: WebSocket):
await manager.connect(websocket)
@app.websocket("/ws/publishers/{location_name}")
async def websocket_publisher(websocket: WebSocket, location_name: str):
await manager.connect(location_name, websocket)
try:
while True:
data = await websocket.receive_bytes()
Expand All @@ -61,22 +61,23 @@ async def websocket_publisher(websocket: WebSocket):
# TODO 별도 이상상황으로 교체
cell_phone_detected = any(det.class_name == 'cell phone' for det in result.detections)
if cell_phone_detected:
await async_save_history(result)
pass
# await async_save_history(result)
# video_recorder.save_frame(result.predict_image_np)

await manager.broadcast(result.get_encoded_nparr().tobytes())
await manager.broadcast(location_name, result.get_encoded_nparr().tobytes())
except WebSocketDisconnect:
manager.disconnect()
manager.disconnect(location_name)
print("Publisher disconnected")


@app.websocket("/ws/subscriber")
async def websocket_subscriber(websocket: WebSocket):
await manager.subscribe(websocket)
@app.websocket("/ws/subscribers/{location_name}")
async def websocket_subscriber(location_name: str, websocket: WebSocket):
await manager.subscribe(location_name, websocket)
try:
while True:
await asyncio.sleep(1)
except WebSocketDisconnect:
print("Subscriber disconnected")
finally:
manager.unsubscribe(websocket)
manager.unsubscribe(location_name, websocket)

0 comments on commit 958992d

Please sign in to comment.