diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index cf91c33a0..3251b2f55 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -1,14 +1,15 @@ +import asyncio import io import json import tempfile -import time +from collections import deque from datetime import datetime from pathlib import Path from typing import Optional, Union -import tailer +import aiofiles from PIL import Image -from fastapi import APIRouter, Depends, HTTPException, Header, Response +from fastapi import APIRouter, Depends, HTTPException, Header, Request, Response from fastapi.responses import StreamingResponse from app import schemas @@ -224,19 +225,22 @@ def set_env_setting(env: dict, @router.get("/progress/{process_type}", summary="实时进度") -def get_progress(process_type: str, _: schemas.TokenPayload = Depends(verify_resource_token)): +async def get_progress(request: Request, process_type: str, _: schemas.TokenPayload = Depends(verify_resource_token)): """ 实时获取处理进度,返回格式为SSE """ progress = ProgressHelper() - def event_generator(): - while True: - if global_vars.is_system_stopped: - break - detail = progress.get(process_type) - yield 'data: %s\n\n' % json.dumps(detail) - time.sleep(0.2) + async def event_generator(): + try: + while not global_vars.is_system_stopped: + if await request.is_disconnected(): + break + detail = progress.get(process_type) + yield f"data: {json.dumps(detail)}\n\n" + await asyncio.sleep(0.2) + except asyncio.CancelledError: + return return StreamingResponse(event_generator(), media_type="text/event-stream") @@ -273,26 +277,29 @@ def set_setting(key: str, value: Union[list, dict, bool, int, str] = None, @router.get("/message", summary="实时消息") -def get_message(role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)): +async def get_message(request: Request, role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)): """ 实时获取系统消息,返回格式为SSE """ message = MessageHelper() - def event_generator(): - while True: - if global_vars.is_system_stopped: - break - detail = message.get(role) - yield 'data: %s\n\n' % (detail or '') - time.sleep(3) + async def event_generator(): + try: + while not global_vars.is_system_stopped: + if await request.is_disconnected(): + break + detail = message.get(role) + yield f"data: {detail or ''}\n\n" + await asyncio.sleep(3) + except asyncio.CancelledError: + return return StreamingResponse(event_generator(), media_type="text/event-stream") @router.get("/logging", summary="实时日志") -def get_logging(length: int = 50, logfile: str = "moviepilot.log", - _: schemas.TokenPayload = Depends(verify_resource_token)): +async def get_logging(request: Request, length: int = 50, logfile: str = "moviepilot.log", + _: schemas.TokenPayload = Depends(verify_resource_token)): """ 实时获取系统日志 length = -1 时, 返回text/plain @@ -306,27 +313,40 @@ def get_logging(length: int = 50, logfile: str = "moviepilot.log", if not log_path.exists() or not log_path.is_file(): raise HTTPException(status_code=404, detail="Not Found") - def log_generator(): - # 读取文件末尾50行,不使用tailer模块 - with open(log_path, 'r', encoding='utf-8') as f: - for line in f.readlines()[-max(length, 50):]: - yield 'data: %s\n\n' % line - while True: - if global_vars.is_system_stopped: - break - for t in tailer.follow(open(log_path, 'r', encoding='utf-8')): - yield 'data: %s\n\n' % (t or '') - time.sleep(1) + async def log_generator(): + try: + # 使用固定大小的双向队列来限制内存使用 + lines_queue = deque(maxlen=max(length, 50)) + # 使用 aiofiles 异步读取文件 + async with aiofiles.open(log_path, mode="r", encoding="utf-8") as f: + # 逐行读取文件,将每一行存入队列 + file_content = await f.read() + for line in file_content.splitlines(): + lines_queue.append(line) + for line in lines_queue: + yield f"data: {line}\n\n" + # 移动文件指针到文件末尾,继续监听新增内容 + await f.seek(0, 2) + while not global_vars.is_system_stopped: + if await request.is_disconnected(): + break + line = await f.readline() + if not line: + await asyncio.sleep(0.5) + continue + yield f"data: {line}\n\n" + except asyncio.CancelledError: + return # 根据length参数返回不同的响应 if length == -1: # 返回全部日志作为文本响应 if not log_path.exists(): return Response(content="日志文件不存在!", media_type="text/plain") - with open(log_path, 'r', encoding='utf-8') as file: + with open(log_path, "r", encoding='utf-8') as file: text = file.read() # 倒序输出 - text = '\n'.join(text.split('\n')[::-1]) + text = "\n".join(text.split("\n")[::-1]) return Response(content=text, media_type="text/plain") else: # 返回SSE流响应 diff --git a/app/chain/mediaserver.py b/app/chain/mediaserver.py index 89b40cec9..8b9a5186c 100644 --- a/app/chain/mediaserver.py +++ b/app/chain/mediaserver.py @@ -3,6 +3,7 @@ from app import schemas from app.chain import ChainBase +from app.core.config import global_vars from app.db.mediaserver_oper import MediaServerOper from app.helper.service import ServiceConfigHelper from app.log import logger @@ -134,6 +135,8 @@ def sync(self): logger.info(f"正在同步 {server_name} 媒体库 {library.name} ...") library_count = 0 for item in self.items(server=server_name, library_id=library.id): + if global_vars.is_system_stopped: + return if not item or not item.item_id: continue logger.debug(f"正在同步 {item.title} ...") diff --git a/app/core/event.py b/app/core/event.py index 43ea8f7c4..f04d2c2a0 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -20,7 +20,7 @@ DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级 MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数 INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 1 # 事件队列空闲时的初始超时时间(秒) -MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的最大超时时间(秒) +MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 5 # 事件队列空闲时的最大超时时间(秒) class Event: diff --git a/app/main.py b/app/main.py index f6de21807..1081ac03d 100644 --- a/app/main.py +++ b/app/main.py @@ -20,7 +20,8 @@ # uvicorn服务 Server = uvicorn.Server(Config(app, host=settings.HOST, port=settings.PORT, - reload=settings.DEV, workers=multiprocessing.cpu_count())) + reload=settings.DEV, workers=multiprocessing.cpu_count(), + timeout_graceful_shutdown=5)) def start_tray(): diff --git a/app/modules/telegram/telegram.py b/app/modules/telegram/telegram.py index 940a34e5f..9a6d8226e 100644 --- a/app/modules/telegram/telegram.py +++ b/app/modules/telegram/telegram.py @@ -64,7 +64,7 @@ def run_polling(): logger.error(f"Telegram消息接收服务异常:{str(err)}") # 启动线程来运行 infinity_polling - self._polling_thread = threading.Thread(target=run_polling) + self._polling_thread = threading.Thread(target=run_polling, daemon=True) self._polling_thread.start() logger.info("Telegram消息接收服务启动") diff --git a/app/monitor.py b/app/monitor.py index cdaafff0b..f82456262 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -113,7 +113,7 @@ def init(self): self.stop() # 启动文件整理线程 - self._transfer_thread = threading.Thread(target=self.__start_transfer) + self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True) self._transfer_thread.start() # 读取目录配置 diff --git a/app/startup/lifecycle.py b/app/startup/lifecycle.py index 71e0011c6..9a6fd09ee 100644 --- a/app/startup/lifecycle.py +++ b/app/startup/lifecycle.py @@ -14,18 +14,24 @@ async def lifespan(app: FastAPI): 定义应用的生命周期事件 """ print("Starting up...") + # 启动模块 start_modules(app) + # 初始化路由 init_routers(app) + # 初始化插件 plugin_init_task = asyncio.create_task(init_plugins_async()) try: + # 在此处 yield,表示应用已经启动,控制权交回 FastAPI 主事件循环 yield finally: print("Shutting down...") try: + # 取消插件初始化 plugin_init_task.cancel() await plugin_init_task except asyncio.CancelledError: print("Plugin installation task cancelled.") except Exception as e: print(f"Error during plugin installation shutdown: {e}") + # 清理模块 shutdown_modules(app) diff --git a/app/startup/modules_initializer.py b/app/startup/modules_initializer.py index 1e06892fb..d7dc0d3f2 100644 --- a/app/startup/modules_initializer.py +++ b/app/startup/modules_initializer.py @@ -1,10 +1,8 @@ -import signal import sys -from types import FrameType from fastapi import FastAPI -from app.core.config import settings, global_vars +from app.core.config import global_vars, settings from app.core.module import ModuleManager from app.utils.system import SystemUtils @@ -89,27 +87,12 @@ def check_auth(): ) -def singal_handle(): - """ - 监听停止信号 - """ - - def stop_event(signum: int, _: FrameType): - """ - SIGTERM信号处理 - """ - print(f"接收到停止信号:{signum},正在停止系统...") - global_vars.stop_system() - - # 设置信号处理程序 - signal.signal(signal.SIGTERM, stop_event) - signal.signal(signal.SIGINT, stop_event) - - def shutdown_modules(_: FastAPI): """ 服务关闭 """ + # 停止信号 + global_vars.stop_system() # 停止模块 ModuleManager().stop() # 停止插件 @@ -159,5 +142,3 @@ def start_modules(_: FastAPI): start_frontend() # 检查认证状态 check_auth() - # 监听停止信号 - singal_handle() diff --git a/requirements.in b/requirements.in index 82c50a610..d1497279e 100644 --- a/requirements.in +++ b/requirements.in @@ -45,7 +45,6 @@ psutil~=5.9.4 python-dotenv~=1.0.1 python-hosts~=1.0.7 watchdog~=3.0.0 -tailer~=0.4.1 openai~=0.27.2 cacheout~=0.14.1 click~=8.1.6 @@ -61,4 +60,5 @@ Pinyin2Hanzi~=0.1.1 pywebpush~=2.0.0 py115j~=0.0.7 oss2~=2.18.6 -aligo~=6.2.4 \ No newline at end of file +aligo~=6.2.4 +aiofiles~=24.1.0 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5d220da2d..a6fde3b91 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,8 @@ # # pip-compile requirements.in # +aiofiles==24.1.0 + # via -r requirements.in aiohappyeyeballs==2.4.0 # via aiohttp aiohttp==3.10.5 @@ -63,6 +65,11 @@ click==8.1.7 # uvicorn cn2an==0.5.22 # via -r requirements.in +colorama==0.4.6 + # via + # click + # qrcode + # tqdm coloredlogs==15.0.1 # via aligo crcmod==1.7 @@ -95,7 +102,9 @@ frozenlist==1.4.1 func-timeout==4.3.5 # via -r requirements.in greenlet==2.0.2 - # via playwright + # via + # playwright + # sqlalchemy h11==0.14.0 # via # httpcore @@ -139,7 +148,9 @@ openai==0.27.10 oss2==2.18.6 # via -r requirements.in packaging==24.1 - # via docker + # via + # docker + # qbittorrent-api parse==1.19.1 # via -r requirements.in passlib==1.7.4 @@ -181,14 +192,6 @@ pyee==9.0.4 # via playwright pyjwt==2.7.0 # via -r requirements.in -pyobjc-core==10.3.1 - # via - # pyobjc-framework-cocoa - # pyobjc-framework-quartz -pyobjc-framework-cocoa==10.3.1 - # via pyobjc-framework-quartz -pyobjc-framework-quartz==10.3.1 - # via pystray pyotp==2.9.0 # via -r requirements.in pyparsing==3.0.9 @@ -197,6 +200,8 @@ pypng==0.20220715.0 # via qrcode pyquery==2.0.0 # via -r requirements.in +pyreadline3==3.5.4 + # via humanfriendly pysocks==1.7.1 # via requests pystray==0.19.5 @@ -223,7 +228,7 @@ pyvirtualdisplay==3.0 # via -r requirements.in pywebpush==2.0.0 # via -r requirements.in -qbittorrent-api==2023.5.48 +qbittorrent-api==2024.9.67 # via -r requirements.in qrcode[pil]==7.4.2 # via @@ -262,7 +267,6 @@ six==1.16.0 # pystray # python-dateutil # pywebpush - # qbittorrent-api slack-bolt==1.18.0 # via -r requirements.in slack-sdk==3.21.3 @@ -283,8 +287,6 @@ starlette==0.27.0 # via # -r requirements.in # fastapi -tailer==0.4.1 - # via -r requirements.in torrentool==1.2.0 # via -r requirements.in tqdm==4.66.5 @@ -301,6 +303,8 @@ typing-extensions==4.12.2 # qrcode # sqlalchemy # transmission-rpc +tzdata==2024.2 + # via tzlocal tzlocal==5.2 # via # apscheduler