From ac086a7640a98357c46370f43db0ad958a85abab Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Thu, 24 Oct 2024 20:27:41 +0800 Subject: [PATCH 1/3] refactor(wechat): optimize message handling and add menu deletion --- app/modules/wechat/wechat.py | 175 ++++++++++++++++++++++------------- 1 file changed, 113 insertions(+), 62 deletions(-) diff --git a/app/modules/wechat/wechat.py b/app/modules/wechat/wechat.py index 4c675f9de..5ee719b02 100644 --- a/app/modules/wechat/wechat.py +++ b/app/modules/wechat/wechat.py @@ -10,6 +10,7 @@ from app.utils.common import retry from app.utils.http import RequestUtils from app.utils.string import StringUtils +from app.utils.url import UrlUtils lock = threading.Lock() @@ -31,14 +32,16 @@ class WeChat: _proxy = None # 企业微信发送消息URL - _send_msg_url = "/cgi-bin/message/send?access_token=%s" + _send_msg_url = "/cgi-bin/message/send?access_token={access_token}" # 企业微信获取TokenURL - _token_url = "/cgi-bin/gettoken?corpid=%s&corpsecret=%s" - # 企业微信创新菜单URL - _create_menu_url = "/cgi-bin/menu/create?access_token=%s&agentid=%s" - - def __init__(self, WECHAT_CORPID: str = None, WECHAT_APP_SECRET: str = None, WECHAT_APP_ID: str = None, - WECHAT_PROXY: str = None, **kwargs): + _token_url = "/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}" + # 企业微信创建菜单URL + _create_menu_url = "/cgi-bin/menu/create?access_token={access_token}&agentid={agentid}" + # 企业微信删除菜单URL + _delete_menu_url = "/cgi-bin/menu/delete?access_token={access_token}&agentid={agentid}" + + def __init__(self, WECHAT_CORPID: str = None, WECHAT_APP_SECRET: str = None, + WECHAT_APP_ID: str = None, WECHAT_PROXY: str = None, **kwargs): """ 初始化 """ @@ -51,10 +54,10 @@ def __init__(self, WECHAT_CORPID: str = None, WECHAT_APP_SECRET: str = None, WEC self._proxy = WECHAT_PROXY or "https://qyapi.weixin.qq.com" if self._proxy: - self._proxy = self._proxy.rstrip("/") - self._send_msg_url = f"{self._proxy}/{self._send_msg_url}" - self._token_url = f"{self._proxy}/{self._token_url}" - self._create_menu_url = f"{self._proxy}/{self._create_menu_url}" + self._send_msg_url = UrlUtils.adapt_request_url(self._proxy, self._send_msg_url) + self._token_url = UrlUtils.adapt_request_url(self._proxy, self._token_url) + self._create_menu_url = UrlUtils.adapt_request_url(self._proxy, self._create_menu_url) + self._delete_menu_url = UrlUtils.adapt_request_url(self._proxy, self._delete_menu_url) if self._corpid and self._appsecret and self._appid: self.__get_access_token() @@ -82,13 +85,13 @@ def __get_access_token(self, force=False): if not self._corpid or not self._appsecret: return None try: - token_url = self._token_url % (self._corpid, self._appsecret) + token_url = self._token_url.format(corpid=self._corpid, corpsecret=self._appsecret) res = RequestUtils().get_res(token_url) if res: ret_json = res.json() - if ret_json.get('errcode') == 0: - self._access_token = ret_json.get('access_token') - self._expires_in = ret_json.get('expires_in') + if ret_json.get("errcode") == 0: + self._access_token = ret_json.get("access_token") + self._expires_in = ret_json.get("expires_in") self._access_token_time = datetime.now() elif res is not None: logger.error(f"获取微信access_token失败,错误码:{res.status_code},错误原因:{res.reason}") @@ -100,8 +103,64 @@ def __get_access_token(self, force=False): return None return self._access_token + @staticmethod + def __split_content(content: str, max_bytes: int = 2048) -> List[str]: + """ + 将内容分块为不超过 max_bytes 字节的块 + :param content: 待拆分的内容 + :param max_bytes: 最大字节数 + :return: 分块后的内容列表 + """ + content_chunks = [] + current_chunk = bytearray() + + for line in content.splitlines(): + encoded_line = (line + "\n").encode("utf-8") + line_length = len(encoded_line) + + if line_length > max_bytes: + # 在处理长行之前,先将 current_chunk 添加到 content_chunks + if current_chunk: + content_chunks.append(current_chunk.decode("utf-8", errors="replace").strip()) + current_chunk = bytearray() + + # 处理长行,拆分为多个不超过 max_bytes 的块 + start = 0 + while start < line_length: + end = start + max_bytes # 不再需要为 "..." 预留空间 + if end >= line_length: + end = line_length + else: + # 调整以避免拆分多字节字符 + while end > start and (encoded_line[end] & 0xC0) == 0x80: + end -= 1 + if end == start: + # 单个字符超过了 max_bytes,强制包含整个字符 + end = start + 1 + while end < line_length and (encoded_line[end] & 0xC0) == 0x80: + end += 1 + truncated_line = encoded_line[start:end].decode("utf-8", errors="replace") + content_chunks.append(truncated_line.strip()) + start = end + continue # 继续处理下一行 + + # 检查添加当前行后是否会超过 max_bytes + if len(current_chunk) + line_length > max_bytes: + # 将 current_chunk 添加到 content_chunks + content_chunks.append(current_chunk.decode("utf-8", errors="replace").strip()) + current_chunk = bytearray() + + # 将当前行添加到 current_chunk + current_chunk += encoded_line + + # 处理剩余的 current_chunk + if current_chunk: + content_chunks.append(current_chunk.decode("utf-8", errors="replace").strip()) + + return content_chunks + def __send_message(self, title: str, text: str = None, - userid: str = None, link: str = None) -> Optional[bool]: + userid: str = None, link: str = None) -> bool: """ 发送文本消息 :param title: 消息标题 @@ -110,9 +169,14 @@ def __send_message(self, title: str, text: str = None, :param link: 跳转链接 :return: 发送状态,错误信息 """ - message_url = self._send_msg_url % self.__get_access_token() + if not title: + logger.error("消息标题不能为空") + return False + + message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if text: - content = "%s\n%s" % (title, text.replace("\n\n", "\n")) + formatted_text = text.replace("\n\n", "\n") + content = f"{title}\n{formatted_text}" else: content = title @@ -122,50 +186,28 @@ def __send_message(self, title: str, text: str = None, if not userid: userid = "@all" - # Check if content exceeds 2048 bytes and split if necessary - if len(content.encode('utf-8')) > 2048: - content_chunks = [] - current_chunk = "" - for line in content.splitlines(): - if len(current_chunk.encode('utf-8')) + len(line.encode('utf-8')) > 2048: - content_chunks.append(current_chunk.strip()) - current_chunk = "" - current_chunk += line + "\n" - if current_chunk: - content_chunks.append(current_chunk.strip()) - - # Send each chunk as a separate message - result = True - for chunk in content_chunks: - req_json = { - "touser": userid, - "msgtype": "text", - "agentid": self._appid, - "text": { - "content": chunk - }, - "safe": 0, - "enable_id_trans": 0, - "enable_duplicate_check": 0 - } - result = self.__post_request(message_url, req_json) - if not result: - return False - else: + # 分块处理逻辑 + content_chunks = self.__split_content(content) + + # 逐块发送消息 + for chunk in content_chunks: req_json = { "touser": userid, "msgtype": "text", "agentid": self._appid, "text": { - "content": content + "content": chunk }, "safe": 0, "enable_id_trans": 0, "enable_duplicate_check": 0 } result = self.__post_request(message_url, req_json) + if not result: + logger.error(f"发送消息块失败: {chunk}") + return False - return result + return True def __send_image_message(self, title: str, text: str, image_url: str, userid: str = None, link: str = None) -> Optional[bool]: @@ -178,7 +220,7 @@ def __send_image_message(self, title: str, text: str, image_url: str, :param link: 跳转链接 :return: 发送状态,错误信息 """ - message_url = self._send_msg_url % self.__get_access_token() + message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if text: text = text.replace("\n\n", "\n") if not userid: @@ -230,7 +272,7 @@ def send_medias_msg(self, medias: List[MediaInfo], userid: str = "") -> Optional logger.error("获取微信access_token失败,请检查参数配置") return None - message_url = self._send_msg_url % self.__get_access_token() + message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if not userid: userid = "@all" articles = [] @@ -272,7 +314,7 @@ def send_torrents_msg(self, torrents: List[Context], self.__send_message(title=title, userid=userid, link=link) # 发送列表 - message_url = self._send_msg_url % self.__get_access_token() + message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if not userid: userid = "@all" articles = [] @@ -289,11 +331,11 @@ def send_torrents_msg(self, torrents: List[Context], f"{StringUtils.str_filesize(torrent.size)} " \ f"{torrent.volume_factor} " \ f"{torrent.seeders}↑" - title = re.sub(r"\s+", " ", title).strip() + torrent_title = re.sub(r"\s+", " ", torrent_title).strip() articles.append({ "title": torrent_title, - "description": torrent.description if index == 1 else '', - "picurl": mediainfo.get_message_image() if index == 1 else '', + "description": torrent.description if index == 1 else "", + "picurl": mediainfo.get_message_image() if index == 1 else "", "url": torrent.page_url }) index += 1 @@ -313,16 +355,16 @@ def __post_request(self, message_url: str, req_json: dict) -> bool: 向微信发送请求 """ try: - res = RequestUtils(content_type='application/json').post( + res = RequestUtils(content_type="application/json").post( message_url, - data=json.dumps(req_json, ensure_ascii=False).encode('utf-8') + data=json.dumps(req_json, ensure_ascii=False).encode("utf-8") ) if res and res.status_code == 200: ret_json = res.json() - if ret_json.get('errcode') == 0: + if ret_json.get("errcode") == 0: return True else: - if ret_json.get('errcode') == 42001: + if ret_json.get("errcode") == 42001: self.__get_access_token(force=True) logger.error(f"发送请求失败,错误信息:{ret_json.get('errmsg')}") return False @@ -376,12 +418,12 @@ def create_menus(self, commands: Dict[str, dict]): } """ # 请求URL - req_url = self._create_menu_url % (self.__get_access_token(), self._appid) + req_url = self._create_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid) # 对commands按category分组 category_dict = {} for key, value in commands.items(): - category: Dict[str, dict] = value.get("category") + category: str = value.get("category") if category: if not category_dict.get(category): category_dict[category] = {} @@ -408,3 +450,12 @@ def create_menus(self, commands: Dict[str, dict]): self.__post_request(req_url, { "button": buttons[:3] }) + + def delete_menus(self): + """ + 删除微信菜单 + """ + # 请求URL + req_url = self._delete_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid) + # 发送请求 + RequestUtils().get(req_url) From 5d89ad965f8797fb045bdb9d02f11527f8754166 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 25 Oct 2024 17:26:56 +0800 Subject: [PATCH 2/3] fix(telegram): ensure image cache path exists --- app/modules/telegram/telegram.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/app/modules/telegram/telegram.py b/app/modules/telegram/telegram.py index 9a6d8226e..3652840bc 100644 --- a/app/modules/telegram/telegram.py +++ b/app/modules/telegram/telegram.py @@ -206,14 +206,15 @@ def __send_request(self, userid: str = None, image="", caption="") -> bool: """ 向Telegram发送报文 """ - if image: res = RequestUtils(proxies=settings.PROXY).get_res(image) if res is None: raise Exception("获取图片失败") if res.content: # 使用随机标识构建图片文件的完整路径,并写入图片内容到文件 - image_file = Path(settings.TEMP_PATH) / str(uuid.uuid4()) + image_file = Path(settings.TEMP_PATH) / "telegram" / str(uuid.uuid4()) + if not image_file.parent.exists(): + image_file.parent.mkdir(parents=True, exist_ok=True) image_file.write_bytes(res.content) photo = InputFile(image_file) # 发送图片到Telegram @@ -223,8 +224,7 @@ def __send_request(self, userid: str = None, image="", caption="") -> bool: parse_mode="Markdown") if ret is None: raise Exception("发送图片消息失败") - if ret: - return True + return True # 按4096分段循环发送消息 ret = None if len(caption) > 4095: From eff8a6c497479a77269e6e7c819d1d875dcb1f4f Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 25 Oct 2024 17:27:58 +0800 Subject: [PATCH 3/3] feat(wechat): add retry mechanism for message requests --- app/modules/wechat/wechat.py | 97 ++++++++++++++++++++---------------- 1 file changed, 54 insertions(+), 43 deletions(-) diff --git a/app/modules/wechat/wechat.py b/app/modules/wechat/wechat.py index 5ee719b02..89d13507e 100644 --- a/app/modules/wechat/wechat.py +++ b/app/modules/wechat/wechat.py @@ -66,7 +66,7 @@ def get_state(self): """ 获取状态 """ - return True if self.__get_access_token else False + return True if self.__get_access_token() else False @retry(Exception, logger=logger) def __get_access_token(self, force=False): @@ -172,23 +172,17 @@ def __send_message(self, title: str, text: str = None, if not title: logger.error("消息标题不能为空") return False - - message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if text: formatted_text = text.replace("\n\n", "\n") content = f"{title}\n{formatted_text}" else: content = title - if link: content = f"{content}\n点击查看:{link}" - if not userid: userid = "@all" - # 分块处理逻辑 content_chunks = self.__split_content(content) - # 逐块发送消息 for chunk in content_chunks: req_json = { @@ -202,11 +196,13 @@ def __send_message(self, title: str, text: str = None, "enable_id_trans": 0, "enable_duplicate_check": 0 } - result = self.__post_request(message_url, req_json) - if not result: - logger.error(f"发送消息块失败: {chunk}") + try: + # 如果是超长消息,有一个发送失败就全部失败 + if not self.__post_request(self._send_msg_url, req_json): + return False + except Exception as e: + logger.error(f"发送消息块失败:{e}") return False - return True def __send_image_message(self, title: str, text: str, image_url: str, @@ -220,7 +216,6 @@ def __send_image_message(self, title: str, text: str, image_url: str, :param link: 跳转链接 :return: 发送状态,错误信息 """ - message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if text: text = text.replace("\n\n", "\n") if not userid: @@ -240,7 +235,11 @@ def __send_image_message(self, title: str, text: str, image_url: str, ] } } - return self.__post_request(message_url, req_json) + try: + return self.__post_request(self._send_msg_url, req_json) + except Exception as e: + logger.error(f"发送图文消息失败:{e}") + return False def send_msg(self, title: str, text: str = "", image: str = "", userid: str = None, link: str = None) -> Optional[bool]: @@ -272,7 +271,6 @@ def send_medias_msg(self, medias: List[MediaInfo], userid: str = "") -> Optional logger.error("获取微信access_token失败,请检查参数配置") return None - message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if not userid: userid = "@all" articles = [] @@ -298,7 +296,11 @@ def send_medias_msg(self, medias: List[MediaInfo], userid: str = "") -> Optional "articles": articles } } - return self.__post_request(message_url, req_json) + try: + return self.__post_request(self._send_msg_url, req_json) + except Exception as e: + logger.error(f"发送消息失败:{e}") + return False def send_torrents_msg(self, torrents: List[Context], userid: str = "", title: str = "", link: str = None) -> Optional[bool]: @@ -314,7 +316,6 @@ def send_torrents_msg(self, torrents: List[Context], self.__send_message(title=title, userid=userid, link=link) # 发送列表 - message_url = self._send_msg_url.format(access_token=self.__get_access_token()) if not userid: userid = "@all" articles = [] @@ -348,35 +349,41 @@ def send_torrents_msg(self, torrents: List[Context], "articles": articles } } - return self.__post_request(message_url, req_json) + try: + return self.__post_request(self._send_msg_url, req_json) + except Exception as e: + logger.error(f"发送消息失败:{e}") + return False - def __post_request(self, message_url: str, req_json: dict) -> bool: + @retry(Exception, logger=logger) + def __post_request(self, url: str, req_json: dict) -> bool: """ 向微信发送请求 """ - try: - res = RequestUtils(content_type="application/json").post( - message_url, - data=json.dumps(req_json, ensure_ascii=False).encode("utf-8") - ) - if res and res.status_code == 200: - ret_json = res.json() - if ret_json.get("errcode") == 0: - return True - else: - if ret_json.get("errcode") == 42001: - self.__get_access_token(force=True) - logger.error(f"发送请求失败,错误信息:{ret_json.get('errmsg')}") - return False - elif res is not None: - logger.error(f"发送请求失败,错误码:{res.status_code},错误原因:{res.reason}") - return False + url = url.format(access_token=self.__get_access_token()) + res = RequestUtils(content_type="application/json").post( + url=url, + data=json.dumps(req_json, ensure_ascii=False).encode("utf-8") + ) + if res is None: + error_msg = "发送请求失败,未获取到返回信息" + raise Exception(error_msg) + if res.status_code != 200: + error_msg = f"发送请求失败,错误码:{res.status_code},错误原因:{res.reason}" + raise Exception(error_msg) + + ret_json = res.json() + if ret_json.get("errcode") == 0: + return True + else: + if ret_json.get("errcode") == 42001: + self.__get_access_token(force=True) + error_msg = (f"access_token 已过期,尝试重新获取 access_token," + f"errcode: {ret_json.get('errcode')}, errmsg: {ret_json.get('errmsg')}") + raise Exception(error_msg) else: - logger.error(f"发送请求失败,未获取到返回信息") + logger.error(f"发送请求失败,错误信息:{ret_json.get('errmsg')}") return False - except Exception as err: - logger.error(f"发送请求失败,错误信息:{str(err)}") - return False def create_menus(self, commands: Dict[str, dict]): """ @@ -418,7 +425,7 @@ def create_menus(self, commands: Dict[str, dict]): } """ # 请求URL - req_url = self._create_menu_url.format(access_token=self.__get_access_token(), agentid=self._appid) + req_url = self._create_menu_url.format(access_token="{access_token}", agentid=self._appid) # 对commands按category分组 category_dict = {} @@ -447,9 +454,13 @@ def create_menus(self, commands: Dict[str, dict]): if buttons: # 发送请求 - self.__post_request(req_url, { - "button": buttons[:3] - }) + try: + self.__post_request(req_url, { + "button": buttons[:3] + }) + except Exception as e: + logger.error(f"创建菜单失败:{e}") + return False def delete_menus(self): """