From bd564aa0eae70675300dd4f899d070ba4af20931 Mon Sep 17 00:00:00 2001 From: Lunarmagpie Date: Fri, 22 Nov 2024 13:32:13 -0500 Subject: [PATCH] ruff --- bot/__init__.py | 2 +- bot/__main__.py | 100 ++--- bot/config.py | 44 +-- bot/exts/card.py | 790 +++++++++++++++++++------------------- bot/exts/dotd.py | 286 +++++++------- bot/exts/forums.py | 468 +++++++++++----------- bot/exts/game.py | 274 ++++++------- bot/exts/utils.py | 138 +++---- bot/util/__init__.py | 10 +- bot/util/card_palettes.py | 50 +-- bot/util/datagen.py | 532 ++++++++++++------------- bot/util/probability.py | 112 +++--- bot/util/server.py | 492 ++++++++++++------------ 13 files changed, 1649 insertions(+), 1649 deletions(-) diff --git a/bot/__init__.py b/bot/__init__.py index edf7d7d..610a953 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -1 +1 @@ -"""HC-TCG online bot.""" +"""HC-TCG online bot.""" diff --git a/bot/__main__.py b/bot/__main__.py index 6f33094..68b14a3 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -1,50 +1,50 @@ -"""Run the bot.""" - -from importlib import import_module -from os import listdir, path -from time import time - -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from interactions import Client, listen - -from bot.config import CONFIG -from bot.util import ServerManager - -start = time() - - -class Bot(Client): - """Slightly modified discord client.""" - - @listen() - async def on_ready(self: "Bot") -> None: - """Handle bot starting.""" - await server_manager.reload_all_generators() - - await bot.change_presence() - scheduler.start() - - print(f"Bot started in {round(time()-start, 2)}s") - - @listen() - async def on_disconnect(self: "Bot") -> None: - """Handle bot disconnection.""" - await server_manager.close_all_sessions() - scheduler.shutdown() - - -bot = Bot() - -scheduler = AsyncIOScheduler() - -servers = [] -for file in listdir("servers"): - if not path.isfile(f"servers/{file}"): - continue - servers.append(import_module(f"servers.{file.removesuffix(".py")}").server) - -server_manager = ServerManager(bot, servers) - -bot.load_extensions("bot\\exts", manager=server_manager, scheduler=scheduler) - -bot.start(CONFIG.SECRET) +"""Run the bot.""" + +from importlib import import_module +from os import listdir, path +from time import time + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from interactions import Client, listen + +from bot.config import CONFIG +from bot.util import ServerManager + +start = time() + + +class Bot(Client): + """Slightly modified discord client.""" + + @listen() + async def on_ready(self: "Bot") -> None: + """Handle bot starting.""" + await server_manager.reload_all_generators() + + await bot.change_presence() + scheduler.start() + + print(f"Bot started in {round(time()-start, 2)}s") + + @listen() + async def on_disconnect(self: "Bot") -> None: + """Handle bot disconnection.""" + await server_manager.close_all_sessions() + scheduler.shutdown() + + +bot = Bot() + +scheduler = AsyncIOScheduler() + +servers = [] +for file in listdir("servers"): + if not path.isfile(f"servers/{file}"): + continue + servers.append(import_module(f"servers.{file.removesuffix(".py")}").server) + +server_manager = ServerManager(bot, servers) + +bot.load_extensions("bot\\exts", manager=server_manager, scheduler=scheduler) + +bot.start(CONFIG.SECRET) diff --git a/bot/config.py b/bot/config.py index 62b5e68..e0d9726 100644 --- a/bot/config.py +++ b/bot/config.py @@ -1,22 +1,22 @@ -"""Load environment config.""" - -from __future__ import annotations - -import os - -import dotenv - - -class Config: - """Load environement variables as class.""" - - def __init__(self: Config) -> None: - """Load environement variables as class.""" - dotenv.load_dotenv() - - env = os.environ - - self.SECRET: str = env.get("DISCORD_SECRET") or "" - - -CONFIG = Config() +"""Load environment config.""" + +from __future__ import annotations + +import os + +import dotenv + + +class Config: + """Load environement variables as class.""" + + def __init__(self: Config) -> None: + """Load environement variables as class.""" + dotenv.load_dotenv() + + env = os.environ + + self.SECRET: str = env.get("DISCORD_SECRET") or "" + + +CONFIG = Config() diff --git a/bot/exts/card.py b/bot/exts/card.py index 320718b..a5b952e 100644 --- a/bot/exts/card.py +++ b/bot/exts/card.py @@ -1,395 +1,395 @@ -"""Get information about cards and decks.""" - -from __future__ import annotations - -from asyncio import gather -from collections import Counter -from datetime import datetime as dt -from datetime import timezone -from io import BytesIO -from itertools import islice -from math import ceil, sqrt -from re import compile as re_compile -from typing import Iterable - -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from interactions import ( - AutocompleteContext, - Button, - ButtonStyle, - Client, - ComponentContext, - Embed, - Extension, - File, - OptionType, - SlashContext, - component_callback, - global_autocomplete, - slash_command, - slash_option, - spread_to_rows, -) -from matplotlib import pyplot as plt -from PIL import Image - -from bot.util import TYPE_COLORS, Card, EffectCard, HermitCard, Server, ServerManager, probability -from bot.util.datagen import ItemCard - - -def take(items: int, iterable: Iterable) -> list: - """Return first `items` items of the iterable as a list.""" - return list(islice(iterable, items)) - - -beige = (226, 202, 139) - - -def rgb_to_int(rgb: tuple[int, int, int]) -> int: - """Convert an rgb tuple to an integer. - - Args: - ---- - rgb (tuple): RGB color to convert - """ - return (rgb[0] << 16) + (rgb[1] << 8) + rgb[2] - - -def count(s: str) -> str: - """Count the number of items required.""" - final = [] - for k, v in Counter(s).most_common(): - final.append(f"{v}x {k}") - return ", ".join(final) if len(final) else "None" - - -def best_factors(number: int) -> tuple[int, int]: - """Get as close to being square as possible.""" - x = sqrt(number) // 1 - return ceil(x), ceil(x if number - x**2 == 0 else (number - x**2) / x + x) - - -class CardExt(Extension): - """Get information about cards and decks.""" - - def __init__( - self: CardExt, - _: Client, - manager: ServerManager, - _scheduler: AsyncIOScheduler, - ) -> None: - """Get information about cards and decks. - - Args: - ---- - client (Client): The discord bot client - manager (ServerManager): The server connection manager - _scheduler (AsyncIOScheduler): Event scheduler - generator (DataGenerator): Card data generator - """ - self.manager: ServerManager = manager - - async def get_stats( - self: CardExt, server: Server, deck: list[Card] - ) -> tuple[Image.Image, tuple[int, int, int], dict[str, int], int]: - """Get information and an image of a deck. - - Args: - ---- - server (Server): The server the deck comes from - deck (list): List of card ids in the deck - """ - type_counts: dict[str, int] = { - "miner": 0, - "terraform": 0, - "speedrunner": 0, - "pvp": 0, - "builder": 0, - "balanced": 0, - "explorer": 0, - "prankster": 0, - "redstone": 0, - "farm": 0, - } - cost: int = 0 - hermits, items, effects = ([] for _ in range(3)) - for card in deck: - if card.category == "item": - items.append(card) - elif isinstance(card, HermitCard): - hermits.append(card) - if card.text_id in server.data_generator.universe.keys(): - type_counts[card.hermit_type] += 1 - else: - effects.append(card) - cost += card.cost - - hermits.sort(key=lambda x: x.text_id) - items.sort(key=lambda x: x.text_id) - effects.sort(key=lambda x: x.text_id) - - width, height = best_factors(len(deck)) - im = Image.new("RGBA", (width * 200, height * 200)) - - card_images = await gather( - *( - server.data_generator.get_image(card.token_image_url) - for card in hermits + effects + items - ) - ) - for i, card_image in enumerate(card_images): - card_image = card_image.resize((200, 200)).convert("RGBA") - im.paste(card_image, ((i % width) * 200, (i // width) * 200), card_image) - return im, (len(hermits), len(effects), len(items)), type_counts, cost - - @global_autocomplete("card_name") - async def card_autocomplete(self: CardExt, ctx: AutocompleteContext) -> None: - """Autocomplete a card name.""" - server = self.manager.get_server(ctx.guild_id) - if not ctx.input_text: - await ctx.send( - [card.rarityName for card in take(25, server.data_generator.universe.values())] - ) - return - await ctx.send( - [ - card.rarityName - for card in server.data_generator.universe.values() - if ctx.input_text.lower() in card.rarityName.lower() - ][0:25] - ) - - @slash_command() - async def card(self: CardExt, _: SlashContext) -> None: - """Get information about cards and decks.""" - - @card.subcommand() - @slash_option("code", "The deck's export code", OptionType.STRING, required=True) - @slash_option("hide_hash", "If the deck's hash should be hidden", OptionType.BOOLEAN) - async def deck(self: CardExt, ctx: SlashContext, code: str, *, hide_hash: bool = False) -> None: - """Get information about a deck.""" - server = self.manager.get_server(ctx.guild_id) - - deck = await server.get_deck(code) - if not deck: - await ctx.send("Invalid deck: Perhaps you're looking for /card info") - return - if len(deck["cards"]) > 100: - await ctx.send(f"A deck of {len(deck["cards"])} cards is too large!", ephemeral=True) - return - - if hide_hash: - await ctx.send("This message handily obscures your deck hash!", ephemeral=True) - - col = 0 if len(deck["tags"]) == 0 else int(deck["tags"][0]["color"].lstrip("#"), 16) - e = Embed( - title=deck["name"], - description=None if hide_hash else f"Code: {deck["code"]}", - timestamp=dt.now(tz=timezone.utc), - color=col, - ).add_field("Deck loading", "Please wait") - message = await ctx.send(embed=e) - - im, card_type_counts, hermit_type_counts, cost = await self.get_stats( - server, [server.data_generator.universe[card["props"]["id"]] for card in deck["cards"]] - ) - if len(deck["tags"]) == 0: - e.color = rgb_to_int(TYPE_COLORS[Counter(hermit_type_counts).most_common()[0][0]]) - - e.fields.clear() - e = ( - e.set_image("attachment://deck.png") - .add_field("Token cost", str(cost), inline=True) - .add_field( - "HEI ratio", - f"{card_type_counts[0]}:{card_type_counts[1]}:{card_type_counts[2]}", - inline=True, - ) - .add_field( - "Types", - len([typeList for typeList in hermit_type_counts.values() if typeList != 0]), - inline=True, - ) - .set_footer("Bot by Tyrannicodin16") - ) - with BytesIO() as im_binary: - im.save(im_binary, "PNG") - im_binary.seek(0) - delete_button = Button( - style=ButtonStyle.DANGER, - label="Delete", - emoji=":wastebasket:", - custom_id=f"delete_deck:{ctx.author_id}", - ) - await message.edit( - embed=e, - file=File(im_binary, "deck.png"), - components=spread_to_rows(delete_button), - ) - - @component_callback(re_compile("delete_deck:[0-9]")) - async def handle_delete(self: CardExt, ctx: ComponentContext) -> None: - """Handle the delete button being pressed on the deck info.""" - if str(ctx.author_id) == ctx.custom_id.split(":")[-1]: - await ctx.message.delete() - await ctx.send("Deleted!", ephemeral=True) - else: - await ctx.send("You can't delete this deck message!", ephemeral=True) - - @card.subcommand() - @slash_option( - "card_name", - "The card to get", - OptionType.STRING, - required=True, - autocomplete=True, - ) - async def info(self: CardExt, ctx: SlashContext, card_name: str) -> None: - """Get information about a card.""" - server = self.manager.get_server(ctx.guild_id) - cards = [ - card - for card in server.data_generator.universe.values() - if card_name.lower() in card.rarityName.lower() - ] - cards.sort(key=lambda val: val.rarityName) - if len(cards) > 0: - card = cards[0] - if isinstance(card, HermitCard): # Special for hermits - col = TYPE_COLORS[card.hermit_type] - e = ( - Embed( - title=f"{card.name} ({card.rarity})", - description=f"{card.name} ({card.rarity}) - {card.cost} tokens", - timestamp=dt.now(tz=timezone.utc), - color=rgb_to_int(col), - ) - .add_field("Rarity", card.rarity, inline=True) - .add_field( - "Primary attack", - card.attacks[0]["name"] - if card.attacks[0]["power"] is None - else card.attacks[0]["name"] + " - " + card.attacks[0]["power"], - inline=False, - ) - .add_field("Attack damage", card.attacks[0]["damage"], inline=True) - .add_field("Items required", count(card.attacks[0]["cost"]), inline=True) - .add_field( - "Secondary attack", - card.attacks[1]["name"] - if card.attacks[1]["power"] is None - else card.attacks[1]["name"] - + " - " - + card.attacks[1]["power"].replace("\n\n", "\n"), - inline=False, - ) - .add_field("Attack damage", card.attacks[1]["damage"], inline=True) - .add_field("Items required", count(card.attacks[1]["cost"]), inline=True) - ) - else: - description: str - color: tuple[int, int, int] - if isinstance(card, ItemCard): - description = ( - card.energy[0] + f" x{len(card.energy)}" - if len(card.energy) - else "" + " item card" - ) - color = TYPE_COLORS[card.energy[0]] - elif isinstance(card, EffectCard): - description = card.description - color = beige - e = Embed( - title=card.name, - description=description, - timestamp=dt.now(tz=timezone.utc), - color=rgb_to_int(color), - ).add_field("Rarity", card.rarity, inline=True) - e.set_thumbnail(card.token_image_url) - e.set_footer("Bot by Tyrannicodin16") - await ctx.send(embeds=e) - else: - await ctx.send("Couldn't find that card!", ephemeral=True) - - @card.subcommand() - @slash_option( - "hermits", - "The number of hermits in your deck", - OptionType.INTEGER, - required=True, - ) - @slash_option( - "desired_chance", - "The chance of getting a number of hermits (default 2) on a turn", - OptionType.INTEGER, - ) - @slash_option("desired_hermits", "The number of hermits you want", OptionType.INTEGER) - async def two_hermits( - self: CardExt, - ctx: SlashContext, - hermits: int, - desired_chance: int = 50, - desired_hermits: int = 2, - ) -> None: - """View probability to have a number of hermits in your hand after a certain number of draws.""" # noqa: E501 - if hermits < 1 or hermits > 36: - await ctx.send("Invalid hermit count (1-36)", ephemeral=True) - return - plt.figure() - xs = list(range(35)) - ys = [probability(hermits, i, desired_hermits) * 100 for i in xs] - surpass = next((idx[0] for idx in enumerate(ys) if idx[1] >= desired_chance), None) - plt.plot(xs, list(ys)) - plt.xlabel("Draws") - plt.ylabel("Probability") - plt.title( - f"Chance of having {desired_hermits} hermits in your hand after x draws for {hermits} hermits" # noqa: E501 - ) - plt.grid(visible=True) - e = Embed( - title=f"Chance of having {desired_hermits} hermits in your hand after x draws for {hermits} hermits", # noqa: E501 - timestamp=dt.now(tz=timezone.utc), - color=rgb_to_int((178, 178, 255)), - ).add_field("Initial draw chance", f"{ys[0]}%", inline=True) - if surpass or surpass == 0: - e.add_field(f"Hits {desired_chance}%", f"{surpass} draw(s)", inline=True) - else: - e.add_field(f"Hits {desired_chance}%", "Never", inline=True) - e.set_footer("Bot by Tyrannicodin | Probability calculations by Allophony") - e.set_image("attachment://graph.png") - with BytesIO() as figure_bytes: - plt.savefig(figure_bytes, format="png") - figure_bytes.seek(0) - await ctx.send(embeds=e, files=File(figure_bytes, "graph.png")) - plt.close() - - @card.subcommand() - async def chart(self: CardExt, ctx: SlashContext) -> None: - """Display the type chart by u/itsNizart.""" - e = ( - Embed(title="Type chart", timestamp=dt.now(tz=timezone.utc)) - .set_image("attachment://typechart.png") - .set_author( - "u/itsNizart", - "https://www.reddit.com/user/itsNizart", - "https://styles.redditmedia.com/t5_2efni5/styles/profileIcon_jzv50kvrvlb71.png", - ) - .set_footer("Bot by Tyrannicodin") - ) - await ctx.send(embeds=e, files=File("resources/typechart.png")) - - -def setup( - client: Client, - manager: ServerManager, - scheduler: AsyncIOScheduler, -) -> Extension: - """Create the extension. - - Args: - ---- - client (Client): The discord bot client - manager (ServerManager): The server connection manager - scheduler (AsyncIOScheduler): Event scheduler - """ - return CardExt(client, manager, scheduler) +"""Get information about cards and decks.""" + +from __future__ import annotations + +from asyncio import gather +from collections import Counter +from datetime import datetime as dt +from datetime import timezone +from io import BytesIO +from itertools import islice +from math import ceil, sqrt +from re import compile as re_compile +from typing import Iterable + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from interactions import ( + AutocompleteContext, + Button, + ButtonStyle, + Client, + ComponentContext, + Embed, + Extension, + File, + OptionType, + SlashContext, + component_callback, + global_autocomplete, + slash_command, + slash_option, + spread_to_rows, +) +from matplotlib import pyplot as plt +from PIL import Image + +from bot.util import TYPE_COLORS, Card, EffectCard, HermitCard, Server, ServerManager, probability +from bot.util.datagen import ItemCard + + +def take(items: int, iterable: Iterable) -> list: + """Return first `items` items of the iterable as a list.""" + return list(islice(iterable, items)) + + +beige = (226, 202, 139) + + +def rgb_to_int(rgb: tuple[int, int, int]) -> int: + """Convert an rgb tuple to an integer. + + Args: + ---- + rgb (tuple): RGB color to convert + """ + return (rgb[0] << 16) + (rgb[1] << 8) + rgb[2] + + +def count(s: str) -> str: + """Count the number of items required.""" + final = [] + for k, v in Counter(s).most_common(): + final.append(f"{v}x {k}") + return ", ".join(final) if len(final) else "None" + + +def best_factors(number: int) -> tuple[int, int]: + """Get as close to being square as possible.""" + x = sqrt(number) // 1 + return ceil(x), ceil(x if number - x**2 == 0 else (number - x**2) / x + x) + + +class CardExt(Extension): + """Get information about cards and decks.""" + + def __init__( + self: CardExt, + _: Client, + manager: ServerManager, + _scheduler: AsyncIOScheduler, + ) -> None: + """Get information about cards and decks. + + Args: + ---- + client (Client): The discord bot client + manager (ServerManager): The server connection manager + _scheduler (AsyncIOScheduler): Event scheduler + generator (DataGenerator): Card data generator + """ + self.manager: ServerManager = manager + + async def get_stats( + self: CardExt, server: Server, deck: list[Card] + ) -> tuple[Image.Image, tuple[int, int, int], dict[str, int], int]: + """Get information and an image of a deck. + + Args: + ---- + server (Server): The server the deck comes from + deck (list): List of card ids in the deck + """ + type_counts: dict[str, int] = { + "miner": 0, + "terraform": 0, + "speedrunner": 0, + "pvp": 0, + "builder": 0, + "balanced": 0, + "explorer": 0, + "prankster": 0, + "redstone": 0, + "farm": 0, + } + cost: int = 0 + hermits, items, effects = ([] for _ in range(3)) + for card in deck: + if card.category == "item": + items.append(card) + elif isinstance(card, HermitCard): + hermits.append(card) + if card.text_id in server.data_generator.universe.keys(): + type_counts[card.hermit_type] += 1 + else: + effects.append(card) + cost += card.cost + + hermits.sort(key=lambda x: x.text_id) + items.sort(key=lambda x: x.text_id) + effects.sort(key=lambda x: x.text_id) + + width, height = best_factors(len(deck)) + im = Image.new("RGBA", (width * 200, height * 200)) + + card_images = await gather( + *( + server.data_generator.get_image(card.token_image_url) + for card in hermits + effects + items + ) + ) + for i, card_image in enumerate(card_images): + card_image = card_image.resize((200, 200)).convert("RGBA") + im.paste(card_image, ((i % width) * 200, (i // width) * 200), card_image) + return im, (len(hermits), len(effects), len(items)), type_counts, cost + + @global_autocomplete("card_name") + async def card_autocomplete(self: CardExt, ctx: AutocompleteContext) -> None: + """Autocomplete a card name.""" + server = self.manager.get_server(ctx.guild_id) + if not ctx.input_text: + await ctx.send( + [card.rarityName for card in take(25, server.data_generator.universe.values())] + ) + return + await ctx.send( + [ + card.rarityName + for card in server.data_generator.universe.values() + if ctx.input_text.lower() in card.rarityName.lower() + ][0:25] + ) + + @slash_command() + async def card(self: CardExt, _: SlashContext) -> None: + """Get information about cards and decks.""" + + @card.subcommand() + @slash_option("code", "The deck's export code", OptionType.STRING, required=True) + @slash_option("hide_hash", "If the deck's hash should be hidden", OptionType.BOOLEAN) + async def deck(self: CardExt, ctx: SlashContext, code: str, *, hide_hash: bool = False) -> None: + """Get information about a deck.""" + server = self.manager.get_server(ctx.guild_id) + + deck = await server.get_deck(code) + if not deck: + await ctx.send("Invalid deck: Perhaps you're looking for /card info") + return + if len(deck["cards"]) > 100: + await ctx.send(f"A deck of {len(deck["cards"])} cards is too large!", ephemeral=True) + return + + if hide_hash: + await ctx.send("This message handily obscures your deck hash!", ephemeral=True) + + col = 0 if len(deck["tags"]) == 0 else int(deck["tags"][0]["color"].lstrip("#"), 16) + e = Embed( + title=deck["name"], + description=None if hide_hash else f"Code: {deck["code"]}", + timestamp=dt.now(tz=timezone.utc), + color=col, + ).add_field("Deck loading", "Please wait") + message = await ctx.send(embed=e) + + im, card_type_counts, hermit_type_counts, cost = await self.get_stats( + server, [server.data_generator.universe[card["props"]["id"]] for card in deck["cards"]] + ) + if len(deck["tags"]) == 0: + e.color = rgb_to_int(TYPE_COLORS[Counter(hermit_type_counts).most_common()[0][0]]) + + e.fields.clear() + e = ( + e.set_image("attachment://deck.png") + .add_field("Token cost", str(cost), inline=True) + .add_field( + "HEI ratio", + f"{card_type_counts[0]}:{card_type_counts[1]}:{card_type_counts[2]}", + inline=True, + ) + .add_field( + "Types", + len([typeList for typeList in hermit_type_counts.values() if typeList != 0]), + inline=True, + ) + .set_footer("Bot by Tyrannicodin16") + ) + with BytesIO() as im_binary: + im.save(im_binary, "PNG") + im_binary.seek(0) + delete_button = Button( + style=ButtonStyle.DANGER, + label="Delete", + emoji=":wastebasket:", + custom_id=f"delete_deck:{ctx.author_id}", + ) + await message.edit( + embed=e, + file=File(im_binary, "deck.png"), + components=spread_to_rows(delete_button), + ) + + @component_callback(re_compile("delete_deck:[0-9]")) + async def handle_delete(self: CardExt, ctx: ComponentContext) -> None: + """Handle the delete button being pressed on the deck info.""" + if str(ctx.author_id) == ctx.custom_id.split(":")[-1]: + await ctx.message.delete() + await ctx.send("Deleted!", ephemeral=True) + else: + await ctx.send("You can't delete this deck message!", ephemeral=True) + + @card.subcommand() + @slash_option( + "card_name", + "The card to get", + OptionType.STRING, + required=True, + autocomplete=True, + ) + async def info(self: CardExt, ctx: SlashContext, card_name: str) -> None: + """Get information about a card.""" + server = self.manager.get_server(ctx.guild_id) + cards = [ + card + for card in server.data_generator.universe.values() + if card_name.lower() in card.rarityName.lower() + ] + cards.sort(key=lambda val: val.rarityName) + if len(cards) > 0: + card = cards[0] + if isinstance(card, HermitCard): # Special for hermits + col = TYPE_COLORS[card.hermit_type] + e = ( + Embed( + title=f"{card.name} ({card.rarity})", + description=f"{card.name} ({card.rarity}) - {card.cost} tokens", + timestamp=dt.now(tz=timezone.utc), + color=rgb_to_int(col), + ) + .add_field("Rarity", card.rarity, inline=True) + .add_field( + "Primary attack", + card.attacks[0]["name"] + if card.attacks[0]["power"] is None + else card.attacks[0]["name"] + " - " + card.attacks[0]["power"], + inline=False, + ) + .add_field("Attack damage", card.attacks[0]["damage"], inline=True) + .add_field("Items required", count(card.attacks[0]["cost"]), inline=True) + .add_field( + "Secondary attack", + card.attacks[1]["name"] + if card.attacks[1]["power"] is None + else card.attacks[1]["name"] + + " - " + + card.attacks[1]["power"].replace("\n\n", "\n"), + inline=False, + ) + .add_field("Attack damage", card.attacks[1]["damage"], inline=True) + .add_field("Items required", count(card.attacks[1]["cost"]), inline=True) + ) + else: + description: str + color: tuple[int, int, int] + if isinstance(card, ItemCard): + description = ( + card.energy[0] + f" x{len(card.energy)}" + if len(card.energy) + else "" + " item card" + ) + color = TYPE_COLORS[card.energy[0]] + elif isinstance(card, EffectCard): + description = card.description + color = beige + e = Embed( + title=card.name, + description=description, + timestamp=dt.now(tz=timezone.utc), + color=rgb_to_int(color), + ).add_field("Rarity", card.rarity, inline=True) + e.set_thumbnail(card.token_image_url) + e.set_footer("Bot by Tyrannicodin16") + await ctx.send(embeds=e) + else: + await ctx.send("Couldn't find that card!", ephemeral=True) + + @card.subcommand() + @slash_option( + "hermits", + "The number of hermits in your deck", + OptionType.INTEGER, + required=True, + ) + @slash_option( + "desired_chance", + "The chance of getting a number of hermits (default 2) on a turn", + OptionType.INTEGER, + ) + @slash_option("desired_hermits", "The number of hermits you want", OptionType.INTEGER) + async def two_hermits( + self: CardExt, + ctx: SlashContext, + hermits: int, + desired_chance: int = 50, + desired_hermits: int = 2, + ) -> None: + """View probability to have a number of hermits in your hand after a certain number of draws.""" # noqa: E501 + if hermits < 1 or hermits > 36: + await ctx.send("Invalid hermit count (1-36)", ephemeral=True) + return + plt.figure() + xs = list(range(35)) + ys = [probability(hermits, i, desired_hermits) * 100 for i in xs] + surpass = next((idx[0] for idx in enumerate(ys) if idx[1] >= desired_chance), None) + plt.plot(xs, list(ys)) + plt.xlabel("Draws") + plt.ylabel("Probability") + plt.title( + f"Chance of having {desired_hermits} hermits in your hand after x draws for {hermits} hermits" # noqa: E501 + ) + plt.grid(visible=True) + e = Embed( + title=f"Chance of having {desired_hermits} hermits in your hand after x draws for {hermits} hermits", # noqa: E501 + timestamp=dt.now(tz=timezone.utc), + color=rgb_to_int((178, 178, 255)), + ).add_field("Initial draw chance", f"{ys[0]}%", inline=True) + if surpass or surpass == 0: + e.add_field(f"Hits {desired_chance}%", f"{surpass} draw(s)", inline=True) + else: + e.add_field(f"Hits {desired_chance}%", "Never", inline=True) + e.set_footer("Bot by Tyrannicodin | Probability calculations by Allophony") + e.set_image("attachment://graph.png") + with BytesIO() as figure_bytes: + plt.savefig(figure_bytes, format="png") + figure_bytes.seek(0) + await ctx.send(embeds=e, files=File(figure_bytes, "graph.png")) + plt.close() + + @card.subcommand() + async def chart(self: CardExt, ctx: SlashContext) -> None: + """Display the type chart by u/itsNizart.""" + e = ( + Embed(title="Type chart", timestamp=dt.now(tz=timezone.utc)) + .set_image("attachment://typechart.png") + .set_author( + "u/itsNizart", + "https://www.reddit.com/user/itsNizart", + "https://styles.redditmedia.com/t5_2efni5/styles/profileIcon_jzv50kvrvlb71.png", + ) + .set_footer("Bot by Tyrannicodin") + ) + await ctx.send(embeds=e, files=File("resources/typechart.png")) + + +def setup( + client: Client, + manager: ServerManager, + scheduler: AsyncIOScheduler, +) -> Extension: + """Create the extension. + + Args: + ---- + client (Client): The discord bot client + manager (ServerManager): The server connection manager + scheduler (AsyncIOScheduler): Event scheduler + """ + return CardExt(client, manager, scheduler) diff --git a/bot/exts/dotd.py b/bot/exts/dotd.py index d8f3faf..b16bbdc 100644 --- a/bot/exts/dotd.py +++ b/bot/exts/dotd.py @@ -1,143 +1,143 @@ -"""Commands for recording dotd results.""" - -from __future__ import annotations - -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from interactions import ( - Client, - Extension, - Member, - OptionType, - SlashContext, - User, - slash_command, - slash_option, -) - -from bot.util import ServerManager - - -class DotdExt(Extension): - """Commands for recording dotd results.""" - - def __init__( - self: DotdExt, - client: Client, - manager: ServerManager, - _scheduler: AsyncIOScheduler, - ) -> None: - """Commands for recording dotd results. - - Args: - ---- - client (Client): The discord bot client - manager (ServerManager): The server connection manager - _scheduler (AsyncIOScheduler): Event scheduler - _generator (DataGenerator): Card data generator - """ - self.client: Client = client - self.manager = manager - - self.data: dict[str, tuple[int, int, int]] = {} - - @slash_command() - async def dotd(self: DotdExt, _: SlashContext) -> None: - """Commands for recording dotd results.""" - - @dotd.subcommand() - @slash_option("wins", "The number of games you won", OptionType.INTEGER, required=True) - @slash_option("ties", "The number of games you tied (can be blank)", OptionType.INTEGER) - async def submit(self: DotdExt, ctx: SlashContext, wins: int, ties: int = 0) -> None: - """Submit a dotd result, this will overwrite any previous results.""" - if wins > 5 or ties > 5 - wins or wins < 0 or ties < 0: - await ctx.send("Invalid wins or ties", ephemeral=True) - return - self.data[str(ctx.author_id)] = ( - wins, - ties, - 5 - wins - ties, - ) - await ctx.send( - f"{ctx.author.display_name}: {wins} wins, {ties} ties and {5-wins-ties} losses" - ) - - @dotd.subcommand() - @slash_option("player", "The player to add", OptionType.USER, required=True) - @slash_option("wins", "The number of games the player won", OptionType.INTEGER, required=True) - @slash_option("ties", "The number of games the player tied (can be blank)", OptionType.INTEGER) - async def add_other( - self: DotdExt, ctx: SlashContext, player: Member, wins: int, ties: int = 0 - ) -> None: - """Add a score for a player that is not you.""" - if ctx.member is None: - await ctx.send("You can't do that!", ephemeral=True) - return - if not self.manager.get_server(ctx.guild_id).authorize_user(ctx.member): - await ctx.send("You can't do that!", ephemeral=True) - return - - if wins > 5 or ties > 5 - wins or wins < 0 or ties < 0: - await ctx.send("Invalid wins or ties", ephemeral=True) - return - self.data[str(player.id)] = (wins, ties, 5 - wins - ties) - await ctx.send( - f"{player.display_name}: {wins} wins, {ties} ties and {5-wins-ties} losses", - ephemeral=True, - ) - - @dotd.subcommand("list") - async def list_results(self: DotdExt, ctx: SlashContext) -> None: - """List today's dotd results.""" - reversed_data: dict[tuple[int, int, int], str] = { - value: key for key, value in self.data.items() - } - data_sorted: list[tuple[str, int, int, int]] = [ - (key, *value) for key, value in self.data.items() - ] - - data_sorted.sort(key=lambda x: (-x[1], -x[2])) - output: str = "" - for i, user in enumerate(data_sorted, 1): - discord_member: User | Member | None = await self.client.fetch_member( - reversed_data[user[1:4]], ctx.guild_id - ) - if not discord_member: - discord_member = await self.client.fetch_user(reversed_data[user[1:4]]) - output = ( - f"{output}\n{i}. {discord_member.display_name if discord_member else user[0]} - " - + f"{user[1]} wins, {user[2]} ties and {user[3]} losses" - ) - if output: - await ctx.send(output) - else: - await ctx.send("No results submitted yet", ephemeral=True) - - @dotd.subcommand() - async def clear(self: DotdExt, ctx: SlashContext) -> None: - """Clear all results.""" - if ctx.member is None: - await ctx.send("You can't do that!", ephemeral=True) - return - if not self.manager.get_server(ctx.guild_id).authorize_user(ctx.member): - await ctx.send("You can't do that!", ephemeral=True) - return - - self.data = {} - await ctx.send("Cleared all results") - - -def setup( - client: Client, - manager: ServerManager, - scheduler: AsyncIOScheduler, -) -> Extension: - """Create the extension. - - Args: - ---- - client (Client): The discord bot client - manager (ServerManager): The server connection manager - scheduler (AsyncIOScheduler): Event scheduler - generator (DataGenerator): Card data generator - """ - return DotdExt(client, manager, scheduler) +"""Commands for recording dotd results.""" + +from __future__ import annotations + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from interactions import ( + Client, + Extension, + Member, + OptionType, + SlashContext, + User, + slash_command, + slash_option, +) + +from bot.util import ServerManager + + +class DotdExt(Extension): + """Commands for recording dotd results.""" + + def __init__( + self: DotdExt, + client: Client, + manager: ServerManager, + _scheduler: AsyncIOScheduler, + ) -> None: + """Commands for recording dotd results. + + Args: + ---- + client (Client): The discord bot client + manager (ServerManager): The server connection manager + _scheduler (AsyncIOScheduler): Event scheduler + _generator (DataGenerator): Card data generator + """ + self.client: Client = client + self.manager = manager + + self.data: dict[str, tuple[int, int, int]] = {} + + @slash_command() + async def dotd(self: DotdExt, _: SlashContext) -> None: + """Commands for recording dotd results.""" + + @dotd.subcommand() + @slash_option("wins", "The number of games you won", OptionType.INTEGER, required=True) + @slash_option("ties", "The number of games you tied (can be blank)", OptionType.INTEGER) + async def submit(self: DotdExt, ctx: SlashContext, wins: int, ties: int = 0) -> None: + """Submit a dotd result, this will overwrite any previous results.""" + if wins > 5 or ties > 5 - wins or wins < 0 or ties < 0: + await ctx.send("Invalid wins or ties", ephemeral=True) + return + self.data[str(ctx.author_id)] = ( + wins, + ties, + 5 - wins - ties, + ) + await ctx.send( + f"{ctx.author.display_name}: {wins} wins, {ties} ties and {5-wins-ties} losses" + ) + + @dotd.subcommand() + @slash_option("player", "The player to add", OptionType.USER, required=True) + @slash_option("wins", "The number of games the player won", OptionType.INTEGER, required=True) + @slash_option("ties", "The number of games the player tied (can be blank)", OptionType.INTEGER) + async def add_other( + self: DotdExt, ctx: SlashContext, player: Member, wins: int, ties: int = 0 + ) -> None: + """Add a score for a player that is not you.""" + if ctx.member is None: + await ctx.send("You can't do that!", ephemeral=True) + return + if not self.manager.get_server(ctx.guild_id).authorize_user(ctx.member): + await ctx.send("You can't do that!", ephemeral=True) + return + + if wins > 5 or ties > 5 - wins or wins < 0 or ties < 0: + await ctx.send("Invalid wins or ties", ephemeral=True) + return + self.data[str(player.id)] = (wins, ties, 5 - wins - ties) + await ctx.send( + f"{player.display_name}: {wins} wins, {ties} ties and {5-wins-ties} losses", + ephemeral=True, + ) + + @dotd.subcommand("list") + async def list_results(self: DotdExt, ctx: SlashContext) -> None: + """List today's dotd results.""" + reversed_data: dict[tuple[int, int, int], str] = { + value: key for key, value in self.data.items() + } + data_sorted: list[tuple[str, int, int, int]] = [ + (key, *value) for key, value in self.data.items() + ] + + data_sorted.sort(key=lambda x: (-x[1], -x[2])) + output: str = "" + for i, user in enumerate(data_sorted, 1): + discord_member: User | Member | None = await self.client.fetch_member( + reversed_data[user[1:4]], ctx.guild_id + ) + if not discord_member: + discord_member = await self.client.fetch_user(reversed_data[user[1:4]]) + output = ( + f"{output}\n{i}. {discord_member.display_name if discord_member else user[0]} - " + + f"{user[1]} wins, {user[2]} ties and {user[3]} losses" + ) + if output: + await ctx.send(output) + else: + await ctx.send("No results submitted yet", ephemeral=True) + + @dotd.subcommand() + async def clear(self: DotdExt, ctx: SlashContext) -> None: + """Clear all results.""" + if ctx.member is None: + await ctx.send("You can't do that!", ephemeral=True) + return + if not self.manager.get_server(ctx.guild_id).authorize_user(ctx.member): + await ctx.send("You can't do that!", ephemeral=True) + return + + self.data = {} + await ctx.send("Cleared all results") + + +def setup( + client: Client, + manager: ServerManager, + scheduler: AsyncIOScheduler, +) -> Extension: + """Create the extension. + + Args: + ---- + client (Client): The discord bot client + manager (ServerManager): The server connection manager + scheduler (AsyncIOScheduler): Event scheduler + generator (DataGenerator): Card data generator + """ + return DotdExt(client, manager, scheduler) diff --git a/bot/exts/forums.py b/bot/exts/forums.py index 4957adb..d3c6b05 100644 --- a/bot/exts/forums.py +++ b/bot/exts/forums.py @@ -1,234 +1,234 @@ -"""Commands to help manage forums.""" - -from __future__ import annotations - -from asyncio import sleep -from collections import defaultdict -from json import dump, load - -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from interactions import ( - TYPE_THREAD_CHANNEL, - Button, - ButtonStyle, - Client, - ComponentContext, - Extension, - GuildForum, - GuildForumPost, - GuildText, - SlashContext, - Snowflake_Type, - StringSelectMenu, - StringSelectOption, - ThreadTag, - component_callback, - events, - listen, - slash_command, - spread_to_rows, -) - -from bot.util import Server, ServerManager - - -class DummyPost: - """A fake post.""" - - def __init__(self: DummyPost, ctx: SlashContext) -> None: - """Fake post for manually tracking a forum post.""" - self.thread = ctx.channel - self.author = ctx.author - self.channel = ctx.channel - - -class ForumExt(Extension): - """Commands to help manage forums.""" - - def __init__( - self: ForumExt, - client: Client, - manager: ServerManager, - _scheduler: AsyncIOScheduler, - ) -> None: - """Commands to help manage forums. - - Args: - ---- - client (Client): The discord bot client - manager (ServerManager): The server connection manager - _scheduler (AsyncIOScheduler): Event scheduler - """ - self.client: Client = client - self.manager: ServerManager = manager - - self.to_close: defaultdict[str, list] = defaultdict(list) - with open("forums.json") as f: - self.to_close.update(load(f)) - - @listen() - async def disconnect(self: ForumExt, _: str) -> None: - """Handle bot disconnection.""" - with open("forums.json", "w") as f: - dump(self.to_close, f) - - @slash_command() - async def forum(self: ForumExt, _: SlashContext) -> None: - """Commands to help manage forums.""" - - @forum.subcommand() - async def close_done(self: ForumExt, ctx: SlashContext) -> None: - """Close all forums that are complete for the next update.""" - if ctx.member is None: - await ctx.send("You can't do that!", ephemeral=True) - return - if not self.manager.get_server(ctx.guild_id).authorize_user(ctx.member): - await ctx.send("You can't do that!", ephemeral=True) - return - - for parent, threads in self.to_close.items(): - parent_channel = await self.client.fetch_channel(parent) - if not isinstance(parent_channel, GuildForum): - continue - for thread_id in threads: - thread = await parent_channel.fetch_post(thread_id) - if thread: - await thread.archive(locked=True) - self.to_close = defaultdict(list) - await ctx.send("Closed all posts", ephemeral=True) - - @forum.subcommand() - async def manual(self: ForumExt, ctx: SlashContext) -> None: - """Manually add a forum to be tracked.""" - if ctx.member is None: - await ctx.send("You can't do that!", ephemeral=True) - return - authed = self.manager.get_server(ctx.guild_id).authorize_user(ctx.member) - if not authed: - await ctx.send("You can't do that!", ephemeral=True) - return - await ctx.send("Creating message", ephemeral=True) - - await self.new_post(self, DummyPost(ctx)) - - @listen("new_thread_create") - async def new_post(self: ForumExt, event: events.NewThreadCreate) -> None: - """Track a new thread when posted.""" - thread: TYPE_THREAD_CHANNEL = event.thread - if not isinstance(thread, GuildForumPost): # Must be a forum post - return - - server: Server = self.manager.get_server(thread.guild.id) - if ( - str(thread.parent_id) not in server.tracked_forums.keys() - ): # Ensure this forum is tracked - return - - forum: GuildText | GuildForum = thread.parent_channel - if isinstance(forum, GuildText): # This should never happen since we know it's a forum post - return - - await sleep(1) - await thread.join() - - final_tags: list[Snowflake_Type | ThreadTag] = [ - tag - for tag in thread.applied_tags - if tag.name in server.tracked_forums[str(thread.parent_id)] - ] - open_tag = forum.get_tag("open", case_insensitive=True) - if open_tag: - final_tags.append(open_tag) - await thread.edit(applied_tags=final_tags) - - select_option = [] - for tag in forum.available_tags: - if not ( - tag.name in server.tracked_forums[str(thread.parent_id)] - or tag.name.lower() in ["open", "closed"] - ): - select_option.append( - StringSelectOption(label=tag.name, value=str(tag.id), emoji=tag.emoji_name) - ) - await thread.send( - "Thanks for submitting a post", - components=spread_to_rows( - StringSelectMenu(*select_option, custom_id="post_tagged"), - Button( - style=ButtonStyle.DANGER, - label="Close thread", - emoji=":wastebasket:", - custom_id="close_thread", - ), - ), - ) - - @component_callback("post_tagged") - async def change_tags(self: ForumExt, ctx: ComponentContext) -> None: - """Change the status tag on a post.""" - server: Server = self.manager.get_server(ctx.guild_id) - if not ( - server.authorize_user(ctx.author) - or (ctx.channel.initial_post and ctx.author == ctx.channel.initial_post.author) - ): - await ctx.send("You can't do that!", ephemeral=True) - return - - post: GuildForumPost = ctx.channel - parent: GuildForum | GuildText = post.parent_channel - if isinstance(parent, GuildText): # This will never happen (type checking is fun I swear) - return - - selected_tag = parent.get_tag(ctx.values[0]) - final_tags: list[Snowflake_Type | ThreadTag] = [ - tag - for tag in post.applied_tags - if tag.name in server.tracked_forums[str(post.parent_id)] - or tag.name in ["open", "closed"] - ] - - if selected_tag in final_tags: - final_tags.remove(selected_tag) - await ctx.send("Removed tag", ephemeral=True) - elif selected_tag: - final_tags.append(selected_tag) - await ctx.send("Added tag", ephemeral=True) - else: - await ctx.send("Couldn't find tag!") - await post.edit(applied_tags=final_tags) - - @component_callback("close_thread") - async def close_thread(self: ForumExt, ctx: ComponentContext) -> None: - """Close a thread.""" - server: Server = self.manager.get_server(ctx.guild_id) - if not ( - server.authorize_user(ctx.author) - or (ctx.channel.initial_post and ctx.author == ctx.channel.initial_post.author) - ): - await ctx.send("You can't do that!", ephemeral=True) - return - - final_tags = [tag for tag in ctx.channel.applied_tags if tag.name not in ["open", "closed"]] - closed_tag = ctx.channel.parent_channel.get_tag("closed", case_insensitive=True) - if closed_tag: - final_tags.append(closed_tag) - - await ctx.send("Closed post") - await ctx.channel.edit(locked=True, applied_tags=final_tags) - self.to_close[ctx.channel.parent_id].append(ctx.channel_id) - - -def setup( - client: Client, - manager: ServerManager, - scheduler: AsyncIOScheduler, -) -> Extension: - """Create the extension. - - Args: - ---- - client (Client): The discord bot client - manager (ServerManager): The server connection manager - scheduler (AsyncIOScheduler): Event scheduler - """ - return ForumExt(client, manager, scheduler) +"""Commands to help manage forums.""" + +from __future__ import annotations + +from asyncio import sleep +from collections import defaultdict +from json import dump, load + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from interactions import ( + TYPE_THREAD_CHANNEL, + Button, + ButtonStyle, + Client, + ComponentContext, + Extension, + GuildForum, + GuildForumPost, + GuildText, + SlashContext, + Snowflake_Type, + StringSelectMenu, + StringSelectOption, + ThreadTag, + component_callback, + events, + listen, + slash_command, + spread_to_rows, +) + +from bot.util import Server, ServerManager + + +class DummyPost: + """A fake post.""" + + def __init__(self: DummyPost, ctx: SlashContext) -> None: + """Fake post for manually tracking a forum post.""" + self.thread = ctx.channel + self.author = ctx.author + self.channel = ctx.channel + + +class ForumExt(Extension): + """Commands to help manage forums.""" + + def __init__( + self: ForumExt, + client: Client, + manager: ServerManager, + _scheduler: AsyncIOScheduler, + ) -> None: + """Commands to help manage forums. + + Args: + ---- + client (Client): The discord bot client + manager (ServerManager): The server connection manager + _scheduler (AsyncIOScheduler): Event scheduler + """ + self.client: Client = client + self.manager: ServerManager = manager + + self.to_close: defaultdict[str, list] = defaultdict(list) + with open("forums.json") as f: + self.to_close.update(load(f)) + + @listen() + async def disconnect(self: ForumExt, _: str) -> None: + """Handle bot disconnection.""" + with open("forums.json", "w") as f: + dump(self.to_close, f) + + @slash_command() + async def forum(self: ForumExt, _: SlashContext) -> None: + """Commands to help manage forums.""" + + @forum.subcommand() + async def close_done(self: ForumExt, ctx: SlashContext) -> None: + """Close all forums that are complete for the next update.""" + if ctx.member is None: + await ctx.send("You can't do that!", ephemeral=True) + return + if not self.manager.get_server(ctx.guild_id).authorize_user(ctx.member): + await ctx.send("You can't do that!", ephemeral=True) + return + + for parent, threads in self.to_close.items(): + parent_channel = await self.client.fetch_channel(parent) + if not isinstance(parent_channel, GuildForum): + continue + for thread_id in threads: + thread = await parent_channel.fetch_post(thread_id) + if thread: + await thread.archive(locked=True) + self.to_close = defaultdict(list) + await ctx.send("Closed all posts", ephemeral=True) + + @forum.subcommand() + async def manual(self: ForumExt, ctx: SlashContext) -> None: + """Manually add a forum to be tracked.""" + if ctx.member is None: + await ctx.send("You can't do that!", ephemeral=True) + return + authed = self.manager.get_server(ctx.guild_id).authorize_user(ctx.member) + if not authed: + await ctx.send("You can't do that!", ephemeral=True) + return + await ctx.send("Creating message", ephemeral=True) + + await self.new_post(self, DummyPost(ctx)) + + @listen("new_thread_create") + async def new_post(self: ForumExt, event: events.NewThreadCreate) -> None: + """Track a new thread when posted.""" + thread: TYPE_THREAD_CHANNEL = event.thread + if not isinstance(thread, GuildForumPost): # Must be a forum post + return + + server: Server = self.manager.get_server(thread.guild.id) + if ( + str(thread.parent_id) not in server.tracked_forums.keys() + ): # Ensure this forum is tracked + return + + forum: GuildText | GuildForum = thread.parent_channel + if isinstance(forum, GuildText): # This should never happen since we know it's a forum post + return + + await sleep(1) + await thread.join() + + final_tags: list[Snowflake_Type | ThreadTag] = [ + tag + for tag in thread.applied_tags + if tag.name in server.tracked_forums[str(thread.parent_id)] + ] + open_tag = forum.get_tag("open", case_insensitive=True) + if open_tag: + final_tags.append(open_tag) + await thread.edit(applied_tags=final_tags) + + select_option = [] + for tag in forum.available_tags: + if not ( + tag.name in server.tracked_forums[str(thread.parent_id)] + or tag.name.lower() in ["open", "closed"] + ): + select_option.append( + StringSelectOption(label=tag.name, value=str(tag.id), emoji=tag.emoji_name) + ) + await thread.send( + "Thanks for submitting a post", + components=spread_to_rows( + StringSelectMenu(*select_option, custom_id="post_tagged"), + Button( + style=ButtonStyle.DANGER, + label="Close thread", + emoji=":wastebasket:", + custom_id="close_thread", + ), + ), + ) + + @component_callback("post_tagged") + async def change_tags(self: ForumExt, ctx: ComponentContext) -> None: + """Change the status tag on a post.""" + server: Server = self.manager.get_server(ctx.guild_id) + if not ( + server.authorize_user(ctx.author) + or (ctx.channel.initial_post and ctx.author == ctx.channel.initial_post.author) + ): + await ctx.send("You can't do that!", ephemeral=True) + return + + post: GuildForumPost = ctx.channel + parent: GuildForum | GuildText = post.parent_channel + if isinstance(parent, GuildText): # This will never happen (type checking is fun I swear) + return + + selected_tag = parent.get_tag(ctx.values[0]) + final_tags: list[Snowflake_Type | ThreadTag] = [ + tag + for tag in post.applied_tags + if tag.name in server.tracked_forums[str(post.parent_id)] + or tag.name in ["open", "closed"] + ] + + if selected_tag in final_tags: + final_tags.remove(selected_tag) + await ctx.send("Removed tag", ephemeral=True) + elif selected_tag: + final_tags.append(selected_tag) + await ctx.send("Added tag", ephemeral=True) + else: + await ctx.send("Couldn't find tag!") + await post.edit(applied_tags=final_tags) + + @component_callback("close_thread") + async def close_thread(self: ForumExt, ctx: ComponentContext) -> None: + """Close a thread.""" + server: Server = self.manager.get_server(ctx.guild_id) + if not ( + server.authorize_user(ctx.author) + or (ctx.channel.initial_post and ctx.author == ctx.channel.initial_post.author) + ): + await ctx.send("You can't do that!", ephemeral=True) + return + + final_tags = [tag for tag in ctx.channel.applied_tags if tag.name not in ["open", "closed"]] + closed_tag = ctx.channel.parent_channel.get_tag("closed", case_insensitive=True) + if closed_tag: + final_tags.append(closed_tag) + + await ctx.send("Closed post") + await ctx.channel.edit(locked=True, applied_tags=final_tags) + self.to_close[ctx.channel.parent_id].append(ctx.channel_id) + + +def setup( + client: Client, + manager: ServerManager, + scheduler: AsyncIOScheduler, +) -> Extension: + """Create the extension. + + Args: + ---- + client (Client): The discord bot client + manager (ServerManager): The server connection manager + scheduler (AsyncIOScheduler): Event scheduler + """ + return ForumExt(client, manager, scheduler) diff --git a/bot/exts/game.py b/bot/exts/game.py index fb7aecf..a4224ad 100644 --- a/bot/exts/game.py +++ b/bot/exts/game.py @@ -1,137 +1,137 @@ -"""Commands for matches.""" - -from __future__ import annotations - -from typing import Any - -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.triggers.interval import IntervalTrigger -from interactions import ( - Activity, - ActivityType, - Button, - ButtonStyle, - Client, - ComponentContext, - Extension, - Message, - OptionType, - SlashContext, - component_callback, - slash_command, - slash_option, -) - -from bot.util import QueueGame, Server, ServerManager - - -class GameExt(Extension): - """Commands linked to games.""" - - def __init__( - self: GameExt, - client: Client, - manager: ServerManager, - scheduler: AsyncIOScheduler, - ) -> None: - """Commands linked to games. - - Args: - ---- - client (Client): The discord bot client - manager (ServerManager): The server connection manager - scheduler (AsyncIOScheduler): Event scheduler - """ - self.client: Client = client - self.manager: ServerManager = manager - self.scheduler: AsyncIOScheduler = scheduler - - self.scheduler.add_job(self.update_status, IntervalTrigger(minutes=1)) - - self.games: dict[str, QueueGame] = {} - - @slash_command() - async def game(self: GameExt, _: SlashContext) -> None: - """Commands linked to games.""" - - @game.subcommand() - @slash_option("spectators", "Should the spectator code be shown", OptionType.BOOLEAN) - async def create(self: GameExt, ctx: SlashContext, *, spectators: bool = False) -> None: - """Create a match for someone to join.""" - server: Server = self.manager.get_server(ctx.guild_id) - - game: QueueGame | None = await server.create_game() - if not game: - await ctx.send("Failed to create a game, seems to be a server problem.") - return - - cancel_button = Button( - style=ButtonStyle.GRAY, label="Cancel", emoji="🚫", custom_id="cancel_game" - ) - - message: Message = await ctx.send( - embed=game.create_embed(spectators=spectators), components=cancel_button - ) - self.games[str(message.id)] = game - - @component_callback("cancel_game") - async def cancel_game(self: GameExt, ctx: ComponentContext) -> None: - """Cancel a game.""" - if str(ctx.message_id) not in self.games.keys(): - await ctx.send("Couldn't find game.", ephemeral=True) - return - - server: Server = self.manager.get_server(ctx.guild_id) - - target_game = self.games[str(ctx.message_id)] - await server.cancel_game(target_game) - await ctx.send("Cancelled game") - - @game.subcommand() - async def count(self: GameExt, ctx: ComponentContext) -> None: - """Get the number of games being played on this server.""" - server: Server = self.manager.get_server(ctx.guild_id) - - game_count = await server.get_game_count() - - if (game_count == 1): - game_message = "is 1 game" - else: - game_message = f"are {game_count} games" - - await ctx.send(f"There {game_message} on this server") - - async def update_status(self: GameExt) -> None: - """Update the bots status.""" - game_count: int = 0 - for server in self.manager.servers: - game_count += await server.get_game_count() - - if (game_count == 1): - game_word = "game" - else: - game_word = "games" - - await self.client.change_presence( - activity=Activity( - f"{game_count} {game_word}", - ActivityType.WATCHING, - self.manager.servers[0].server_url, - ) - ) - - -def setup( - client: Client, - manager: ServerManager, - scheduler: AsyncIOScheduler, -) -> Extension: - """Create the extension. - - Args: - ---- - client (Client): The discord bot client - manager (ServerManager): The server connection manager - scheduler (AsyncIOScheduler): Event scheduler - """ - return GameExt(client, manager, scheduler) +"""Commands for matches.""" + +from __future__ import annotations + +from typing import Any + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger +from interactions import ( + Activity, + ActivityType, + Button, + ButtonStyle, + Client, + ComponentContext, + Extension, + Message, + OptionType, + SlashContext, + component_callback, + slash_command, + slash_option, +) + +from bot.util import QueueGame, Server, ServerManager + + +class GameExt(Extension): + """Commands linked to games.""" + + def __init__( + self: GameExt, + client: Client, + manager: ServerManager, + scheduler: AsyncIOScheduler, + ) -> None: + """Commands linked to games. + + Args: + ---- + client (Client): The discord bot client + manager (ServerManager): The server connection manager + scheduler (AsyncIOScheduler): Event scheduler + """ + self.client: Client = client + self.manager: ServerManager = manager + self.scheduler: AsyncIOScheduler = scheduler + + self.scheduler.add_job(self.update_status, IntervalTrigger(minutes=1)) + + self.games: dict[str, QueueGame] = {} + + @slash_command() + async def game(self: GameExt, _: SlashContext) -> None: + """Commands linked to games.""" + + @game.subcommand() + @slash_option("spectators", "Should the spectator code be shown", OptionType.BOOLEAN) + async def create(self: GameExt, ctx: SlashContext, *, spectators: bool = False) -> None: + """Create a match for someone to join.""" + server: Server = self.manager.get_server(ctx.guild_id) + + game: QueueGame | None = await server.create_game() + if not game: + await ctx.send("Failed to create a game, seems to be a server problem.") + return + + cancel_button = Button( + style=ButtonStyle.GRAY, label="Cancel", emoji="🚫", custom_id="cancel_game" + ) + + message: Message = await ctx.send( + embed=game.create_embed(spectators=spectators), components=cancel_button + ) + self.games[str(message.id)] = game + + @component_callback("cancel_game") + async def cancel_game(self: GameExt, ctx: ComponentContext) -> None: + """Cancel a game.""" + if str(ctx.message_id) not in self.games.keys(): + await ctx.send("Couldn't find game.", ephemeral=True) + return + + server: Server = self.manager.get_server(ctx.guild_id) + + target_game = self.games[str(ctx.message_id)] + await server.cancel_game(target_game) + await ctx.send("Cancelled game") + + @game.subcommand() + async def count(self: GameExt, ctx: ComponentContext) -> None: + """Get the number of games being played on this server.""" + server: Server = self.manager.get_server(ctx.guild_id) + + game_count = await server.get_game_count() + + if game_count == 1: + game_message = "is 1 game" + else: + game_message = f"are {game_count} games" + + await ctx.send(f"There {game_message} on this server") + + async def update_status(self: GameExt) -> None: + """Update the bots status.""" + game_count: int = 0 + for server in self.manager.servers: + game_count += await server.get_game_count() + + if game_count == 1: + game_word = "game" + else: + game_word = "games" + + await self.client.change_presence( + activity=Activity( + f"{game_count} {game_word}", + ActivityType.WATCHING, + self.manager.servers[0].server_url, + ) + ) + + +def setup( + client: Client, + manager: ServerManager, + scheduler: AsyncIOScheduler, +) -> Extension: + """Create the extension. + + Args: + ---- + client (Client): The discord bot client + manager (ServerManager): The server connection manager + scheduler (AsyncIOScheduler): Event scheduler + """ + return GameExt(client, manager, scheduler) diff --git a/bot/exts/utils.py b/bot/exts/utils.py index a598cef..43b0e72 100644 --- a/bot/exts/utils.py +++ b/bot/exts/utils.py @@ -1,69 +1,69 @@ -"""Commands for the bot.""" - -from __future__ import annotations - -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from interactions import Client, Extension, SlashContext, Status, User, slash_command - -from bot.util import DataGenerator, ServerManager - - -class UtilExt(Extension): - """Commands for the bot.""" - - def __init__( - self: UtilExt, - client: Client, - _manager: ServerManager, - _scheduler: AsyncIOScheduler, - ) -> None: - """Commands for the bot. - - Args: - ---- - client (Client): The discord bot client - _manager (ServerManager): The server connection manager - _scheduler (AsyncIOScheduler): Event scheduler - _generator (DataGenerator): Card data generator - """ - self.client: Client = client - - @slash_command() - async def util(self: UtilExt, _: SlashContext) -> None: - """Commands for the bot.""" - - @util.subcommand() - async def ping(self: UtilExt, ctx: SlashContext) -> None: - """Get the latency of the bot.""" - await ctx.send(f"Pong!\nLatency:{round(self.client.latency, 3)}ms", ephemeral=True) - - @util.subcommand() - async def stop(self: UtilExt, ctx: SlashContext) -> None: - """Gracefully shutdown the bot.""" - owner: User | None = self.client.owner - if not owner: - await ctx.send("You aren't allowed to do this.", ephemeral=True) - return - if ctx.author_id != owner.id: - await ctx.send(f"You aren't allowed to do this ||{owner.mention}||") - return - - await self.client.change_presence(Status.OFFLINE) - await ctx.send("Stopping!", ephemeral=True) - await self.client.stop() - - -def setup( - client: Client, - manager: ServerManager, - scheduler: AsyncIOScheduler, -) -> Extension: - """Create the extension. - - Args: - ---- - client (Client): The discord bot client - manager (ServerManager): The server connection manager - scheduler (AsyncIOScheduler): Event scheduler - """ - return UtilExt(client, manager, scheduler) +"""Commands for the bot.""" + +from __future__ import annotations + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from interactions import Client, Extension, SlashContext, Status, User, slash_command + +from bot.util import DataGenerator, ServerManager + + +class UtilExt(Extension): + """Commands for the bot.""" + + def __init__( + self: UtilExt, + client: Client, + _manager: ServerManager, + _scheduler: AsyncIOScheduler, + ) -> None: + """Commands for the bot. + + Args: + ---- + client (Client): The discord bot client + _manager (ServerManager): The server connection manager + _scheduler (AsyncIOScheduler): Event scheduler + _generator (DataGenerator): Card data generator + """ + self.client: Client = client + + @slash_command() + async def util(self: UtilExt, _: SlashContext) -> None: + """Commands for the bot.""" + + @util.subcommand() + async def ping(self: UtilExt, ctx: SlashContext) -> None: + """Get the latency of the bot.""" + await ctx.send(f"Pong!\nLatency:{round(self.client.latency, 3)}ms", ephemeral=True) + + @util.subcommand() + async def stop(self: UtilExt, ctx: SlashContext) -> None: + """Gracefully shutdown the bot.""" + owner: User | None = self.client.owner + if not owner: + await ctx.send("You aren't allowed to do this.", ephemeral=True) + return + if ctx.author_id != owner.id: + await ctx.send(f"You aren't allowed to do this ||{owner.mention}||") + return + + await self.client.change_presence(Status.OFFLINE) + await ctx.send("Stopping!", ephemeral=True) + await self.client.stop() + + +def setup( + client: Client, + manager: ServerManager, + scheduler: AsyncIOScheduler, +) -> Extension: + """Create the extension. + + Args: + ---- + client (Client): The discord bot client + manager (ServerManager): The server connection manager + scheduler (AsyncIOScheduler): Event scheduler + """ + return UtilExt(client, manager, scheduler) diff --git a/bot/util/__init__.py b/bot/util/__init__.py index 88ec458..b8c81bb 100644 --- a/bot/util/__init__.py +++ b/bot/util/__init__.py @@ -1,5 +1,5 @@ -"""Utility function for the bot to use.""" - -from .datagen import * -from .probability import * -from .server import * +"""Utility function for the bot to use.""" + +from .datagen import * +from .probability import * +from .server import * diff --git a/bot/util/card_palettes.py b/bot/util/card_palettes.py index 07a554d..d61a3b7 100644 --- a/bot/util/card_palettes.py +++ b/bot/util/card_palettes.py @@ -1,25 +1,25 @@ -"""Contains palettes of cards.""" - -from dataclasses import dataclass - - -@dataclass -class Palette: - """Palette information.""" - - BACKGROUND: tuple[int, int, int] = (226, 202, 139) - NAME: tuple[int, int, int] = (0, 0, 0) - BASIC_ATTACK: tuple[int, int, int] = (0, 0, 0) - SPECIAL_ATTACK: tuple[int, int, int] = (0, 0, 0) - HEALTH: tuple[int, int, int] = (246, 4, 1) - TYPE_BACKGROUND: tuple[int, int, int] = (255, 255, 255) - BASIC_DAMAGE: tuple[int, int, int] = (246, 4, 1) - SPECIAL_DAMAGE: tuple[int, int, int] = (23, 66, 234) - - -palettes: dict[str, Palette] = { - "base": Palette(), - "alter_egos": Palette((25, 25, 25), (255, 255, 255), (255, 255, 255), (255, 255, 255)), - "pharaoh": Palette((239, 228, 103), (246, 4, 1), (246, 4, 1), (23, 66, 234)), - "advent_of_tcg": Palette((206, 211, 206)), -} +"""Contains palettes of cards.""" + +from dataclasses import dataclass + + +@dataclass +class Palette: + """Palette information.""" + + BACKGROUND: tuple[int, int, int] = (226, 202, 139) + NAME: tuple[int, int, int] = (0, 0, 0) + BASIC_ATTACK: tuple[int, int, int] = (0, 0, 0) + SPECIAL_ATTACK: tuple[int, int, int] = (0, 0, 0) + HEALTH: tuple[int, int, int] = (246, 4, 1) + TYPE_BACKGROUND: tuple[int, int, int] = (255, 255, 255) + BASIC_DAMAGE: tuple[int, int, int] = (246, 4, 1) + SPECIAL_DAMAGE: tuple[int, int, int] = (23, 66, 234) + + +palettes: dict[str, Palette] = { + "base": Palette(), + "alter_egos": Palette((25, 25, 25), (255, 255, 255), (255, 255, 255), (255, 255, 255)), + "pharaoh": Palette((239, 228, 103), (246, 4, 1), (246, 4, 1), (23, 66, 234)), + "advent_of_tcg": Palette((206, 211, 206)), +} diff --git a/bot/util/datagen.py b/bot/util/datagen.py index 16f8ef8..ee9a377 100644 --- a/bot/util/datagen.py +++ b/bot/util/datagen.py @@ -1,266 +1,266 @@ -"""Generation of card images.""" - -from __future__ import annotations - -from io import BytesIO -from json import load, loads -from typing import Any, Literal - -from aiohttp import ClientResponse, ClientSession -from numpy import array -from PIL import Image, ImageDraw -from PIL.ImageFilter import GaussianBlur - -from .card_palettes import Palette, palettes - -try: - has_progression = True - from tqdm import tqdm -except ImportError: - has_progression = False - - -def change_color( - im: Image.Image, origin: tuple[int, int, int], new: tuple[int, int, int] -) -> Image.Image: - """Change one color to another in an image. - - Args: - ---- - im (Image): The target image to change - origin (tuple): The original color - new (tuple): The new color - """ - data = array(im) - - alpha = len(data.T) == 4 - if alpha: - red, blue, green, _1 = data.T - else: - red, blue, green = data.T - white_areas = (red == origin[0]) & (blue == origin[1]) & (green == origin[2]) - data[..., :3][white_areas.T] = new # Transpose back needed - return Image.fromarray(data) - - -def draw_no_fade( - image: Image.Image, - method: str, - color: tuple[int, int, int], - *args: tuple, - **kwargs: dict, -) -> None: - """Perform an image modification ensuring no fade is made between two colors. - - Args: - ---- - image (Image): The image to modify - method (str): The method to perform on the image - color (tuple): The color of the modification - *args (tuple): Other method arguments - **kwargs (dict): Other keyword method arguments - """ - bw_im = Image.new("1", image.size) - bw_im_draw = ImageDraw.Draw(bw_im) - - getattr(bw_im_draw, method)(*args, **kwargs, fill=1) - - rgba = array(bw_im.convert("RGBA")) - rgba[rgba[..., 0] == 0] = [0, 0, 0, 0] # Convert black to transparrent - rgba[rgba[..., 0] == 255] = (*color, 255) # Convert white to desired colour - image.paste(Image.fromarray(rgba), (0, 0), Image.fromarray(rgba)) - - -def drop_shadow( - image: Image.Image, radius: int, color: tuple[int, int, int, Literal[0]] -) -> Image.Image: - """Generate a drop shadow for an image. - - Args: - ---- - image (Image): The image to create the shaadow for - radius (int): The size of the shadow in pixels - color (tuple): The color of the shadow - - Returns: - ------- - Image containg the drop shadow - """ - base = Image.new("RGBA", (image.width + radius * 2, image.height + radius * 2), color) - alpha = Image.new("L", (image.width + radius * 2, image.height + radius * 2)) - alpha.paste(image.getchannel("A"), (radius, radius)) - base.putalpha(alpha.filter(GaussianBlur(radius))) - return base - - -class Colors: - """Usefull colors.""" - - WHITE = (255, 255, 255) - REPLACE = (0, 172, 96) - REPLACE_2 = (1, 172, 96) - HEALTH_HI = (124, 205, 17) - HEALTH_MID = (213, 118, 39) - HEALTH_LOW = (150, 41, 40) - SHADOW = (0, 0, 0) - - -TYPE_COLORS = { - "miner": (110, 105, 108), - "terraform": (217, 119, 147), - "speedrunner": (223, 226, 36), - "pvp": (85, 202, 194), - "builder": (184, 162, 154), - "balanced": (101, 124, 50), - "explorer": (103, 138, 190), - "prankster": (116, 55, 168), - "redstone": (185, 33, 42), - "farm": (124, 204, 12), - "any": (0, 0, 0), -} - - -class Card: - """Basic image generator for a card.""" - - def __init__(self: Card, data: dict, generator: DataGenerator) -> None: - """Init card. - - Args: - ---- - data (dict): card informtaion - generator (dict): generator this card is part of - """ - self._raw_data: dict = data - self.generator: DataGenerator = generator - - self.text_id: str = data["id"] - self.category: str = data["category"] - - self.cost: int = data["tokens"] - self.image_url: str = data["images"]["default"] - self.token_image_url: str = data["images"]["with-token-cost"] - - self.rarity: str = ( - "Ultra rare" if data["rarity"] == "ultra_rare" else data["rarity"].capitalize() - ) - self.name: str = data["name"] - self.rarityName: str = f"{data['name']} ({self.rarity})" - - self.palette: Palette = palettes[data["palette"] if "palette" in data.keys() else "base"] - - -class HermitCard(Card): - """Image creator for a hermit card.""" - - def __init__(self: HermitCard, data: dict, generator: DataGenerator) -> None: - """Init card. - - Args: - ---- - data (dict): card informtaion - generator (dict): generator this card is part of - """ - super().__init__(data, generator) - - self.hermit_type: str = data["type"] - self.health: int = data["health"] - self.attacks: list[dict[str, Any]] = [data["primary"], data["secondary"]] - - -class EffectCard(Card): - """Image creator for an effect card.""" - - def __init__(self: EffectCard, data: dict, generator: DataGenerator) -> None: - """Init card. - - Args: - ---- - data (dict): card informtaion - generator (dict): generator this card is part of - """ - super().__init__(data, generator) - - self.description = data["description"] - - -class ItemCard(Card): - """Image creator for an item card.""" - - def __init__(self: ItemCard, data: dict, generator: DataGenerator) -> None: - """Init card. - - Args: - ---- - data (dict): card informtaion - generator (dict): generator this card is part of - """ - self.energy: list[str] = data["energy"] - - super().__init__(data, generator) - - -def get_card(data: dict, data_generator: DataGenerator) -> Card: - """Create a card class of the correct type.""" - if data["category"] == "hermit": - return HermitCard(data, data_generator) - if data["category"] == "attach" or data["category"] == "single_use": - return EffectCard(data, data_generator) - if data["category"] == "item": - return ItemCard(data, data_generator) - invalid_folder = "Invalid category: " + data["category"] - raise ValueError(invalid_folder) - - -class DataGenerator: - """Generate card images for hc-tcg.""" - - def __init__(self: DataGenerator, session: ClientSession) -> None: - """Init generator.""" - self.http_session = session - - self.exclude: list[int] = [] - - async def reload_all(self: DataGenerator) -> None: - """Reload all card information.""" - self.cache: dict[str, Image.Image] = {} - self.universe: dict[str, Card] = {} - - for card in await self.load_data(): - self.universe[card.text_id] = card - - async def get_image(self: DataGenerator, path: str) -> Image.Image: - """Get an image from the server. - - Args: - ---- - path (str): The path to the image - """ - try: - url = self.http_session._base_url - if url: - path = path.removeprefix(str(url.origin())) - if not path.startswith("/"): - path = "/" + path - - if path in self.cache.keys(): - return self.cache[path] - async with self.http_session.get(path) as response: - self.cache[path] = Image.open(BytesIO(await response.content.read())) - return self.cache[path] - except Image.UnidentifiedImageError: - return Image.new("RGBA", (0, 0)) - - async def load_data(self: DataGenerator) -> list[Card]: - """Load all card data.""" - cards = [] - async with self.http_session.get("cards") as response: - content = await response.content.read() - - iterator = loads(content.decode()) - if has_progression: - iterator = tqdm(iterator, "Loading cards") - - for card in iterator: - cards.append(get_card(card, self)) - return cards +"""Generation of card images.""" + +from __future__ import annotations + +from io import BytesIO +from json import load, loads +from typing import Any, Literal + +from aiohttp import ClientResponse, ClientSession +from numpy import array +from PIL import Image, ImageDraw +from PIL.ImageFilter import GaussianBlur + +from .card_palettes import Palette, palettes + +try: + has_progression = True + from tqdm import tqdm +except ImportError: + has_progression = False + + +def change_color( + im: Image.Image, origin: tuple[int, int, int], new: tuple[int, int, int] +) -> Image.Image: + """Change one color to another in an image. + + Args: + ---- + im (Image): The target image to change + origin (tuple): The original color + new (tuple): The new color + """ + data = array(im) + + alpha = len(data.T) == 4 + if alpha: + red, blue, green, _1 = data.T + else: + red, blue, green = data.T + white_areas = (red == origin[0]) & (blue == origin[1]) & (green == origin[2]) + data[..., :3][white_areas.T] = new # Transpose back needed + return Image.fromarray(data) + + +def draw_no_fade( + image: Image.Image, + method: str, + color: tuple[int, int, int], + *args: tuple, + **kwargs: dict, +) -> None: + """Perform an image modification ensuring no fade is made between two colors. + + Args: + ---- + image (Image): The image to modify + method (str): The method to perform on the image + color (tuple): The color of the modification + *args (tuple): Other method arguments + **kwargs (dict): Other keyword method arguments + """ + bw_im = Image.new("1", image.size) + bw_im_draw = ImageDraw.Draw(bw_im) + + getattr(bw_im_draw, method)(*args, **kwargs, fill=1) + + rgba = array(bw_im.convert("RGBA")) + rgba[rgba[..., 0] == 0] = [0, 0, 0, 0] # Convert black to transparrent + rgba[rgba[..., 0] == 255] = (*color, 255) # Convert white to desired colour + image.paste(Image.fromarray(rgba), (0, 0), Image.fromarray(rgba)) + + +def drop_shadow( + image: Image.Image, radius: int, color: tuple[int, int, int, Literal[0]] +) -> Image.Image: + """Generate a drop shadow for an image. + + Args: + ---- + image (Image): The image to create the shaadow for + radius (int): The size of the shadow in pixels + color (tuple): The color of the shadow + + Returns: + ------- + Image containg the drop shadow + """ + base = Image.new("RGBA", (image.width + radius * 2, image.height + radius * 2), color) + alpha = Image.new("L", (image.width + radius * 2, image.height + radius * 2)) + alpha.paste(image.getchannel("A"), (radius, radius)) + base.putalpha(alpha.filter(GaussianBlur(radius))) + return base + + +class Colors: + """Usefull colors.""" + + WHITE = (255, 255, 255) + REPLACE = (0, 172, 96) + REPLACE_2 = (1, 172, 96) + HEALTH_HI = (124, 205, 17) + HEALTH_MID = (213, 118, 39) + HEALTH_LOW = (150, 41, 40) + SHADOW = (0, 0, 0) + + +TYPE_COLORS = { + "miner": (110, 105, 108), + "terraform": (217, 119, 147), + "speedrunner": (223, 226, 36), + "pvp": (85, 202, 194), + "builder": (184, 162, 154), + "balanced": (101, 124, 50), + "explorer": (103, 138, 190), + "prankster": (116, 55, 168), + "redstone": (185, 33, 42), + "farm": (124, 204, 12), + "any": (0, 0, 0), +} + + +class Card: + """Basic image generator for a card.""" + + def __init__(self: Card, data: dict, generator: DataGenerator) -> None: + """Init card. + + Args: + ---- + data (dict): card informtaion + generator (dict): generator this card is part of + """ + self._raw_data: dict = data + self.generator: DataGenerator = generator + + self.text_id: str = data["id"] + self.category: str = data["category"] + + self.cost: int = data["tokens"] + self.image_url: str = data["images"]["default"] + self.token_image_url: str = data["images"]["with-token-cost"] + + self.rarity: str = ( + "Ultra rare" if data["rarity"] == "ultra_rare" else data["rarity"].capitalize() + ) + self.name: str = data["name"] + self.rarityName: str = f"{data['name']} ({self.rarity})" + + self.palette: Palette = palettes[data["palette"] if "palette" in data.keys() else "base"] + + +class HermitCard(Card): + """Image creator for a hermit card.""" + + def __init__(self: HermitCard, data: dict, generator: DataGenerator) -> None: + """Init card. + + Args: + ---- + data (dict): card informtaion + generator (dict): generator this card is part of + """ + super().__init__(data, generator) + + self.hermit_type: str = data["type"] + self.health: int = data["health"] + self.attacks: list[dict[str, Any]] = [data["primary"], data["secondary"]] + + +class EffectCard(Card): + """Image creator for an effect card.""" + + def __init__(self: EffectCard, data: dict, generator: DataGenerator) -> None: + """Init card. + + Args: + ---- + data (dict): card informtaion + generator (dict): generator this card is part of + """ + super().__init__(data, generator) + + self.description = data["description"] + + +class ItemCard(Card): + """Image creator for an item card.""" + + def __init__(self: ItemCard, data: dict, generator: DataGenerator) -> None: + """Init card. + + Args: + ---- + data (dict): card informtaion + generator (dict): generator this card is part of + """ + self.energy: list[str] = data["energy"] + + super().__init__(data, generator) + + +def get_card(data: dict, data_generator: DataGenerator) -> Card: + """Create a card class of the correct type.""" + if data["category"] == "hermit": + return HermitCard(data, data_generator) + if data["category"] == "attach" or data["category"] == "single_use": + return EffectCard(data, data_generator) + if data["category"] == "item": + return ItemCard(data, data_generator) + invalid_folder = "Invalid category: " + data["category"] + raise ValueError(invalid_folder) + + +class DataGenerator: + """Generate card images for hc-tcg.""" + + def __init__(self: DataGenerator, session: ClientSession) -> None: + """Init generator.""" + self.http_session = session + + self.exclude: list[int] = [] + + async def reload_all(self: DataGenerator) -> None: + """Reload all card information.""" + self.cache: dict[str, Image.Image] = {} + self.universe: dict[str, Card] = {} + + for card in await self.load_data(): + self.universe[card.text_id] = card + + async def get_image(self: DataGenerator, path: str) -> Image.Image: + """Get an image from the server. + + Args: + ---- + path (str): The path to the image + """ + try: + url = self.http_session._base_url + if url: + path = path.removeprefix(str(url.origin())) + if not path.startswith("/"): + path = "/" + path + + if path in self.cache.keys(): + return self.cache[path] + async with self.http_session.get(path) as response: + self.cache[path] = Image.open(BytesIO(await response.content.read())) + return self.cache[path] + except Image.UnidentifiedImageError: + return Image.new("RGBA", (0, 0)) + + async def load_data(self: DataGenerator) -> list[Card]: + """Load all card data.""" + cards = [] + async with self.http_session.get("cards") as response: + content = await response.content.read() + + iterator = loads(content.decode()) + if has_progression: + iterator = tqdm(iterator, "Loading cards") + + for card in iterator: + cards.append(get_card(card, self)) + return cards diff --git a/bot/util/probability.py b/bot/util/probability.py index e26761f..c68335d 100644 --- a/bot/util/probability.py +++ b/bot/util/probability.py @@ -1,56 +1,56 @@ -"""Calculate probability of certain events, by Allophony on discord.""" - -from math import comb - -deck_size = 42 -opening_hand_size = 7 - - -def allophony_formula(hermits: int, hand_size: int, desired: int, deck_size: int) -> float: - """Allophony maths (idk).""" - return ( - comb(hermits, desired) - * comb(deck_size - hermits, hand_size - desired) - / comb(deck_size, hand_size) - ) - - -def initial_hand_chance(hermits_in_deck: int, desired_hermits: int) -> float: - """Get the chance of having `desired_hermits` hermits - in your inital hand when you have `hermits_in_deck` in your deck. - """ # noqa: D205 - valid_hands = sum( - allophony_formula(hermits_in_deck, opening_hand_size, k, deck_size) - for k in range(1, min(hermits_in_deck + 1, opening_hand_size + 1)) - ) - good_hands = allophony_formula(hermits_in_deck, opening_hand_size, desired_hermits, deck_size) - return good_hands / valid_hands - - -def probability(hermits_in_deck: int, draws: int, desired_hermits: int) -> float: - """Get the probability of having x hermits in your hands after d draws. - - Args: - ---- - hermits_in_deck (int): The number of hermits in the deck - draws (int): The number of draws taken - desired_hermits (int): The target hermit count - """ - if ( - draws + opening_hand_size < desired_hermits - or hermits_in_deck < desired_hermits - or hermits_in_deck > deck_size - or draws > deck_size - opening_hand_size - ): - return 0 - res: float - for i in range(1, opening_hand_size + 1): - hermits_in_first_hand = initial_hand_chance(hermits_in_deck, i) - if i >= desired_hermits: - res = res + hermits_in_first_hand - else: - res = res + hermits_in_first_hand * sum( - allophony_formula(hermits_in_deck - i, draws, k, deck_size - opening_hand_size) - for k in range(desired_hermits - i, min(draws + 1, hermits_in_deck - i + 1)) - ) - return res +"""Calculate probability of certain events, by Allophony on discord.""" + +from math import comb + +deck_size = 42 +opening_hand_size = 7 + + +def allophony_formula(hermits: int, hand_size: int, desired: int, deck_size: int) -> float: + """Allophony maths (idk).""" + return ( + comb(hermits, desired) + * comb(deck_size - hermits, hand_size - desired) + / comb(deck_size, hand_size) + ) + + +def initial_hand_chance(hermits_in_deck: int, desired_hermits: int) -> float: + """Get the chance of having `desired_hermits` hermits + in your inital hand when you have `hermits_in_deck` in your deck. + """ # noqa: D205 + valid_hands = sum( + allophony_formula(hermits_in_deck, opening_hand_size, k, deck_size) + for k in range(1, min(hermits_in_deck + 1, opening_hand_size + 1)) + ) + good_hands = allophony_formula(hermits_in_deck, opening_hand_size, desired_hermits, deck_size) + return good_hands / valid_hands + + +def probability(hermits_in_deck: int, draws: int, desired_hermits: int) -> float: + """Get the probability of having x hermits in your hands after d draws. + + Args: + ---- + hermits_in_deck (int): The number of hermits in the deck + draws (int): The number of draws taken + desired_hermits (int): The target hermit count + """ + if ( + draws + opening_hand_size < desired_hermits + or hermits_in_deck < desired_hermits + or hermits_in_deck > deck_size + or draws > deck_size - opening_hand_size + ): + return 0 + res: float + for i in range(1, opening_hand_size + 1): + hermits_in_first_hand = initial_hand_chance(hermits_in_deck, i) + if i >= desired_hermits: + res = res + hermits_in_first_hand + else: + res = res + hermits_in_first_hand * sum( + allophony_formula(hermits_in_deck - i, draws, k, deck_size - opening_hand_size) + for k in range(desired_hermits - i, min(draws + 1, hermits_in_deck - i + 1)) + ) + return res diff --git a/bot/util/server.py b/bot/util/server.py index 68d6f72..2b4025e 100644 --- a/bot/util/server.py +++ b/bot/util/server.py @@ -1,246 +1,246 @@ -"""Handles interactions and linking discord and hc-tcg servers.""" - -from __future__ import annotations - -from datetime import datetime as dt -from datetime import timezone -from json import JSONDecodeError, loads -from time import time -from typing import Any - -from aiohttp import ClientSession -from interactions import Client, Embed, Member, Snowflake - -from bot.util.datagen import DataGenerator - - -class GamePlayer: - """A representation of a player in a game.""" - - def __init__(self: GamePlayer, data: dict[str, Any]) -> None: - """Represent a player in a game. - - Args: - ---- - data (dict): Player information - """ - self.id: str = data["playerId"] - self.name: str = data["censoredPlayerName"] - self.minecraft_name: str = data["minecraftName"] - self.lives: int = data["lives"] - - self.deck: list[str] = data["deck"] - - -class Game: - """Store data about a game.""" - - def __init__(self: Game, data: dict[str, Any]) -> None: - """Store data about a game. - - Args: - ---- - data (dict): The game data dict - """ - self.players: list[GamePlayer] = [GamePlayer(player) for player in data["players"]] - self.player_names = [player.name for player in self.players] - self.id = data["id"] - self.spectator_code: str | None = data["spectatorCode"] - self.created: dt = dt.fromtimestamp(data["createdTime"] / 1000, tz=timezone.utc) - self.spectators = data["viewers"] - len(self.players) - - print(data["state"]) - - -class QueueGame: - """Information about a private queued game.""" - - def __init__(self: QueueGame, data: dict[str, Any]) -> None: - """Information about a private queued game. - - Args: - ---- - data (dict): The game data dict - """ - self.joinCode: str = data["gameCode"] - self.spectatorCode: str = data["spectatorCode"] - self.secret: str = data["apiSecret"] - self.timeout: str = data["timeOutAt"] / 1000 - - def create_embed(self: QueueGame, *, spectators: bool = False) -> Embed: - """Create an embed with information about the game.""" - e = ( - Embed( - "Game", - f"Expires ", - timestamp=dt.now(tz=timezone.utc), - ) - .add_field("Join code", self.joinCode, inline=True) - .set_footer("Bot by Tyrannicodin16") - ) - if spectators: - e.add_field("Spectate code", self.spectatorCode, inline=True) - return e - - -class Server: - """An interface between a discord and hc-tcg server.""" - - http_session: ClientSession - data_generator: DataGenerator - - def __init__( - self: Server, - server_id: str, - server_url: str, - guild_id: str, - admins: list[str] | None = None, - tracked_forums: dict[str, list[str]] | None = None, - ) -> None: - """Create a Server object. - - Args: - ---- - server_id (str): Unique name for the server - server_url (str): The url of the hc-tcg server - server_key (str): The api key to send to the server - guild_id (str): The id of the discord server - guild_key (str): The api key sent from the server - admins (list[str]): List of users and/or roles that can use privileged - features, if blank allows all users to use privileged features - tracked_forums (list[str]): Dictionary with channel ids and tags to ignore - update_channel (str): The channel to get server updates from - """ - if admins is None: - admins = [] - if tracked_forums is None: - tracked_forums = {} - - self.server_id: str = server_id - self.last_game_count: int = 0 - self.last_game_count_time: int = 0 - - self.server_url: str = server_url - self.guild_id: str = guild_id - self.admin_roles: list[str] = admins - self.tracked_forums: dict[str, list[str]] = tracked_forums - - def create_session(self: Server) -> None: - """Create http session and data generator.""" - self.http_session = ClientSession(self.server_url + "/api/") - self.data_generator = DataGenerator(self.http_session) - - def authorize_user(self: Server, member: Member) -> bool: - """Check if a user is allowed to use privileged commands.""" - if self.admin_roles is []: - return True - admin_user = str(member.id) in self.admin_roles - admin_role = any(str(role.id) in self.admin_roles for role in member.roles) - - return admin_user or admin_role - - async def get_deck(self: Server, code: str) -> dict | None: - """Get information about a deck from the server. - - Args: - ---- - code (str): The export code of the deck to retrieve - """ - try: - async with self.http_session.get(f"deck/{code}") as response: - result = loads((await response.content.read()).decode()) - except (TimeoutError, JSONDecodeError): - return None - if result["type"] == "success": - return result - return None - - async def create_game(self: Server) -> QueueGame | None: - """Create a server game.""" - try: - async with self.http_session.get("games/create") as response: - data: dict[str, str | int] = loads((await response.content.read()).decode()) - return QueueGame(data) - except ( - ConnectionError, - JSONDecodeError, - KeyError, - ): - return None - - async def cancel_game(self: Server, game: QueueGame) -> bool: - """Cancel a queued game.""" - try: - async with self.http_session.delete( - "games/cancel", json={"code": game.secret} - ) as response: - data: dict[str, str | None] = loads((await response.content.read()).decode()) - return "success" in data.keys() - except ( - ConnectionError, - JSONDecodeError, - KeyError, - ): - return False - - async def get_game_count(self: Server) -> int: - """Get the number of games.""" - try: - if self.last_game_count_time > time() - 60: - return self.last_game_count - - async with self.http_session.get("games/count") as response: - data: dict[str, int] = loads((await response.content.read()).decode()) - self.last_game_count = data["games"] - self.last_game_count_time = round(time()) - return self.last_game_count - except ( - ConnectionError, - JSONDecodeError, - KeyError, - ): - return 0 - - -class ServerManager: - """Manage multiple servers and their functionality.""" - - def __init__(self: ServerManager, client: Client, servers: list[Server]) -> None: - """Manage multiple servers and their functionality. - - Args: - ---- - client (Client): The bot client - servers (list[Server]): A list of server objects to manage - bot_server (Application): The web server hc-tcg servers send requests to - scheduler (AsyncIOScheduler): Sheduler for repeating tasks - universe (dict): Dictionary that converts card ids to Card objects - """ - self._discord_links = {server.guild_id: server for server in servers} - - self.client = client - self.servers = servers - - def get_server(self: ServerManager, guild_id: Snowflake | None) -> Server: - """Get a server by its discord guild id. - - Args: - ---- - guild_id (str): The guild id of the discord server - """ - return ( - self._discord_links[str(guild_id)] - if guild_id in self._discord_links.keys() - else self.servers[0] - ) - - async def close_all_sessions(self: ServerManager) -> None: - """Close all server ClientSessions.""" - for server in self.servers: - await server.http_session.close() - - async def reload_all_generators(self: ServerManager) -> None: - """Close all server DataGenerators.""" - for server in self.servers: - server.create_session() - await server.data_generator.reload_all() +"""Handles interactions and linking discord and hc-tcg servers.""" + +from __future__ import annotations + +from datetime import datetime as dt +from datetime import timezone +from json import JSONDecodeError, loads +from time import time +from typing import Any + +from aiohttp import ClientSession +from interactions import Client, Embed, Member, Snowflake + +from bot.util.datagen import DataGenerator + + +class GamePlayer: + """A representation of a player in a game.""" + + def __init__(self: GamePlayer, data: dict[str, Any]) -> None: + """Represent a player in a game. + + Args: + ---- + data (dict): Player information + """ + self.id: str = data["playerId"] + self.name: str = data["censoredPlayerName"] + self.minecraft_name: str = data["minecraftName"] + self.lives: int = data["lives"] + + self.deck: list[str] = data["deck"] + + +class Game: + """Store data about a game.""" + + def __init__(self: Game, data: dict[str, Any]) -> None: + """Store data about a game. + + Args: + ---- + data (dict): The game data dict + """ + self.players: list[GamePlayer] = [GamePlayer(player) for player in data["players"]] + self.player_names = [player.name for player in self.players] + self.id = data["id"] + self.spectator_code: str | None = data["spectatorCode"] + self.created: dt = dt.fromtimestamp(data["createdTime"] / 1000, tz=timezone.utc) + self.spectators = data["viewers"] - len(self.players) + + print(data["state"]) + + +class QueueGame: + """Information about a private queued game.""" + + def __init__(self: QueueGame, data: dict[str, Any]) -> None: + """Information about a private queued game. + + Args: + ---- + data (dict): The game data dict + """ + self.joinCode: str = data["gameCode"] + self.spectatorCode: str = data["spectatorCode"] + self.secret: str = data["apiSecret"] + self.timeout: str = data["timeOutAt"] / 1000 + + def create_embed(self: QueueGame, *, spectators: bool = False) -> Embed: + """Create an embed with information about the game.""" + e = ( + Embed( + "Game", + f"Expires ", + timestamp=dt.now(tz=timezone.utc), + ) + .add_field("Join code", self.joinCode, inline=True) + .set_footer("Bot by Tyrannicodin16") + ) + if spectators: + e.add_field("Spectate code", self.spectatorCode, inline=True) + return e + + +class Server: + """An interface between a discord and hc-tcg server.""" + + http_session: ClientSession + data_generator: DataGenerator + + def __init__( + self: Server, + server_id: str, + server_url: str, + guild_id: str, + admins: list[str] | None = None, + tracked_forums: dict[str, list[str]] | None = None, + ) -> None: + """Create a Server object. + + Args: + ---- + server_id (str): Unique name for the server + server_url (str): The url of the hc-tcg server + server_key (str): The api key to send to the server + guild_id (str): The id of the discord server + guild_key (str): The api key sent from the server + admins (list[str]): List of users and/or roles that can use privileged + features, if blank allows all users to use privileged features + tracked_forums (list[str]): Dictionary with channel ids and tags to ignore + update_channel (str): The channel to get server updates from + """ + if admins is None: + admins = [] + if tracked_forums is None: + tracked_forums = {} + + self.server_id: str = server_id + self.last_game_count: int = 0 + self.last_game_count_time: int = 0 + + self.server_url: str = server_url + self.guild_id: str = guild_id + self.admin_roles: list[str] = admins + self.tracked_forums: dict[str, list[str]] = tracked_forums + + def create_session(self: Server) -> None: + """Create http session and data generator.""" + self.http_session = ClientSession(self.server_url + "/api/") + self.data_generator = DataGenerator(self.http_session) + + def authorize_user(self: Server, member: Member) -> bool: + """Check if a user is allowed to use privileged commands.""" + if self.admin_roles is []: + return True + admin_user = str(member.id) in self.admin_roles + admin_role = any(str(role.id) in self.admin_roles for role in member.roles) + + return admin_user or admin_role + + async def get_deck(self: Server, code: str) -> dict | None: + """Get information about a deck from the server. + + Args: + ---- + code (str): The export code of the deck to retrieve + """ + try: + async with self.http_session.get(f"deck/{code}") as response: + result = loads((await response.content.read()).decode()) + except (TimeoutError, JSONDecodeError): + return None + if result["type"] == "success": + return result + return None + + async def create_game(self: Server) -> QueueGame | None: + """Create a server game.""" + try: + async with self.http_session.get("games/create") as response: + data: dict[str, str | int] = loads((await response.content.read()).decode()) + return QueueGame(data) + except ( + ConnectionError, + JSONDecodeError, + KeyError, + ): + return None + + async def cancel_game(self: Server, game: QueueGame) -> bool: + """Cancel a queued game.""" + try: + async with self.http_session.delete( + "games/cancel", json={"code": game.secret} + ) as response: + data: dict[str, str | None] = loads((await response.content.read()).decode()) + return "success" in data.keys() + except ( + ConnectionError, + JSONDecodeError, + KeyError, + ): + return False + + async def get_game_count(self: Server) -> int: + """Get the number of games.""" + try: + if self.last_game_count_time > time() - 60: + return self.last_game_count + + async with self.http_session.get("games/count") as response: + data: dict[str, int] = loads((await response.content.read()).decode()) + self.last_game_count = data["games"] + self.last_game_count_time = round(time()) + return self.last_game_count + except ( + ConnectionError, + JSONDecodeError, + KeyError, + ): + return 0 + + +class ServerManager: + """Manage multiple servers and their functionality.""" + + def __init__(self: ServerManager, client: Client, servers: list[Server]) -> None: + """Manage multiple servers and their functionality. + + Args: + ---- + client (Client): The bot client + servers (list[Server]): A list of server objects to manage + bot_server (Application): The web server hc-tcg servers send requests to + scheduler (AsyncIOScheduler): Sheduler for repeating tasks + universe (dict): Dictionary that converts card ids to Card objects + """ + self._discord_links = {server.guild_id: server for server in servers} + + self.client = client + self.servers = servers + + def get_server(self: ServerManager, guild_id: Snowflake | None) -> Server: + """Get a server by its discord guild id. + + Args: + ---- + guild_id (str): The guild id of the discord server + """ + return ( + self._discord_links[str(guild_id)] + if guild_id in self._discord_links.keys() + else self.servers[0] + ) + + async def close_all_sessions(self: ServerManager) -> None: + """Close all server ClientSessions.""" + for server in self.servers: + await server.http_session.close() + + async def reload_all_generators(self: ServerManager) -> None: + """Close all server DataGenerators.""" + for server in self.servers: + server.create_session() + await server.data_generator.reload_all()