diff --git a/bot.py b/bot.py index 780e94c7ab..9c9fb81a4b 100644 --- a/bot.py +++ b/bot.py @@ -24,6 +24,8 @@ from emoji import UNICODE_EMOJI from pkg_resources import parse_version +from core.audit_logger import AuditLogger + try: # noinspection PyUnresolvedReferences from colorama import init @@ -87,6 +89,8 @@ def __init__(self): self._configure_logging() self.plugin_db = PluginDatabaseClient(self) # Deprecated + + self.audit_logger = AuditLogger(bot=self) self.startup() def get_guild_icon( diff --git a/cogs/modmail.py b/cogs/modmail.py index e41ecbe321..bf5934371b 100644 --- a/cogs/modmail.py +++ b/cogs/modmail.py @@ -13,6 +13,7 @@ from discord.ext.commands.view import StringView from core import checks +from core.audit_logger import construct_from_ctx from core.models import DMDisabled, PermissionLevel, SimilarCategoryConverter, getLogger from core.paginator import EmbedPaginatorSession from core.thread import Thread @@ -245,6 +246,7 @@ async def snippet_add(self, ctx, name: str.lower, *, value: commands.clean_conte color=self.bot.main_color, description="Successfully created snippet.", ) + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.snippet.add", description=f"Added snippet {name}")) return await ctx.send(embed=embed) def _fix_aliases(self, snippet_being_deleted: str) -> Tuple[List[str]]: @@ -347,6 +349,7 @@ async def snippet_remove(self, ctx, *, name: str.lower): ) self.bot.snippets.pop(name) await self.bot.config.update() + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.snippet.remove", description=f"removed snippet {name}")) else: embed = create_not_found_embed(name, self.bot.snippets.keys(), "Snippet") await ctx.send(embed=embed) @@ -370,6 +373,7 @@ async def snippet_edit(self, ctx, name: str.lower, *, value): color=self.bot.main_color, description=f'`{name}` will now send "{value}".', ) + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.snippet.edit", description=f"edited snippet {name}")) else: embed = create_not_found_embed(name, self.bot.snippets.keys(), "Snippet") await ctx.send(embed=embed) @@ -420,6 +424,12 @@ async def move(self, ctx, *, arguments): category=category, end=True, sync_permissions=True, reason=f"{ctx.author} moved this thread." ) + self.bot.audit_logger.push( + await construct_from_ctx( + ctx, + action="modmail.thread.move", + description=f"{'silently ' if silent else ''}moved thread {thread.id} to {category.name}")) + if self.bot.config["thread_move_notify"] and not silent: embed = discord.Embed( title=self.bot.config["thread_move_title"], @@ -512,6 +522,14 @@ async def close( if after and after.dt > after.now: await self.send_scheduled_close_message(ctx, after, silent) + # Generate audit event + description = None + if close_after == 0: + description = f"{'silently ' if silent else ''}closed thread {thread.id}" + else: + description = f"{'silently ' if silent else ''}scheduled to close thread {thread.id} in {human_timedelta(after.dt)}" + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.thread.close", description=description)) + await thread.close(closer=ctx.author, after=close_after, message=message, silent=silent) @staticmethod @@ -677,7 +695,9 @@ async def unsubscribe(self, ctx, *, user_or_role: Union[discord.Role, User, str. @checks.thread_only() async def nsfw(self, ctx): """Flags a Modmail thread as NSFW (not safe for work).""" + # TODO set the thread as NSFW in the database await ctx.channel.edit(nsfw=True) + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.thread.set.nsfw", description=f"marked thread as nsfw")) sent_emoji, _ = await self.bot.retrieve_emoji() await self.bot.add_reaction(ctx.message, sent_emoji) @@ -686,7 +706,9 @@ async def nsfw(self, ctx): @checks.thread_only() async def sfw(self, ctx): """Flags a Modmail thread as SFW (safe for work).""" + # TODO unset the thread as NSFW in the database await ctx.channel.edit(nsfw=False) + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.thread.set.sfw", description=f"marked thread as sfw")) sent_emoji, _ = await self.bot.retrieve_emoji() await self.bot.add_reaction(ctx.message, sent_emoji) @@ -769,9 +791,14 @@ def format_log_embeds(self, logs, avatar_url): @commands.cooldown(1, 600, BucketType.channel) async def title(self, ctx, *, name: str): """Sets title for a thread""" + # todo update database with new title await ctx.thread.set_title(name) sent_emoji, _ = await self.bot.retrieve_emoji() await ctx.message.pin() + + # Push audit event + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.thread.set.title", description=f"set thread title to {name}")) + await self.bot.add_reaction(ctx.message, sent_emoji) @commands.command(usage=" [options]", cooldown_after_parsing=True) @@ -865,6 +892,12 @@ async def adduser(self, ctx, *users_arg: Union[discord.Member, discord.Role, str if i not in users: to_exec.append(i.send(embed=em)) + # Push audit event + users_string: str = "" + for user in users: + users_string += f"{user.name}({user.id})," + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.thread.user.add", description=f"added user(s) {users_string} to thread")) + await ctx.thread.add_users(users) if to_exec: await asyncio.gather(*to_exec) @@ -962,6 +995,12 @@ async def removeuser(self, ctx, *users_arg: Union[discord.Member, discord.Role, if to_exec: await asyncio.gather(*to_exec) + # Push audit event + users_string: str = "" + for user in users: + users_string += f"{user.name}({user.id})," + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.thread.user.remove", description=f"removed user(s) {users_string} to thread")) + sent_emoji, _ = await self.bot.retrieve_emoji() await self.bot.add_reaction(ctx.message, sent_emoji) @@ -1052,6 +1091,12 @@ async def anonadduser(self, ctx, *users_arg: Union[discord.Member, discord.Role, if i not in users: to_exec.append(i.send(embed=em)) + users_string: str = "" + for user in users: + users_string += f"{user.name}({user.id})," + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.thread.user.add.anon", description=f"anonymously added user(s) {users_string} to thread")) + + await ctx.thread.add_users(users) if to_exec: await asyncio.gather(*to_exec) @@ -1145,6 +1190,13 @@ async def anonremoveuser(self, ctx, *users_arg: Union[discord.Member, discord.Ro if to_exec: await asyncio.gather(*to_exec) + # Push audit event + users_string: str = "" + for user in users: + users_string += f"{user.name}({user.id})," + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.thread.user.remove.anon", + description=f"anonymously removed user(s) {users_string} to thread")) + sent_emoji, _ = await self.bot.retrieve_emoji() await self.bot.add_reaction(ctx.message, sent_emoji) @@ -1772,6 +1824,10 @@ async def blocked_whitelist(self, ctx, *, user: User = None): color=self.bot.main_color, ) self.bot.blocked_whitelisted_users.remove(str(user.id)) + + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.whitelist.remove", + description=f"removed user {user.name}({user.id}) from whitelist")) + return await ctx.send(embed=embed) self.bot.blocked_whitelisted_users.append(str(user.id)) @@ -1782,6 +1838,9 @@ async def blocked_whitelist(self, ctx, *, user: User = None): await self.bot.config.update() + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.whitelist.add", description=f"added user {user.name}({user.id}) to whitelist")) + + if msg.startswith("System Message: "): # If the user is blocked internally (for example: below minimum account age) # Show an extended message stating the original internal message @@ -1869,6 +1928,9 @@ async def send_embed(title: str, message: str): f"{__name__}: cannot block user, user is neither an instance of Discord Role or User" ) + desc = f"added {'role' if isinstance(user_or_role, discord.Role) else 'user'} {user_or_role.id} to blocklist" + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.user.block", description=desc)) + await self.bot.config.update() return await send_embed("Success", desc) @@ -1916,6 +1978,9 @@ async def send_embed(title: str, message: str): await self.bot.config.update() + desc: str = f"removed {'role' if isinstance(user_or_role, discord.Role) else 'user'} {user_or_role.id} from blocklist" + self.bot.audit_logger.push(construct_from_ctx(ctx, action="modmail.user.unblock", description=desc)) + return await send_embed("Success", f"{mention} has been unblocked.") @commands.command() @@ -1943,6 +2008,9 @@ async def delete(self, ctx, message_id: int = None): color=self.bot.error_color, ) ) + # Push audit event + user = await self.bot.get_or_fetch_user(thread.recipient_id) + self.bot.audit_logger.push(construct_from_ctx(ctx, action="modmail.thread.delete_message", description=f"deleted message {message_id} sent by {user.name}({user.id}) from thread {thread.id}")) sent_emoji, _ = await self.bot.retrieve_emoji() await self.bot.add_reaction(ctx.message, sent_emoji) @@ -2078,6 +2146,8 @@ async def enable(self, ctx): self.bot.config["dm_disabled"] = DMDisabled.NONE await self.bot.config.update() + self.bot.audit_logger.push(construct_from_ctx(ctx, action="modmail.dm.enable", description=f"enabled inbound direct messages")) + return await ctx.send(embed=embed) @commands.group(invoke_without_command=True) @@ -2109,6 +2179,8 @@ async def disable_new(self, ctx): self.bot.config["dm_disabled"] = DMDisabled.NEW_THREADS await self.bot.config.update() + self.bot.audit_logger.push(construct_from_ctx(ctx, action="modmail.dm.disable.new", description="disabled new thread creation")) + return await ctx.send(embed=embed) @disable.command(name="all") @@ -2129,6 +2201,8 @@ async def disable_all(self, ctx): self.bot.config["dm_disabled"] = DMDisabled.ALL_THREADS await self.bot.config.update() + self.bot.audit_logger.push(construct_from_ctx(ctx, action="modmail.dm.disable.all", description="disabled inbound direct message handling")) + return await ctx.send(embed=embed) @commands.command() diff --git a/cogs/utility.py b/cogs/utility.py index f4c1a5f663..ab4c793cba 100644 --- a/cogs/utility.py +++ b/cogs/utility.py @@ -10,6 +10,7 @@ from io import BytesIO, StringIO from itertools import takewhile, zip_longest from json import JSONDecodeError, loads +from pprint import pformat from subprocess import PIPE from textwrap import indent from types import SimpleNamespace @@ -23,10 +24,11 @@ from pkg_resources import parse_version from core import checks, utils +from core.audit_logger import AuditEvent, audit_event_source_from_user, construct_from_ctx from core.changelog import Changelog from core.models import HostingMethod, InvalidConfigError, PermissionLevel, UnseenFormatter, getLogger from core.paginator import EmbedPaginatorSession, MessagePaginatorSession -from core.utils import trigger_typing, truncate +from core.utils import trigger_typing, truncate, get_permission_level, explain_permissions_level logger = getLogger(__name__) @@ -455,6 +457,8 @@ async def debug(self, ctx): embed = discord.Embed(color=self.bot.main_color) embed.set_footer(text="Debug logs - Navigate using the reactions below.") + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.debug.show", description=f"viewed debug logs in discord ({ctx.channel.id})")) + session = MessagePaginatorSession(ctx, *messages, embed=embed) session.current = len(messages) - 1 return await session.run() @@ -485,7 +489,7 @@ async def debug_hastebin(self, ctx): embed = discord.Embed( title="Debug Logs", color=self.bot.main_color, - description=f"{haste_url}/" + key, + description=f"{haste_url}/{key}", ) except (JSONDecodeError, ClientResponseError, IndexError, KeyError): embed = discord.Embed( @@ -494,6 +498,10 @@ async def debug_hastebin(self, ctx): description="Something's wrong. We're unable to upload your logs to hastebin.", ) embed.set_footer(text="Go to Heroku to see your logs.") + + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.debug.hastebin", + description=f"uploaded logs to hastebin at ({haste_url + '/' + key})")) + await ctx.send(embed=embed) @debug.command(name="clear", aliases=["wipe"]) @@ -513,6 +521,8 @@ async def debug_clear(self, ctx): embed=discord.Embed(color=self.bot.main_color, description="Cached logs are now cleared.") ) + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.debug.clear", description=f"cleared local log file")) + @commands.command(aliases=["presence"]) @checks.has_permissions(PermissionLevel.ADMINISTRATOR) async def activity(self, ctx, activity_type: str.lower, *, message: str = ""): @@ -570,6 +580,15 @@ async def activity(self, ctx, activity_type: str.lower, *, message: str = ""): msg += f"{activity.name}." embed = discord.Embed(title="Activity Changed", description=msg, color=self.bot.main_color) + + # Push audit log event + event : AuditEvent + if activity_type == "clear": + event = await construct_from_ctx(ctx, action=f"modmail.activity.clear", description=f"removed bot activity") + else: + event = await construct_from_ctx(ctx, action=f"modmail.activity.set", description=msg) + self.bot.audit_logger.push(event) + return await ctx.send(embed=embed) @commands.command() @@ -606,6 +625,15 @@ async def status(self, ctx, *, status_type: str.lower): await self.bot.config.update() msg = f"Status set to: {status.value}." + + # Push audit log event + event : AuditEvent + if status_type == "clear": + event = await construct_from_ctx(ctx, action=f"modmail.status.clear", description=f"removed bot activity") + else: + event = await construct_from_ctx(ctx, action=f"modmail.status.set", description=msg) + self.bot.audit_logger.push(event) + embed = discord.Embed(title="Status Changed", description=msg, color=self.bot.main_color) return await ctx.send(embed=embed) @@ -727,6 +755,7 @@ async def mention(self, ctx, *user_or_role: Union[discord.Role, discord.Member, color=self.bot.main_color, ) self.bot.config["mention_message"] = None + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.mention.disable", description="disabled mention on thread creation")) else: msg = " ".join(user_or_role[1:]) mention_msg = msg.replace("[prefix]", self.bot.prefix) @@ -736,6 +765,8 @@ async def mention(self, ctx, *user_or_role: Union[discord.Role, discord.Member, color=self.bot.main_color, ) self.bot.config["mention_message"] = msg + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.mention.set", description=f"set thread creation message to {current} {mention_msg}")) + await self.bot.config.update() elif ( len(user_or_role) == 1 @@ -749,12 +780,15 @@ async def mention(self, ctx, *user_or_role: Union[discord.Role, discord.Member, color=self.bot.main_color, ) self.bot.config["mention"] = None + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.mention.disable", description="disabled mention on thread creation")) else: embed = discord.Embed( description="`mention` is reset to default.", color=self.bot.main_color, ) self.bot.config.remove("mention") + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.mention.set", description="reset mention to default")) + await self.bot.config.update() else: mention = [] @@ -774,6 +808,9 @@ async def mention(self, ctx, *user_or_role: Union[discord.Role, discord.Member, color=self.bot.main_color, ) self.bot.config["mention"] = mention + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.mention.set", + description=f"set mention to {mention}")) + await self.bot.config.update() return await ctx.send(embed=embed) @@ -805,6 +842,9 @@ async def prefix(self, ctx, *, prefix=None): await self.bot.config.update() await ctx.send(embed=embed) + # Push audit event + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.prefix.set", description=f"Set prefix to {match.group(1) if match else prefix}")) + @commands.group(aliases=["configuration"], invoke_without_command=True) @checks.has_permissions(PermissionLevel.OWNER) async def config(self, ctx): @@ -858,6 +898,8 @@ async def config_set(self, ctx, key: str.lower, *, value: str): color=self.bot.main_color, description=f"Set `{key}` to `{self.bot.config[key]}`.", ) + # Push audit event + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.config.set", description=f"Set {key} to {self.bot.config[key]}.")) except InvalidConfigError as exc: embed = exc.embed else: @@ -882,6 +924,7 @@ async def config_remove(self, ctx, *, key: str.lower): color=self.bot.main_color, description=f"`{key}` had been reset to default.", ) + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.config.remove", description=f"reset {key} to default.")) else: embed = discord.Embed( title="Error", color=self.bot.error_color, description=f"{key} is an invalid key." @@ -1205,6 +1248,10 @@ async def alias_add(self, ctx, name: str.lower, *, value): if embed is None: embed = await self.make_alias(name, value, "Added") + + # Push audit event + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.alias.add", description=f"added alias {name} for {value}")) + return await ctx.send(embed=embed) @alias.command(name="remove", aliases=["del", "delete"]) @@ -1221,6 +1268,11 @@ async def alias_remove(self, ctx, *, name: str.lower): color=self.bot.main_color, description=f"Successfully deleted `{name}`.", ) + + # Push audit event + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.alias.remove", + description=f"removed alias {name}")) + else: embed = utils.create_not_found_embed(name, self.bot.aliases.keys(), "Alias") @@ -1237,6 +1289,10 @@ async def alias_edit(self, ctx, name: str.lower, *, value): return await ctx.send(embed=embed) embed = await self.make_alias(name, value, "Edited") + + # Push audit event + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.alias.edit", description=f"edited alias {name} for {value}")) + return await ctx.send(embed=embed) @commands.group(aliases=["perms"], invoke_without_command=True) @@ -1336,11 +1392,15 @@ async def permissions_override(self, ctx, command_name: str.lower, *, level_name self.bot.config["override_command_level"][command.qualified_name] = level.name await self.bot.config.update() + + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.permissions.override", + description=f"Updated command permission level for '{command.qualified_name}' to '{level.name}'")) + embed = discord.Embed( title="Success", color=self.bot.main_color, description="Successfully set command permission level for " - f"`{command.qualified_name}` to `{level.name}`.", + f"`{command.qualified_name}` to `{level.name}`.", ) return await ctx.send(embed=embed) @@ -1407,6 +1467,9 @@ async def permissions_add( logger.info("Granting %s access to Modmail category.", key.name) await self.bot.main_category.set_permissions(key, read_messages=True) + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.permissions.add", description=f"Updated '{user_or_role}' {type_} permissions for {name}")) + + embed = discord.Embed( title="Success", color=self.bot.main_color, @@ -1508,6 +1571,8 @@ async def permissions_remove( logger.info("Denying %s access to Modmail category.", member.name) await self.bot.main_category.set_permissions(member, overwrite=None) + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.permissions.remove", description=f"Updated '{user_or_role}' {type_} permissions for {name}")) + embed = discord.Embed( title="Success", color=self.bot.main_color, @@ -1760,6 +1825,12 @@ async def oauth_whitelist(self, ctx, target: Union[discord.Role, utils.User]): target = self.bot.get_user(target.id) or self.bot.modmail_guild.get_role(target.id) embed.description = f"{'Un-w' if removed else 'W'}hitelisted {target.mention} to view logs." + if removed: + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.oauth.whitelist.remove", + description=f"Removed {target.name}({target.id}) from the oauth whitelist.")) + else: + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.oauth.whitelist.add", + description=f"Added {target.name}({target.id}) to the whitelist.")) await ctx.send(embed=embed) @@ -1823,11 +1894,16 @@ async def autotrigger_add(self, ctx, keyword, *, command): self.bot.auto_triggers[keyword] = command await self.bot.config.update() + # Push audit event + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.autotrigger.add", + description=f"added autotrigger {keyword} for {command}.")) + embed = discord.Embed( title="Success", color=self.bot.main_color, description=f"Keyword `{keyword}` has been linked to `{command}`.", ) + else: embed = discord.Embed( title="Error", @@ -1867,6 +1943,11 @@ async def autotrigger_edit(self, ctx, keyword, *, command): color=self.bot.main_color, description=f"Keyword `{keyword}` has been linked to `{command}`.", ) + + # Push audit event + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.autotrigger.edit", + description=f"edited autotrigger {keyword} to {command}.")) + else: embed = discord.Embed( title="Error", @@ -1897,6 +1978,11 @@ async def autotrigger_remove(self, ctx, keyword): color=self.bot.main_color, description=f"Keyword `{keyword}` has been removed.", ) + + # Push audit event + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.autotrigger.remove", + description=f"removed autotrigger {keyword}")) + await ctx.send(embed=embed) @autotrigger.command(name="test") @@ -1966,6 +2052,8 @@ async def github(self, ctx): embed.set_author(name=user["username"], icon_url=user["avatar_url"], url=user["url"]) embed.set_thumbnail(url=user["avatar_url"]) await ctx.send(embed=embed) + + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.github.show", description=f"showed github user info in channel {ctx.channel.id}")) else: await ctx.send(embed=discord.Embed(title="Invalid Github Token", color=self.bot.error_color)) @@ -1983,6 +2071,9 @@ async def update(self, ctx, *, flag: str = ""): changelog = await Changelog.from_url(self.bot) latest = changelog.latest_version + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.update", + description=f"ran update command in channel {ctx.channel.id}")) + desc = ( f"The latest version is [`{self.bot.version}`]" "(https://github.com/raidensakura/modmail/blob/stable/bot.py#L1)" @@ -2104,6 +2195,10 @@ async def eval_(self, ctx, *, body: str): logger.warning("Running eval command:\n%s", body) + #Push audit event + self.bot.audit_logger.push(await construct_from_ctx(ctx, action="modmail.eval", + description=f"evaled {body}")) + env = { "ctx": ctx, "bot": self.bot, @@ -2177,6 +2272,68 @@ def paginate(text: str): await self.bot.add_reaction(ctx.message, "\u2705") + @commands.command(name="audit", usage="[event id]") + @checks.has_permissions(PermissionLevel.ADMINISTRATOR) + async def print_audit_event(self, ctx: discord.ext.commands.context.Context, event_id: str): + if len(event_id) != 24: + await ctx.send(embed=discord.Embed( + title="Error", + color=self.bot.error_color, + description="Invalid audit event id. Should be 24 characters long.", + )) + return + + log: AuditEvent = await self.bot.audit_logger.get_audit_event(event_id) + + if log is None: + await ctx.send(embed=discord.Embed( + title="404", + color=self.bot.error_color, + description=f"Audit event of id {event_id} not found." + )) + return + + embed = discord.Embed(title="Audit log", color=self.bot.main_color) + + for k, v in log.__dict__.items(): + if v != "" and v is not None and k != "actor": + embed.add_field(name=k, value=v, inline=False) + for k, v in log.actor.__dict__.items(): + if v != "" and v is not None: + embed.add_field(name=k, value=v, inline=False) + + await ctx.send(content=f"```{pformat(log)}```", embed=embed) + + @permissions.command(name="list", usage="[user]") + @checks.has_permissions(PermissionLevel.ADMINISTRATOR) + async def get_user_permission_level_command(self, ctx: discord.ext.commands.context.Context, user: discord.Member): + """ + See the permission level of a user. + """ + # copilot wrote this entire method, so it's not my fault if it's bad + # wow it wrote broken code + level = await get_permission_level(bot=self.bot, user=user) + await ctx.send(embed=discord.Embed( + title="Permission level", + color=self.bot.main_color, + description=f"{user.mention} has permission level {level} ({level.name})." + )) + + @permissions.command(name="explain", usage="[user]") + @checks.has_permissions(PermissionLevel.ADMINISTRATOR) + async def explain_user_permission_level_command(self, ctx: discord.ext.commands.context.Context, user: discord.Member): + """ + Explain the permission level of a user. + """ + # copilot wrote this entire method, so it's not my fault if it's bad + # wow it wrote broken code + level = await explain_permissions_level(bot=self.bot, user=user) + await ctx.send(embed=discord.Embed( + title="Permission level", + color=self.bot.main_color, + description=level[1] + ).add_field(name="summary", value=level[0], inline=False)) + async def setup(bot): await bot.add_cog(Utility(bot)) diff --git a/core/audit_logger.py b/core/audit_logger.py new file mode 100644 index 0000000000..5454847e8a --- /dev/null +++ b/core/audit_logger.py @@ -0,0 +1,103 @@ +from dataclasses import dataclass, asdict +from datetime import datetime, timezone +from typing import Self + +import discord +import pymongo +from bson import ObjectId, CodecOptions +from motor.motor_asyncio import AsyncIOMotorCollection + +from core.utils import get_permission_level + + +@dataclass(frozen=True) +class AuditEventSource: + user_id: int + username: str + ip: str + country: str + user_agent: str + role: int + source: str + + +async def audit_event_source_from_user(bot: object, user: discord.Member) -> AuditEventSource: + """Creates an AuditEventSource object from a discord.Member object with all fields filled out""" + return AuditEventSource(user_id=user.id, + username=user.name, + ip="", + country="", + user_agent="Discord", + role=await get_permission_level(bot, user), + source="modmail") + + +@dataclass(frozen=True) +class AuditEvent: + action: str + description: str + actor: AuditEventSource + timestamp: datetime = datetime.now(tz=timezone.utc) + id: ObjectId = None + + def to_short_dict(self) -> dict: + """Returns the object as a dictionary, but without the id field""" + result = asdict(self) + result.pop("id") + return result + + +def _construct_audit_event_from_dict(dict: dict) -> AuditEvent: + return AuditEvent(action=dict["action"], + description=dict["description"], + actor=_construct_audit_event_source_from_dict(dict["actor"]), + timestamp=dict["timestamp"], + id=dict["_id"]) + + +def _construct_audit_event_source_from_dict(dict: dict) -> AuditEventSource: + return AuditEventSource(user_id=dict["user_id"], + username=dict["username"], + ip=dict["ip"], + country=dict["country"], + user_agent=dict["user_agent"], + role=dict["role"], + source=dict["source"]) + + +async def construct_from_ctx(ctx: discord.ext.commands.context.Context, action: str, description: str) -> AuditEvent: + """ + Constructs a complete audit event from a context object and the given action and description + Parameters + ---------- + ctx + action + description + + Returns + ------- + AuditEvent + """ + actor = await audit_event_source_from_user(ctx.bot, ctx.author) + return AuditEvent(action=action, + description=description, + actor=actor) + + +class AuditLogger: + audit_log_collection: AsyncIOMotorCollection + + def __init__(self: Self, bot): + # don't create our own connection, use the one from the bot + # We need to use the tz_aware option to ensure that datetimes are retrieved as timezone aware + self.audit_log_collection = bot.api.db.audit_log.with_options( + codec_options=CodecOptions(tz_aware=True, tzinfo=timezone.utc)) + + def push(self, event: AuditEvent) -> pymongo.results.InsertOneResult: + """Pushes an audit event to the database""" + return self.audit_log_collection.insert_one(event.to_short_dict()) + + async def get_audit_event(self: Self, event_id: str) -> AuditEvent: + """Gets an audit event from the database by its id""" + event = await self.audit_log_collection.find_one({"_id": ObjectId(event_id)}) + return _construct_audit_event_from_dict(event) diff --git a/core/utils.py b/core/utils.py index 21ea50d4cd..9008e8d319 100644 --- a/core/utils.py +++ b/core/utils.py @@ -11,7 +11,7 @@ import discord from discord.ext import commands -from core.models import getLogger +from core.models import getLogger, PermissionLevel __all__ = [ "strtobool", @@ -41,6 +41,7 @@ "AcceptButton", "DenyButton", "ConfirmThreadCreationView", + "get_permission_level" ] @@ -565,6 +566,122 @@ def extract_block_timestamp(reason, id_): return end_time, after +async def get_permission_level(bot, user: discord.member.Member) -> PermissionLevel: + """ + Determines the permission level of a user. + First checks if the user is the owner of the bot, then by checking the level permissions + + Parameters + ---------- + bot + user + + Returns + ------- + The permission level of the user. + """ + if await bot.is_owner(user) or user.id == bot.user.id: + return PermissionLevel.OWNER + + level_permissions = bot.config["level_permissions"] + user_permission_level = PermissionLevel.REGULAR + roles = user.roles + print(roles) + + for level, ids in level_permissions.items(): + print(level) + print(ids) + # if the user has a higher permission level, then we don't need to check the rest + if user_permission_level < permission_level_from_string(level): + for user_or_role_id in ids: + if str(user_or_role_id) == str(user.id): + user_permission_level = permission_level_from_string(level) + print(f"{level}, {user.id}, {user_or_role_id}") + continue + # check each role the user has against the id + for role in roles: + print(f"{role.id} == {user_or_role_id}?") + if str(role.id) == str(user_or_role_id): + print("yes") + user_permission_level = permission_level_from_string(level) + print(f"{level}, DISCORD ROLE {role.id}, {user_or_role_id}") + continue + + return user_permission_level + + +async def explain_permissions_level(bot, user: discord.member.Member) -> typing.Tuple[str, str]: + """ + Explains the permission level of a user. + First checks if the user is the owner of the bot, then by checking the level permissions + + Parameters + ---------- + bot + user + + Returns + ------- + An explanation of the permissions level of the user. + The first string is the summary of all matching roles, the second is the explanation of the role used and why. + """ + + level_permissions = bot.config["level_permissions"] + user_permission_level = PermissionLevel.REGULAR + roles = user.roles + summary = "" + result_explanation = "" + + for level, ids in level_permissions.items(): + # if the user has a higher permission level, then we don't need to check the rest + for user_or_role_id in ids: + if str(user_or_role_id) == str(user.id): + if user_permission_level < permission_level_from_string(level): + user_permission_level = permission_level_from_string(level) + result_explanation = f"{user.mention} had the level {user_permission_level.name} manually assigned.\n" + summary += f"MATCH {level}, USER ID {user.id}\n" + + # check each role the user has against the id + for role in roles: + if str(role.id) == str(user_or_role_id): + if user_permission_level < permission_level_from_string(level): + user_permission_level = permission_level_from_string(level) + result_explanation = f"User has the role {role.mention} which was assigned to permissions level {user_permission_level} ({user_permission_level.name}).\n" + summary += f"MATCH {level}, ROLE ID {role.mention}\n" + + if await bot.is_owner(user) or user.id == bot.user.id: + result_explanation = "The user is the owner of the bot or *is* the bot." + + return summary, result_explanation + + +def permission_level_from_string(level: str) -> PermissionLevel: + """ + Converts a string to a permission level. + + Parameters + ---------- + level + + Returns + ------- + The permission level. + """ + level = level.upper() + if level == "OWNER": + return PermissionLevel.OWNER + elif level == "ADMIN" or level == "ADMINISTRATOR": + return PermissionLevel.ADMIN + elif level == "MOD" or level == "MODERATOR": + return PermissionLevel.MOD + elif level == "SUPPORTER" or level == "RESPONDER": + return PermissionLevel.SUPPORTER + elif level == "REGULAR": + return PermissionLevel.REGULAR + else: + return PermissionLevel.INVALID + + class AcceptButton(discord.ui.Button): def __init__(self, emoji): super().__init__(style=discord.ButtonStyle.gray, emoji=emoji)