Skip to content

Commit

Permalink
Merge pull request #2863 from InfinityPacer/feature/setup
Browse files Browse the repository at this point in the history
  • Loading branch information
jxxghp authored Oct 17, 2024
2 parents a4bf59a + 4accd5d commit 1190d8d
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 76 deletions.
88 changes: 54 additions & 34 deletions app/api/endpoints/system.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import asyncio
import io
import json
import tempfile
import time
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Optional, Union

import tailer
import aiofiles
from PIL import Image
from fastapi import APIRouter, Depends, HTTPException, Header, Response
from fastapi import APIRouter, Depends, HTTPException, Header, Request, Response
from fastapi.responses import StreamingResponse

from app import schemas
Expand Down Expand Up @@ -224,19 +225,22 @@ def set_env_setting(env: dict,


@router.get("/progress/{process_type}", summary="实时进度")
def get_progress(process_type: str, _: schemas.TokenPayload = Depends(verify_resource_token)):
async def get_progress(request: Request, process_type: str, _: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取处理进度,返回格式为SSE
"""
progress = ProgressHelper()

def event_generator():
while True:
if global_vars.is_system_stopped:
break
detail = progress.get(process_type)
yield 'data: %s\n\n' % json.dumps(detail)
time.sleep(0.2)
async def event_generator():
try:
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
detail = progress.get(process_type)
yield f"data: {json.dumps(detail)}\n\n"
await asyncio.sleep(0.2)
except asyncio.CancelledError:
return

return StreamingResponse(event_generator(), media_type="text/event-stream")

Expand Down Expand Up @@ -273,26 +277,29 @@ def set_setting(key: str, value: Union[list, dict, bool, int, str] = None,


@router.get("/message", summary="实时消息")
def get_message(role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)):
async def get_message(request: Request, role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取系统消息,返回格式为SSE
"""
message = MessageHelper()

def event_generator():
while True:
if global_vars.is_system_stopped:
break
detail = message.get(role)
yield 'data: %s\n\n' % (detail or '')
time.sleep(3)
async def event_generator():
try:
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
detail = message.get(role)
yield f"data: {detail or ''}\n\n"
await asyncio.sleep(3)
except asyncio.CancelledError:
return

return StreamingResponse(event_generator(), media_type="text/event-stream")


@router.get("/logging", summary="实时日志")
def get_logging(length: int = 50, logfile: str = "moviepilot.log",
_: schemas.TokenPayload = Depends(verify_resource_token)):
async def get_logging(request: Request, length: int = 50, logfile: str = "moviepilot.log",
_: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取系统日志
length = -1 时, 返回text/plain
Expand All @@ -306,27 +313,40 @@ def get_logging(length: int = 50, logfile: str = "moviepilot.log",
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Not Found")

def log_generator():
# 读取文件末尾50行,不使用tailer模块
with open(log_path, 'r', encoding='utf-8') as f:
for line in f.readlines()[-max(length, 50):]:
yield 'data: %s\n\n' % line
while True:
if global_vars.is_system_stopped:
break
for t in tailer.follow(open(log_path, 'r', encoding='utf-8')):
yield 'data: %s\n\n' % (t or '')
time.sleep(1)
async def log_generator():
try:
# 使用固定大小的双向队列来限制内存使用
lines_queue = deque(maxlen=max(length, 50))
# 使用 aiofiles 异步读取文件
async with aiofiles.open(log_path, mode="r", encoding="utf-8") as f:
# 逐行读取文件,将每一行存入队列
file_content = await f.read()
for line in file_content.splitlines():
lines_queue.append(line)
for line in lines_queue:
yield f"data: {line}\n\n"
# 移动文件指针到文件末尾,继续监听新增内容
await f.seek(0, 2)
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
line = await f.readline()
if not line:
await asyncio.sleep(0.5)
continue
yield f"data: {line}\n\n"
except asyncio.CancelledError:
return

# 根据length参数返回不同的响应
if length == -1:
# 返回全部日志作为文本响应
if not log_path.exists():
return Response(content="日志文件不存在!", media_type="text/plain")
with open(log_path, 'r', encoding='utf-8') as file:
with open(log_path, "r", encoding='utf-8') as file:
text = file.read()
# 倒序输出
text = '\n'.join(text.split('\n')[::-1])
text = "\n".join(text.split("\n")[::-1])
return Response(content=text, media_type="text/plain")
else:
# 返回SSE流响应
Expand Down
3 changes: 3 additions & 0 deletions app/chain/mediaserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from app import schemas
from app.chain import ChainBase
from app.core.config import global_vars
from app.db.mediaserver_oper import MediaServerOper
from app.helper.service import ServiceConfigHelper
from app.log import logger
Expand Down Expand Up @@ -134,6 +135,8 @@ def sync(self):
logger.info(f"正在同步 {server_name} 媒体库 {library.name} ...")
library_count = 0
for item in self.items(server=server_name, library_id=library.id):
if global_vars.is_system_stopped:
return
if not item or not item.item_id:
continue
logger.debug(f"正在同步 {item.title} ...")
Expand Down
2 changes: 1 addition & 1 deletion app/core/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级
MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数
INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 1 # 事件队列空闲时的初始超时时间(秒)
MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的最大超时时间(秒)
MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 5 # 事件队列空闲时的最大超时时间(秒)


class Event:
Expand Down
3 changes: 2 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

# uvicorn服务
Server = uvicorn.Server(Config(app, host=settings.HOST, port=settings.PORT,
reload=settings.DEV, workers=multiprocessing.cpu_count()))
reload=settings.DEV, workers=multiprocessing.cpu_count(),
timeout_graceful_shutdown=5))


def start_tray():
Expand Down
2 changes: 1 addition & 1 deletion app/modules/telegram/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def run_polling():
logger.error(f"Telegram消息接收服务异常:{str(err)}")

# 启动线程来运行 infinity_polling
self._polling_thread = threading.Thread(target=run_polling)
self._polling_thread = threading.Thread(target=run_polling, daemon=True)
self._polling_thread.start()
logger.info("Telegram消息接收服务启动")

Expand Down
2 changes: 1 addition & 1 deletion app/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def init(self):
self.stop()

# 启动文件整理线程
self._transfer_thread = threading.Thread(target=self.__start_transfer)
self._transfer_thread = threading.Thread(target=self.__start_transfer, daemon=True)
self._transfer_thread.start()

# 读取目录配置
Expand Down
6 changes: 6 additions & 0 deletions app/startup/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,24 @@ async def lifespan(app: FastAPI):
定义应用的生命周期事件
"""
print("Starting up...")
# 启动模块
start_modules(app)
# 初始化路由
init_routers(app)
# 初始化插件
plugin_init_task = asyncio.create_task(init_plugins_async())
try:
# 在此处 yield,表示应用已经启动,控制权交回 FastAPI 主事件循环
yield
finally:
print("Shutting down...")
try:
# 取消插件初始化
plugin_init_task.cancel()
await plugin_init_task
except asyncio.CancelledError:
print("Plugin installation task cancelled.")
except Exception as e:
print(f"Error during plugin installation shutdown: {e}")
# 清理模块
shutdown_modules(app)
25 changes: 3 additions & 22 deletions app/startup/modules_initializer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import signal
import sys
from types import FrameType

from fastapi import FastAPI

from app.core.config import settings, global_vars
from app.core.config import global_vars, settings
from app.core.module import ModuleManager
from app.utils.system import SystemUtils

Expand Down Expand Up @@ -89,27 +87,12 @@ def check_auth():
)


def singal_handle():
"""
监听停止信号
"""

def stop_event(signum: int, _: FrameType):
"""
SIGTERM信号处理
"""
print(f"接收到停止信号:{signum},正在停止系统...")
global_vars.stop_system()

# 设置信号处理程序
signal.signal(signal.SIGTERM, stop_event)
signal.signal(signal.SIGINT, stop_event)


def shutdown_modules(_: FastAPI):
"""
服务关闭
"""
# 停止信号
global_vars.stop_system()
# 停止模块
ModuleManager().stop()
# 停止插件
Expand Down Expand Up @@ -159,5 +142,3 @@ def start_modules(_: FastAPI):
start_frontend()
# 检查认证状态
check_auth()
# 监听停止信号
singal_handle()
4 changes: 2 additions & 2 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ psutil~=5.9.4
python-dotenv~=1.0.1
python-hosts~=1.0.7
watchdog~=3.0.0
tailer~=0.4.1
openai~=0.27.2
cacheout~=0.14.1
click~=8.1.6
Expand All @@ -61,4 +60,5 @@ Pinyin2Hanzi~=0.1.1
pywebpush~=2.0.0
py115j~=0.0.7
oss2~=2.18.6
aligo~=6.2.4
aligo~=6.2.4
aiofiles~=24.1.0
32 changes: 18 additions & 14 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# pip-compile requirements.in
#
aiofiles==24.1.0
# via -r requirements.in
aiohappyeyeballs==2.4.0
# via aiohttp
aiohttp==3.10.5
Expand Down Expand Up @@ -63,6 +65,11 @@ click==8.1.7
# uvicorn
cn2an==0.5.22
# via -r requirements.in
colorama==0.4.6
# via
# click
# qrcode
# tqdm
coloredlogs==15.0.1
# via aligo
crcmod==1.7
Expand Down Expand Up @@ -95,7 +102,9 @@ frozenlist==1.4.1
func-timeout==4.3.5
# via -r requirements.in
greenlet==2.0.2
# via playwright
# via
# playwright
# sqlalchemy
h11==0.14.0
# via
# httpcore
Expand Down Expand Up @@ -139,7 +148,9 @@ openai==0.27.10
oss2==2.18.6
# via -r requirements.in
packaging==24.1
# via docker
# via
# docker
# qbittorrent-api
parse==1.19.1
# via -r requirements.in
passlib==1.7.4
Expand Down Expand Up @@ -181,14 +192,6 @@ pyee==9.0.4
# via playwright
pyjwt==2.7.0
# via -r requirements.in
pyobjc-core==10.3.1
# via
# pyobjc-framework-cocoa
# pyobjc-framework-quartz
pyobjc-framework-cocoa==10.3.1
# via pyobjc-framework-quartz
pyobjc-framework-quartz==10.3.1
# via pystray
pyotp==2.9.0
# via -r requirements.in
pyparsing==3.0.9
Expand All @@ -197,6 +200,8 @@ pypng==0.20220715.0
# via qrcode
pyquery==2.0.0
# via -r requirements.in
pyreadline3==3.5.4
# via humanfriendly
pysocks==1.7.1
# via requests
pystray==0.19.5
Expand All @@ -223,7 +228,7 @@ pyvirtualdisplay==3.0
# via -r requirements.in
pywebpush==2.0.0
# via -r requirements.in
qbittorrent-api==2023.5.48
qbittorrent-api==2024.9.67
# via -r requirements.in
qrcode[pil]==7.4.2
# via
Expand Down Expand Up @@ -262,7 +267,6 @@ six==1.16.0
# pystray
# python-dateutil
# pywebpush
# qbittorrent-api
slack-bolt==1.18.0
# via -r requirements.in
slack-sdk==3.21.3
Expand All @@ -283,8 +287,6 @@ starlette==0.27.0
# via
# -r requirements.in
# fastapi
tailer==0.4.1
# via -r requirements.in
torrentool==1.2.0
# via -r requirements.in
tqdm==4.66.5
Expand All @@ -301,6 +303,8 @@ typing-extensions==4.12.2
# qrcode
# sqlalchemy
# transmission-rpc
tzdata==2024.2
# via tzlocal
tzlocal==5.2
# via
# apscheduler
Expand Down

0 comments on commit 1190d8d

Please sign in to comment.