-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
200 additions
and
104 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from typing import List | ||
|
||
from models.db import Accounts | ||
from models.enums import MemberStatus | ||
from models.types import JoinedGroupMemberInfo | ||
|
||
|
||
async def sync_in_group(members: List[JoinedGroupMemberInfo]): | ||
await ( | ||
Accounts | ||
.filter( | ||
id__in=[members['id'] for members in members], | ||
status=MemberStatus.Removed | ||
) | ||
.update( | ||
status=MemberStatus.InGroup | ||
) | ||
) | ||
for member in members: | ||
member['status'] = MemberStatus.InGroup \ | ||
if member['status'] == MemberStatus.Removed \ | ||
else member['status'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import os | ||
|
||
import dotenv | ||
from loguru import logger | ||
from nonebot import on_command | ||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot | ||
from nonebot.adapters.onebot.v11.message import Message | ||
from nonebot.internal.matcher import Matcher | ||
from nonebot.internal.params import Arg | ||
from nonebot.params import CommandArg | ||
from nonebot.rule import Rule | ||
from nonebot.typing import T_State | ||
from result import as_result | ||
|
||
from models import Accounts | ||
from plugins.sb_kicker import merge_onebot_data_with_db_result | ||
from utils import J_G_M_to_saveable | ||
from utils.qq_helper import fmt_user, is_sender_admin | ||
|
||
dotenv.load_dotenv() | ||
SB_GROUP_ID = int(os.getenv("SB_GROUP_ID") or "-1") | ||
|
||
bind = on_command( | ||
"bind", | ||
rule=Rule(is_sender_admin), | ||
) | ||
|
||
|
||
@bind.handle() | ||
def _(matcher: Matcher, state: T_State, args: Message = CommandArg()) -> None: | ||
state['parsed'] = {} | ||
v = as_result(Exception)(lambda: args.extract_plain_text().split(" "))().unwrap_or([]) | ||
v = [i.strip() for i in v] | ||
sb_id = as_result(IndexError, ValueError)(lambda: int(v[0]))().ok() | ||
qq_id = as_result(IndexError, ValueError)(lambda: int(v[1]))().ok() | ||
|
||
if sb_id is not None: | ||
state['parsed']['sb_id'] = sb_id | ||
matcher.set_arg('sb_id', Message(str(sb_id))) | ||
|
||
if qq_id is not None: | ||
state['parsed']['qq_id'] = qq_id | ||
matcher.set_arg('qq_id', Message(str(qq_id))) | ||
|
||
|
||
@bind.got('sb_id', 'sb-id') | ||
async def read_sb_id(state: T_State, sb_id: str = Arg()) -> None: | ||
s_id = as_result(ValueError)(lambda: int(sb_id))().ok() | ||
|
||
if s_id is None: | ||
await bind.finish('invalid sb id') | ||
|
||
state['parsed']['sb_id'] = s_id | ||
|
||
|
||
@bind.got('qq_id', 'qq') | ||
async def read_qq_id(bot: OnebotV11Bot, state: T_State, qq_id: str = Arg()) -> None: | ||
s_id = as_result(ValueError)(lambda: int(qq_id))().ok() | ||
|
||
if s_id is None: | ||
await bind.finish('invalid qq id') | ||
|
||
state['parsed']['qq_id'] = s_id | ||
|
||
|
||
@bind.handle() | ||
async def fin(bot: OnebotV11Bot, state: T_State): | ||
parsed = state['parsed'] | ||
if parsed['qq_id'] is None or parsed['sb_id'] is None: | ||
await bind.finish('missing args') | ||
|
||
try: | ||
qq_user = await bot.get_group_member_info(group_id=SB_GROUP_ID, user_id=int(parsed['qq_id'])) | ||
db_user = await Accounts.get_or_none(group_id=SB_GROUP_ID, qq_id=parsed['qq_id']) | ||
|
||
merged_view = merge_onebot_data_with_db_result(qq_user, db_user) | ||
merged_view['sb_id'] = parsed['sb_id'] or merged_view['sb_id'] | ||
|
||
write_data = J_G_M_to_saveable(merged_view) | ||
await Accounts.update_or_create(id=merged_view['id'], defaults=write_data) | ||
await bind.send(fmt_user(merged_view)) | ||
except Exception as e: | ||
logger.opt(exception=True).error(e) | ||
await bind.send('error occurred') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,21 @@ | ||
from enum import Enum | ||
|
||
|
||
class PluginStatus(Enum): | ||
class PluginStatus(str, Enum): | ||
Idle = "idle" | ||
Running = "running" | ||
|
||
|
||
class Action(str, Enum): | ||
SyncEntries = "S" | ||
MarkPendingRemoval = "M" | ||
Kick = "K" | ||
|
||
|
||
cn_names = { | ||
Action.SyncEntries: '同步数据库', | ||
Action.MarkPendingRemoval: '通知并标记', | ||
Action.Kick: '移除', | ||
} | ||
|
||
actions = set(item for item in Action) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,3 @@ | ||
from datetime import datetime | ||
|
||
import pytest | ||
import nonebot | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from models import JoinedGroupMemberInfo, Accounts | ||
|
||
|
||
def parse_ranges(input_str: str, min_range: int, max_range: int): | ||
# Split the input string by commas | ||
parts = input_str.split(',') | ||
result = set() | ||
|
||
# Process each part | ||
for part in parts: | ||
part = part.strip() | ||
if '-' in part: | ||
start, end = part.split('-') | ||
start = int(start) if start else min_range | ||
end = int(end) if end else max_range | ||
|
||
if start is not None and end is not None: | ||
result.update([*range(start, end)]) | ||
elif start is not None: | ||
result.add(start) | ||
else: | ||
if part: | ||
result.add(int(part)) | ||
|
||
# Convert the set to a sorted list | ||
return sorted(result) | ||
|
||
|
||
def J_G_M_to_saveable(member: JoinedGroupMemberInfo): | ||
return { | ||
k: v for k, v in member.items() if ( | ||
v is not None and | ||
k in Accounts._meta.fields_map.keys() | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,33 @@ | ||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot | ||
import os | ||
|
||
from nonebot.adapters.onebot.v11 import Bot as OnebotV11Bot, MessageEvent | ||
|
||
from models.db import Admins | ||
from models.types import JoinedGroupMemberInfo | ||
|
||
import dotenv | ||
|
||
dotenv.load_dotenv() | ||
SB_GROUP_ID = int(os.getenv("SB_GROUP_ID") or "-1") | ||
|
||
|
||
async def is_admin(bot: OnebotV11Bot, group_id: int): | ||
self_info = await bot.get_group_member_info(group_id=group_id, user_id=int(bot.self_id)) | ||
if self_info["role"] in ["owner", "admin"]: | ||
return True | ||
return False | ||
try: | ||
self_info = await bot.get_group_member_info(group_id=group_id, user_id=int(bot.self_id)) | ||
if self_info["role"] in ["owner", "admin"]: | ||
return True | ||
return False | ||
except Exception as e: | ||
return False | ||
|
||
|
||
def fmt_user(member: JoinedGroupMemberInfo): | ||
return f"{member['nickname']}(qq = {member['qq_id']}, sb = {member['sb_id']})" | ||
|
||
|
||
async def is_sender_admin(event: MessageEvent) -> bool: | ||
return await Admins.exists(qq_id=event.sender.user_id) | ||
|
||
|
||
async def is_bot_group_admin(bot: OnebotV11Bot): | ||
return await is_admin(bot, SB_GROUP_ID) |