From 8c1159cca36f1848843cad2b47d9a0c030a89481 Mon Sep 17 00:00:00 2001 From: arily Date: Wed, 26 Jun 2024 18:00:05 +0800 Subject: [PATCH] add binding feature --- bot.py | 1 - models/__init__.py | 22 ++++++++ plugins/sb_bind/__init__.py | 84 ++++++++++++++++++++++++++++ plugins/sb_kicker/__init__.py | 101 ++++------------------------------ plugins/sb_kicker/enum.py | 17 +++++- plugins/sb_kicker/utils.py | 7 +-- requirements.txt | Bin 2248 -> 2280 bytes test_source/test_sb_kicker.py | 2 - utils/__init__.py | 35 ++++++++++++ utils/qq_helper.py | 35 ++++++++++-- 10 files changed, 200 insertions(+), 104 deletions(-) create mode 100644 plugins/sb_bind/__init__.py create mode 100644 utils/__init__.py diff --git a/bot.py b/bot.py index 4119069..69cf113 100644 --- a/bot.py +++ b/bot.py @@ -1,4 +1,3 @@ -import dotenv import nonebot from nonebot.adapters.onebot.v11 import Adapter as OneBotV11Adapter from init.database import init as database_init, disconnect as database_disconnect diff --git a/models/__init__.py b/models/__init__.py index e69de29..4e432d3 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -0,0 +1,22 @@ +from typing import List + +from models.db import Accounts +from models.enums import MemberStatus +from models.types import JoinedGroupMemberInfo + + +async def sync_in_group(members: List[JoinedGroupMemberInfo]): + await ( + Accounts + .filter( + id__in=[members['id'] for members in members], + status=MemberStatus.Removed + ) + .update( + status=MemberStatus.InGroup + ) + ) + for member in members: + member['status'] = MemberStatus.InGroup \ + if member['status'] == MemberStatus.Removed \ + else member['status'] diff --git a/plugins/sb_bind/__init__.py b/plugins/sb_bind/__init__.py new file mode 100644 index 0000000..f5d8055 --- /dev/null +++ b/plugins/sb_bind/__init__.py @@ -0,0 +1,84 @@ +import os + +import dotenv +from loguru import logger +from nonebot import on_command +from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot +from nonebot.adapters.onebot.v11.message import Message +from nonebot.internal.matcher import Matcher +from nonebot.internal.params import Arg +from nonebot.params import CommandArg +from nonebot.rule import Rule +from nonebot.typing import T_State +from result import as_result + +from models import Accounts +from plugins.sb_kicker import merge_onebot_data_with_db_result +from utils import J_G_M_to_saveable +from utils.qq_helper import fmt_user, is_sender_admin + +dotenv.load_dotenv() +SB_GROUP_ID = int(os.getenv("SB_GROUP_ID") or "-1") + +bind = on_command( + "bind", + rule=Rule(is_sender_admin), +) + + +@bind.handle() +def _(matcher: Matcher, state: T_State, args: Message = CommandArg()) -> None: + state['parsed'] = {} + v = as_result(Exception)(lambda: args.extract_plain_text().split(" "))().unwrap_or([]) + v = [i.strip() for i in v] + sb_id = as_result(IndexError, ValueError)(lambda: int(v[0]))().ok() + qq_id = as_result(IndexError, ValueError)(lambda: int(v[1]))().ok() + + if sb_id is not None: + state['parsed']['sb_id'] = sb_id + matcher.set_arg('sb_id', Message(str(sb_id))) + + if qq_id is not None: + state['parsed']['qq_id'] = qq_id + matcher.set_arg('qq_id', Message(str(qq_id))) + + +@bind.got('sb_id', 'sb-id') +async def read_sb_id(state: T_State, sb_id: str = Arg()) -> None: + s_id = as_result(ValueError)(lambda: int(sb_id))().ok() + + if s_id is None: + await bind.finish('invalid sb id') + + state['parsed']['sb_id'] = s_id + + +@bind.got('qq_id', 'qq') +async def read_qq_id(bot: OnebotV11Bot, state: T_State, qq_id: str = Arg()) -> None: + s_id = as_result(ValueError)(lambda: int(qq_id))().ok() + + if s_id is None: + await bind.finish('invalid qq id') + + state['parsed']['qq_id'] = s_id + + +@bind.handle() +async def fin(bot: OnebotV11Bot, state: T_State): + parsed = state['parsed'] + if parsed['qq_id'] is None or parsed['sb_id'] is None: + await bind.finish('missing args') + + try: + qq_user = await bot.get_group_member_info(group_id=SB_GROUP_ID, user_id=int(parsed['qq_id'])) + db_user = await Accounts.get_or_none(group_id=SB_GROUP_ID, qq_id=parsed['qq_id']) + + merged_view = merge_onebot_data_with_db_result(qq_user, db_user) + merged_view['sb_id'] = parsed['sb_id'] or merged_view['sb_id'] + + write_data = J_G_M_to_saveable(merged_view) + await Accounts.update_or_create(id=merged_view['id'], defaults=write_data) + await bind.send(fmt_user(merged_view)) + except Exception as e: + logger.opt(exception=True).error(e) + await bind.send('error occurred') diff --git a/plugins/sb_kicker/__init__.py b/plugins/sb_kicker/__init__.py index 4f97847..ad03be8 100644 --- a/plugins/sb_kicker/__init__.py +++ b/plugins/sb_kicker/__init__.py @@ -1,10 +1,8 @@ -from enum import Enum - from loguru import logger -from models.db import Admins + +from models import sync_in_group from nonebot import on_fullmatch from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot -from nonebot.adapters.onebot.v11 import MessageEvent from nonebot.adapters.onebot.v11.message import Message from nonebot.internal.matcher import Matcher from nonebot.internal.params import Arg @@ -12,44 +10,16 @@ from nonebot.rule import Rule from nonebot.typing import T_State -from utils.qq_helper import is_admin - -from .enum import PluginStatus +from utils import parse_ranges, J_G_M_to_saveable +from utils.qq_helper import fmt_user, is_sender_admin, is_bot_group_admin +from .enum import PluginStatus, Action, cn_names, actions from .utils import * +dotenv.load_dotenv() SB_GROUP_ID = int(os.getenv("SB_GROUP_ID") or "-1") -class Action(str, Enum): - SyncEntries = "S" - MarkPendingRemoval = "M" - Kick = "K" - - -cn_names = { - Action.SyncEntries: '同步数据库', - Action.MarkPendingRemoval: '通知并标记', - Action.Kick: '移除', -} - -actions = set(item for item in Action) - - -# class State(TypedDict): -# trigger_time: float -# sorted: List[JoinedGroupMemberInfo] -# action: Action - - -async def checker_common(event: MessageEvent) -> bool: - return await Admins.exists(qq_id=event.sender.user_id) - - -async def checker_is_admin(bot: OnebotV11Bot): - return await is_admin(bot, SB_GROUP_ID) - - -async def checker_is_plugin_idle() -> bool: +async def is_plugin_idle() -> bool: cache, _ = await Caches.get_or_create( key="sb_kicker_status", defaults={"value": PluginStatus.Idle.value} ) @@ -58,7 +28,7 @@ async def checker_is_plugin_idle() -> bool: sb_kicker = on_fullmatch( "sb服送人", - rule=Rule(checker_common, checker_is_admin, checker_is_plugin_idle), + rule=Rule(is_sender_admin, is_bot_group_admin, is_plugin_idle), ) sb_kicker_force = on_fullmatch("sb服送人 --force") @@ -124,7 +94,7 @@ async def prompt_range(bot: OnebotV11Bot, state: T_State, pending_range: Message if max(*picked_ids, len(members)) > len(members): await sb_kicker.finish("取消本次操作, 尝试踢出太多群友") - picked = find_all(members, lambda m, i: i in picked_ids) + picked = filter_in(members, lambda m, i: i in picked_ids) state['marked'] = picked except Exception as e: logger.opt(exception=True).error(e) @@ -161,12 +131,7 @@ async def work(bot: OnebotV11Bot, state: T_State, confirm: Message = Arg()): case Action.SyncEntries: for member in picked: member['status'] = MemberStatus.PendingRemoval - await Accounts.update_or_create(**{ - k: v for k, v in member.items() if ( - v is not None and - k in Accounts._meta.fields_map.keys() - ) - }) + await Accounts.update_or_create(**J_G_M_to_saveable(member)) case Action.MarkPendingRemoval: try: await ( @@ -222,49 +187,3 @@ async def work(bot: OnebotV11Bot, state: T_State, confirm: Message = Arg()): except Exception as e: logger.opt(exception=True).error(e) await sb_kicker.finish("出现未知错误,请查看日志。") - - -def parse_ranges(input_str: str, min_range: int, max_range: int): - # Split the input string by commas - parts = input_str.split(',') - result = set() - - # Process each part - for part in parts: - part = part.strip() - if '-' in part: - start, end = part.split('-') - start = int(start) if start else min_range - end = int(end) if end else max_range - - if start is not None and end is not None: - result.update([*range(start, end)]) - elif start is not None: - result.add(start) - else: - if part: - result.add(int(part)) - - # Convert the set to a sorted list - return sorted(result) - - -def fmt_user(member: JoinedGroupMemberInfo): - return f"{member['nickname']}(qq = {member['qq_id']}, sb = {member['sb_id']})" - - -async def sync_in_group(members: List[JoinedGroupMemberInfo]): - await ( - Accounts - .filter( - id__in=[members['id'] for members in members], - status=MemberStatus.Removed - ) - .update( - status=MemberStatus.InGroup - ) - ) - for member in members: - member['status'] = MemberStatus.InGroup \ - if member['status'] == MemberStatus.Removed \ - else member['status'] diff --git a/plugins/sb_kicker/enum.py b/plugins/sb_kicker/enum.py index 4a7b712..98d8c4d 100644 --- a/plugins/sb_kicker/enum.py +++ b/plugins/sb_kicker/enum.py @@ -1,6 +1,21 @@ from enum import Enum -class PluginStatus(Enum): +class PluginStatus(str, Enum): Idle = "idle" Running = "running" + + +class Action(str, Enum): + SyncEntries = "S" + MarkPendingRemoval = "M" + Kick = "K" + + +cn_names = { + Action.SyncEntries: '同步数据库', + Action.MarkPendingRemoval: '通知并标记', + Action.Kick: '移除', +} + +actions = set(item for item in Action) diff --git a/plugins/sb_kicker/utils.py b/plugins/sb_kicker/utils.py index 6313fc4..a7fc42b 100644 --- a/plugins/sb_kicker/utils.py +++ b/plugins/sb_kicker/utils.py @@ -15,6 +15,8 @@ from models.types import GroupMemberInfo, JoinedGroupMemberInfo from inspect import signature +T = TypeVar("T") + dotenv.load_dotenv() SB_GROUP_ID = int(os.getenv("SB_GROUP_ID")) @@ -124,9 +126,6 @@ async def get_accounts_with_db_data(onebot_data: List[GroupMemberInfo]): ] -T = TypeVar("T") - - def find_in( _list: List[T], matcher: Callable[[], bool] | Callable[[T], bool] | Callable[[T, int], bool] | Callable[[T, int, List[T]], bool] @@ -137,7 +136,7 @@ def find_in( return item -def find_all( +def filter_in( _list: List[T], matcher: Callable[[], bool] | Callable[[T], bool] | Callable[[T, int], bool] | Callable[[T, int, List[T]], bool] ) -> List[T]: diff --git a/requirements.txt b/requirements.txt index 7c0f0a5c74308d6819288c24823b4c156ca5cacf..35e213b65300426710dd33d6a72456dfea068418 100644 GIT binary patch delta 40 ucmX>h_(E{Q2@VBb1}=sohE#@PhEj$ch7yK423rOL20aEt26G^3zyJW!Ob7@7 delta 7 OcmaDMctUW)2@U`bnFCV* diff --git a/test_source/test_sb_kicker.py b/test_source/test_sb_kicker.py index fc85f82..216cd19 100644 --- a/test_source/test_sb_kicker.py +++ b/test_source/test_sb_kicker.py @@ -1,5 +1,3 @@ -from datetime import datetime - import pytest import nonebot diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..1dc3a7c --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1,35 @@ +from models import JoinedGroupMemberInfo, Accounts + + +def parse_ranges(input_str: str, min_range: int, max_range: int): + # Split the input string by commas + parts = input_str.split(',') + result = set() + + # Process each part + for part in parts: + part = part.strip() + if '-' in part: + start, end = part.split('-') + start = int(start) if start else min_range + end = int(end) if end else max_range + + if start is not None and end is not None: + result.update([*range(start, end)]) + elif start is not None: + result.add(start) + else: + if part: + result.add(int(part)) + + # Convert the set to a sorted list + return sorted(result) + + +def J_G_M_to_saveable(member: JoinedGroupMemberInfo): + return { + k: v for k, v in member.items() if ( + v is not None and + k in Accounts._meta.fields_map.keys() + ) + } diff --git a/utils/qq_helper.py b/utils/qq_helper.py index e0a7864..6360eef 100644 --- a/utils/qq_helper.py +++ b/utils/qq_helper.py @@ -1,8 +1,33 @@ -from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot +import os + +from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot, MessageEvent + +from models.db import Admins +from models.types import JoinedGroupMemberInfo + +import dotenv + +dotenv.load_dotenv() +SB_GROUP_ID = int(os.getenv("SB_GROUP_ID") or "-1") async def is_admin(bot: OnebotV11Bot, group_id: int): - self_info = await bot.get_group_member_info(group_id=group_id, user_id=int(bot.self_id)) - if self_info["role"] in ["owner", "admin"]: - return True - return False + try: + self_info = await bot.get_group_member_info(group_id=group_id, user_id=int(bot.self_id)) + if self_info["role"] in ["owner", "admin"]: + return True + return False + except Exception as e: + return False + + +def fmt_user(member: JoinedGroupMemberInfo): + return f"{member['nickname']}(qq = {member['qq_id']}, sb = {member['sb_id']})" + + +async def is_sender_admin(event: MessageEvent) -> bool: + return await Admins.exists(qq_id=event.sender.user_id) + + +async def is_bot_group_admin(bot: OnebotV11Bot): + return await is_admin(bot, SB_GROUP_ID)