Skip to content

Commit

Permalink
Merge pull request #2860 from InfinityPacer/feature/setup
Browse files Browse the repository at this point in the history
  • Loading branch information
jxxghp authored Oct 17, 2024
2 parents aeed9fb + e233bc6 commit 8234c29
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 43 deletions.
6 changes: 3 additions & 3 deletions app/api/endpoints/plugin.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from typing import Any, List, Annotated, Optional
from typing import Annotated, Any, List, Optional

from fastapi import APIRouter, Depends, Header

from app import schemas
from app.factory import app
from app.core.config import settings
from app.core.plugin import PluginManager
from app.core.security import verify_token, verify_apikey
from app.core.security import verify_apikey, verify_token
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
from app.factory import app
from app.helper.plugin import PluginHelper
from app.log import logger
from app.scheduler import Scheduler
Expand Down
82 changes: 57 additions & 25 deletions app/core/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import importlib.util
import inspect
import os
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union

from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
Expand All @@ -19,7 +21,7 @@
from app.helper.plugin import PluginHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas.types import SystemConfigKey, EventType
from app.schemas.types import EventType, SystemConfigKey
from app.utils.crypto import RSAUtils
from app.utils.limit import rate_limit_window
from app.utils.object import ObjectUtils
Expand Down Expand Up @@ -271,34 +273,63 @@ def reload_plugin(self, plugin_id: str):
# 广播事件
eventmanager.send_event(EventType.PluginReload, data={"plugin_id": plugin_id})

def sync(self):
def sync(self) -> List[str]:
"""
安装本地不存在的在线插件
"""

def install_plugin(plugin):
start_time = time.time()
state, msg = self.pluginhelper.install(pid=plugin.id, repo_url=plugin.repo_url, force_install=True)
elapsed_time = time.time() - start_time
if state:
logger.info(
f"插件 {plugin.plugin_name} 安装成功,版本:{plugin.plugin_version},耗时:{elapsed_time:.2f} 秒")
sync_plugins.append(plugin.id)
else:
logger.error(
f"插件 {plugin.plugin_name} v{plugin.plugin_version} 安装失败:{msg},耗时:{elapsed_time:.2f} 秒")
failed_plugins.append(plugin.id)

if SystemUtils.is_frozen():
return
logger.info("开始安装第三方插件...")
# 已安装插件
return []

# 获取已安装插件列表
install_plugins = self.systemconfig.get(SystemConfigKey.UserInstalledPlugins) or []
# 在线插件
# 获取在线插件列表
online_plugins = self.get_online_plugins()
if not online_plugins:
logger.error("未获取到第三方插件")
return
# 支持更新的插件自动更新
for plugin in online_plugins:
# 只处理已安装的插件
if plugin.id in install_plugins and not self.is_plugin_exists(plugin.id):
# 下载安装
state, msg = self.pluginhelper.install(pid=plugin.id,
repo_url=plugin.repo_url)
# 安装失败
if not state:
logger.error(
f"插件 {plugin.plugin_name} v{plugin.plugin_version} 安装失败:{msg}")
continue
logger.info(f"插件 {plugin.plugin_name} 安装成功,版本:{plugin.plugin_version}")
logger.info("第三方插件安装完成")
# 确定需要安装的插件
plugins_to_install = [
plugin for plugin in online_plugins
if plugin.id in install_plugins and not self.is_plugin_exists(plugin.id)
]

if not plugins_to_install:
return []
logger.info("开始安装第三方插件...")
sync_plugins = []
failed_plugins = []

# 使用 ThreadPoolExecutor 进行并发安装
total_start_time = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(install_plugin, plugin): plugin
for plugin in plugins_to_install
}
for future in as_completed(futures):
plugin = futures[future]
try:
future.result()
except Exception as exc:
logger.error(f"插件 {plugin.plugin_name} 安装过程中出现异常: {exc}")

total_elapsed_time = time.time() - total_start_time
logger.info(
f"第三方插件安装完成,成功:{len(sync_plugins)} 个,"
f"失败:{len(failed_plugins)} 个,总耗时:{total_elapsed_time:.2f} 秒"
)
return sync_plugins

def get_plugin_config(self, pid: str) -> dict:
"""
Expand Down Expand Up @@ -689,7 +720,8 @@ def is_plugin_exists(pid: str) -> bool:
# 构建包名
package_name = f"app.plugins.{pid.lower()}"
# 检查包是否存在
package_exists = importlib.util.find_spec(package_name) is not None
spec = importlib.util.find_spec(package_name)
package_exists = spec is not None and spec.origin is not None
logger.debug(f"{pid} exists: {package_exists}")
return package_exists
except Exception as e:
Expand Down
68 changes: 63 additions & 5 deletions app/helper/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import shutil
import traceback
from pathlib import Path
from typing import Dict, Tuple, Optional, List, Any
from typing import Any, Dict, List, Optional, Tuple

from cachetools import TTLCache, cached

Expand Down Expand Up @@ -148,18 +148,20 @@ def install_report(self) -> bool:
json={"plugins": [{"plugin_id": plugin} for plugin in plugins]})
return True if res else False

def install(self, pid: str, repo_url: str, package_version: str = None) -> Tuple[bool, str]:
def install(self, pid: str, repo_url: str, package_version: str = None, force_install: bool = False) \
-> Tuple[bool, str]:
"""
安装插件,包括依赖安装和文件下载,相关资源支持自动降级策略
1. 检查并获取插件的指定版本,确认版本兼容性
1. 检查并获取插件的指定版本,确认版本兼容性
2. 从 GitHub 获取文件列表(包括 requirements.txt)
3. 删除旧的插件目录
3. 删除旧的插件目录(如非强制安装则进行备份)
4. 下载并预安装 requirements.txt 中的依赖(如果存在)
5. 下载并安装插件的其他文件
6. 再次尝试安装依赖(确保安装完整)
:param pid: 插件 ID
:param repo_url: 插件仓库地址
:param package_version: 首选插件版本 (如 "v2", "v3"),如不指定则默认使用系统配置的版本
:param force_install: 是否强制安装插件,默认不启用,启用时不进行备份和恢复操作
:return: (是否成功, 错误信息)
"""
if SystemUtils.is_frozen():
Expand Down Expand Up @@ -197,7 +199,11 @@ def install(self, pid: str, repo_url: str, package_version: str = None) -> Tuple
if not file_list:
return False, msg

# 3. 删除旧的插件目录
# 3. 删除旧的插件目录,如果不强制安装则备份
backup_dir = None
if not force_install:
backup_dir = self.__backup_plugin(pid.lower())

self.__remove_old_plugin(pid.lower())

# 4. 查找并安装 requirements.txt 中的依赖,确保插件环境的依赖尽可能完整。依赖安装可能失败且不影响插件安装,目前只记录日志
Expand All @@ -216,6 +222,13 @@ def install(self, pid: str, repo_url: str, package_version: str = None) -> Tuple
success, message = self.__download_files(pid.lower(), file_list, user_repo, package_version, True)
if not success:
logger.error(f"{pid} 下载插件文件失败:{message}")
if backup_dir:
self.__restore_plugin(pid.lower(), backup_dir)
logger.warning(f"{pid} 插件安装失败,已还原备份插件")
else:
self.__remove_old_plugin(pid.lower())
logger.warning(f"{pid} 已清理对应插件目录,请尝试重新安装")

return False, message
else:
logger.info(f"{pid} 下载插件文件成功")
Expand All @@ -225,6 +238,12 @@ def install(self, pid: str, repo_url: str, package_version: str = None) -> Tuple
if dependencies_exist:
if not success:
logger.error(f"{pid} 依赖安装失败:{message}")
if backup_dir:
self.__restore_plugin(pid.lower(), backup_dir)
logger.warning(f"{pid} 插件安装失败,已还原备份插件")
else:
self.__remove_old_plugin(pid.lower())
logger.warning(f"{pid} 已清理对应插件目录,请尝试重新安装")
else:
logger.info(f"{pid} 依赖安装成功")

Expand Down Expand Up @@ -371,6 +390,45 @@ def __install_dependencies_if_required(self, pid: str) -> Tuple[bool, bool, str]

return False, False, "不存在依赖"

@staticmethod
def __backup_plugin(pid: str) -> str:
"""
备份旧插件目录
:param pid: 插件 ID
:return: 备份目录路径
"""
plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid
backup_dir = Path(settings.TEMP_PATH) / "plugins_backup" / pid

if plugin_dir.exists():
# 备份时清理已有的备份目录,防止残留文件影响
if backup_dir.exists():
shutil.rmtree(backup_dir, ignore_errors=True)
logger.debug(f"{pid} 旧的备份目录已清理 {backup_dir}")

shutil.copytree(plugin_dir, backup_dir, dirs_exist_ok=True)
logger.debug(f"{pid} 插件已备份到 {backup_dir}")

return str(backup_dir) if backup_dir.exists() else None

@staticmethod
def __restore_plugin(pid: str, backup_dir: str):
"""
还原旧插件目录
:param pid: 插件 ID
:param backup_dir: 备份目录路径
"""
plugin_dir = Path(settings.ROOT_PATH) / "app" / "plugins" / pid
if plugin_dir.exists():
shutil.rmtree(plugin_dir, ignore_errors=True)
logger.debug(f"{pid} 已清理插件目录 {plugin_dir}")

if Path(backup_dir).exists():
shutil.copytree(backup_dir, plugin_dir, dirs_exist_ok=True)
logger.debug(f"{pid} 已还原插件目录 {plugin_dir}")
shutil.rmtree(backup_dir, ignore_errors=True)
logger.debug(f"{pid} 已删除备份目录 {backup_dir}")

@staticmethod
def __remove_old_plugin(pid: str):
"""
Expand Down
11 changes: 8 additions & 3 deletions app/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,7 @@ def user_auth():
}
)

# 注册插件公共服务
for pid in PluginManager().get_running_plugin_ids():
self.update_plugin_job(pid)
self.init_plugin_jobs()

# 打印服务
logger.debug(self._scheduler.print_jobs())
Expand Down Expand Up @@ -410,6 +408,13 @@ def start(self, job_id: str, *args, **kwargs):
except KeyError:
pass

def init_plugin_jobs(self):
"""
注册插件公共服务
"""
for pid in PluginManager().get_running_plugin_ids():
self.update_plugin_job(pid)

def update_plugin_job(self, pid: str):
"""
更新插件定时服务
Expand Down
22 changes: 17 additions & 5 deletions app/startup/lifecycle.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
from contextlib import asynccontextmanager

from fastapi import FastAPI

from app.startup.module_initializer import start_modules, shutdown_modules
from app.startup.routers import init_routers
from app.startup.modules_initializer import shutdown_modules, start_modules
from app.startup.plugins_initializer import init_plugins_async
from app.startup.routers_initializer import init_routers


@asynccontextmanager
Expand All @@ -14,6 +16,16 @@ async def lifespan(app: FastAPI):
print("Starting up...")
start_modules(app)
init_routers(app)
yield
print("Shutting down...")
shutdown_modules(app)
plugin_init_task = asyncio.create_task(init_plugins_async())
try:
yield
finally:
print("Shutting down...")
try:
plugin_init_task.cancel()
await plugin_init_task
except asyncio.CancelledError:
print("Plugin installation task cancelled.")
except Exception as e:
print(f"Error during plugin installation shutdown: {e}")
shutdown_modules(app)
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ def start_modules(_: FastAPI):
ModuleManager()
# 启动事件消费
EventManager().start()
# 安装在线插件
PluginManager().sync()
# 加载插件
PluginManager().start()
# 启动监控任务
Expand Down
37 changes: 37 additions & 0 deletions app/startup/plugins_initializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio

from app.core.plugin import PluginManager
from app.log import logger
from app.scheduler import Scheduler


async def init_plugins_async():
"""
初始化安装插件,并动态注册后台任务及API
"""
try:
loop = asyncio.get_event_loop()
plugin_manager = PluginManager()
scheduler = Scheduler()
sync_plugins = await loop.run_in_executor(None, plugin_manager.sync)
if not sync_plugins:
return
# 为避免初始化插件异常,这里所有插件都进行初始化
logger.info(f"已同步安装 {len(sync_plugins)} 个在线插件,正在初始化所有插件")
# 安装完成后重新初始化插件
plugin_manager.init_config()
# 插件启动后注册后台任务
scheduler.init_plugin_jobs()
# 插件启动后注册插件API
register_plugin_api()
logger.info("所有插件初始化完成")
except Exception as e:
logger.error(f"插件初始化过程中出现异常: {e}")


def register_plugin_api():
"""
插件启动后注册插件API
"""
from app.api.endpoints import plugin
plugin.register_plugin_api()
File renamed without changes.

0 comments on commit 8234c29

Please sign in to comment.