-
Notifications
You must be signed in to change notification settings - Fork 0
/
aiohubot_telegram.py
155 lines (132 loc) · 5.82 KB
/
aiohubot_telegram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import re
from os import environ
from asyncio import CancelledError, Event, ensure_future, gather, sleep
from collections.abc import Mapping
from aiogram import Bot
from aiohubot import Adapter, TextMessage, CatchAllMessage
__version__ = '0.1.0'
UNSUPPORTED_FIELDS = ("inline_query", "chosen_inline_result", "callback_query",
"shipping_query", "pre_checkout_query",
"poll", "poll_answer", "my_chat_member", "chat_member")
class Telegram(Adapter):
def __init__(self, robot):
super().__init__(robot)
self.bot = self.stream = self._offset = None
self._polling = Event()
self.api_token = environ.get("HUBOT_TELEGRAM_TOKEN", "")
self.interval = environ.get("HUBOT_TELEGRAM_INTERVAL", 0.5)
# TODO: support webhook
async def send(self, envelope, *strings):
msg, user = envelope['message'], envelope['user']
if hasattr(msg, 'origin'):
await msg.origin.answer("\n".join(strings))
else:
self.robot.logger.warning("Not support, use raw bot directly.")
async def reply(self, envelope, *strings):
msg, user = envelope['message'], envelope['user']
if hasattr(msg, 'origin'):
await msg.origin.reply("\n".join(strings))
else:
self.robot.logger.warning("Not support, use raw bot directly.")
async def run(self):
if not self.api_token:
err_msg = "environment `HUBOT_TELEGRAM_TOKEN` is required."
raise AttributeError(err_msg)
self.bot = Bot(self.api_token)
Bot.set_current(self.bot)
me = await self.bot.me
self.robot.logger.info("Connected to Telegram as Bot"
f" {me.first_name}(@{me.username})")
if me.username.lower() != self.robot.name.lower():
msg = (f"Inconsistent bot name found: {me.username} from Telegram,"
f" {self.robot.name} from Hubot."
"\nIt will run into problem when using @mention.")
self.robot.logger.warning(msg)
self._polling.set()
self.stream = ensure_future(self._start_polling())
self.emit("connected")
async def _start_polling(self, timeout=10, reset_webhook=True):
if reset_webhook:
await self.bot.delete_webhook()
self.robot.logger.info("Telegram: Start polling...")
while self._polling.is_set():
try:
with self.bot.request_timeout(timeout):
updates = await self.bot.get_updates(offset=self._offset,
timeout=timeout)
except CancelledError:
self.emit("disconnected")
self.robot.logger.debug("Telegram: Polling Received Cancellation.")
self._polling.clear()
except Exception as e:
self.robot.logger.exception(f"Telegram: client error - {e!r}")
self.emit("clientError", e)
await sleep(self.interval * 10)
else:
if updates:
self._offset = updates[-1].update_id + 1
ensure_future(self.handle_updates(*updates))
await sleep(self.interval)
self.robot.logger.info("Telegram: Stop polling...")
async def handle_updates(self, *updates):
futs = []
for update in updates:
self.robot.logger.debug(f"Received update: {update.to_python()}")
msg = (update.message or update.edited_message
or update.channel_post or update.edited_channel_post)
if msg:
user = self.robot.brain.user_for_id(**dict(msg.from_user))
changed = self.diff_user(user, msg.from_user, update=True)
hubot_msg = self._msg_reformat(msg)
msg_obj = TextMessage(user, hubot_msg, msg.message_id)
msg_obj.origin = msg
futs.append(self.receive(msg_obj))
else:
futs.append(self._handle_unsupported(update))
await gather(*futs)
def diff_user(self, old, new, update=False):
if isinstance(old, Mapping):
old = set(old.items())
else:
old = set(old)
if isinstance(new, Mapping):
new = set(new.items())
else:
new = set(new)
changed = dict(new-old)
if update and changed:
new_m = dict(new)
u = self.robot.brain.data['users'][new_m['id']]
u.update(changed)
return changed
def close(self):
self.robot.logger.debug("Adapter closing...")
self.stream.cancel()
self.stream = None
loop = self.robot._loop
if not loop.is_running():
loop.run_until_complete(self.bot.session.close())
def _handle_unsupported(self, update):
for name in UNSUPPORTED_FIELDS:
if update.values.get(name):
self.robot.logger.debug(f"Unspported field: {name}")
obj = update.values[name]
obj.user = obj.values.get('from')
msg_obj = CatchAllMessage(obj)
msg_obj.field = name
return self.receive(msg_obj)
else:
self.robot.logger.warning(f"Update with Unknown field.")
update.user = None
msg_obj = CatchAllMessage(update)
msg_obj.field = None
return self.receive(msg_obj)
def _msg_reformat(self, msg):
bot_prefix = f"{self.robot.name}:"
hubot_msg = re.sub(fr"^@{self.bot._me.username}(,\\b)",
bot_prefix, msg.text, flags=re.I)
if (not re.match(fr"^{self.robot.name}", hubot_msg, re.I)
and msg.chat.type == "private"):
hubot_msg = bot_prefix + hubot_msg
return hubot_msg
use = Telegram # to fit the API