diff --git a/chatushka/bot/libs/mute.py b/chatushka/bot/libs/mute.py new file mode 100644 index 0000000..fff9761 --- /dev/null +++ b/chatushka/bot/libs/mute.py @@ -0,0 +1,73 @@ +from datetime import datetime, timedelta, timezone +from enum import Enum +from random import choice + +from chatushka.bot.settings import get_settings +from chatushka.core.matchers import CommandsMatcher +from chatushka.core.transports.models import ChatPermissions, Message, User +from chatushka.core.transports.telegram_bot_api import TelegramBotApi + +RESTRICT_PERMISSION = ChatPermissions( + can_send_messages=False, + can_send_media_messages=False, + can_send_polls=False, + can_send_other_messages=False, +) +settings = get_settings() +mute_matcher = CommandsMatcher( + prefixes=settings.command_prefixes, + postfixes=settings.command_postfixes, +) + + +class MuteMessages(Enum): + ACCIDENT = ( + '🧐 {name} споткнулся, упал и попал в больницу на {time} минут.', + '🧐 {name} заигрался револьвером и угодил в травмпункт на {time} минут.', + '🧐 {name} переводил бабушку через дорогу и теперь отдыхает {time} минут.', + '🧐 {name} решил подумать о жизни {time} минут.', + ) + LOOSER = ( + '🧐 {looser_name} хотел убить ' + '{victim_name}, но что-то пошло не так и он ' + "вынужден провести в тюрьме {time} минут", + '🧐 У {looser_name} лапки коротковаты ' + 'чтоб убить {victim_name}', + ) + + +async def send_mute_request( + api: TelegramBotApi, + message: Message, + initiator: User, + restrict_user: User, + restrict_time: timedelta, +) -> None: + text_tmpl = choice(MuteMessages.ACCIDENT.value) + is_success = await api.restrict_chat_member( + chat_id=message.chat.id, + user_id=restrict_user.id, + permissions=RESTRICT_PERMISSION, + until_date=datetime.now(tz=timezone.utc) + restrict_time, + ) + if is_success: + await api.send_message( + chat_id=message.chat.id, + text=text_tmpl.format( + user=restrict_user.id, + name=restrict_user.readable_name, + time=int(restrict_time.total_seconds() // 60), + ), + ) + return + text_tmpl = choice(MuteMessages.LOOSER.value) + await api.send_message( + chat_id=message.chat.id, + text=text_tmpl.format( + looser_id=initiator.id, + looser_name=initiator.readable_name, + victim_id=restrict_user.id, + victim_name=restrict_user.readable_name, + ), + reply_to_message_id=message.message_id, + ) diff --git a/chatushka/bot/main.py b/chatushka/bot/main.py index 8883f7a..c8e97a9 100644 --- a/chatushka/bot/main.py +++ b/chatushka/bot/main.py @@ -4,7 +4,14 @@ from click import command, option from chatushka import ChatushkaBot -from chatushka.bot.matchers import eight_ball_matchers, helpers_matcher, heroes_matchers, jokes_matcher, suicide_matcher +from chatushka.bot.matchers import ( + admin_matcher, + eight_ball_matcher, + helpers_matcher, + heroes_matcher, + jokes_matcher, + suicide_matcher, +) from chatushka.bot.settings import get_settings from chatushka.core.services.mongodb.wrapper import MongoDBWrapper @@ -19,16 +26,14 @@ def make_bot( instance = ChatushkaBot(token=token, debug=debug) wrapper = MongoDBWrapper() wrapper.add_event_handlers(instance) - - instance.add_matchers( + instance.add_matcher( + admin_matcher, jokes_matcher, - *eight_ball_matchers, + eight_ball_matcher, helpers_matcher, - *heroes_matchers, + heroes_matcher, suicide_matcher, ) - - instance.add_matchers() return instance diff --git a/chatushka/bot/matchers/__init__.py b/chatushka/bot/matchers/__init__.py index 95530ea..fb46a94 100644 --- a/chatushka/bot/matchers/__init__.py +++ b/chatushka/bot/matchers/__init__.py @@ -1,13 +1,15 @@ +from chatushka.bot.matchers.admin import admin_matcher from chatushka.bot.matchers.bobuk_jokes import jokes_matcher -from chatushka.bot.matchers.eight_ball import eight_ball_matchers +from chatushka.bot.matchers.eight_ball import eight_ball_matcher from chatushka.bot.matchers.helpers import helpers_matcher -from chatushka.bot.matchers.heroes import heroes_matchers +from chatushka.bot.matchers.heroes import heroes_matcher from chatushka.bot.matchers.suicide import suicide_matcher __all__ = ( + "admin_matcher", "jokes_matcher", - "eight_ball_matchers", + "eight_ball_matcher", "helpers_matcher", - "heroes_matchers", + "heroes_matcher", "suicide_matcher", ) diff --git a/chatushka/bot/matchers/admin/__init__.py b/chatushka/bot/matchers/admin/__init__.py index e69de29..d45c457 100644 --- a/chatushka/bot/matchers/admin/__init__.py +++ b/chatushka/bot/matchers/admin/__init__.py @@ -0,0 +1,12 @@ +from chatushka.bot.matchers.admin.mute import mute_matcher +from chatushka.bot.settings import get_settings +from chatushka.core.matchers import CommandsMatcher + +settings = get_settings() +admin_matcher = CommandsMatcher( + prefixes=settings.command_prefixes, + postfixes=settings.command_postfixes, +) +admin_matcher.add_matcher(mute_matcher) + +__all__ = ("admin_matcher",) diff --git a/chatushka/bot/matchers/admin/_matcher.py b/chatushka/bot/matchers/admin/_matcher.py deleted file mode 100644 index 53659cb..0000000 --- a/chatushka/bot/matchers/admin/_matcher.py +++ /dev/null @@ -1,8 +0,0 @@ -from chatushka.bot.settings import get_settings -from chatushka.core.matchers import CommandsMatcher - -settings = get_settings() -admin_matcher = CommandsMatcher( - prefixes=settings.command_prefixes, - postfixes=settings.command_postfixes, -) diff --git a/chatushka/bot/matchers/admin/mute.py b/chatushka/bot/matchers/admin/mute.py index 6c45277..6fc4de3 100644 --- a/chatushka/bot/matchers/admin/mute.py +++ b/chatushka/bot/matchers/admin/mute.py @@ -1,115 +1,58 @@ -from datetime import datetime, timedelta, timezone -from enum import Enum -from random import choice, randrange -from typing import Optional +from datetime import timedelta +from random import randrange -from chatushka.bot.matchers.admin._matcher import admin_matcher -from chatushka.core.transports.models import ChatPermissions, Message, User +from chatushka.bot.libs.mute import send_mute_request +from chatushka.bot.settings import get_settings +from chatushka.core.matchers import CommandsMatcher +from chatushka.core.transports.models import Message from chatushka.core.transports.telegram_bot_api import TelegramBotApi -RESTRICT_PERMISSION = ChatPermissions( - can_send_messages=False, - can_send_media_messages=False, - can_send_polls=False, - can_send_other_messages=False, +settings = get_settings() +mute_matcher = CommandsMatcher( + prefixes=settings.command_prefixes, + postfixes=settings.command_postfixes, ) -class MuteMessages(Enum): - ACCIDENT = ( - '🧐 {name} споткнулся и упал. Попал в больницу на {time} минут.', - '🧐 {name} заигрался револьвером и угодил в травмпункт на {time} минут.', - '🧐 {name} переводил бабушку через дорогу и теперь отдыхает {time} минут.', - ) - LOOSER = ( - '🧐 {looser_name} хотел убить ' - '{victim_name}, но что-то пошло не так и он ' - "вынужден провести в тюрьме {time} минут", - ) - - -@admin_matcher() +@mute_matcher("mute", "shutup") async def mute_handler( api: TelegramBotApi, message: Message, args: list[str], ) -> None: + restrict_time = timedelta(minutes=randrange(10, 30)) admins = await api.get_chat_administrators(message.chat.id) + privileges = None for admin in admins: if admin.user.id == message.user.id: privileges = admin break - restrict_text = None - restrict_user: Optional[User] = None - restrict_time = None - - if not privileges or not (privileges.can_restrict_members or privileges.status.CREATOR): - restrict_user = message.user - restrict_time = timedelta(minutes=randrange(10, 30)) - text_tmpl = choice(MuteMessages.ACCIDENT.value) - restrict_text = text_tmpl.format( - user=restrict_user.id, - name=restrict_user.readable_name, - time=restrict_time.total_seconds() // 60, - ) - - if not message.reply_to_message: - restrict_user = message.user - restrict_time = timedelta(minutes=randrange(10, 30)) - text_tmpl = choice(MuteMessages.LOOSER.value) - restrict_text = text_tmpl.format( - looser_id=restrict_user.id, - looser_name=restrict_user.readable_name, - victim_id=message.reply_to_message.user.id, - victim_name=message.reply_to_message.user.readable_name, - time=restrict_time.total_seconds() // 60, + if ( + not message.reply_to_message + or not privileges + or not privileges.status.CREATOR + or not privileges.can_restrict_members + ): + await send_mute_request( + api=api, + message=message, + initiator=message.user, + restrict_user=message.user, + restrict_time=restrict_time, ) - - if not restrict_user: - restrict_user = message.reply_to_message.user + return try: - if not restrict_time: - restrict_time = timedelta(hours=int(args[0])) + restrict_time = timedelta(hours=int(args[0])) except (ValueError, IndexError): - restrict_time = timedelta(minutes=randrange(10, 30)) - text = ( - f'🧐 {restrict_user.readable_name} будет молчать ровно' - f" {restrict_time.total_seconds() // 60} минут" - ) - await api.send_message( - chat_id=message.chat.id, - text=text, - reply_to_message_id=message.message_id, - ) - - try: - is_success = await api.restrict_chat_member( - chat_id=message.chat.id, - user_id=restrict_user.id, - permissions=RESTRICT_PERMISSION, - until_date=datetime.now(tz=timezone.utc) + restrict_time, - ) - except ValueError: - is_success = False - - if not restrict_text: - restrict_text = ( - f'Пользователь {restrict_user.readable_name} ' - f"принял обет молчания" - ) - - if is_success: - await api.send_message( - chat_id=message.chat.id, - text=restrict_text, - ) - return None - await api.send_message( - chat_id=message.chat.id, - text=f"Лапки коротковаты чтоб покарать " - f'{restrict_user.readable_name}', - reply_to_message_id=message.message_id, + pass + + await send_mute_request( + api=api, + message=message, + initiator=message.user, + restrict_user=message.reply_to_message.user, + restrict_time=restrict_time, ) diff --git a/chatushka/bot/matchers/admin/pin.py b/chatushka/bot/matchers/admin/pin.py new file mode 100644 index 0000000..37e4a95 --- /dev/null +++ b/chatushka/bot/matchers/admin/pin.py @@ -0,0 +1,58 @@ +from asyncio import sleep + +from chatushka.bot.settings import get_settings +from chatushka.core.matchers import CommandsMatcher +from chatushka.core.transports.models import Message +from chatushka.core.transports.telegram_bot_api import TelegramBotApi + +settings = get_settings() +pin_matcher = CommandsMatcher( + prefixes=settings.command_prefixes, + postfixes=settings.command_postfixes, +) + + +@pin_matcher("pin") +async def pin_handler( + api: TelegramBotApi, + message: Message, + args: list[str], +) -> None: + if not message.reply_to_message: + await api.send_message( + chat_id=message.chat.id, + text="Для закрепа необходимо написать команду реплаем", + reply_to_message_id=message.message_id, + ) + + try: + pin_hours = int(args[0]) + except (ValueError, IndexError): + pin_hours = None + + _ = await api.pin_chat_message( + chat_id=message.chat.id, + message_id=message.reply_to_message.message_id, + ) + if pin_hours: + await api.send_message( + chat_id=message.chat.id, + text="Через {} часа закреп будет убран", + reply_to_message_id=message.message_id, + ) + await sleep(pin_hours * 60) + await api.unpin_chat_message( + chat_id=message.chat.id, + message_id=message.reply_to_message.message_id, + ) + + +@pin_matcher("unpin") +async def unpin_handler( + api: TelegramBotApi, + message: Message, +) -> None: + await api.unpin_chat_message( + chat_id=message.chat.id, + message_id=message.reply_to_message.message_id, + ) diff --git a/chatushka/bot/matchers/eight_ball.py b/chatushka/bot/matchers/eight_ball.py index 05905d1..767d928 100644 --- a/chatushka/bot/matchers/eight_ball.py +++ b/chatushka/bot/matchers/eight_ball.py @@ -5,6 +5,10 @@ from chatushka.core.transports.models import Message from chatushka.core.transports.telegram_bot_api import TelegramBotApi +HELP_MESSAGE = ( + 'Just think of a question that can be answered "Yes" or "No", concentrate very, very hard, and type command!' +) + EIGHT_BALL_EN = ( "It is certain", "It is decidedly so", @@ -51,18 +55,15 @@ ) settings = get_settings() -eight_ball_command_matcher = CommandsMatcher( +eight_ball_matcher = CommandsMatcher( prefixes=settings.command_prefixes, postfixes=settings.command_postfixes, ) -eight_ball_question_matcher = RegexMatcher() -eight_ball_matchers = ( - eight_ball_command_matcher, - eight_ball_question_matcher, -) +question_matcher = RegexMatcher() +eight_ball_matcher.add_matcher(question_matcher) -@eight_ball_command_matcher("8ball", "ball8", "b8", "8b") +@eight_ball_matcher("8ball", "ball8", "b8", "8b", help_message=HELP_MESSAGE) async def eight_ball_handler( api: TelegramBotApi, message: Message, @@ -74,7 +75,7 @@ async def eight_ball_handler( ) -@eight_ball_question_matcher(r"\?") +@question_matcher(r"\?") async def eight_ball_answer_handler( api: TelegramBotApi, message: Message, diff --git a/chatushka/bot/matchers/heroes.py b/chatushka/bot/matchers/heroes.py index 02c6bc4..8b2b06d 100644 --- a/chatushka/bot/matchers/heroes.py +++ b/chatushka/bot/matchers/heroes.py @@ -9,18 +9,15 @@ MESSAGE_TITLE = "✨💫 ГЕРОЙСКИЙ КАЛЕНДАРЬ 💫✨" settings = get_settings() -heroes_commands_matcher = CommandsMatcher( +heroes_matcher = CommandsMatcher( prefixes=settings.command_prefixes, postfixes=settings.command_postfixes, ) -heroes_regex_matcher = RegexMatcher() -heroes_matchers = ( - heroes_commands_matcher, - heroes_regex_matcher, -) +regex_matcher = RegexMatcher() +heroes_matcher.add_matcher(regex_matcher) -@heroes_commands_matcher("homm", "heroes") +@heroes_matcher("homm", "heroes") async def activate_heroes_handler( api: TelegramBotApi, message: Message, @@ -61,7 +58,7 @@ async def activate_heroes_handler( ) -@heroes_regex_matcher("2 4 1 * *") +@regex_matcher("2 4 1 * *") async def heroes_month_handler( api: TelegramBotApi, ) -> None: @@ -77,7 +74,7 @@ async def heroes_month_handler( continue -@heroes_regex_matcher("0 8 * * mon") +@regex_matcher("0 8 * * mon") async def heroes_week_handler( api: TelegramBotApi, ) -> None: diff --git a/chatushka/core/bot.py b/chatushka/core/bot.py index 79eeb67..ce56c42 100644 --- a/chatushka/core/bot.py +++ b/chatushka/core/bot.py @@ -1,9 +1,10 @@ import signal from asyncio import ensure_future, get_event_loop, sleep from logging import getLogger -from typing import Iterable, Optional +from typing import Optional -from chatushka.core.matchers import EventsMatcher, EventTypes, MatcherProtocol +from chatushka.core.matchers import EventsMatcher, EventTypes +from chatushka.core.models import MatchedToken from chatushka.core.transports.telegram_bot_api import TelegramBotApi from chatushka.core.transports.utils import check_preconditions @@ -14,33 +15,41 @@ class ChatushkaBot(EventsMatcher): def __init__( self, token: str, - matchers: Iterable[MatcherProtocol] = None, debug: bool = False, ) -> None: super().__init__() self.debug = debug self.api = TelegramBotApi(token) - self.matchers = list(matchers) if matchers else list() self.add_handler(EventTypes.STARTUP, check_preconditions) - def add_matchers( - self, - *matchers: MatcherProtocol, - ): - self.matchers += matchers - + # pylint: disable=too-many-nested-blocks async def _loop(self) -> None: offset: Optional[int] = None while True: - updates, latest_update_id = await self.api.get_updates(timeout=60, offset=offset) + try: + updates, latest_update_id = await self.api.get_updates(timeout=60, offset=offset) + except Exception as err: # noqa, pylint: disable=broad-except + logger.error(err) + await sleep(1) + continue if updates: for update in updates: + message = update.message + logger.debug( + f'[ {message.chat.id} "{message.chat.title}" ] ' + f'[ {message.user.id} "{message.user.readable_name}" ] -> ' + f"{message.text}" + ) try: for matcher in self.matchers: - await matcher.match(self.api, update.message) + matched_handlers = await matcher.match(self.api, message, should_call_matched=True) + for handler in matched_handlers: # type: MatchedToken + logger.debug( + f'[ {message.chat.id} "{message.chat.title}" ] ' + f'[ {message.user.id} "{message.user.readable_name}" ] <- ' + f"{handler.token}" + ) except Exception as err: # noqa, pylint: disable=broad-except - if self.debug: - raise logger.error(err) offset = latest_update_id + 1 await sleep(1) diff --git a/chatushka/core/matchers/base.py b/chatushka/core/matchers/base.py index af85c17..8874a92 100644 --- a/chatushka/core/matchers/base.py +++ b/chatushka/core/matchers/base.py @@ -2,35 +2,53 @@ from asyncio import iscoroutinefunction from collections import defaultdict from inspect import signature -from typing import Any, Callable, Hashable, Iterable, Optional, Union +from typing import Any, Callable, Hashable, Iterable, NamedTuple, Optional, Union from chatushka.core.models import HANDLER_TYPING, MatchedToken +from chatushka.core.protocols import MatcherProtocol from chatushka.core.transports.models import Message from chatushka.core.transports.telegram_bot_api import TelegramBotApi -class MatcherBase(ABC): +class HelpMessage(NamedTuple): + tokens: tuple[str] + message: Optional[str] - handlers: dict[Hashable, list[HANDLER_TYPING]] +class MatcherBase(ABC): def __init__(self) -> None: - self.handlers = defaultdict(list) + self.handlers: dict[Hashable, list[HANDLER_TYPING]] = defaultdict(list) + self.matchers: list[MatcherProtocol] = list() + self.help_messages: list[HelpMessage] = list() def __call__( self, *tokens: Hashable, + help_message: Optional[str] = None, ) -> Callable[[Callable[[], None]], None]: def decorator( func: HANDLER_TYPING, ) -> None: - self.add_handler(tokens=tokens, handler=func) + self.add_handler( + tokens=tokens, + handler=func, + help_message=help_message, + ) return decorator + def make_help_message(self): + messages = self.help_messages + for matcher in self.matchers: + messages += matcher.make_help_message() + return messages + def add_handler( self, tokens: Union[Hashable, Iterable[Hashable]], handler: HANDLER_TYPING, + help_message: Optional[str] = None, + include_in_help: bool = True, ) -> None: if not isinstance(tokens, (list, tuple, set)): tokens = (tokens,) @@ -42,22 +60,36 @@ def add_handler( prepared = (prepared,) for token in prepared: self.handlers[token].append(handler) + if include_in_help: + self.help_messages.append(HelpMessage(tokens, help_message)) + + def add_matcher( + self, + *matchers: MatcherProtocol, + ): + self.matchers += matchers async def match( self, api: TelegramBotApi, message: Message, - ) -> Optional[MatchedToken]: + *, + should_call_matched: bool = False, + ) -> list[MatchedToken]: + matched_handlers = list() for token in self.handlers.keys(): if matched := await self._check(token, message): - await self.call( - api=api, - token=matched.token, - message=message, - kwargs=matched.kwargs | dict(args=matched.args), - ) - return matched - return + matched_handlers.append(matched) + if should_call_matched: + await self.call( + api=api, + token=matched.token, + message=message, + kwargs=matched.kwargs | dict(args=matched.args), + ) + for matcher in self.matchers: + matched_handlers += await matcher.match(api, message, should_call_matched=should_call_matched) + return matched_handlers async def call( self, @@ -85,7 +117,7 @@ def _cast_token( self, token: Hashable, ) -> Union[Any, Iterable[Any]]: - return (token,) + return (token,) # noqa # pylint: disable=unused-argument async def _check( diff --git a/chatushka/core/transports/telegram_bot_api.py b/chatushka/core/transports/telegram_bot_api.py index f752059..9a9ef89 100644 --- a/chatushka/core/transports/telegram_bot_api.py +++ b/chatushka/core/transports/telegram_bot_api.py @@ -144,3 +144,39 @@ async def get_chat_administrators( if status == ChatMemberStatuses.ADMINISTRATOR: admins.append(ChatMemberAdministrator(**result)) return admins + + async def pin_chat_message( + self, + chat_id: int, + message_id: int, + disable_notification: bool = True, + ) -> bool: + result = await self._call_api( + "pinChatMessage", + chat_id=chat_id, + message_id=message_id, + disable_notification=disable_notification, + ) + return result # noqa, type: ignore + + async def unpin_chat_message( + self, + chat_id: int, + message_id: int, + ) -> bool: + result = await self._call_api( + "unpinChatMessage", + chat_id=chat_id, + message_id=message_id, + ) + return result # noqa, type: ignore + + async def unpin_all_chat_messages( + self, + chat_id: int, + ) -> bool: + result = await self._call_api( + "unpinAllChatMessages", + chat_id=chat_id, + ) + return result # noqa, type: ignore