diff --git a/app/api/endpoints/plugin.py b/app/api/endpoints/plugin.py index f280d4130..3d83a4ed3 100644 --- a/app/api/endpoints/plugin.py +++ b/app/api/endpoints/plugin.py @@ -1,14 +1,14 @@ -from typing import Any, List, Annotated, Optional +from typing import Annotated, Any, List, Optional from fastapi import APIRouter, Depends, Header from app import schemas -from app.factory import app from app.core.config import settings from app.core.plugin import PluginManager -from app.core.security import verify_token, verify_apikey +from app.core.security import verify_apikey, verify_token from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_superuser +from app.factory import app from app.helper.plugin import PluginHelper from app.log import logger from app.scheduler import Scheduler diff --git a/app/core/plugin.py b/app/core/plugin.py index 271be6aee..7536d0c9f 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -3,9 +3,11 @@ import importlib.util import inspect import os +import time import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer @@ -19,7 +21,7 @@ from app.helper.plugin import PluginHelper from app.helper.sites import SitesHelper from app.log import logger -from app.schemas.types import SystemConfigKey, EventType +from app.schemas.types import EventType, SystemConfigKey from app.utils.crypto import RSAUtils from app.utils.limit import rate_limit_window from app.utils.object import ObjectUtils @@ -271,34 +273,63 @@ def reload_plugin(self, plugin_id: str): # 广播事件 eventmanager.send_event(EventType.PluginReload, data={"plugin_id": plugin_id}) - def sync(self): + def sync(self) -> List[str]: """ 安装本地不存在的在线插件 """ + + def install_plugin(plugin): + start_time = time.time() + state, msg = self.pluginhelper.install(pid=plugin.id, repo_url=plugin.repo_url, force_install=True) + elapsed_time = time.time() - start_time + if state: + logger.info( + f"插件 {plugin.plugin_name} 安装成功,版本:{plugin.plugin_version},耗时:{elapsed_time:.2f} 秒") + sync_plugins.append(plugin.id) + else: + logger.error( + f"插件 {plugin.plugin_name} v{plugin.plugin_version} 安装失败:{msg},耗时:{elapsed_time:.2f} 秒") + failed_plugins.append(plugin.id) + if SystemUtils.is_frozen(): - return - logger.info("开始安装第三方插件...") - # 已安装插件 + return [] + + # 获取已安装插件列表 install_plugins = self.systemconfig.get(SystemConfigKey.UserInstalledPlugins) or [] - # 在线插件 + # 获取在线插件列表 online_plugins = self.get_online_plugins() - if not online_plugins: - logger.error("未获取到第三方插件") - return - # 支持更新的插件自动更新 - for plugin in online_plugins: - # 只处理已安装的插件 - if plugin.id in install_plugins and not self.is_plugin_exists(plugin.id): - # 下载安装 - state, msg = self.pluginhelper.install(pid=plugin.id, - repo_url=plugin.repo_url) - # 安装失败 - if not state: - logger.error( - f"插件 {plugin.plugin_name} v{plugin.plugin_version} 安装失败:{msg}") - continue - logger.info(f"插件 {plugin.plugin_name} 安装成功,版本:{plugin.plugin_version}") - logger.info("第三方插件安装完成") + # 确定需要安装的插件 + plugins_to_install = [ + plugin for plugin in online_plugins + if plugin.id in install_plugins and not self.is_plugin_exists(plugin.id) + ] + + if not plugins_to_install: + return [] + logger.info("开始安装第三方插件...") + sync_plugins = [] + failed_plugins = [] + + # 使用 ThreadPoolExecutor 进行并发安装 + total_start_time = time.time() + with ThreadPoolExecutor(max_workers=5) as executor: + futures = { + executor.submit(install_plugin, plugin): plugin + for plugin in plugins_to_install + } + for future in as_completed(futures): + plugin = futures[future] + try: + future.result() + except Exception as exc: + logger.error(f"插件 {plugin.plugin_name} 安装过程中出现异常: {exc}") + + total_elapsed_time = time.time() - total_start_time + logger.info( + f"第三方插件安装完成,成功:{len(sync_plugins)} 个," + f"失败:{len(failed_plugins)} 个,总耗时:{total_elapsed_time:.2f} 秒" + ) + return sync_plugins def get_plugin_config(self, pid: str) -> dict: """ @@ -689,7 +720,8 @@ def is_plugin_exists(pid: str) -> bool: # 构建包名 package_name = f"app.plugins.{pid.lower()}" # 检查包是否存在 - package_exists = importlib.util.find_spec(package_name) is not None + spec = importlib.util.find_spec(package_name) + package_exists = spec is not None and spec.origin is not None logger.debug(f"{pid} exists: {package_exists}") return package_exists except Exception as e: diff --git a/app/helper/plugin.py b/app/helper/plugin.py index caacfff86..a9a2e7c5a 100644 --- a/app/helper/plugin.py +++ b/app/helper/plugin.py @@ -2,7 +2,7 @@ import shutil import traceback from pathlib import Path -from typing import Dict, Tuple, Optional, List, Any +from typing import Any, Dict, List, Optional, Tuple from cachetools import TTLCache, cached @@ -148,18 +148,20 @@ def install_report(self) -> bool: json={"plugins": [{"plugin_id": plugin} for plugin in plugins]}) return True if res else False - def install(self, pid: str, repo_url: str, package_version: str = None) -> Tuple[bool, str]: + def install(self, pid: str, repo_url: str, package_version: str = None, force_install: bool = False) \ + -> Tuple[bool, str]: """ 安装插件,包括依赖安装和文件下载,相关资源支持自动降级策略 - 1. 检查并获取插件的指定版本,确认版本兼容性。 + 1. 检查并获取插件的指定版本,确认版本兼容性 2. 从 GitHub 获取文件列表(包括 requirements.txt) - 3. 删除旧的插件目录 + 3. 删除旧的插件目录(如非强制安装则进行备份) 4. 下载并预安装 requirements.txt 中的依赖(如果存在) 5. 下载并安装插件的其他文件 6. 再次尝试安装依赖(确保安装完整) :param pid: 插件 ID :param repo_url: 插件仓库地址 :param package_version: 首选插件版本 (如 "v2", "v3"),如不指定则默认使用系统配置的版本 + :param force_install: 是否强制安装插件,默认不启用,启用时不进行备份和恢复操作 :return: (是否成功, 错误信息) """ if SystemUtils.is_frozen(): @@ -197,7 +199,11 @@ def install(self, pid: str, repo_url: str, package_version: str = None) -> Tuple if not file_list: return False, msg - # 3. 删除旧的插件目录 + # 3. 删除旧的插件目录,如果不强制安装则备份 + backup_dir = None + if not force_install: + backup_dir = self.__backup_plugin(pid.lower()) + self.__remove_old_plugin(pid.lower()) # 4. 查找并安装 requirements.txt 中的依赖,确保插件环境的依赖尽可能完整。依赖安装可能失败且不影响插件安装,目前只记录日志 @@ -216,6 +222,13 @@ def install(self, pid: str, repo_url: str, package_version: str = None) -> Tuple success, message = self.__download_files(pid.lower(), file_list, user_repo, package_version, True) if not success: logger.error(f"{pid} 下载插件文件失败:{message}") + if backup_dir: + self.__restore_plugin(pid.lower(), backup_dir) + logger.warning(f"{pid} 插件安装失败,已还原备份插件") + else: + self.__remove_old_plugin(pid.lower()) + logger.warning(f"{pid} 已清理对应插件目录,请尝试重新安装") + return False, message else: logger.info(f"{pid} 下载插件文件成功") @@ -225,6 +238,12 @@ def install(self, pid: str, repo_url: str, package_version: str = None) -> Tuple if dependencies_exist: if not success: logger.error(f"{pid} 依赖安装失败:{message}") + if backup_dir: + self.__restore_plugin(pid.lower(), backup_dir) + logger.warning(f"{pid} 插件安装失败,已还原备份插件") + else: + self.__remove_old_plugin(pid.lower()) + logger.warning(f"{pid} 已清理对应插件目录,请尝试重新安装") else: logger.info(f"{pid} 依赖安装成功") @@ -371,6 +390,45 @@ def __install_dependencies_if_required(self, pid: str) -> Tuple[bool, bool, str] return False, False, "不存在依赖" + @staticmethod + def __backup_plugin(pid: str) -> str: + """ + 备份旧插件目录 + :param pid: 插件 ID + :return: 备份目录路径 + """ + plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid + backup_dir = Path(settings.TEMP_PATH) / "plugins_backup" / pid + + if plugin_dir.exists(): + # 备份时清理已有的备份目录,防止残留文件影响 + if backup_dir.exists(): + shutil.rmtree(backup_dir, ignore_errors=True) + logger.debug(f"{pid} 旧的备份目录已清理 {backup_dir}") + + shutil.copytree(plugin_dir, backup_dir, dirs_exist_ok=True) + logger.debug(f"{pid} 插件已备份到 {backup_dir}") + + return str(backup_dir) if backup_dir.exists() else None + + @staticmethod + def __restore_plugin(pid: str, backup_dir: str): + """ + 还原旧插件目录 + :param pid: 插件 ID + :param backup_dir: 备份目录路径 + """ + plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid + if plugin_dir.exists(): + shutil.rmtree(plugin_dir, ignore_errors=True) + logger.debug(f"{pid} 已清理插件目录 {plugin_dir}") + + if Path(backup_dir).exists(): + shutil.copytree(backup_dir, plugin_dir, dirs_exist_ok=True) + logger.debug(f"{pid} 已还原插件目录 {plugin_dir}") + shutil.rmtree(backup_dir, ignore_errors=True) + logger.debug(f"{pid} 已删除备份目录 {backup_dir}") + @staticmethod def __remove_old_plugin(pid: str): """ diff --git a/app/scheduler.py b/app/scheduler.py index 4f151f604..ae4c42df1 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -359,9 +359,7 @@ def user_auth(): } ) - # 注册插件公共服务 - for pid in PluginManager().get_running_plugin_ids(): - self.update_plugin_job(pid) + self.init_plugin_jobs() # 打印服务 logger.debug(self._scheduler.print_jobs()) @@ -410,6 +408,13 @@ def start(self, job_id: str, *args, **kwargs): except KeyError: pass + def init_plugin_jobs(self): + """ + 注册插件公共服务 + """ + for pid in PluginManager().get_running_plugin_ids(): + self.update_plugin_job(pid) + def update_plugin_job(self, pid: str): """ 更新插件定时服务 diff --git a/app/startup/lifecycle.py b/app/startup/lifecycle.py index 7bb0e7d3f..71e0011c6 100644 --- a/app/startup/lifecycle.py +++ b/app/startup/lifecycle.py @@ -1,9 +1,11 @@ +import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI -from app.startup.module_initializer import start_modules, shutdown_modules -from app.startup.routers import init_routers +from app.startup.modules_initializer import shutdown_modules, start_modules +from app.startup.plugins_initializer import init_plugins_async +from app.startup.routers_initializer import init_routers @asynccontextmanager @@ -14,6 +16,16 @@ async def lifespan(app: FastAPI): print("Starting up...") start_modules(app) init_routers(app) - yield - print("Shutting down...") - shutdown_modules(app) + plugin_init_task = asyncio.create_task(init_plugins_async()) + try: + yield + finally: + print("Shutting down...") + try: + plugin_init_task.cancel() + await plugin_init_task + except asyncio.CancelledError: + print("Plugin installation task cancelled.") + except Exception as e: + print(f"Error during plugin installation shutdown: {e}") + shutdown_modules(app) diff --git a/app/startup/module_initializer.py b/app/startup/modules_initializer.py similarity index 98% rename from app/startup/module_initializer.py rename to app/startup/modules_initializer.py index 1bd86a89f..1e06892fb 100644 --- a/app/startup/module_initializer.py +++ b/app/startup/modules_initializer.py @@ -147,8 +147,6 @@ def start_modules(_: FastAPI): ModuleManager() # 启动事件消费 EventManager().start() - # 安装在线插件 - PluginManager().sync() # 加载插件 PluginManager().start() # 启动监控任务 diff --git a/app/startup/plugins_initializer.py b/app/startup/plugins_initializer.py new file mode 100644 index 000000000..c359d3241 --- /dev/null +++ b/app/startup/plugins_initializer.py @@ -0,0 +1,37 @@ +import asyncio + +from app.core.plugin import PluginManager +from app.log import logger +from app.scheduler import Scheduler + + +async def init_plugins_async(): + """ + 初始化安装插件,并动态注册后台任务及API + """ + try: + loop = asyncio.get_event_loop() + plugin_manager = PluginManager() + scheduler = Scheduler() + sync_plugins = await loop.run_in_executor(None, plugin_manager.sync) + if not sync_plugins: + return + # 为避免初始化插件异常,这里所有插件都进行初始化 + logger.info(f"已同步安装 {len(sync_plugins)} 个在线插件,正在初始化所有插件") + # 安装完成后重新初始化插件 + plugin_manager.init_config() + # 插件启动后注册后台任务 + scheduler.init_plugin_jobs() + # 插件启动后注册插件API + register_plugin_api() + logger.info("所有插件初始化完成") + except Exception as e: + logger.error(f"插件初始化过程中出现异常: {e}") + + +def register_plugin_api(): + """ + 插件启动后注册插件API + """ + from app.api.endpoints import plugin + plugin.register_plugin_api() diff --git a/app/startup/routers.py b/app/startup/routers_initializer.py similarity index 100% rename from app/startup/routers.py rename to app/startup/routers_initializer.py