Skip to content

Commit

Permalink
-
Browse files Browse the repository at this point in the history
  • Loading branch information
A.Shpak committed Apr 2, 2024
1 parent 06026d7 commit 30dd313
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 179 deletions.
1 change: 1 addition & 0 deletions chatushka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from chatushka._models import Chat, ChatPermissions, Events, Message, Update, User
from chatushka._transport import TelegramBotAPI

__version__ = "0.1.0"
__all__ = [
"Chatushka",
# errors
Expand Down
1 change: 0 additions & 1 deletion chatushka/__version__.py

This file was deleted.

162 changes: 162 additions & 0 deletions chatushka/_bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import signal
from asyncio import ensure_future, gather, get_event_loop
from collections.abc import AsyncGenerator, MutableMapping, Sequence
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from datetime import timezone, tzinfo
from functools import partial
from traceback import print_exception

import aiocron # type: ignore

from chatushka._constants import HTTP_POOLING_TIMEOUT, HTTP_REGULAR_TIMEOUT
from chatushka._errors import ChatushkaResponseError
from chatushka._logger import logger
from chatushka._matchers import CommandMatcher, Matcher, TelegramBotAPI
from chatushka._sentry import report_exc


@asynccontextmanager
async def _default_lifespan(
_: "MatchersBot",
) -> AsyncGenerator[None, None]:
yield


class MatchersBot:
def __init__(
self,
*,
token: str,
cmd_prefixes: str | Sequence[str] = (),
lifespan: AbstractAsyncContextManager | None = None,
) -> None:
self._state: MutableMapping = {}
self._lifespan = lifespan or _default_lifespan
self._token = token
if isinstance(cmd_prefixes, str):
cmd_prefixes = [cmd_prefixes]
self._cmd_prefixes = cmd_prefixes
self._matchers: list[Matcher] = [] # type: ignore
self._schedulers: list[aiocron.Cron] = []

def __repr__(
self,
) -> str:
return f"<{self.__class__.__name__}: {len(self._matchers)} matchers, {len(self._schedulers)} schedulers>"

def add(
self,
matcher: Matcher,
) -> None:
if isinstance(matcher, CommandMatcher):
matcher.add_commands_prefixes(
self._cmd_prefixes,
)
logger.info(f"{self} + {matcher}")
self._matchers.append(matcher)

async def _call_scheduled(
self,
func,
):
async with self._make_api_client() as client:
await func(
api=client,
)

def schedule(
self,
cron: str,
tz: tzinfo = timezone.utc,
):
def _wrapper(
func,
):
job = aiocron.Cron(
cron,
func=partial(self._call_scheduled, func=func),
start=False,
tz=tz,
)
logger.info(f"{self} + {job}")
self._schedulers.append(job)

return _wrapper

async def _check_updates(
self,
api: TelegramBotAPI,
offset: int | None,
) -> int | None:
try:
updates, offset = await api.get_updates(offset)
except (Exception, ChatushkaResponseError) as exc:
report_exc(exc)
return offset
if not updates:
return offset
logger.debug(f"{self} <<< {len(updates)} updates from {offset=}")
results = await gather(
*[
matcher( # type: ignore
api=api,
update=update,
)
for update in updates
for matcher in self._matchers
],
return_exceptions=True,
)
for result in results:
if isinstance(result, Exception):
print_exception(result)
report_exc(result)
return offset

async def _loop(
self,
) -> None:
offset: int | None = None
while True:
async with TelegramBotAPI(
token=self._token,
timeout=HTTP_POOLING_TIMEOUT,
) as api:
offset = await self._check_updates(
api=api,
offset=offset,
)

def _make_api_client(
self,
timeout: int = HTTP_REGULAR_TIMEOUT,
) -> TelegramBotAPI:
return TelegramBotAPI(
token=self._token,
timeout=timeout,
)

async def _close(
self,
) -> None:
if self._schedulers:
logger.info(f"{self} (っ◔◡◔)っ stop schedulers")
for scheduler in self._schedulers:
scheduler.stop()

async def run(
self,
) -> None:
loop = get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, callback=lambda: ensure_future(self._close()))
except NotImplementedError:
break
if self._schedulers:
logger.info(f"{self} (っ◔◡◔)っ start schedulers")
for scheduler in self._schedulers:
scheduler.start()
logger.info(f"{self} (っ◔◡◔)っ start polling")
async with self._lifespan(self): # type: ignore
await self._loop()
Loading

0 comments on commit 30dd313

Please sign in to comment.