diff --git a/libs/wecom_customer_service_api/__init__.py b/libs/wecom_customer_service_api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/libs/wecom_customer_service_api/api.py b/libs/wecom_customer_service_api/api.py new file mode 100644 index 00000000..279585b3 --- /dev/null +++ b/libs/wecom_customer_service_api/api.py @@ -0,0 +1,337 @@ +from quart import request +from ..wecom_api.WXBizMsgCrypt3 import WXBizMsgCrypt +import base64 +import binascii +import httpx +import traceback +from quart import Quart +import xml.etree.ElementTree as ET +from typing import Callable, Dict, Any +from .wecomcsevent import WecomCSEvent +from pkg.platform.types import events as platform_events, message as platform_message +import aiofiles + + +class WecomCSClient(): + def __init__(self,corpid:str,secret:str,token:str,EncodingAESKey:str): + self.corpid = corpid + self.secret = secret + self.access_token_for_contacts ='' + self.token = token + self.aes = EncodingAESKey + self.base_url = 'https://qyapi.weixin.qq.com/cgi-bin' + self.access_token = '' + self.app = Quart(__name__) + self.wxcpt = WXBizMsgCrypt(self.token, self.aes, self.corpid) + self.app.add_url_rule('/callback/command', 'handle_callback', self.handle_callback_request, methods=['GET', 'POST']) + self._message_handlers = { + "example":[], + } + + async def get_pic_url(self, media_id: str): + if not await self.check_access_token(): + self.access_token = await self.get_access_token(self.secret) + + url = f"{self.base_url}/media/get?access_token={self.access_token}&media_id={media_id}" + + async with httpx.AsyncClient() as client: + response = await client.get(url) + if response.headers.get("Content-Type", "").startswith("application/json"): + data = response.json() + if data.get('errcode') in [40014, 42001]: + self.access_token = await self.get_access_token(self.secret) + return await self.get_pic_url(media_id) + else: + raise Exception("Failed to get image: " + str(data)) + + # 否则是图片,转成 base64 + image_bytes = response.content + content_type = response.headers.get("Content-Type", "") + base64_str = base64.b64encode(image_bytes).decode("utf-8") + base64_str = f"data:{content_type};base64,{base64_str}" + return base64_str + + + #access——token操作 + async def check_access_token(self): + return bool(self.access_token and self.access_token.strip()) + + async def check_access_token_for_contacts(self): + return bool(self.access_token_for_contacts and self.access_token_for_contacts.strip()) + + async def get_access_token(self,secret): + url = f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.corpid}&corpsecret={secret}' + async with httpx.AsyncClient() as client: + response = await client.get(url) + data = response.json() + if 'access_token' in data: + return data['access_token'] + else: + raise Exception(f"未获取access token: {data}") + + async def get_detailed_message_list(self,xml_msg:str): + # 在本方法中解析消息,并且获得消息的具体内容 + root = ET.fromstring(xml_msg) + token = root.find("Token").text + open_kfid = root.find("OpenKfId").text + + # if open_kfid in self.openkfid_list: + # return None + # else: + # self.openkfid_list.append(open_kfid) + + if not await self.check_access_token(): + self.access_token = await self.get_access_token(self.secret) + + url = self.base_url+'/kf/sync_msg?access_token='+ self.access_token + async with httpx.AsyncClient() as client: + params = { + "token": token, + "voice_format": 0, + "open_kfid": open_kfid, + } + response = await client.post(url,json=params) + data = response.json() + if data['errcode'] != 0: + raise Exception("Failed to get message") + if data['errcode'] == 40014 or data['errcode'] == 42001: + self.access_token = await self.get_access_token(self.secret) + return await self.get_detailed_message_list(xml_msg) + last_msg_data = data['msg_list'][-1] + open_kfid = last_msg_data.get("open_kfid") + # 进行获取图片操作 + if last_msg_data.get("msgtype") == "image": + media_id = last_msg_data.get("image").get("media_id") + picurl = await self.get_pic_url(media_id) + last_msg_data["picurl"] = picurl + # await self.change_service_status(userid=external_userid,openkfid=open_kfid,servicer=servicer) + return last_msg_data + + + async def change_service_status(self,userid:str,openkfid:str,servicer:str): + if not await self.check_access_token(): + self.access_token = await self.get_access_token(self.secret) + url = self.base_url+"/kf/service_state/get?access_token="+self.access_token + async with httpx.AsyncClient() as client: + params = { + "open_kfid" : openkfid, + "external_userid" : userid, + "service_state" : 1, + "servicer_userid" : servicer, + } + response = await client.post(url,json=params) + data = response.json() + if data['errcode'] == 40014 or data['errcode'] == 42001: + self.access_token = await self.get_access_token(self.secret) + return await self.change_service_status(userid,openkfid) + if data['errcode'] != 0: + raise Exception("Failed to change service status: "+str(data)) + + + async def send_image(self,user_id:str,agent_id:int,media_id:str): + if not await self.check_access_token(): + self.access_token = await self.get_access_token(self.secret) + url = self.base_url+'/media/upload?access_token='+self.access_token + async with httpx.AsyncClient() as client: + params = { + "touser" : user_id, + "toparty" : "", + "totag":"", + "agentid" : agent_id, + "msgtype" : "image", + "image" : { + "media_id" : media_id, + }, + "safe":0, + "enable_id_trans": 0, + "enable_duplicate_check": 0, + "duplicate_check_interval": 1800 + } + try: + response = await client.post(url,json=params) + data = response.json() + except Exception as e: + raise Exception("Failed to send image: "+str(e)) + + # 企业微信错误码40014和42001,代表accesstoken问题 + if data['errcode'] == 40014 or data['errcode'] == 42001: + self.access_token = await self.get_access_token(self.secret) + return await self.send_image(user_id,agent_id,media_id) + + if data['errcode'] != 0: + raise Exception("Failed to send image: "+str(data)) + + + async def send_text_msg(self, open_kfid: str, external_userid: str, msgid: str,content:str): + if not await self.check_access_token(): + self.access_token = await self.get_access_token(self.secret) + + url = f"https://qyapi.weixin.qq.com/cgi-bin/kf/send_msg?access_token={self.access_token}" + + payload = { + "touser": external_userid, + "open_kfid": open_kfid, + "msgid": msgid, + "msgtype": "text", + "text": { + "content": content, + } + } + + async with httpx.AsyncClient() as client: + response = await client.post(url, json=payload) + + data = response.json() + if data.get("errcode") != 0: + raise Exception(f"消息发送失败: {data}") + return data + + + async def handle_callback_request(self): + """ + 处理回调请求,包括 GET 验证和 POST 消息接收。 + """ + try: + + msg_signature = request.args.get("msg_signature") + timestamp = request.args.get("timestamp") + nonce = request.args.get("nonce") + + if request.method == "GET": + echostr = request.args.get("echostr") + ret, reply_echo_str = self.wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr) + if ret != 0: + raise Exception(f"验证失败,错误码: {ret}") + return reply_echo_str + + elif request.method == "POST": + encrypt_msg = await request.data + ret, xml_msg = self.wxcpt.DecryptMsg(encrypt_msg, msg_signature, timestamp, nonce) + if ret != 0: + raise Exception(f"消息解密失败,错误码: {ret}") + + # 解析消息并处理 + message_data = await self.get_detailed_message_list(xml_msg) + if message_data is not None: + event = WecomCSEvent.from_payload(message_data) + if event: + await self._handle_message(event) + + return "success" + except Exception as e: + traceback.print_exc() + return f"Error processing request: {str(e)}", 400 + + async def run_task(self, host: str, port: int, *args, **kwargs): + """ + 启动 Quart 应用。 + """ + await self.app.run_task(host=host, port=port, *args, **kwargs) + + def on_message(self, msg_type: str): + """ + 注册消息类型处理器。 + """ + def decorator(func: Callable[[WecomCSEvent], None]): + if msg_type not in self._message_handlers: + self._message_handlers[msg_type] = [] + self._message_handlers[msg_type].append(func) + return func + return decorator + + async def _handle_message(self, event: WecomCSEvent): + """ + 处理消息事件。 + """ + msg_type = event.type + if msg_type in self._message_handlers: + for handler in self._message_handlers[msg_type]: + await handler(event) + + + @staticmethod + async def get_image_type(image_bytes: bytes) -> str: + """ + 通过图片的magic numbers判断图片类型 + """ + magic_numbers = { + b'\xFF\xD8\xFF': 'jpg', + b'\x89\x50\x4E\x47': 'png', + b'\x47\x49\x46': 'gif', + b'\x42\x4D': 'bmp', + b'\x00\x00\x01\x00': 'ico' + } + + for magic, ext in magic_numbers.items(): + if image_bytes.startswith(magic): + return ext + return 'jpg' # 默认返回jpg + + + async def upload_to_work(self, image: platform_message.Image): + """ + 获取 media_id + """ + if not await self.check_access_token(): + self.access_token = await self.get_access_token(self.secret) + + url = self.base_url + '/media/upload?access_token=' + self.access_token + '&type=file' + file_bytes = None + file_name = "uploaded_file.txt" + + # 获取文件的二进制数据 + if image.path: + async with aiofiles.open(image.path, 'rb') as f: + file_bytes = await f.read() + file_name = image.path.split('/')[-1] + elif image.url: + file_bytes = await self.download_image_to_bytes(image.url) + file_name = image.url.split('/')[-1] + elif image.base64: + try: + base64_data = image.base64 + if ',' in base64_data: + base64_data = base64_data.split(',', 1)[1] + padding = 4 - (len(base64_data) % 4) if len(base64_data) % 4 else 0 + padded_base64 = base64_data + '=' * padding + file_bytes = base64.b64decode(padded_base64) + except binascii.Error as e: + raise ValueError(f"Invalid base64 string: {str(e)}") + else: + raise ValueError("image对象出错") + + # 设置 multipart/form-data 格式的文件 + boundary = "-------------------------acebdf13572468" + headers = { + 'Content-Type': f'multipart/form-data; boundary={boundary}' + } + body = ( + f"--{boundary}\r\n" + f"Content-Disposition: form-data; name=\"media\"; filename=\"{file_name}\"; filelength={len(file_bytes)}\r\n" + f"Content-Type: application/octet-stream\r\n\r\n" + ).encode('utf-8') + file_bytes + f"\r\n--{boundary}--\r\n".encode('utf-8') + + # 上传文件 + async with httpx.AsyncClient() as client: + response = await client.post(url, headers=headers, content=body) + data = response.json() + if data['errcode'] == 40014 or data['errcode'] == 42001: + self.access_token = await self.get_access_token(self.secret) + media_id = await self.upload_to_work(image) + if data.get('errcode', 0) != 0: + raise Exception("failed to upload file") + + media_id = data.get('media_id') + return media_id + + async def download_image_to_bytes(self,url:str) -> bytes: + async with httpx.AsyncClient() as client: + response = await client.get(url) + response.raise_for_status() + return response.content + + #进行media_id的获取 + async def get_media_id(self, image: platform_message.Image): + + media_id = await self.upload_to_work(image=image) + return media_id diff --git a/libs/wecom_customer_service_api/wecomcsevent.py b/libs/wecom_customer_service_api/wecomcsevent.py new file mode 100644 index 00000000..8dc0e30d --- /dev/null +++ b/libs/wecom_customer_service_api/wecomcsevent.py @@ -0,0 +1,134 @@ +from typing import Dict, Any, Optional + + +class WecomCSEvent(dict): + """ + 封装从企业微信收到的事件数据对象(字典),提供属性以获取其中的字段。 + + 除 `type` 和 `detail_type` 属性对于任何事件都有效外,其它属性是否存在(若不存在则返回 `None`)依事件类型不同而不同。 + """ + + @staticmethod + def from_payload(payload: Dict[str, Any]) -> Optional["WecomCSEvent"]: + """ + 从企业微信(客服会话)事件数据构造 `WecomEvent` 对象。 + + Args: + payload (Dict[str, Any]): 解密后的企业微信事件数据。 + + Returns: + Optional[WecomEvent]: 如果事件数据合法,则返回 WecomEvent 对象;否则返回 None。 + """ + try: + event = WecomCSEvent(payload) + _ = event.type, + return event + except KeyError: + return None + + @property + def type(self) -> str: + """ + 事件类型,例如 "message"、"event"、"text" 等。 + + Returns: + str: 事件类型。 + """ + return self.get("msgtype", "") + + @property + def user_id(self) -> Optional[str]: + """ + 用户 ID,例如消息的发送者或事件的触发者。 + + Returns: + Optional[str]: 用户 ID。 + """ + return self.get("external_userid") + + @property + def receiver_id(self) -> Optional[str]: + """ + 接收者 ID,例如机器人自身的企业微信 ID。 + + Returns: + Optional[str]: 接收者 ID。 + """ + return self.get("open_kfid","") + + @property + def picurl(self) -> Optional[str]: + """ + 图片 URL,仅在图片消息中存在。 + base64格式 + Returns: + Optional[str]: 图片 URL。 + """ + + return self.get("picurl","") + + @property + def message_id(self) -> Optional[str]: + """ + 消息 ID,仅在消息类型事件中存在。 + + Returns: + Optional[str]: 消息 ID。 + """ + return self.get("msgid") + + @property + def message(self) -> Optional[str]: + """ + 消息内容,仅在消息类型事件中存在。 + + Returns: + Optional[str]: 消息内容。 + """ + if self.get("msgtype") == 'text': + return self.get("text").get("content","") + else: + return None + + + @property + def timestamp(self) -> Optional[int]: + """ + 事件发生的时间戳。 + + Returns: + Optional[int]: 时间戳。 + """ + return self.get("send_time") + + + def __getattr__(self, key: str) -> Optional[Any]: + """ + 允许通过属性访问数据中的任意字段。 + + Args: + key (str): 字段名。 + + Returns: + Optional[Any]: 字段值。 + """ + return self.get(key) + + def __setattr__(self, key: str, value: Any) -> None: + """ + 允许通过属性设置数据中的任意字段。 + + Args: + key (str): 字段名。 + value (Any): 字段值。 + """ + self[key] = value + + def __repr__(self) -> str: + """ + 生成事件对象的字符串表示。 + + Returns: + str: 字符串表示。 + """ + return f"" diff --git a/pkg/platform/sources/wecomcs.py b/pkg/platform/sources/wecomcs.py new file mode 100644 index 00000000..532d7470 --- /dev/null +++ b/pkg/platform/sources/wecomcs.py @@ -0,0 +1,223 @@ +from __future__ import annotations +import typing +import asyncio +import traceback + +import datetime + +from libs.wecom_customer_service_api.api import WecomCSClient +from pkg.platform.adapter import MessagePlatformAdapter +from pkg.platform.types import events as platform_events, message as platform_message +from libs.wecom_customer_service_api.wecomcsevent import WecomCSEvent +from pkg.core import app +from .. import adapter +from ...core import app +from ..types import message as platform_message +from ..types import events as platform_events +from ..types import entities as platform_entities +from ...command.errors import ParamNotEnoughError +from ...utils import image + +class WecomMessageConverter(adapter.MessageConverter): + + @staticmethod + async def yiri2target( + message_chain: platform_message.MessageChain, bot: WecomCSClient + ): + content_list = [] + + for msg in message_chain: + if type(msg) is platform_message.Plain: + content_list.append({ + "type": "text", + "content": msg.text, + }) + elif type(msg) is platform_message.Image: + content_list.append({ + "type": "image", + "media_id": await bot.get_media_id(msg), + }) + elif type(msg) is platform_message.Forward: + for node in msg.node_list: + content_list.extend((await WecomMessageConverter.yiri2target(node.message_chain, bot))) + else: + content_list.append({ + "type": "text", + "content": str(msg), + }) + + return content_list + + @staticmethod + async def target2yiri(message: str, message_id: int = -1): + yiri_msg_list = [] + yiri_msg_list.append( + platform_message.Source(id=message_id, time=datetime.datetime.now()) + ) + + yiri_msg_list.append(platform_message.Plain(text=message)) + chain = platform_message.MessageChain(yiri_msg_list) + + return chain + + @staticmethod + async def target2yiri_image(picurl: str, message_id: int = -1): + yiri_msg_list = [] + yiri_msg_list.append( + platform_message.Source(id=message_id, time=datetime.datetime.now()) + ) + yiri_msg_list.append(platform_message.Image(base64=picurl)) + chain = platform_message.MessageChain(yiri_msg_list) + + return chain + + +class WecomEventConverter: + + @staticmethod + async def yiri2target( + event: platform_events.Event, bot_account_id: int, bot: WecomCSClient + ) -> WecomCSEvent: + # only for extracting user information + + if type(event) is platform_events.GroupMessage: + pass + + if type(event) is platform_events.FriendMessage: + return event.source_platform_object + + @staticmethod + async def target2yiri(event: WecomCSEvent): + """ + 将 WecomEvent 转换为平台的 FriendMessage 对象。 + + Args: + event (WecomEvent): 企业微信客服事件。 + + Returns: + platform_events.FriendMessage: 转换后的 FriendMessage 对象。 + """ + # 转换消息链 + if event.type == "text": + yiri_chain = await WecomMessageConverter.target2yiri( + event.message, event.message_id + ) + friend = platform_entities.Friend( + id=f"u{event.user_id}", + nickname=str(event.user_id), + remark="", + ) + + return platform_events.FriendMessage( + sender=friend, message_chain=yiri_chain, time=event.timestamp, source_platform_object=event + ) + elif event.type == "image": + friend = platform_entities.Friend( + id=f"u{event.user_id}", + nickname=str(event.user_id), + remark="", + ) + + yiri_chain = await WecomMessageConverter.target2yiri_image( + picurl=event.picurl, message_id=event.message_id + ) + + return platform_events.FriendMessage( + sender=friend, message_chain=yiri_chain, time=event.timestamp, source_platform_object=event + ) + + +class WecomCSAdapter(adapter.MessagePlatformAdapter): + + bot: WecomCSClient + ap: app.Application + bot_account_id: str + message_converter: WecomMessageConverter = WecomMessageConverter() + event_converter: WecomEventConverter = WecomEventConverter() + config: dict + + def __init__(self, config: dict, ap: app.Application): + self.config = config + + self.ap = ap + + required_keys = [ + "corpid", + "secret", + "token", + "EncodingAESKey", + ] + missing_keys = [key for key in required_keys if key not in config] + if missing_keys: + raise ParamNotEnoughError("企业微信客服缺少相关配置项,请查看文档或联系管理员") + + self.bot = WecomCSClient( + corpid=config["corpid"], + secret=config["secret"], + token=config["token"], + EncodingAESKey=config["EncodingAESKey"], + ) + + async def reply_message( + self, + message_source: platform_events.MessageEvent, + message: platform_message.MessageChain, + quote_origin: bool = False, + ): + + Wecom_event = await WecomEventConverter.yiri2target( + message_source, self.bot_account_id, self.bot + ) + content_list = await WecomMessageConverter.yiri2target(message, self.bot) + + for content in content_list: + if content["type"] == "text": + await self.bot.send_text_msg(open_kfid=Wecom_event.receiver_id,external_userid=Wecom_event.user_id,msgid=Wecom_event.message_id,content=content["content"]) + + async def send_message( + self, target_type: str, target_id: str, message: platform_message.MessageChain + ): + pass + + def register_listener( + self, + event_type: typing.Type[platform_events.Event], + callback: typing.Callable[ + [platform_events.Event, adapter.MessagePlatformAdapter], None + ], + ): + async def on_message(event: WecomCSEvent): + self.bot_account_id = event.receiver_id + try: + return await callback( + await self.event_converter.target2yiri(event), self + ) + except: + traceback.print_exc() + + if event_type == platform_events.FriendMessage: + self.bot.on_message("text")(on_message) + self.bot.on_message("image")(on_message) + elif event_type == platform_events.GroupMessage: + pass + + async def run_async(self): + async def shutdown_trigger_placeholder(): + while True: + await asyncio.sleep(1) + + await self.bot.run_task( + host="0.0.0.0", + port=self.config["port"], + shutdown_trigger=shutdown_trigger_placeholder, + ) + + async def kill(self) -> bool: + return False + + async def unregister_listener( + self, + event_type: type, + callback: typing.Callable[[platform_events.Event, MessagePlatformAdapter], None], + ): + return super().unregister_listener(event_type, callback) \ No newline at end of file diff --git a/pkg/platform/sources/wecomcs.yaml b/pkg/platform/sources/wecomcs.yaml new file mode 100644 index 00000000..fb93d0b6 --- /dev/null +++ b/pkg/platform/sources/wecomcs.yaml @@ -0,0 +1,51 @@ +apiVersion: v1 +kind: MessagePlatformAdapter +metadata: + name: wecomcs + label: + en_US: WeComCustomerService + zh_CN: 企业微信客服 + description: + en_US: WeComCSAdapter + zh_CN: 企业微信客服适配器 +spec: + config: + - name: port + label: + en_US: Port + zh_CN: 监听端口 + type: int + required: true + default: 2289 + - name: corpid + label: + en_US: Corpid + zh_CN: 企业ID + type: string + required: true + default: "" + - name: secret + label: + en_US: Secret + zh_CN: 密钥 + type: string + required: true + default: "" + - name: token + label: + en_US: Token + zh_CN: 令牌 + type: string + required: true + default: "" + - name: EncodingAESKey + label: + en_US: EncodingAESKey + zh_CN: 消息加解密密钥 + type: string + required: true + default: "" +execution: + python: + path: ./wecomcs.py + attr: WecomCSAdapter \ No newline at end of file diff --git a/templates/platform.json b/templates/platform.json index d3f23b43..3fa4e3bf 100644 --- a/templates/platform.json +++ b/templates/platform.json @@ -103,6 +103,15 @@ "bot_token": "", "signing_secret": "", "port": 2288 + }, + { + "adapter": "wecomcs", + "enable": false, + "port": 2289, + "corpid": "", + "secret": "", + "token": "", + "EncodingAESKey": "" } ], "track-function-calls": true,