diff --git a/pkg/platform/sources/gewechat.py b/pkg/platform/sources/gewechat.py index 06e1378b..8c1b7d6f 100644 --- a/pkg/platform/sources/gewechat.py +++ b/pkg/platform/sources/gewechat.py @@ -26,7 +26,7 @@ from ..types import events as platform_events from ..types import entities as platform_entities from ...utils import image import xml.etree.ElementTree as ET - +from typing import Optional, List, Tuple class GewechatMessageConverter(adapter.MessageConverter): @@ -60,13 +60,18 @@ class GewechatMessageConverter(adapter.MessageConverter): 'link_thumb_url': component.link_thumb_url, 'link_url': component.link_url}) elif isinstance(component, platform_message.WeChatForwardLink): content_list.append({'type': 'WeChatForwardLink', 'xml_data': component.xml_data}) - - elif isinstance(component, platform_message.Voice): - content_list.append({"type": "voice", "url": component.url, "length": component.length}) + content_list.append({"type": "voice", "url": component.url, "length": component.length}) + elif isinstance(component, platform_message.WeChatForwardImage): + content_list.append({'type': 'WeChatForwardImage', 'xml_data': component.xml_data}) + elif isinstance(component, platform_message.WeChatForwardFile): + content_list.append({'type': 'WeChatForwardFile', 'xml_data': component.xml_data}) + elif isinstance(component, platform_message.WeChatAppMsg): + content_list.append({'type': 'WeChatAppMsg', 'app_msg': component.app_msg}) elif isinstance(component, platform_message.Forward): for node in component.node_list: - content_list.extend(await GewechatMessageConverter.yiri2target(node.message_chain)) + if node.message_chain: + content_list.extend(await GewechatMessageConverter.yiri2target(node.message_chain)) return content_list @@ -76,49 +81,38 @@ class GewechatMessageConverter(adapter.MessageConverter): bot_account_id: str ) -> platform_message.MessageChain: - - - if message["Data"]["MsgType"] == 1: - # 检查消息开头,如果有 wxid_sbitaz0mt65n22:\n 则删掉 - regex = re.compile(r"^wxid_.*:") - # print(message) - - line_split = message["Data"]["Content"]["string"].split("\n") - - if len(line_split) > 0 and regex.match(line_split[0]): - message["Data"]["Content"]["string"] = "\n".join(line_split[1:]) - - - # 正则表达式模式,匹配'@'后跟任意数量的非空白字符 - pattern = r'@\S+' - at_string = f"@{bot_account_id}" - content_list = [] - if at_string in message["Data"]["Content"]["string"]: + # 预处理 + content_list = [] + ats_bot = False + raw_content = message["Data"]["Content"]["string"] + is_group_message = self.__is_group_message(message) + if is_group_message: + ats_bot = self.__ats_bot(message, bot_account_id) + # 优先处理艾特全体成员, + if "@所有人" in raw_content: ## at全员时候传入atll不当作at自己 + content_list.append(platform_message.AtAll()) + elif ats_bot: content_list.append(platform_message.At(target=bot_account_id)) - content_list.append(platform_message.Plain(message["Data"]["Content"]["string"].replace(at_string, '', 1))) - # 更优雅的替换改名后@机器人,仅仅限于单独AT的情况 - elif "PushContent" in message['Data'] and '在群聊中@了你' in message["Data"]["PushContent"]: - if '@所有人' in message["Data"]["Content"]["string"]: # at全员时候传入atll不当作at自己 - content_list.append(platform_message.AtAll()) - else: - content_list.append(platform_message.At(target=bot_account_id)) - content_list.append(platform_message.Plain(re.sub(pattern, '', message["Data"]["Content"]["string"]))) - else: - content_list = [platform_message.Plain(message["Data"]["Content"]["string"])] + raw_content, sender_id = self.__extract_content_and_sender(raw_content) + # 消息类型 + msg_type = message["Data"]["MsgType"] + + # 文本消息 + if msg_type == 1: + # 文本清洗,仅替换群文本中的@文本[空格],的文本 + if is_group_message and ats_bot: + pattern = r'@\S+' + raw_content = re.sub(pattern, '',raw_content) + content_list.append(platform_message.Plain(raw_content)) return platform_message.MessageChain(content_list) - - elif message["Data"]["MsgType"] == 3: - image_xml = message["Data"]["Content"]["string"] - if image_xml.startswith('wxid'): # 此处处理群聊发送图片会有微信id开头 - xml_list = image_xml.split('\n')[2:] - image_xml = '\n'.join(xml_list) + + # 图像 + elif msg_type == 3: + image_xml = raw_content # 已经去除群聊消息前缀 if not image_xml: - return platform_message.MessageChain([ - platform_message.Plain(text="[图片内容为空]") - ]) - - + content_list.append(platform_message.Plain(text="[图片内容为空]")) + return platform_message.MessageChain(content_list) try: base64_str, image_format = await image.get_gewechat_image_base64( gewechat_url=self.config["gewechat_url"], @@ -129,17 +123,20 @@ class GewechatMessageConverter(adapter.MessageConverter): image_type=2, ) - return platform_message.MessageChain([ - platform_message.Image( - base64=f"data:image/{image_format};base64,{base64_str}" - ) - ]) + content_list.append(platform_message.Image( + base64=f"data:image/{image_format};base64,{base64_str}" + )) + # 消息链中加一个WeChatForwardImage的xml用于转发 + content_list.append(platform_message.WeChatForwardImage( + xml_data = image_xml + )) + return platform_message.MessageChain(content_list) except Exception as e: print(f"处理图片消息失败: {str(e)}") - return platform_message.MessageChain([ - platform_message.Plain(text=f"[图片处理失败]") - ]) - elif message["Data"]["MsgType"] == 34: + content_list.append(platform_message.Plain(text=f"[图片处理失败]")) + return platform_message.MessageChain(content_list) + # 语音消息 + elif msg_type == 34: try: audio_base64 = message["Data"]["ImgBuf"]["buffer"] return platform_message.MessageChain( @@ -149,23 +146,20 @@ class GewechatMessageConverter(adapter.MessageConverter): return platform_message.MessageChain( [platform_message.Plain(text="[无法解析群聊语音的消息]")] # 小测了一下,免费版拿不到群聊语音消息的base64,或者用什么办法解析xml里的url? ) - elif message["Data"]["MsgType"] == 49: + finally: + return platform_message.MessageChain(content_list) + elif msg_type == 49: # 支持微信聊天记录的消息类型,将 XML 内容转换为 MessageChain 传递 - content = message["Data"]["Content"]["string"] - try: - # content = message["Data"]["Content"]["string"] - # 有三种可能的消息结构weid开头,私聊直接和直接 - if content.startswith('wxid'): - xml_list = content.split('\n')[2:] + try: + # 下方是移除 bool: + ats_bot = False + try: + to_user_name = message['Wxid'] # 接收方: 所属微信的wxid + raw_content = message["Data"]["Content"]["string"] # 原始消息内容 + # step 1 + ats_bot = ats_bot or (f"@{bot_account_id}" in raw_content) + # step 2 + push_content = message.get('Data', {}).get('PushContent', '') + ats_bot = ats_bot or ('在群聊中@了你' in push_content) + # step 3 + msg_source = message.get('Data', {}).get('MsgSource', '') or '' + if len(msg_source) > 0: + msg_source_data = ET.fromstring(msg_source) + at_user_list = msg_source_data.findtext("atuserlist") or "" + ats_bot = ats_bot or (to_user_name in at_user_list) + except Exception as e: + print(f"__ats_bot got except: {e}") + finally: + return ats_bot + + # 提取一下content前面的sender_id, 和去掉前缀的内容 + def __extract_content_and_sender(self, raw_content: str) -> Tuple[str, Optional[str]]: + try: + # 检查消息开头,如果有 wxid_sbitaz0mt65n22:\n 则删掉 + # add: 有些用户的wxid不是上述格式。换成user_name: + regex = re.compile(r"^[a-zA-Z0-9_\-]{5,20}:") + line_split = raw_content.split("\n") + if len(line_split) > 0 and regex.match(line_split[0]): + raw_content = "\n".join(line_split[1:]) + sender_id = line_split[0].strip(":") + return raw_content, sender_id + except Exception as e: + print(f"__extract_content_and_sender got except: {e}") + finally: + return raw_content, None + + # 是否是群消息 + def __is_group_message(self, message: dict)->bool: + from_user_name = message['Data']['FromUserName']['string'] + return from_user_name.endswith("@chatroom") class GewechatEventConverter(adapter.EventConverter): @@ -365,55 +423,118 @@ class GeWeChatAdapter(adapter.MessagePlatformAdapter): return 'ok' + async def _handle_message( + self, + message: platform_message.MessageChain, + target_id: str + ): + """统一消息处理核心逻辑""" + content_list = await self.message_converter.yiri2target(message) + at_targets = [item["target"] for item in content_list if item["type"] == "at"] + + # 处理@逻辑 + at_targets = at_targets or [] + member_info = [] + if at_targets: + member_info = self.bot.get_chatroom_member_detail( + self.config["app_id"], + target_id, + at_targets[::-1] + )["data"] + + # 处理消息组件 + for msg in content_list: + # 文本消息处理@ + if msg['type'] == 'text' and at_targets: + for member in member_info: + msg['content'] = f'@{member["nickName"]} {msg["content"]}' + + # 统一消息派发 + handler_map = { + 'text': lambda msg: self.bot.post_text( + app_id=self.config['app_id'], + to_wxid=target_id, + content=msg['content'], + ats=",".join(at_targets) + ), + 'image': lambda msg: self.bot.post_image( + app_id=self.config['app_id'], + to_wxid=target_id, + img_url=msg["image"] + ), + 'WeChatForwardMiniPrograms': lambda msg: self.bot.forward_mini_app( + app_id=self.config['app_id'], + to_wxid=target_id, + xml=msg['xml_data'], + cover_img_url=msg.get('image_url') + ), + 'WeChatEmoji': lambda msg: self.bot.post_emoji( + app_id=self.config['app_id'], + to_wxid=target_id, + emoji_md5=msg['emoji_md5'], + emoji_size=msg['emoji_size'] + ), + 'WeChatLink': lambda msg: self.bot.post_link( + app_id=self.config['app_id'], + to_wxid=target_id, + title=msg['link_title'], + desc=msg['link_desc'], + link_url=msg['link_url'], + thumb_url=msg['link_thumb_url'], + ), + 'WeChatMiniPrograms': lambda msg: self.bot.post_mini_app( + app_id=self.config['app_id'], + to_wxid=target_id, + mini_app_id=msg['mini_app_id'], + display_name=msg['display_name'], + page_path=msg['page_path'], + cover_img_url=msg['cover_img_url'], + title=msg['title'], + user_name=msg['user_name'] + ), + 'WeChatForwardLink': lambda msg: self.bot.forward_url( + app_id=self.config['app_id'], + to_wxid=target_id, + xml=msg['xml_data'] + ), + 'WeChatForwardImage': lambda msg: self.bot.forward_image( + app_id=self.config['app_id'], + to_wxid=target_id, + xml=msg['xml_data'] + ), + 'WeChatForwardFile': lambda msg: self.bot.forward_file( + app_id=self.config['app_id'], + to_wxid=target_id, + xml=msg['xml_data'] + ), + 'voice': lambda msg: self.bot.post_voice( + app_id=self.config['app_id'], + to_wxid=target_id, + voice_url=msg['url'], + voice_duration=msg['length'] + ), + 'WeChatAppMsg': lambda msg: self.bot.post_app_msg( + app_id=self.config['app_id'], + to_wxid=target_id, + appmsg=msg['app_msg'] + ), + 'at': lambda msg: None + } + + if handler := handler_map.get(msg['type']): + handler(msg) + else: + self.ap.logger.warning(f"未处理的消息类型: {msg['type']}") + continue + async def send_message( self, target_type: str, target_id: str, message: platform_message.MessageChain ): - geweap_msg = await self.message_converter.yiri2target(message) - # 此处加上群消息at处理 - ats = [item["target"] for item in geweap_msg if item["type"] == "at"] - - - for msg in geweap_msg: - # at主动发送消息 - if msg['type'] == 'text': - if ats: - member_info = self.bot.get_chatroom_member_detail( - self.config["app_id"], - target_id, - ats[::-1] - )["data"] - - for member in member_info: - msg['content'] = f'@{member["nickName"]} {msg["content"]}' - self.bot.post_text(app_id=self.config['app_id'], to_wxid=target_id, content=msg['content'], - ats=",".join(ats)) - - elif msg['type'] == 'image': - - self.bot.post_image(app_id=self.config['app_id'], to_wxid=target_id, img_url=msg["image"]) - elif msg['type'] == 'WeChatMiniPrograms': - self.bot.post_mini_app(app_id=self.config['app_id'], to_wxid=target_id, mini_app_id=msg['mini_app_id'] - , display_name=msg['display_name'], page_path=msg['page_path'] - , cover_img_url=msg['cover_img_url'], title=msg['title'], user_name=msg['user_name']) - elif msg['type'] == 'WeChatForwardMiniPrograms': - self.bot.forward_mini_app(app_id=self.config['app_id'], to_wxid=target_id, xml=msg['xml_data'], cover_img_url=msg['image_url']) - elif msg['type'] == 'WeChatEmoji': - self.bot.post_emoji(app_id=self.config['app_id'], to_wxid=target_id, - emoji_md5=msg['emoji_md5'], emoji_size=msg['emoji_size']) - elif msg['type'] == 'WeChatLink': - self.bot.post_link(app_id=self.config['app_id'], to_wxid=target_id - ,title=msg['link_title'], desc=msg['link_desc'] - , link_url=msg['link_url'], thumb_url=msg['link_thumb_url']) - - elif msg['type'] == 'WeChatForwardLink': - self.bot.forward_url(app_id=self.config['app_id'], to_wxid=target_id, xml=msg['xml_data']) - elif msg['type'] == 'voice': - self.bot.post_voice(app_id=self.config['app_id'], to_wxid=target_id, voice_url=msg['url'],voice_duration=msg['length']) - - + """主动发送消息""" + return await self._handle_message(message, target_id) async def reply_message( self, @@ -421,51 +542,10 @@ class GeWeChatAdapter(adapter.MessagePlatformAdapter): message: platform_message.MessageChain, quote_origin: bool = False ): - content_list = await self.message_converter.yiri2target(message) - - ats = [item["target"] for item in content_list if item["type"] == "at"] - target_id = message_source.source_platform_object["Data"]["FromUserName"]["string"] - - for msg in content_list: - if msg["type"] == "text": - - if ats: - member_info = self.bot.get_chatroom_member_detail( - self.config["app_id"], - message_source.source_platform_object["Data"]["FromUserName"]["string"], - ats[::-1] - )["data"] - - for member in member_info: - msg['content'] = f'@{member["nickName"]} {msg["content"]}' - - self.bot.post_text( - app_id=self.config["app_id"], - to_wxid=message_source.source_platform_object["Data"]["FromUserName"]["string"], - content=msg["content"], - ats=",".join(ats) - ) - elif msg['type'] == 'image': - - self.bot.post_image(app_id=self.config['app_id'], to_wxid=target_id, img_url=msg["image"]) - elif msg['type'] == 'WeChatMiniPrograms': - self.bot.post_mini_app(app_id=self.config['app_id'], to_wxid=target_id, mini_app_id=msg['mini_app_id'] - , display_name=msg['display_name'], page_path=msg['page_path'] - , cover_img_url=msg['cover_img_url'], title=msg['title'], user_name=msg['user_name']) - elif msg['type'] == 'WeChatForwardMiniPrograms': - self.bot.forward_mini_app(app_id=self.config['app_id'], to_wxid=target_id, xml=msg['xml_data'], cover_img_url=msg['image_url']) - elif msg['type'] == 'WeChatEmoji': - self.bot.post_emoji(app_id=self.config['app_id'], to_wxid=target_id, - emoji_md5=msg['emoji_md5'], emoji_size=msg['emoji_size']) - elif msg['type'] == 'WeChatLink': - self.bot.post_link(app_id=self.config['app_id'], to_wxid=target_id - , title=msg['link_title'], desc=msg['link_desc'] - , link_url=msg['link_url'], thumb_url=msg['link_thumb_url']) - elif msg['type'] == 'WeChatForwardLink': - self.bot.forward_url(app_id=self.config['app_id'], to_wxid=target_id, xml=msg['xml_data']) - elif msg['type'] == 'voice': - self.bot.post_voice(app_id=self.config['app_id'], to_wxid=target_id, voice_url=msg['url'], - voice_duration=msg['length']) + """回复消息""" + if message_source.source_platform_object: + target_id = message_source.source_platform_object["Data"]["FromUserName"]["string"] + return await self._handle_message(message, target_id) async def is_muted(self, group_id: int) -> bool: pass @@ -519,8 +599,13 @@ class GeWeChatAdapter(adapter.MessagePlatformAdapter): time.sleep(2) - ret = self.bot.set_callback(self.config["token"], self.config["callback_url"]) - print('设置 Gewechat 回调:', ret) + try: + # gewechat-server容器重启, token会变,但是还会登录成功 + # 换新token也会收不到回调,要重新登陆下。 + ret = self.bot.set_callback(self.config["token"], self.config["callback_url"]) + except Exception as e: + raise Exception(f"设置 Gewechat 回调失败, token失效: {e}") + threading.Thread(target=gewechat_login_process).start() diff --git a/pkg/platform/types/message.py b/pkg/platform/types/message.py index 20ca399b..0396dad9 100644 --- a/pkg/platform/types/message.py +++ b/pkg/platform/types/message.py @@ -1,16 +1,15 @@ import itertools import logging +import typing from datetime import datetime from enum import Enum from pathlib import Path -import typing import pydantic.v1 as pydantic from . import entities as platform_entities from .base import PlatformBaseModel, PlatformIndexedMetaclass, PlatformIndexedModel - logger = logging.getLogger(__name__) @@ -642,7 +641,8 @@ class Unknown(MessageComponent): """消息组件类型。""" text: str """文本。""" - + def __str__(self): + return f'Unknown Message: {self.text}' class Voice(MessageComponent): """语音。""" @@ -837,6 +837,8 @@ class WeChatForwardMiniPrograms(MessageComponent): xml_data: str """首页图片""" image_url: typing.Optional[str] = None + def __str__(self): + return self.xml_data class WeChatEmoji(MessageComponent): @@ -866,4 +868,29 @@ class WeChatForwardLink(MessageComponent): type: str = 'WeChatForwardLink' """xml数据""" xml_data: str + def __str__(self): + return self.xml_data +class WeChatForwardImage(MessageComponent): + """转发图片。个人微信专用组件。""" + type: str = 'WeChatForwardImage' + """xml数据""" + xml_data: str + def __str__(self): + return self.xml_data + +class WeChatForwardFile(MessageComponent): + """转发文件。个人微信专用组件。""" + type: str = 'WeChatForwardFile' + """xml数据""" + xml_data: str + def __str__(self): + return self.xml_data + +class WeChatAppMsg(MessageComponent): + """通用appmsg发送。个人微信专用组件。""" + type: str = 'WeChatAppMsg' + """xml数据""" + app_msg: str + def __str__(self): + return self.app_msg