feat: add support for wecom customer service (#1304)

This commit is contained in:
Guanchao Wang
2025-04-16 15:02:01 +08:00
committed by GitHub
parent 2782c8cebe
commit 6e3514c0b2
6 changed files with 754 additions and 0 deletions

View File

@@ -0,0 +1,337 @@
from quart import request
from ..wecom_api.WXBizMsgCrypt3 import WXBizMsgCrypt
import base64
import binascii
import httpx
import traceback
from quart import Quart
import xml.etree.ElementTree as ET
from typing import Callable, Dict, Any
from .wecomcsevent import WecomCSEvent
from pkg.platform.types import events as platform_events, message as platform_message
import aiofiles
class WecomCSClient():
def __init__(self,corpid:str,secret:str,token:str,EncodingAESKey:str):
self.corpid = corpid
self.secret = secret
self.access_token_for_contacts =''
self.token = token
self.aes = EncodingAESKey
self.base_url = 'https://qyapi.weixin.qq.com/cgi-bin'
self.access_token = ''
self.app = Quart(__name__)
self.wxcpt = WXBizMsgCrypt(self.token, self.aes, self.corpid)
self.app.add_url_rule('/callback/command', 'handle_callback', self.handle_callback_request, methods=['GET', 'POST'])
self._message_handlers = {
"example":[],
}
async def get_pic_url(self, media_id: str):
if not await self.check_access_token():
self.access_token = await self.get_access_token(self.secret)
url = f"{self.base_url}/media/get?access_token={self.access_token}&media_id={media_id}"
async with httpx.AsyncClient() as client:
response = await client.get(url)
if response.headers.get("Content-Type", "").startswith("application/json"):
data = response.json()
if data.get('errcode') in [40014, 42001]:
self.access_token = await self.get_access_token(self.secret)
return await self.get_pic_url(media_id)
else:
raise Exception("Failed to get image: " + str(data))
# 否则是图片,转成 base64
image_bytes = response.content
content_type = response.headers.get("Content-Type", "")
base64_str = base64.b64encode(image_bytes).decode("utf-8")
base64_str = f"data:{content_type};base64,{base64_str}"
return base64_str
#access——token操作
async def check_access_token(self):
return bool(self.access_token and self.access_token.strip())
async def check_access_token_for_contacts(self):
return bool(self.access_token_for_contacts and self.access_token_for_contacts.strip())
async def get_access_token(self,secret):
url = f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.corpid}&corpsecret={secret}'
async with httpx.AsyncClient() as client:
response = await client.get(url)
data = response.json()
if 'access_token' in data:
return data['access_token']
else:
raise Exception(f"未获取access token: {data}")
async def get_detailed_message_list(self,xml_msg:str):
# 在本方法中解析消息,并且获得消息的具体内容
root = ET.fromstring(xml_msg)
token = root.find("Token").text
open_kfid = root.find("OpenKfId").text
# if open_kfid in self.openkfid_list:
# return None
# else:
# self.openkfid_list.append(open_kfid)
if not await self.check_access_token():
self.access_token = await self.get_access_token(self.secret)
url = self.base_url+'/kf/sync_msg?access_token='+ self.access_token
async with httpx.AsyncClient() as client:
params = {
"token": token,
"voice_format": 0,
"open_kfid": open_kfid,
}
response = await client.post(url,json=params)
data = response.json()
if data['errcode'] != 0:
raise Exception("Failed to get message")
if data['errcode'] == 40014 or data['errcode'] == 42001:
self.access_token = await self.get_access_token(self.secret)
return await self.get_detailed_message_list(xml_msg)
last_msg_data = data['msg_list'][-1]
open_kfid = last_msg_data.get("open_kfid")
# 进行获取图片操作
if last_msg_data.get("msgtype") == "image":
media_id = last_msg_data.get("image").get("media_id")
picurl = await self.get_pic_url(media_id)
last_msg_data["picurl"] = picurl
# await self.change_service_status(userid=external_userid,openkfid=open_kfid,servicer=servicer)
return last_msg_data
async def change_service_status(self,userid:str,openkfid:str,servicer:str):
if not await self.check_access_token():
self.access_token = await self.get_access_token(self.secret)
url = self.base_url+"/kf/service_state/get?access_token="+self.access_token
async with httpx.AsyncClient() as client:
params = {
"open_kfid" : openkfid,
"external_userid" : userid,
"service_state" : 1,
"servicer_userid" : servicer,
}
response = await client.post(url,json=params)
data = response.json()
if data['errcode'] == 40014 or data['errcode'] == 42001:
self.access_token = await self.get_access_token(self.secret)
return await self.change_service_status(userid,openkfid)
if data['errcode'] != 0:
raise Exception("Failed to change service status: "+str(data))
async def send_image(self,user_id:str,agent_id:int,media_id:str):
if not await self.check_access_token():
self.access_token = await self.get_access_token(self.secret)
url = self.base_url+'/media/upload?access_token='+self.access_token
async with httpx.AsyncClient() as client:
params = {
"touser" : user_id,
"toparty" : "",
"totag":"",
"agentid" : agent_id,
"msgtype" : "image",
"image" : {
"media_id" : media_id,
},
"safe":0,
"enable_id_trans": 0,
"enable_duplicate_check": 0,
"duplicate_check_interval": 1800
}
try:
response = await client.post(url,json=params)
data = response.json()
except Exception as e:
raise Exception("Failed to send image: "+str(e))
# 企业微信错误码40014和42001代表accesstoken问题
if data['errcode'] == 40014 or data['errcode'] == 42001:
self.access_token = await self.get_access_token(self.secret)
return await self.send_image(user_id,agent_id,media_id)
if data['errcode'] != 0:
raise Exception("Failed to send image: "+str(data))
async def send_text_msg(self, open_kfid: str, external_userid: str, msgid: str,content:str):
if not await self.check_access_token():
self.access_token = await self.get_access_token(self.secret)
url = f"https://qyapi.weixin.qq.com/cgi-bin/kf/send_msg?access_token={self.access_token}"
payload = {
"touser": external_userid,
"open_kfid": open_kfid,
"msgid": msgid,
"msgtype": "text",
"text": {
"content": content,
}
}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload)
data = response.json()
if data.get("errcode") != 0:
raise Exception(f"消息发送失败: {data}")
return data
async def handle_callback_request(self):
"""
处理回调请求,包括 GET 验证和 POST 消息接收。
"""
try:
msg_signature = request.args.get("msg_signature")
timestamp = request.args.get("timestamp")
nonce = request.args.get("nonce")
if request.method == "GET":
echostr = request.args.get("echostr")
ret, reply_echo_str = self.wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
if ret != 0:
raise Exception(f"验证失败,错误码: {ret}")
return reply_echo_str
elif request.method == "POST":
encrypt_msg = await request.data
ret, xml_msg = self.wxcpt.DecryptMsg(encrypt_msg, msg_signature, timestamp, nonce)
if ret != 0:
raise Exception(f"消息解密失败,错误码: {ret}")
# 解析消息并处理
message_data = await self.get_detailed_message_list(xml_msg)
if message_data is not None:
event = WecomCSEvent.from_payload(message_data)
if event:
await self._handle_message(event)
return "success"
except Exception as e:
traceback.print_exc()
return f"Error processing request: {str(e)}", 400
async def run_task(self, host: str, port: int, *args, **kwargs):
"""
启动 Quart 应用。
"""
await self.app.run_task(host=host, port=port, *args, **kwargs)
def on_message(self, msg_type: str):
"""
注册消息类型处理器。
"""
def decorator(func: Callable[[WecomCSEvent], None]):
if msg_type not in self._message_handlers:
self._message_handlers[msg_type] = []
self._message_handlers[msg_type].append(func)
return func
return decorator
async def _handle_message(self, event: WecomCSEvent):
"""
处理消息事件。
"""
msg_type = event.type
if msg_type in self._message_handlers:
for handler in self._message_handlers[msg_type]:
await handler(event)
@staticmethod
async def get_image_type(image_bytes: bytes) -> str:
"""
通过图片的magic numbers判断图片类型
"""
magic_numbers = {
b'\xFF\xD8\xFF': 'jpg',
b'\x89\x50\x4E\x47': 'png',
b'\x47\x49\x46': 'gif',
b'\x42\x4D': 'bmp',
b'\x00\x00\x01\x00': 'ico'
}
for magic, ext in magic_numbers.items():
if image_bytes.startswith(magic):
return ext
return 'jpg' # 默认返回jpg
async def upload_to_work(self, image: platform_message.Image):
"""
获取 media_id
"""
if not await self.check_access_token():
self.access_token = await self.get_access_token(self.secret)
url = self.base_url + '/media/upload?access_token=' + self.access_token + '&type=file'
file_bytes = None
file_name = "uploaded_file.txt"
# 获取文件的二进制数据
if image.path:
async with aiofiles.open(image.path, 'rb') as f:
file_bytes = await f.read()
file_name = image.path.split('/')[-1]
elif image.url:
file_bytes = await self.download_image_to_bytes(image.url)
file_name = image.url.split('/')[-1]
elif image.base64:
try:
base64_data = image.base64
if ',' in base64_data:
base64_data = base64_data.split(',', 1)[1]
padding = 4 - (len(base64_data) % 4) if len(base64_data) % 4 else 0
padded_base64 = base64_data + '=' * padding
file_bytes = base64.b64decode(padded_base64)
except binascii.Error as e:
raise ValueError(f"Invalid base64 string: {str(e)}")
else:
raise ValueError("image对象出错")
# 设置 multipart/form-data 格式的文件
boundary = "-------------------------acebdf13572468"
headers = {
'Content-Type': f'multipart/form-data; boundary={boundary}'
}
body = (
f"--{boundary}\r\n"
f"Content-Disposition: form-data; name=\"media\"; filename=\"{file_name}\"; filelength={len(file_bytes)}\r\n"
f"Content-Type: application/octet-stream\r\n\r\n"
).encode('utf-8') + file_bytes + f"\r\n--{boundary}--\r\n".encode('utf-8')
# 上传文件
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, content=body)
data = response.json()
if data['errcode'] == 40014 or data['errcode'] == 42001:
self.access_token = await self.get_access_token(self.secret)
media_id = await self.upload_to_work(image)
if data.get('errcode', 0) != 0:
raise Exception("failed to upload file")
media_id = data.get('media_id')
return media_id
async def download_image_to_bytes(self,url:str) -> bytes:
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return response.content
#进行media_id的获取
async def get_media_id(self, image: platform_message.Image):
media_id = await self.upload_to_work(image=image)
return media_id

View File

@@ -0,0 +1,134 @@
from typing import Dict, Any, Optional
class WecomCSEvent(dict):
"""
封装从企业微信收到的事件数据对象(字典),提供属性以获取其中的字段。
除 `type` 和 `detail_type` 属性对于任何事件都有效外,其它属性是否存在(若不存在则返回 `None`)依事件类型不同而不同。
"""
@staticmethod
def from_payload(payload: Dict[str, Any]) -> Optional["WecomCSEvent"]:
"""
从企业微信(客服会话)事件数据构造 `WecomEvent` 对象。
Args:
payload (Dict[str, Any]): 解密后的企业微信事件数据。
Returns:
Optional[WecomEvent]: 如果事件数据合法,则返回 WecomEvent 对象;否则返回 None。
"""
try:
event = WecomCSEvent(payload)
_ = event.type,
return event
except KeyError:
return None
@property
def type(self) -> str:
"""
事件类型,例如 "message""event""text" 等。
Returns:
str: 事件类型。
"""
return self.get("msgtype", "")
@property
def user_id(self) -> Optional[str]:
"""
用户 ID例如消息的发送者或事件的触发者。
Returns:
Optional[str]: 用户 ID。
"""
return self.get("external_userid")
@property
def receiver_id(self) -> Optional[str]:
"""
接收者 ID例如机器人自身的企业微信 ID。
Returns:
Optional[str]: 接收者 ID。
"""
return self.get("open_kfid","")
@property
def picurl(self) -> Optional[str]:
"""
图片 URL仅在图片消息中存在。
base64格式
Returns:
Optional[str]: 图片 URL。
"""
return self.get("picurl","")
@property
def message_id(self) -> Optional[str]:
"""
消息 ID仅在消息类型事件中存在。
Returns:
Optional[str]: 消息 ID。
"""
return self.get("msgid")
@property
def message(self) -> Optional[str]:
"""
消息内容,仅在消息类型事件中存在。
Returns:
Optional[str]: 消息内容。
"""
if self.get("msgtype") == 'text':
return self.get("text").get("content","")
else:
return None
@property
def timestamp(self) -> Optional[int]:
"""
事件发生的时间戳。
Returns:
Optional[int]: 时间戳。
"""
return self.get("send_time")
def __getattr__(self, key: str) -> Optional[Any]:
"""
允许通过属性访问数据中的任意字段。
Args:
key (str): 字段名。
Returns:
Optional[Any]: 字段值。
"""
return self.get(key)
def __setattr__(self, key: str, value: Any) -> None:
"""
允许通过属性设置数据中的任意字段。
Args:
key (str): 字段名。
value (Any): 字段值。
"""
self[key] = value
def __repr__(self) -> str:
"""
生成事件对象的字符串表示。
Returns:
str: 字符串表示。
"""
return f"<WecomEvent {super().__repr__()}>"